Edit on GitHub

sqlmesh.core.analytics.collector

  1from __future__ import annotations
  2
  3import itertools
  4import json
  5import typing as t
  6from functools import cached_property
  7from pathlib import Path
  8
  9from sqlmesh.core import constants as c
 10from sqlmesh.core.analytics.dispatcher import AsyncEventDispatcher, EventDispatcher
 11from sqlmesh.utils import random_id
 12from sqlmesh.utils.date import now_timestamp
 13from sqlmesh.utils.hashing import md5
 14from sqlmesh.utils.pydantic import PydanticModel
 15from sqlmesh.utils.yaml import dump as yaml_dump
 16from sqlmesh.utils.yaml import load as yaml_load
 17
 18if t.TYPE_CHECKING:
 19    from sqlmesh.cicd.config import CICDBotConfig
 20    from sqlmesh.core.plan import EvaluatablePlan
 21    from sqlmesh.core.snapshot import Snapshot
 22
 23
 24class AnalyticsCollector:
 25    def __init__(
 26        self,
 27        dispatcher: t.Optional[EventDispatcher] = None,
 28        sqlmesh_path: t.Optional[Path] = None,
 29    ):
 30        self._dispatcher = dispatcher or AsyncEventDispatcher()
 31        self._sqlmesh_path = sqlmesh_path or c.SQLMESH_PATH
 32        self._process_id = random_id()
 33        self._seq_num = itertools.count()
 34
 35    def on_cicd_command(
 36        self,
 37        *,
 38        command_name: str,
 39        command_args: t.Collection[str],
 40        parent_command_names: t.Collection[str],
 41        cicd_bot_config: t.Optional[CICDBotConfig],
 42    ) -> None:
 43        """Called when a CICD command is executed.
 44
 45        Args:
 46            command_name: The name of the command.
 47            command_args: The arguments of the command.
 48            parent_command_names: The names of the parent commands.
 49            cicd_config: The CICD bot configuration.
 50        """
 51        additional_args = {}
 52        if cicd_bot_config is not None and getattr(cicd_bot_config, "FIELDS_FOR_ANALYTICS", None):
 53            additional_args["cicd_bot_config"] = cicd_bot_config.dict(
 54                include=cicd_bot_config.FIELDS_FOR_ANALYTICS, mode="json"
 55            )
 56        self._on_command(
 57            "CICD_COMMAND",
 58            command_name,
 59            command_args,
 60            parent_command_names=parent_command_names,
 61            **additional_args,
 62        )
 63
 64    def on_cli_command(
 65        self,
 66        *,
 67        command_name: str,
 68        command_args: t.Collection[str],
 69        parent_command_names: t.Collection[str],
 70    ) -> None:
 71        """Called when a CLI command is executed.
 72
 73        Args:
 74            command_name: The name of the command.
 75            command_args: The arguments of the command.
 76            parent_command_names: The names of the parent commands.
 77        """
 78        self._on_command(
 79            "CLI_COMMAND", command_name, command_args, parent_command_names=parent_command_names
 80        )
 81
 82    def on_magic_command(self, *, command_name: str, command_args: t.Collection[str]) -> None:
 83        """Called when a Notebook magic command is executed.
 84
 85        Args:
 86            command_name: The name of the command.
 87            command_args: The arguments of the command.
 88        """
 89        self._on_command("MAGIC_COMMAND", command_name, command_args)
 90
 91    def on_python_api_command(self, *, command_name: str, command_args: t.Collection[str]) -> None:
 92        """Called when a Python method is called directly.
 93
 94        Args:
 95            command_name: The name of the command.
 96            command_args: The arguments of the command.
 97        """
 98        self._on_command("PYTHON_API_COMMAND", command_name, command_args)
 99
100    def on_project_loaded(
101        self,
102        *,
103        project_type: str,
104        models_count: int,
105        audits_count: int,
106        standalone_audits_count: int,
107        macros_count: int,
108        jinja_macros_count: int,
109        load_time_sec: float,
110        state_sync_fingerprint: str,
111        project_name: str,
112    ) -> None:
113        """Called when a project is loaded.
114
115        Args:
116            project_type: The type of the project. Eg. "dbt" or "native".
117            models_count: The number of models in the project.
118            audits_count: The number of audits in the project.
119            standalone_audits_count: The number of standalone audits in the project.
120            macros_count: The number of macros in the project.
121            jinja_macros_count: The number of Jinja macros in the project.
122            load_time_sec: The time it took to load the project in (fractional) seconds.
123            state_sync_fingerprint: The fingerprint of the state sync configuration.
124            project_name: The name of the project.
125        """
126        project_type = project_type.lower()
127        event_data = {
128            "project_type": project_type,
129            "models_count": models_count,
130            "audits_count": audits_count,
131            "standalone_audits_count": standalone_audits_count,
132            "macros_count": macros_count,
133            "jinja_macros_count": jinja_macros_count,
134            "load_time_ms": int(load_time_sec * 1000),
135            "state_sync_fingerprint": state_sync_fingerprint,
136            "project_name_hash": _anonymize(project_name),
137        }
138
139        if project_type in {c.DBT, c.HYBRID}:
140            from dbt.version import __version__ as dbt_version
141
142            event_data["dbt_version"] = dbt_version
143
144        self._add_event("PROJECT_LOADED", event_data)
145
146    def on_plan_apply_start(
147        self,
148        *,
149        plan: EvaluatablePlan,
150        engine_type: t.Optional[str],
151        state_sync_type: t.Optional[str],
152        scheduler_type: str,
153    ) -> None:
154        """Called after the plan application starts.
155
156        Args:
157            plan: The plan that is being applied.
158            engine_type: The type of the target engine.
159            state_sync_type: The type of the engine used to store the SQLMesh state.
160            scheduler_type: The type of the scheduler being used. Eg. "builtin".
161        """
162        self._add_event(
163            "PLAN_APPLY_START",
164            {
165                "plan_id": plan.plan_id,
166                "engine_type": engine_type.lower() if engine_type is not None else None,
167                "state_sync_type": state_sync_type.lower() if state_sync_type is not None else None,
168                "scheduler_type": scheduler_type.lower(),
169                "is_dev": plan.is_dev,
170                "skip_backfill": plan.skip_backfill,
171                "no_gaps": plan.no_gaps,
172                "forward_only": plan.forward_only,
173                "ensure_finalized_snapshots": plan.ensure_finalized_snapshots,
174                "has_restatements": bool(plan.restatements),
175                "directly_modified_count": len(plan.directly_modified_snapshots),
176                "indirectly_modified_count": len(
177                    {
178                        s_id
179                        for s_ids in plan.indirectly_modified_snapshots.values()
180                        for s_id in s_ids
181                    }
182                ),
183                "environment_name_hash": _anonymize(plan.environment.name),
184            },
185        )
186
187    def on_plan_apply_end(self, *, plan_id: str, error: t.Optional[t.Any] = None) -> None:
188        """Called after the plan application ends.
189
190        Args:
191            plan_id: The ID of the plan that was applied.
192            error: The error that occurred during plan application, if any.
193        """
194        self._add_event(
195            "PLAN_APPLY_END",
196            {
197                "plan_id": plan_id,
198                "succeeded": error is None,
199                "error": type(error).__name__ if error else None,
200            },
201        )
202
203    def on_snapshots_created(self, *, new_snapshots: t.Collection[Snapshot], plan_id: str) -> None:
204        """Called after new snapshots were created and stored in the SQLMesh state.
205
206        Args:
207            new_snapshots: The list of new snapshots.
208            plan_id: The ID of the plan that created the snapshots.
209        """
210        if not new_snapshots:
211            return
212        snapshots = []
213        for snapshot in new_snapshots:
214            snapshots.append(
215                {
216                    "name_hash": _anonymize(snapshot.name),
217                    "identifier": snapshot.identifier,
218                    "version": snapshot.version,
219                    "node_type": snapshot.node_type.lower(),
220                    "model_kind": snapshot.model.kind.name.value.lower()
221                    if snapshot.is_model
222                    else None,
223                    "is_sql": snapshot.model.is_sql if snapshot.is_model else None,
224                    "change_category": (
225                        snapshot.change_category.name.lower() if snapshot.change_category else None
226                    ),
227                    "dialect": getattr(snapshot.node, "dialect", None),
228                    "audits_count": len(snapshot.model.audits) if snapshot.is_model else None,
229                    "effective_from_set": snapshot.effective_from is not None,
230                }
231            )
232        self._add_event("SNAPSHOTS_CREATED", {"plan_id": plan_id, "snapshots": snapshots})
233
234    def on_run_start(self, *, engine_type: str, state_sync_type: str) -> str:
235        """Called after a run starts.
236
237        Args:
238            engine_type: The type of the target engine.
239            state_sync_type: The type of the engine used to store the SQLMesh state.
240
241        Returns:
242            The run ID.
243        """
244        run_id = random_id()
245        self._add_event(
246            "RUN_START",
247            {
248                "run_id": run_id,
249                "engine_type": engine_type.lower(),
250                "state_sync_type": state_sync_type.lower(),
251            },
252        )
253        return run_id
254
255    def on_run_end(
256        self, *, run_id: str, succeeded: bool, interrupted: bool, error: t.Optional[t.Any] = None
257    ) -> None:
258        """Called after a run ends.
259
260        Args:
261            run_id: The ID of the run.
262            succeeded: Whether the run succeeded.
263            interrupted: Whether the run was interrupted.
264            error: The error that occurred during the run, if any.
265        """
266        self._add_event(
267            "RUN_END",
268            {
269                "run_id": run_id,
270                "succeeded": succeeded,
271                "interrupted": interrupted,
272                "error": type(error).__name__ if error else None,
273            },
274        )
275
276    def on_migration_end(
277        self,
278        *,
279        from_sqlmesh_version: str,
280        state_sync_type: str,
281        migration_time_sec: float,
282        error: t.Optional[t.Any] = None,
283    ) -> None:
284        """Called after the migration of the SQLMesh state ends.
285
286        Args:
287            from_sqlmesh_version: The version of SQLMesh from which the migration started.
288            state_sync_type: The type of the engine used to store the SQLMesh state.
289            migration_time_sec: The time it took to migrate the SQLMesh state in (fractional) seconds.
290            error: The error that occurred during the migration, if any.
291        """
292        self._add_event(
293            "MIGRATION_END",
294            {
295                "from_sqlmesh_version": from_sqlmesh_version,
296                "state_sync_type": state_sync_type,
297                "succeeded": error is None,
298                "error": type(error).__name__ if error else None,
299                "migration_time_ms": int(migration_time_sec * 1000),
300            },
301        )
302
303    def flush(self) -> None:
304        """Flushes the events to the dispatcher."""
305        self._dispatcher.flush()
306
307    def shutdown(self, flush: bool = True) -> None:
308        """Shuts down the collector."""
309        self._dispatcher.shutdown(flush=flush)
310
311    def _on_command(
312        self,
313        event_type: str,
314        command_name: str,
315        command_args: t.Collection[str],
316        **kwargs: t.Any,
317    ) -> None:
318        event = {
319            "command_name": command_name,
320            "command_args": list(command_args),
321            **kwargs,
322        }
323        self._add_event(event_type, event)
324
325    def _add_event(self, event_type: str, event: t.Dict[str, t.Any]) -> None:
326        self._dispatcher.add_event(
327            {
328                "user_id": self._user.id,
329                "process_id": self._process_id,
330                "seq_num": next(self._seq_num),
331                "event_type": event_type,
332                "client_ts": now_timestamp(),
333                "event": json.dumps(event),
334            }
335        )
336
337    @cached_property
338    def _user(self) -> User:
339        return User.load_or_create(self._sqlmesh_path)
340
341
342class User(PydanticModel):
343    id: str
344
345    @classmethod
346    def load_or_create(cls, sqlmesh_path: Path) -> User:
347        if not sqlmesh_path.exists():
348            sqlmesh_path.mkdir(parents=True, exist_ok=True)
349
350        user_path = sqlmesh_path / "user.yaml"
351        if user_path.exists():
352            raw_user = yaml_load(user_path, raise_if_empty=False)
353            if raw_user:
354                return cls.parse_obj(raw_user)
355
356        user = User(id=random_id())
357        with user_path.open("w") as fd:
358            yaml_dump(user.dict(mode="json"), stream=fd)
359        return user
360
361
362def _anonymize(value: str) -> str:
363    return md5([value])
class AnalyticsCollector:
 25class AnalyticsCollector:
 26    def __init__(
 27        self,
 28        dispatcher: t.Optional[EventDispatcher] = None,
 29        sqlmesh_path: t.Optional[Path] = None,
 30    ):
 31        self._dispatcher = dispatcher or AsyncEventDispatcher()
 32        self._sqlmesh_path = sqlmesh_path or c.SQLMESH_PATH
 33        self._process_id = random_id()
 34        self._seq_num = itertools.count()
 35
 36    def on_cicd_command(
 37        self,
 38        *,
 39        command_name: str,
 40        command_args: t.Collection[str],
 41        parent_command_names: t.Collection[str],
 42        cicd_bot_config: t.Optional[CICDBotConfig],
 43    ) -> None:
 44        """Called when a CICD command is executed.
 45
 46        Args:
 47            command_name: The name of the command.
 48            command_args: The arguments of the command.
 49            parent_command_names: The names of the parent commands.
 50            cicd_config: The CICD bot configuration.
 51        """
 52        additional_args = {}
 53        if cicd_bot_config is not None and getattr(cicd_bot_config, "FIELDS_FOR_ANALYTICS", None):
 54            additional_args["cicd_bot_config"] = cicd_bot_config.dict(
 55                include=cicd_bot_config.FIELDS_FOR_ANALYTICS, mode="json"
 56            )
 57        self._on_command(
 58            "CICD_COMMAND",
 59            command_name,
 60            command_args,
 61            parent_command_names=parent_command_names,
 62            **additional_args,
 63        )
 64
 65    def on_cli_command(
 66        self,
 67        *,
 68        command_name: str,
 69        command_args: t.Collection[str],
 70        parent_command_names: t.Collection[str],
 71    ) -> None:
 72        """Called when a CLI command is executed.
 73
 74        Args:
 75            command_name: The name of the command.
 76            command_args: The arguments of the command.
 77            parent_command_names: The names of the parent commands.
 78        """
 79        self._on_command(
 80            "CLI_COMMAND", command_name, command_args, parent_command_names=parent_command_names
 81        )
 82
 83    def on_magic_command(self, *, command_name: str, command_args: t.Collection[str]) -> None:
 84        """Called when a Notebook magic command is executed.
 85
 86        Args:
 87            command_name: The name of the command.
 88            command_args: The arguments of the command.
 89        """
 90        self._on_command("MAGIC_COMMAND", command_name, command_args)
 91
 92    def on_python_api_command(self, *, command_name: str, command_args: t.Collection[str]) -> None:
 93        """Called when a Python method is called directly.
 94
 95        Args:
 96            command_name: The name of the command.
 97            command_args: The arguments of the command.
 98        """
 99        self._on_command("PYTHON_API_COMMAND", command_name, command_args)
100
101    def on_project_loaded(
102        self,
103        *,
104        project_type: str,
105        models_count: int,
106        audits_count: int,
107        standalone_audits_count: int,
108        macros_count: int,
109        jinja_macros_count: int,
110        load_time_sec: float,
111        state_sync_fingerprint: str,
112        project_name: str,
113    ) -> None:
114        """Called when a project is loaded.
115
116        Args:
117            project_type: The type of the project. Eg. "dbt" or "native".
118            models_count: The number of models in the project.
119            audits_count: The number of audits in the project.
120            standalone_audits_count: The number of standalone audits in the project.
121            macros_count: The number of macros in the project.
122            jinja_macros_count: The number of Jinja macros in the project.
123            load_time_sec: The time it took to load the project in (fractional) seconds.
124            state_sync_fingerprint: The fingerprint of the state sync configuration.
125            project_name: The name of the project.
126        """
127        project_type = project_type.lower()
128        event_data = {
129            "project_type": project_type,
130            "models_count": models_count,
131            "audits_count": audits_count,
132            "standalone_audits_count": standalone_audits_count,
133            "macros_count": macros_count,
134            "jinja_macros_count": jinja_macros_count,
135            "load_time_ms": int(load_time_sec * 1000),
136            "state_sync_fingerprint": state_sync_fingerprint,
137            "project_name_hash": _anonymize(project_name),
138        }
139
140        if project_type in {c.DBT, c.HYBRID}:
141            from dbt.version import __version__ as dbt_version
142
143            event_data["dbt_version"] = dbt_version
144
145        self._add_event("PROJECT_LOADED", event_data)
146
147    def on_plan_apply_start(
148        self,
149        *,
150        plan: EvaluatablePlan,
151        engine_type: t.Optional[str],
152        state_sync_type: t.Optional[str],
153        scheduler_type: str,
154    ) -> None:
155        """Called after the plan application starts.
156
157        Args:
158            plan: The plan that is being applied.
159            engine_type: The type of the target engine.
160            state_sync_type: The type of the engine used to store the SQLMesh state.
161            scheduler_type: The type of the scheduler being used. Eg. "builtin".
162        """
163        self._add_event(
164            "PLAN_APPLY_START",
165            {
166                "plan_id": plan.plan_id,
167                "engine_type": engine_type.lower() if engine_type is not None else None,
168                "state_sync_type": state_sync_type.lower() if state_sync_type is not None else None,
169                "scheduler_type": scheduler_type.lower(),
170                "is_dev": plan.is_dev,
171                "skip_backfill": plan.skip_backfill,
172                "no_gaps": plan.no_gaps,
173                "forward_only": plan.forward_only,
174                "ensure_finalized_snapshots": plan.ensure_finalized_snapshots,
175                "has_restatements": bool(plan.restatements),
176                "directly_modified_count": len(plan.directly_modified_snapshots),
177                "indirectly_modified_count": len(
178                    {
179                        s_id
180                        for s_ids in plan.indirectly_modified_snapshots.values()
181                        for s_id in s_ids
182                    }
183                ),
184                "environment_name_hash": _anonymize(plan.environment.name),
185            },
186        )
187
188    def on_plan_apply_end(self, *, plan_id: str, error: t.Optional[t.Any] = None) -> None:
189        """Called after the plan application ends.
190
191        Args:
192            plan_id: The ID of the plan that was applied.
193            error: The error that occurred during plan application, if any.
194        """
195        self._add_event(
196            "PLAN_APPLY_END",
197            {
198                "plan_id": plan_id,
199                "succeeded": error is None,
200                "error": type(error).__name__ if error else None,
201            },
202        )
203
204    def on_snapshots_created(self, *, new_snapshots: t.Collection[Snapshot], plan_id: str) -> None:
205        """Called after new snapshots were created and stored in the SQLMesh state.
206
207        Args:
208            new_snapshots: The list of new snapshots.
209            plan_id: The ID of the plan that created the snapshots.
210        """
211        if not new_snapshots:
212            return
213        snapshots = []
214        for snapshot in new_snapshots:
215            snapshots.append(
216                {
217                    "name_hash": _anonymize(snapshot.name),
218                    "identifier": snapshot.identifier,
219                    "version": snapshot.version,
220                    "node_type": snapshot.node_type.lower(),
221                    "model_kind": snapshot.model.kind.name.value.lower()
222                    if snapshot.is_model
223                    else None,
224                    "is_sql": snapshot.model.is_sql if snapshot.is_model else None,
225                    "change_category": (
226                        snapshot.change_category.name.lower() if snapshot.change_category else None
227                    ),
228                    "dialect": getattr(snapshot.node, "dialect", None),
229                    "audits_count": len(snapshot.model.audits) if snapshot.is_model else None,
230                    "effective_from_set": snapshot.effective_from is not None,
231                }
232            )
233        self._add_event("SNAPSHOTS_CREATED", {"plan_id": plan_id, "snapshots": snapshots})
234
235    def on_run_start(self, *, engine_type: str, state_sync_type: str) -> str:
236        """Called after a run starts.
237
238        Args:
239            engine_type: The type of the target engine.
240            state_sync_type: The type of the engine used to store the SQLMesh state.
241
242        Returns:
243            The run ID.
244        """
245        run_id = random_id()
246        self._add_event(
247            "RUN_START",
248            {
249                "run_id": run_id,
250                "engine_type": engine_type.lower(),
251                "state_sync_type": state_sync_type.lower(),
252            },
253        )
254        return run_id
255
256    def on_run_end(
257        self, *, run_id: str, succeeded: bool, interrupted: bool, error: t.Optional[t.Any] = None
258    ) -> None:
259        """Called after a run ends.
260
261        Args:
262            run_id: The ID of the run.
263            succeeded: Whether the run succeeded.
264            interrupted: Whether the run was interrupted.
265            error: The error that occurred during the run, if any.
266        """
267        self._add_event(
268            "RUN_END",
269            {
270                "run_id": run_id,
271                "succeeded": succeeded,
272                "interrupted": interrupted,
273                "error": type(error).__name__ if error else None,
274            },
275        )
276
277    def on_migration_end(
278        self,
279        *,
280        from_sqlmesh_version: str,
281        state_sync_type: str,
282        migration_time_sec: float,
283        error: t.Optional[t.Any] = None,
284    ) -> None:
285        """Called after the migration of the SQLMesh state ends.
286
287        Args:
288            from_sqlmesh_version: The version of SQLMesh from which the migration started.
289            state_sync_type: The type of the engine used to store the SQLMesh state.
290            migration_time_sec: The time it took to migrate the SQLMesh state in (fractional) seconds.
291            error: The error that occurred during the migration, if any.
292        """
293        self._add_event(
294            "MIGRATION_END",
295            {
296                "from_sqlmesh_version": from_sqlmesh_version,
297                "state_sync_type": state_sync_type,
298                "succeeded": error is None,
299                "error": type(error).__name__ if error else None,
300                "migration_time_ms": int(migration_time_sec * 1000),
301            },
302        )
303
304    def flush(self) -> None:
305        """Flushes the events to the dispatcher."""
306        self._dispatcher.flush()
307
308    def shutdown(self, flush: bool = True) -> None:
309        """Shuts down the collector."""
310        self._dispatcher.shutdown(flush=flush)
311
312    def _on_command(
313        self,
314        event_type: str,
315        command_name: str,
316        command_args: t.Collection[str],
317        **kwargs: t.Any,
318    ) -> None:
319        event = {
320            "command_name": command_name,
321            "command_args": list(command_args),
322            **kwargs,
323        }
324        self._add_event(event_type, event)
325
326    def _add_event(self, event_type: str, event: t.Dict[str, t.Any]) -> None:
327        self._dispatcher.add_event(
328            {
329                "user_id": self._user.id,
330                "process_id": self._process_id,
331                "seq_num": next(self._seq_num),
332                "event_type": event_type,
333                "client_ts": now_timestamp(),
334                "event": json.dumps(event),
335            }
336        )
337
338    @cached_property
339    def _user(self) -> User:
340        return User.load_or_create(self._sqlmesh_path)
AnalyticsCollector( dispatcher: Optional[sqlmesh.core.analytics.dispatcher.EventDispatcher] = None, sqlmesh_path: Optional[pathlib.Path] = None)
26    def __init__(
27        self,
28        dispatcher: t.Optional[EventDispatcher] = None,
29        sqlmesh_path: t.Optional[Path] = None,
30    ):
31        self._dispatcher = dispatcher or AsyncEventDispatcher()
32        self._sqlmesh_path = sqlmesh_path or c.SQLMESH_PATH
33        self._process_id = random_id()
34        self._seq_num = itertools.count()
def on_cicd_command( self, *, command_name: str, command_args: Collection[str], parent_command_names: Collection[str], cicd_bot_config: Optional[sqlmesh.integrations.github.cicd.config.GithubCICDBotConfig]) -> None:
36    def on_cicd_command(
37        self,
38        *,
39        command_name: str,
40        command_args: t.Collection[str],
41        parent_command_names: t.Collection[str],
42        cicd_bot_config: t.Optional[CICDBotConfig],
43    ) -> None:
44        """Called when a CICD command is executed.
45
46        Args:
47            command_name: The name of the command.
48            command_args: The arguments of the command.
49            parent_command_names: The names of the parent commands.
50            cicd_config: The CICD bot configuration.
51        """
52        additional_args = {}
53        if cicd_bot_config is not None and getattr(cicd_bot_config, "FIELDS_FOR_ANALYTICS", None):
54            additional_args["cicd_bot_config"] = cicd_bot_config.dict(
55                include=cicd_bot_config.FIELDS_FOR_ANALYTICS, mode="json"
56            )
57        self._on_command(
58            "CICD_COMMAND",
59            command_name,
60            command_args,
61            parent_command_names=parent_command_names,
62            **additional_args,
63        )

Called when a CICD command is executed.

Arguments:
  • command_name: The name of the command.
  • command_args: The arguments of the command.
  • parent_command_names: The names of the parent commands.
  • cicd_config: The CICD bot configuration.
def on_cli_command( self, *, command_name: str, command_args: Collection[str], parent_command_names: Collection[str]) -> None:
65    def on_cli_command(
66        self,
67        *,
68        command_name: str,
69        command_args: t.Collection[str],
70        parent_command_names: t.Collection[str],
71    ) -> None:
72        """Called when a CLI command is executed.
73
74        Args:
75            command_name: The name of the command.
76            command_args: The arguments of the command.
77            parent_command_names: The names of the parent commands.
78        """
79        self._on_command(
80            "CLI_COMMAND", command_name, command_args, parent_command_names=parent_command_names
81        )

Called when a CLI command is executed.

Arguments:
  • command_name: The name of the command.
  • command_args: The arguments of the command.
  • parent_command_names: The names of the parent commands.
def on_magic_command(self, *, command_name: str, command_args: Collection[str]) -> None:
83    def on_magic_command(self, *, command_name: str, command_args: t.Collection[str]) -> None:
84        """Called when a Notebook magic command is executed.
85
86        Args:
87            command_name: The name of the command.
88            command_args: The arguments of the command.
89        """
90        self._on_command("MAGIC_COMMAND", command_name, command_args)

Called when a Notebook magic command is executed.

Arguments:
  • command_name: The name of the command.
  • command_args: The arguments of the command.
def on_python_api_command(self, *, command_name: str, command_args: Collection[str]) -> None:
92    def on_python_api_command(self, *, command_name: str, command_args: t.Collection[str]) -> None:
93        """Called when a Python method is called directly.
94
95        Args:
96            command_name: The name of the command.
97            command_args: The arguments of the command.
98        """
99        self._on_command("PYTHON_API_COMMAND", command_name, command_args)

Called when a Python method is called directly.

Arguments:
  • command_name: The name of the command.
  • command_args: The arguments of the command.
def on_project_loaded( self, *, project_type: str, models_count: int, audits_count: int, standalone_audits_count: int, macros_count: int, jinja_macros_count: int, load_time_sec: float, state_sync_fingerprint: str, project_name: str) -> None:
101    def on_project_loaded(
102        self,
103        *,
104        project_type: str,
105        models_count: int,
106        audits_count: int,
107        standalone_audits_count: int,
108        macros_count: int,
109        jinja_macros_count: int,
110        load_time_sec: float,
111        state_sync_fingerprint: str,
112        project_name: str,
113    ) -> None:
114        """Called when a project is loaded.
115
116        Args:
117            project_type: The type of the project. Eg. "dbt" or "native".
118            models_count: The number of models in the project.
119            audits_count: The number of audits in the project.
120            standalone_audits_count: The number of standalone audits in the project.
121            macros_count: The number of macros in the project.
122            jinja_macros_count: The number of Jinja macros in the project.
123            load_time_sec: The time it took to load the project in (fractional) seconds.
124            state_sync_fingerprint: The fingerprint of the state sync configuration.
125            project_name: The name of the project.
126        """
127        project_type = project_type.lower()
128        event_data = {
129            "project_type": project_type,
130            "models_count": models_count,
131            "audits_count": audits_count,
132            "standalone_audits_count": standalone_audits_count,
133            "macros_count": macros_count,
134            "jinja_macros_count": jinja_macros_count,
135            "load_time_ms": int(load_time_sec * 1000),
136            "state_sync_fingerprint": state_sync_fingerprint,
137            "project_name_hash": _anonymize(project_name),
138        }
139
140        if project_type in {c.DBT, c.HYBRID}:
141            from dbt.version import __version__ as dbt_version
142
143            event_data["dbt_version"] = dbt_version
144
145        self._add_event("PROJECT_LOADED", event_data)

Called when a project is loaded.

Arguments:
  • project_type: The type of the project. Eg. "dbt" or "native".
  • models_count: The number of models in the project.
  • audits_count: The number of audits in the project.
  • standalone_audits_count: The number of standalone audits in the project.
  • macros_count: The number of macros in the project.
  • jinja_macros_count: The number of Jinja macros in the project.
  • load_time_sec: The time it took to load the project in (fractional) seconds.
  • state_sync_fingerprint: The fingerprint of the state sync configuration.
  • project_name: The name of the project.
def on_plan_apply_start( self, *, plan: sqlmesh.core.plan.definition.EvaluatablePlan, engine_type: Optional[str], state_sync_type: Optional[str], scheduler_type: str) -> None:
147    def on_plan_apply_start(
148        self,
149        *,
150        plan: EvaluatablePlan,
151        engine_type: t.Optional[str],
152        state_sync_type: t.Optional[str],
153        scheduler_type: str,
154    ) -> None:
155        """Called after the plan application starts.
156
157        Args:
158            plan: The plan that is being applied.
159            engine_type: The type of the target engine.
160            state_sync_type: The type of the engine used to store the SQLMesh state.
161            scheduler_type: The type of the scheduler being used. Eg. "builtin".
162        """
163        self._add_event(
164            "PLAN_APPLY_START",
165            {
166                "plan_id": plan.plan_id,
167                "engine_type": engine_type.lower() if engine_type is not None else None,
168                "state_sync_type": state_sync_type.lower() if state_sync_type is not None else None,
169                "scheduler_type": scheduler_type.lower(),
170                "is_dev": plan.is_dev,
171                "skip_backfill": plan.skip_backfill,
172                "no_gaps": plan.no_gaps,
173                "forward_only": plan.forward_only,
174                "ensure_finalized_snapshots": plan.ensure_finalized_snapshots,
175                "has_restatements": bool(plan.restatements),
176                "directly_modified_count": len(plan.directly_modified_snapshots),
177                "indirectly_modified_count": len(
178                    {
179                        s_id
180                        for s_ids in plan.indirectly_modified_snapshots.values()
181                        for s_id in s_ids
182                    }
183                ),
184                "environment_name_hash": _anonymize(plan.environment.name),
185            },
186        )

Called after the plan application starts.

Arguments:
  • plan: The plan that is being applied.
  • engine_type: The type of the target engine.
  • state_sync_type: The type of the engine used to store the SQLMesh state.
  • scheduler_type: The type of the scheduler being used. Eg. "builtin".
def on_plan_apply_end(self, *, plan_id: str, error: Optional[Any] = None) -> None:
188    def on_plan_apply_end(self, *, plan_id: str, error: t.Optional[t.Any] = None) -> None:
189        """Called after the plan application ends.
190
191        Args:
192            plan_id: The ID of the plan that was applied.
193            error: The error that occurred during plan application, if any.
194        """
195        self._add_event(
196            "PLAN_APPLY_END",
197            {
198                "plan_id": plan_id,
199                "succeeded": error is None,
200                "error": type(error).__name__ if error else None,
201            },
202        )

Called after the plan application ends.

Arguments:
  • plan_id: The ID of the plan that was applied.
  • error: The error that occurred during plan application, if any.
def on_snapshots_created( self, *, new_snapshots: Collection[sqlmesh.core.snapshot.definition.Snapshot], plan_id: str) -> None:
204    def on_snapshots_created(self, *, new_snapshots: t.Collection[Snapshot], plan_id: str) -> None:
205        """Called after new snapshots were created and stored in the SQLMesh state.
206
207        Args:
208            new_snapshots: The list of new snapshots.
209            plan_id: The ID of the plan that created the snapshots.
210        """
211        if not new_snapshots:
212            return
213        snapshots = []
214        for snapshot in new_snapshots:
215            snapshots.append(
216                {
217                    "name_hash": _anonymize(snapshot.name),
218                    "identifier": snapshot.identifier,
219                    "version": snapshot.version,
220                    "node_type": snapshot.node_type.lower(),
221                    "model_kind": snapshot.model.kind.name.value.lower()
222                    if snapshot.is_model
223                    else None,
224                    "is_sql": snapshot.model.is_sql if snapshot.is_model else None,
225                    "change_category": (
226                        snapshot.change_category.name.lower() if snapshot.change_category else None
227                    ),
228                    "dialect": getattr(snapshot.node, "dialect", None),
229                    "audits_count": len(snapshot.model.audits) if snapshot.is_model else None,
230                    "effective_from_set": snapshot.effective_from is not None,
231                }
232            )
233        self._add_event("SNAPSHOTS_CREATED", {"plan_id": plan_id, "snapshots": snapshots})

Called after new snapshots were created and stored in the SQLMesh state.

Arguments:
  • new_snapshots: The list of new snapshots.
  • plan_id: The ID of the plan that created the snapshots.
def on_run_start(self, *, engine_type: str, state_sync_type: str) -> str:
235    def on_run_start(self, *, engine_type: str, state_sync_type: str) -> str:
236        """Called after a run starts.
237
238        Args:
239            engine_type: The type of the target engine.
240            state_sync_type: The type of the engine used to store the SQLMesh state.
241
242        Returns:
243            The run ID.
244        """
245        run_id = random_id()
246        self._add_event(
247            "RUN_START",
248            {
249                "run_id": run_id,
250                "engine_type": engine_type.lower(),
251                "state_sync_type": state_sync_type.lower(),
252            },
253        )
254        return run_id

Called after a run starts.

Arguments:
  • engine_type: The type of the target engine.
  • state_sync_type: The type of the engine used to store the SQLMesh state.
Returns:

The run ID.

def on_run_end( self, *, run_id: str, succeeded: bool, interrupted: bool, error: Optional[Any] = None) -> None:
256    def on_run_end(
257        self, *, run_id: str, succeeded: bool, interrupted: bool, error: t.Optional[t.Any] = None
258    ) -> None:
259        """Called after a run ends.
260
261        Args:
262            run_id: The ID of the run.
263            succeeded: Whether the run succeeded.
264            interrupted: Whether the run was interrupted.
265            error: The error that occurred during the run, if any.
266        """
267        self._add_event(
268            "RUN_END",
269            {
270                "run_id": run_id,
271                "succeeded": succeeded,
272                "interrupted": interrupted,
273                "error": type(error).__name__ if error else None,
274            },
275        )

Called after a run ends.

Arguments:
  • run_id: The ID of the run.
  • succeeded: Whether the run succeeded.
  • interrupted: Whether the run was interrupted.
  • error: The error that occurred during the run, if any.
def on_migration_end( self, *, from_sqlmesh_version: str, state_sync_type: str, migration_time_sec: float, error: Optional[Any] = None) -> None:
277    def on_migration_end(
278        self,
279        *,
280        from_sqlmesh_version: str,
281        state_sync_type: str,
282        migration_time_sec: float,
283        error: t.Optional[t.Any] = None,
284    ) -> None:
285        """Called after the migration of the SQLMesh state ends.
286
287        Args:
288            from_sqlmesh_version: The version of SQLMesh from which the migration started.
289            state_sync_type: The type of the engine used to store the SQLMesh state.
290            migration_time_sec: The time it took to migrate the SQLMesh state in (fractional) seconds.
291            error: The error that occurred during the migration, if any.
292        """
293        self._add_event(
294            "MIGRATION_END",
295            {
296                "from_sqlmesh_version": from_sqlmesh_version,
297                "state_sync_type": state_sync_type,
298                "succeeded": error is None,
299                "error": type(error).__name__ if error else None,
300                "migration_time_ms": int(migration_time_sec * 1000),
301            },
302        )

Called after the migration of the SQLMesh state ends.

Arguments:
  • from_sqlmesh_version: The version of SQLMesh from which the migration started.
  • state_sync_type: The type of the engine used to store the SQLMesh state.
  • migration_time_sec: The time it took to migrate the SQLMesh state in (fractional) seconds.
  • error: The error that occurred during the migration, if any.
def flush(self) -> None:
304    def flush(self) -> None:
305        """Flushes the events to the dispatcher."""
306        self._dispatcher.flush()

Flushes the events to the dispatcher.

def shutdown(self, flush: bool = True) -> None:
308    def shutdown(self, flush: bool = True) -> None:
309        """Shuts down the collector."""
310        self._dispatcher.shutdown(flush=flush)

Shuts down the collector.

class User(sqlmesh.utils.pydantic.PydanticModel):
343class User(PydanticModel):
344    id: str
345
346    @classmethod
347    def load_or_create(cls, sqlmesh_path: Path) -> User:
348        if not sqlmesh_path.exists():
349            sqlmesh_path.mkdir(parents=True, exist_ok=True)
350
351        user_path = sqlmesh_path / "user.yaml"
352        if user_path.exists():
353            raw_user = yaml_load(user_path, raise_if_empty=False)
354            if raw_user:
355                return cls.parse_obj(raw_user)
356
357        user = User(id=random_id())
358        with user_path.open("w") as fd:
359            yaml_dump(user.dict(mode="json"), stream=fd)
360        return user

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
id: str
@classmethod
def load_or_create(cls, sqlmesh_path: pathlib.Path) -> User:
346    @classmethod
347    def load_or_create(cls, sqlmesh_path: Path) -> User:
348        if not sqlmesh_path.exists():
349            sqlmesh_path.mkdir(parents=True, exist_ok=True)
350
351        user_path = sqlmesh_path / "user.yaml"
352        if user_path.exists():
353            raw_user = yaml_load(user_path, raise_if_empty=False)
354            if raw_user:
355                return cls.parse_obj(raw_user)
356
357        user = User(id=random_id())
358        with user_path.open("w") as fd:
359            yaml_dump(user.dict(mode="json"), stream=fd)
360        return user
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields