Edit on GitHub

sqlmesh.core.analytics

  1from __future__ import annotations
  2
  3import atexit
  4import os
  5import typing as t
  6from functools import wraps
  7
  8from sqlmesh.core.analytics.collector import AnalyticsCollector
  9from sqlmesh.core.analytics.dispatcher import AsyncEventDispatcher, NoopEventDispatcher
 10from sqlmesh.utils import str_to_bool
 11
 12if t.TYPE_CHECKING:
 13    import sys
 14
 15    if sys.version_info >= (3, 10):
 16        from typing import ParamSpec
 17    else:
 18        from typing_extensions import ParamSpec
 19
 20    _P = ParamSpec("_P")
 21    _T = t.TypeVar("_T")
 22
 23
 24def init_collector() -> AnalyticsCollector:
 25    dispatcher = (
 26        NoopEventDispatcher()
 27        if str_to_bool(os.getenv("SQLMESH__DISABLE_ANONYMIZED_ANALYTICS", "false"))
 28        else AsyncEventDispatcher()
 29    )
 30    return AnalyticsCollector(dispatcher=dispatcher)
 31
 32
 33collector = init_collector()
 34
 35
 36atexit.register(collector.shutdown, flush=True)
 37
 38
 39def disable_analytics() -> None:
 40    global collector
 41    collector.shutdown(flush=False)
 42    collector = AnalyticsCollector(dispatcher=NoopEventDispatcher())
 43
 44
 45def cli_analytics(func: t.Callable[_P, _T]) -> t.Callable[_P, _T]:
 46    @wraps(func)
 47    def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T:
 48        import click
 49        from click.core import ParameterSource
 50
 51        arg_keys = set()
 52        command_chain = []
 53
 54        cli_context: click.Context = click.get_current_context()
 55        cli_context_cursor: t.Optional[click.Context] = cli_context
 56        while cli_context_cursor:
 57            arg_keys |= {
 58                k
 59                for k, source in cli_context_cursor._parameter_source.items()
 60                if source == ParameterSource.COMMANDLINE
 61            }
 62            command_chain.append(cli_context_cursor.info_name)
 63            cli_context_cursor = cli_context_cursor.parent
 64
 65        command_name, *parent_command_names = command_chain
 66
 67        common_context: t.Dict[str, t.Any] = {
 68            "command_name": command_name,
 69            "command_args": arg_keys,
 70            "parent_command_names": parent_command_names,
 71        }
 72
 73        if "github" in parent_command_names:
 74            cicd_bot_config = None
 75            github_controller = cli_context.obj.get("github")
 76            if github_controller:
 77                cicd_bot_config = github_controller._context.config.cicd_bot
 78            collector.on_cicd_command(**common_context, cicd_bot_config=cicd_bot_config)
 79        else:
 80            collector.on_cli_command(**common_context)
 81
 82        return func(*args, **kwargs)
 83
 84    return wrapper
 85
 86
 87def python_api_analytics(func: t.Callable[_P, _T]) -> t.Callable[_P, _T]:
 88    @wraps(func)
 89    def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T:
 90        import inspect
 91
 92        from sqlmesh import magics
 93
 94        should_log = True
 95
 96        try:
 97            stack = inspect.stack()
 98        except Exception:
 99            stack = []
100
101        for frame in stack:
102            if "click/" in frame.filename or frame.filename == magics.__file__:
103                # Magics and CLI are reported separately.
104                should_log = False
105                break
106
107        if should_log:
108            collector.on_python_api_command(command_name=func.__name__, command_args=kwargs)
109
110        return func(*args, **kwargs)
111
112    return wrapper
def init_collector() -> sqlmesh.core.analytics.collector.AnalyticsCollector:
25def init_collector() -> AnalyticsCollector:
26    dispatcher = (
27        NoopEventDispatcher()
28        if str_to_bool(os.getenv("SQLMESH__DISABLE_ANONYMIZED_ANALYTICS", "false"))
29        else AsyncEventDispatcher()
30    )
31    return AnalyticsCollector(dispatcher=dispatcher)
def disable_analytics() -> None:
40def disable_analytics() -> None:
41    global collector
42    collector.shutdown(flush=False)
43    collector = AnalyticsCollector(dispatcher=NoopEventDispatcher())
def cli_analytics(func: Callable[~_P, ~_T]) -> Callable[~_P, ~_T]:
46def cli_analytics(func: t.Callable[_P, _T]) -> t.Callable[_P, _T]:
47    @wraps(func)
48    def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T:
49        import click
50        from click.core import ParameterSource
51
52        arg_keys = set()
53        command_chain = []
54
55        cli_context: click.Context = click.get_current_context()
56        cli_context_cursor: t.Optional[click.Context] = cli_context
57        while cli_context_cursor:
58            arg_keys |= {
59                k
60                for k, source in cli_context_cursor._parameter_source.items()
61                if source == ParameterSource.COMMANDLINE
62            }
63            command_chain.append(cli_context_cursor.info_name)
64            cli_context_cursor = cli_context_cursor.parent
65
66        command_name, *parent_command_names = command_chain
67
68        common_context: t.Dict[str, t.Any] = {
69            "command_name": command_name,
70            "command_args": arg_keys,
71            "parent_command_names": parent_command_names,
72        }
73
74        if "github" in parent_command_names:
75            cicd_bot_config = None
76            github_controller = cli_context.obj.get("github")
77            if github_controller:
78                cicd_bot_config = github_controller._context.config.cicd_bot
79            collector.on_cicd_command(**common_context, cicd_bot_config=cicd_bot_config)
80        else:
81            collector.on_cli_command(**common_context)
82
83        return func(*args, **kwargs)
84
85    return wrapper
def python_api_analytics(func: Callable[~_P, ~_T]) -> Callable[~_P, ~_T]:
 88def python_api_analytics(func: t.Callable[_P, _T]) -> t.Callable[_P, _T]:
 89    @wraps(func)
 90    def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T:
 91        import inspect
 92
 93        from sqlmesh import magics
 94
 95        should_log = True
 96
 97        try:
 98            stack = inspect.stack()
 99        except Exception:
100            stack = []
101
102        for frame in stack:
103            if "click/" in frame.filename or frame.filename == magics.__file__:
104                # Magics and CLI are reported separately.
105                should_log = False
106                break
107
108        if should_log:
109            collector.on_python_api_command(command_name=func.__name__, command_args=kwargs)
110
111        return func(*args, **kwargs)
112
113    return wrapper