sqlmesh.core.analytics
1from __future__ import annotations 2 3import atexit 4import os 5import typing as t 6from functools import wraps 7 8from sqlmesh.core.analytics.collector import AnalyticsCollector 9from sqlmesh.core.analytics.dispatcher import AsyncEventDispatcher, NoopEventDispatcher 10from sqlmesh.utils import str_to_bool 11 12if t.TYPE_CHECKING: 13 import sys 14 15 if sys.version_info >= (3, 10): 16 from typing import ParamSpec 17 else: 18 from typing_extensions import ParamSpec 19 20 _P = ParamSpec("_P") 21 _T = t.TypeVar("_T") 22 23 24def init_collector() -> AnalyticsCollector: 25 dispatcher = ( 26 NoopEventDispatcher() 27 if str_to_bool(os.getenv("SQLMESH__DISABLE_ANONYMIZED_ANALYTICS", "false")) 28 else AsyncEventDispatcher() 29 ) 30 return AnalyticsCollector(dispatcher=dispatcher) 31 32 33collector = init_collector() 34 35 36atexit.register(collector.shutdown, flush=True) 37 38 39def disable_analytics() -> None: 40 global collector 41 collector.shutdown(flush=False) 42 collector = AnalyticsCollector(dispatcher=NoopEventDispatcher()) 43 44 45def cli_analytics(func: t.Callable[_P, _T]) -> t.Callable[_P, _T]: 46 @wraps(func) 47 def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: 48 import click 49 from click.core import ParameterSource 50 51 arg_keys = set() 52 command_chain = [] 53 54 cli_context: click.Context = click.get_current_context() 55 cli_context_cursor: t.Optional[click.Context] = cli_context 56 while cli_context_cursor: 57 arg_keys |= { 58 k 59 for k, source in cli_context_cursor._parameter_source.items() 60 if source == ParameterSource.COMMANDLINE 61 } 62 command_chain.append(cli_context_cursor.info_name) 63 cli_context_cursor = cli_context_cursor.parent 64 65 command_name, *parent_command_names = command_chain 66 67 common_context: t.Dict[str, t.Any] = { 68 "command_name": command_name, 69 "command_args": arg_keys, 70 "parent_command_names": parent_command_names, 71 } 72 73 if "github" in parent_command_names: 74 cicd_bot_config = None 75 github_controller = cli_context.obj.get("github") 76 if github_controller: 77 cicd_bot_config = github_controller._context.config.cicd_bot 78 collector.on_cicd_command(**common_context, cicd_bot_config=cicd_bot_config) 79 else: 80 collector.on_cli_command(**common_context) 81 82 return func(*args, **kwargs) 83 84 return wrapper 85 86 87def python_api_analytics(func: t.Callable[_P, _T]) -> t.Callable[_P, _T]: 88 @wraps(func) 89 def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: 90 import inspect 91 92 from sqlmesh import magics 93 94 should_log = True 95 96 try: 97 stack = inspect.stack() 98 except Exception: 99 stack = [] 100 101 for frame in stack: 102 if "click/" in frame.filename or frame.filename == magics.__file__: 103 # Magics and CLI are reported separately. 104 should_log = False 105 break 106 107 if should_log: 108 collector.on_python_api_command(command_name=func.__name__, command_args=kwargs) 109 110 return func(*args, **kwargs) 111 112 return wrapper
collector =
<sqlmesh.core.analytics.collector.AnalyticsCollector object>
def
disable_analytics() -> None:
def
cli_analytics(func: Callable[~_P, ~_T]) -> Callable[~_P, ~_T]:
46def cli_analytics(func: t.Callable[_P, _T]) -> t.Callable[_P, _T]: 47 @wraps(func) 48 def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: 49 import click 50 from click.core import ParameterSource 51 52 arg_keys = set() 53 command_chain = [] 54 55 cli_context: click.Context = click.get_current_context() 56 cli_context_cursor: t.Optional[click.Context] = cli_context 57 while cli_context_cursor: 58 arg_keys |= { 59 k 60 for k, source in cli_context_cursor._parameter_source.items() 61 if source == ParameterSource.COMMANDLINE 62 } 63 command_chain.append(cli_context_cursor.info_name) 64 cli_context_cursor = cli_context_cursor.parent 65 66 command_name, *parent_command_names = command_chain 67 68 common_context: t.Dict[str, t.Any] = { 69 "command_name": command_name, 70 "command_args": arg_keys, 71 "parent_command_names": parent_command_names, 72 } 73 74 if "github" in parent_command_names: 75 cicd_bot_config = None 76 github_controller = cli_context.obj.get("github") 77 if github_controller: 78 cicd_bot_config = github_controller._context.config.cicd_bot 79 collector.on_cicd_command(**common_context, cicd_bot_config=cicd_bot_config) 80 else: 81 collector.on_cli_command(**common_context) 82 83 return func(*args, **kwargs) 84 85 return wrapper
def
python_api_analytics(func: Callable[~_P, ~_T]) -> Callable[~_P, ~_T]:
88def python_api_analytics(func: t.Callable[_P, _T]) -> t.Callable[_P, _T]: 89 @wraps(func) 90 def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: 91 import inspect 92 93 from sqlmesh import magics 94 95 should_log = True 96 97 try: 98 stack = inspect.stack() 99 except Exception: 100 stack = [] 101 102 for frame in stack: 103 if "click/" in frame.filename or frame.filename == magics.__file__: 104 # Magics and CLI are reported separately. 105 should_log = False 106 break 107 108 if should_log: 109 collector.on_python_api_command(command_name=func.__name__, command_args=kwargs) 110 111 return func(*args, **kwargs) 112 113 return wrapper