sqlmesh.core.analytics.collector
1from __future__ import annotations 2 3import itertools 4import json 5import typing as t 6from functools import cached_property 7from pathlib import Path 8 9from sqlmesh.core import constants as c 10from sqlmesh.core.analytics.dispatcher import AsyncEventDispatcher, EventDispatcher 11from sqlmesh.utils import random_id 12from sqlmesh.utils.date import now_timestamp 13from sqlmesh.utils.hashing import md5 14from sqlmesh.utils.pydantic import PydanticModel 15from sqlmesh.utils.yaml import dump as yaml_dump 16from sqlmesh.utils.yaml import load as yaml_load 17 18if t.TYPE_CHECKING: 19 from sqlmesh.cicd.config import CICDBotConfig 20 from sqlmesh.core.plan import EvaluatablePlan 21 from sqlmesh.core.snapshot import Snapshot 22 23 24class AnalyticsCollector: 25 def __init__( 26 self, 27 dispatcher: t.Optional[EventDispatcher] = None, 28 sqlmesh_path: t.Optional[Path] = None, 29 ): 30 self._dispatcher = dispatcher or AsyncEventDispatcher() 31 self._sqlmesh_path = sqlmesh_path or c.SQLMESH_PATH 32 self._process_id = random_id() 33 self._seq_num = itertools.count() 34 35 def on_cicd_command( 36 self, 37 *, 38 command_name: str, 39 command_args: t.Collection[str], 40 parent_command_names: t.Collection[str], 41 cicd_bot_config: t.Optional[CICDBotConfig], 42 ) -> None: 43 """Called when a CICD command is executed. 44 45 Args: 46 command_name: The name of the command. 47 command_args: The arguments of the command. 48 parent_command_names: The names of the parent commands. 49 cicd_config: The CICD bot configuration. 50 """ 51 additional_args = {} 52 if cicd_bot_config is not None and getattr(cicd_bot_config, "FIELDS_FOR_ANALYTICS", None): 53 additional_args["cicd_bot_config"] = cicd_bot_config.dict( 54 include=cicd_bot_config.FIELDS_FOR_ANALYTICS, mode="json" 55 ) 56 self._on_command( 57 "CICD_COMMAND", 58 command_name, 59 command_args, 60 parent_command_names=parent_command_names, 61 **additional_args, 62 ) 63 64 def on_cli_command( 65 self, 66 *, 67 command_name: str, 68 command_args: t.Collection[str], 69 parent_command_names: t.Collection[str], 70 ) -> None: 71 """Called when a CLI command is executed. 72 73 Args: 74 command_name: The name of the command. 75 command_args: The arguments of the command. 76 parent_command_names: The names of the parent commands. 77 """ 78 self._on_command( 79 "CLI_COMMAND", command_name, command_args, parent_command_names=parent_command_names 80 ) 81 82 def on_magic_command(self, *, command_name: str, command_args: t.Collection[str]) -> None: 83 """Called when a Notebook magic command is executed. 84 85 Args: 86 command_name: The name of the command. 87 command_args: The arguments of the command. 88 """ 89 self._on_command("MAGIC_COMMAND", command_name, command_args) 90 91 def on_python_api_command(self, *, command_name: str, command_args: t.Collection[str]) -> None: 92 """Called when a Python method is called directly. 93 94 Args: 95 command_name: The name of the command. 96 command_args: The arguments of the command. 97 """ 98 self._on_command("PYTHON_API_COMMAND", command_name, command_args) 99 100 def on_project_loaded( 101 self, 102 *, 103 project_type: str, 104 models_count: int, 105 audits_count: int, 106 standalone_audits_count: int, 107 macros_count: int, 108 jinja_macros_count: int, 109 load_time_sec: float, 110 state_sync_fingerprint: str, 111 project_name: str, 112 ) -> None: 113 """Called when a project is loaded. 114 115 Args: 116 project_type: The type of the project. Eg. "dbt" or "native". 117 models_count: The number of models in the project. 118 audits_count: The number of audits in the project. 119 standalone_audits_count: The number of standalone audits in the project. 120 macros_count: The number of macros in the project. 121 jinja_macros_count: The number of Jinja macros in the project. 122 load_time_sec: The time it took to load the project in (fractional) seconds. 123 state_sync_fingerprint: The fingerprint of the state sync configuration. 124 project_name: The name of the project. 125 """ 126 project_type = project_type.lower() 127 event_data = { 128 "project_type": project_type, 129 "models_count": models_count, 130 "audits_count": audits_count, 131 "standalone_audits_count": standalone_audits_count, 132 "macros_count": macros_count, 133 "jinja_macros_count": jinja_macros_count, 134 "load_time_ms": int(load_time_sec * 1000), 135 "state_sync_fingerprint": state_sync_fingerprint, 136 "project_name_hash": _anonymize(project_name), 137 } 138 139 if project_type in {c.DBT, c.HYBRID}: 140 from dbt.version import __version__ as dbt_version 141 142 event_data["dbt_version"] = dbt_version 143 144 self._add_event("PROJECT_LOADED", event_data) 145 146 def on_plan_apply_start( 147 self, 148 *, 149 plan: EvaluatablePlan, 150 engine_type: t.Optional[str], 151 state_sync_type: t.Optional[str], 152 scheduler_type: str, 153 ) -> None: 154 """Called after the plan application starts. 155 156 Args: 157 plan: The plan that is being applied. 158 engine_type: The type of the target engine. 159 state_sync_type: The type of the engine used to store the SQLMesh state. 160 scheduler_type: The type of the scheduler being used. Eg. "builtin". 161 """ 162 self._add_event( 163 "PLAN_APPLY_START", 164 { 165 "plan_id": plan.plan_id, 166 "engine_type": engine_type.lower() if engine_type is not None else None, 167 "state_sync_type": state_sync_type.lower() if state_sync_type is not None else None, 168 "scheduler_type": scheduler_type.lower(), 169 "is_dev": plan.is_dev, 170 "skip_backfill": plan.skip_backfill, 171 "no_gaps": plan.no_gaps, 172 "forward_only": plan.forward_only, 173 "ensure_finalized_snapshots": plan.ensure_finalized_snapshots, 174 "has_restatements": bool(plan.restatements), 175 "directly_modified_count": len(plan.directly_modified_snapshots), 176 "indirectly_modified_count": len( 177 { 178 s_id 179 for s_ids in plan.indirectly_modified_snapshots.values() 180 for s_id in s_ids 181 } 182 ), 183 "environment_name_hash": _anonymize(plan.environment.name), 184 }, 185 ) 186 187 def on_plan_apply_end(self, *, plan_id: str, error: t.Optional[t.Any] = None) -> None: 188 """Called after the plan application ends. 189 190 Args: 191 plan_id: The ID of the plan that was applied. 192 error: The error that occurred during plan application, if any. 193 """ 194 self._add_event( 195 "PLAN_APPLY_END", 196 { 197 "plan_id": plan_id, 198 "succeeded": error is None, 199 "error": type(error).__name__ if error else None, 200 }, 201 ) 202 203 def on_snapshots_created(self, *, new_snapshots: t.Collection[Snapshot], plan_id: str) -> None: 204 """Called after new snapshots were created and stored in the SQLMesh state. 205 206 Args: 207 new_snapshots: The list of new snapshots. 208 plan_id: The ID of the plan that created the snapshots. 209 """ 210 if not new_snapshots: 211 return 212 snapshots = [] 213 for snapshot in new_snapshots: 214 snapshots.append( 215 { 216 "name_hash": _anonymize(snapshot.name), 217 "identifier": snapshot.identifier, 218 "version": snapshot.version, 219 "node_type": snapshot.node_type.lower(), 220 "model_kind": snapshot.model.kind.name.value.lower() 221 if snapshot.is_model 222 else None, 223 "is_sql": snapshot.model.is_sql if snapshot.is_model else None, 224 "change_category": ( 225 snapshot.change_category.name.lower() if snapshot.change_category else None 226 ), 227 "dialect": getattr(snapshot.node, "dialect", None), 228 "audits_count": len(snapshot.model.audits) if snapshot.is_model else None, 229 "effective_from_set": snapshot.effective_from is not None, 230 } 231 ) 232 self._add_event("SNAPSHOTS_CREATED", {"plan_id": plan_id, "snapshots": snapshots}) 233 234 def on_run_start(self, *, engine_type: str, state_sync_type: str) -> str: 235 """Called after a run starts. 236 237 Args: 238 engine_type: The type of the target engine. 239 state_sync_type: The type of the engine used to store the SQLMesh state. 240 241 Returns: 242 The run ID. 243 """ 244 run_id = random_id() 245 self._add_event( 246 "RUN_START", 247 { 248 "run_id": run_id, 249 "engine_type": engine_type.lower(), 250 "state_sync_type": state_sync_type.lower(), 251 }, 252 ) 253 return run_id 254 255 def on_run_end( 256 self, *, run_id: str, succeeded: bool, interrupted: bool, error: t.Optional[t.Any] = None 257 ) -> None: 258 """Called after a run ends. 259 260 Args: 261 run_id: The ID of the run. 262 succeeded: Whether the run succeeded. 263 interrupted: Whether the run was interrupted. 264 error: The error that occurred during the run, if any. 265 """ 266 self._add_event( 267 "RUN_END", 268 { 269 "run_id": run_id, 270 "succeeded": succeeded, 271 "interrupted": interrupted, 272 "error": type(error).__name__ if error else None, 273 }, 274 ) 275 276 def on_migration_end( 277 self, 278 *, 279 from_sqlmesh_version: str, 280 state_sync_type: str, 281 migration_time_sec: float, 282 error: t.Optional[t.Any] = None, 283 ) -> None: 284 """Called after the migration of the SQLMesh state ends. 285 286 Args: 287 from_sqlmesh_version: The version of SQLMesh from which the migration started. 288 state_sync_type: The type of the engine used to store the SQLMesh state. 289 migration_time_sec: The time it took to migrate the SQLMesh state in (fractional) seconds. 290 error: The error that occurred during the migration, if any. 291 """ 292 self._add_event( 293 "MIGRATION_END", 294 { 295 "from_sqlmesh_version": from_sqlmesh_version, 296 "state_sync_type": state_sync_type, 297 "succeeded": error is None, 298 "error": type(error).__name__ if error else None, 299 "migration_time_ms": int(migration_time_sec * 1000), 300 }, 301 ) 302 303 def flush(self) -> None: 304 """Flushes the events to the dispatcher.""" 305 self._dispatcher.flush() 306 307 def shutdown(self, flush: bool = True) -> None: 308 """Shuts down the collector.""" 309 self._dispatcher.shutdown(flush=flush) 310 311 def _on_command( 312 self, 313 event_type: str, 314 command_name: str, 315 command_args: t.Collection[str], 316 **kwargs: t.Any, 317 ) -> None: 318 event = { 319 "command_name": command_name, 320 "command_args": list(command_args), 321 **kwargs, 322 } 323 self._add_event(event_type, event) 324 325 def _add_event(self, event_type: str, event: t.Dict[str, t.Any]) -> None: 326 self._dispatcher.add_event( 327 { 328 "user_id": self._user.id, 329 "process_id": self._process_id, 330 "seq_num": next(self._seq_num), 331 "event_type": event_type, 332 "client_ts": now_timestamp(), 333 "event": json.dumps(event), 334 } 335 ) 336 337 @cached_property 338 def _user(self) -> User: 339 return User.load_or_create(self._sqlmesh_path) 340 341 342class User(PydanticModel): 343 id: str 344 345 @classmethod 346 def load_or_create(cls, sqlmesh_path: Path) -> User: 347 if not sqlmesh_path.exists(): 348 sqlmesh_path.mkdir(parents=True, exist_ok=True) 349 350 user_path = sqlmesh_path / "user.yaml" 351 if user_path.exists(): 352 raw_user = yaml_load(user_path, raise_if_empty=False) 353 if raw_user: 354 return cls.parse_obj(raw_user) 355 356 user = User(id=random_id()) 357 with user_path.open("w") as fd: 358 yaml_dump(user.dict(mode="json"), stream=fd) 359 return user 360 361 362def _anonymize(value: str) -> str: 363 return md5([value])
class
AnalyticsCollector:
25class AnalyticsCollector: 26 def __init__( 27 self, 28 dispatcher: t.Optional[EventDispatcher] = None, 29 sqlmesh_path: t.Optional[Path] = None, 30 ): 31 self._dispatcher = dispatcher or AsyncEventDispatcher() 32 self._sqlmesh_path = sqlmesh_path or c.SQLMESH_PATH 33 self._process_id = random_id() 34 self._seq_num = itertools.count() 35 36 def on_cicd_command( 37 self, 38 *, 39 command_name: str, 40 command_args: t.Collection[str], 41 parent_command_names: t.Collection[str], 42 cicd_bot_config: t.Optional[CICDBotConfig], 43 ) -> None: 44 """Called when a CICD command is executed. 45 46 Args: 47 command_name: The name of the command. 48 command_args: The arguments of the command. 49 parent_command_names: The names of the parent commands. 50 cicd_config: The CICD bot configuration. 51 """ 52 additional_args = {} 53 if cicd_bot_config is not None and getattr(cicd_bot_config, "FIELDS_FOR_ANALYTICS", None): 54 additional_args["cicd_bot_config"] = cicd_bot_config.dict( 55 include=cicd_bot_config.FIELDS_FOR_ANALYTICS, mode="json" 56 ) 57 self._on_command( 58 "CICD_COMMAND", 59 command_name, 60 command_args, 61 parent_command_names=parent_command_names, 62 **additional_args, 63 ) 64 65 def on_cli_command( 66 self, 67 *, 68 command_name: str, 69 command_args: t.Collection[str], 70 parent_command_names: t.Collection[str], 71 ) -> None: 72 """Called when a CLI command is executed. 73 74 Args: 75 command_name: The name of the command. 76 command_args: The arguments of the command. 77 parent_command_names: The names of the parent commands. 78 """ 79 self._on_command( 80 "CLI_COMMAND", command_name, command_args, parent_command_names=parent_command_names 81 ) 82 83 def on_magic_command(self, *, command_name: str, command_args: t.Collection[str]) -> None: 84 """Called when a Notebook magic command is executed. 85 86 Args: 87 command_name: The name of the command. 88 command_args: The arguments of the command. 89 """ 90 self._on_command("MAGIC_COMMAND", command_name, command_args) 91 92 def on_python_api_command(self, *, command_name: str, command_args: t.Collection[str]) -> None: 93 """Called when a Python method is called directly. 94 95 Args: 96 command_name: The name of the command. 97 command_args: The arguments of the command. 98 """ 99 self._on_command("PYTHON_API_COMMAND", command_name, command_args) 100 101 def on_project_loaded( 102 self, 103 *, 104 project_type: str, 105 models_count: int, 106 audits_count: int, 107 standalone_audits_count: int, 108 macros_count: int, 109 jinja_macros_count: int, 110 load_time_sec: float, 111 state_sync_fingerprint: str, 112 project_name: str, 113 ) -> None: 114 """Called when a project is loaded. 115 116 Args: 117 project_type: The type of the project. Eg. "dbt" or "native". 118 models_count: The number of models in the project. 119 audits_count: The number of audits in the project. 120 standalone_audits_count: The number of standalone audits in the project. 121 macros_count: The number of macros in the project. 122 jinja_macros_count: The number of Jinja macros in the project. 123 load_time_sec: The time it took to load the project in (fractional) seconds. 124 state_sync_fingerprint: The fingerprint of the state sync configuration. 125 project_name: The name of the project. 126 """ 127 project_type = project_type.lower() 128 event_data = { 129 "project_type": project_type, 130 "models_count": models_count, 131 "audits_count": audits_count, 132 "standalone_audits_count": standalone_audits_count, 133 "macros_count": macros_count, 134 "jinja_macros_count": jinja_macros_count, 135 "load_time_ms": int(load_time_sec * 1000), 136 "state_sync_fingerprint": state_sync_fingerprint, 137 "project_name_hash": _anonymize(project_name), 138 } 139 140 if project_type in {c.DBT, c.HYBRID}: 141 from dbt.version import __version__ as dbt_version 142 143 event_data["dbt_version"] = dbt_version 144 145 self._add_event("PROJECT_LOADED", event_data) 146 147 def on_plan_apply_start( 148 self, 149 *, 150 plan: EvaluatablePlan, 151 engine_type: t.Optional[str], 152 state_sync_type: t.Optional[str], 153 scheduler_type: str, 154 ) -> None: 155 """Called after the plan application starts. 156 157 Args: 158 plan: The plan that is being applied. 159 engine_type: The type of the target engine. 160 state_sync_type: The type of the engine used to store the SQLMesh state. 161 scheduler_type: The type of the scheduler being used. Eg. "builtin". 162 """ 163 self._add_event( 164 "PLAN_APPLY_START", 165 { 166 "plan_id": plan.plan_id, 167 "engine_type": engine_type.lower() if engine_type is not None else None, 168 "state_sync_type": state_sync_type.lower() if state_sync_type is not None else None, 169 "scheduler_type": scheduler_type.lower(), 170 "is_dev": plan.is_dev, 171 "skip_backfill": plan.skip_backfill, 172 "no_gaps": plan.no_gaps, 173 "forward_only": plan.forward_only, 174 "ensure_finalized_snapshots": plan.ensure_finalized_snapshots, 175 "has_restatements": bool(plan.restatements), 176 "directly_modified_count": len(plan.directly_modified_snapshots), 177 "indirectly_modified_count": len( 178 { 179 s_id 180 for s_ids in plan.indirectly_modified_snapshots.values() 181 for s_id in s_ids 182 } 183 ), 184 "environment_name_hash": _anonymize(plan.environment.name), 185 }, 186 ) 187 188 def on_plan_apply_end(self, *, plan_id: str, error: t.Optional[t.Any] = None) -> None: 189 """Called after the plan application ends. 190 191 Args: 192 plan_id: The ID of the plan that was applied. 193 error: The error that occurred during plan application, if any. 194 """ 195 self._add_event( 196 "PLAN_APPLY_END", 197 { 198 "plan_id": plan_id, 199 "succeeded": error is None, 200 "error": type(error).__name__ if error else None, 201 }, 202 ) 203 204 def on_snapshots_created(self, *, new_snapshots: t.Collection[Snapshot], plan_id: str) -> None: 205 """Called after new snapshots were created and stored in the SQLMesh state. 206 207 Args: 208 new_snapshots: The list of new snapshots. 209 plan_id: The ID of the plan that created the snapshots. 210 """ 211 if not new_snapshots: 212 return 213 snapshots = [] 214 for snapshot in new_snapshots: 215 snapshots.append( 216 { 217 "name_hash": _anonymize(snapshot.name), 218 "identifier": snapshot.identifier, 219 "version": snapshot.version, 220 "node_type": snapshot.node_type.lower(), 221 "model_kind": snapshot.model.kind.name.value.lower() 222 if snapshot.is_model 223 else None, 224 "is_sql": snapshot.model.is_sql if snapshot.is_model else None, 225 "change_category": ( 226 snapshot.change_category.name.lower() if snapshot.change_category else None 227 ), 228 "dialect": getattr(snapshot.node, "dialect", None), 229 "audits_count": len(snapshot.model.audits) if snapshot.is_model else None, 230 "effective_from_set": snapshot.effective_from is not None, 231 } 232 ) 233 self._add_event("SNAPSHOTS_CREATED", {"plan_id": plan_id, "snapshots": snapshots}) 234 235 def on_run_start(self, *, engine_type: str, state_sync_type: str) -> str: 236 """Called after a run starts. 237 238 Args: 239 engine_type: The type of the target engine. 240 state_sync_type: The type of the engine used to store the SQLMesh state. 241 242 Returns: 243 The run ID. 244 """ 245 run_id = random_id() 246 self._add_event( 247 "RUN_START", 248 { 249 "run_id": run_id, 250 "engine_type": engine_type.lower(), 251 "state_sync_type": state_sync_type.lower(), 252 }, 253 ) 254 return run_id 255 256 def on_run_end( 257 self, *, run_id: str, succeeded: bool, interrupted: bool, error: t.Optional[t.Any] = None 258 ) -> None: 259 """Called after a run ends. 260 261 Args: 262 run_id: The ID of the run. 263 succeeded: Whether the run succeeded. 264 interrupted: Whether the run was interrupted. 265 error: The error that occurred during the run, if any. 266 """ 267 self._add_event( 268 "RUN_END", 269 { 270 "run_id": run_id, 271 "succeeded": succeeded, 272 "interrupted": interrupted, 273 "error": type(error).__name__ if error else None, 274 }, 275 ) 276 277 def on_migration_end( 278 self, 279 *, 280 from_sqlmesh_version: str, 281 state_sync_type: str, 282 migration_time_sec: float, 283 error: t.Optional[t.Any] = None, 284 ) -> None: 285 """Called after the migration of the SQLMesh state ends. 286 287 Args: 288 from_sqlmesh_version: The version of SQLMesh from which the migration started. 289 state_sync_type: The type of the engine used to store the SQLMesh state. 290 migration_time_sec: The time it took to migrate the SQLMesh state in (fractional) seconds. 291 error: The error that occurred during the migration, if any. 292 """ 293 self._add_event( 294 "MIGRATION_END", 295 { 296 "from_sqlmesh_version": from_sqlmesh_version, 297 "state_sync_type": state_sync_type, 298 "succeeded": error is None, 299 "error": type(error).__name__ if error else None, 300 "migration_time_ms": int(migration_time_sec * 1000), 301 }, 302 ) 303 304 def flush(self) -> None: 305 """Flushes the events to the dispatcher.""" 306 self._dispatcher.flush() 307 308 def shutdown(self, flush: bool = True) -> None: 309 """Shuts down the collector.""" 310 self._dispatcher.shutdown(flush=flush) 311 312 def _on_command( 313 self, 314 event_type: str, 315 command_name: str, 316 command_args: t.Collection[str], 317 **kwargs: t.Any, 318 ) -> None: 319 event = { 320 "command_name": command_name, 321 "command_args": list(command_args), 322 **kwargs, 323 } 324 self._add_event(event_type, event) 325 326 def _add_event(self, event_type: str, event: t.Dict[str, t.Any]) -> None: 327 self._dispatcher.add_event( 328 { 329 "user_id": self._user.id, 330 "process_id": self._process_id, 331 "seq_num": next(self._seq_num), 332 "event_type": event_type, 333 "client_ts": now_timestamp(), 334 "event": json.dumps(event), 335 } 336 ) 337 338 @cached_property 339 def _user(self) -> User: 340 return User.load_or_create(self._sqlmesh_path)
AnalyticsCollector( dispatcher: Optional[sqlmesh.core.analytics.dispatcher.EventDispatcher] = None, sqlmesh_path: Optional[pathlib.Path] = None)
26 def __init__( 27 self, 28 dispatcher: t.Optional[EventDispatcher] = None, 29 sqlmesh_path: t.Optional[Path] = None, 30 ): 31 self._dispatcher = dispatcher or AsyncEventDispatcher() 32 self._sqlmesh_path = sqlmesh_path or c.SQLMESH_PATH 33 self._process_id = random_id() 34 self._seq_num = itertools.count()
def
on_cicd_command( self, *, command_name: str, command_args: Collection[str], parent_command_names: Collection[str], cicd_bot_config: Optional[sqlmesh.integrations.github.cicd.config.GithubCICDBotConfig]) -> None:
36 def on_cicd_command( 37 self, 38 *, 39 command_name: str, 40 command_args: t.Collection[str], 41 parent_command_names: t.Collection[str], 42 cicd_bot_config: t.Optional[CICDBotConfig], 43 ) -> None: 44 """Called when a CICD command is executed. 45 46 Args: 47 command_name: The name of the command. 48 command_args: The arguments of the command. 49 parent_command_names: The names of the parent commands. 50 cicd_config: The CICD bot configuration. 51 """ 52 additional_args = {} 53 if cicd_bot_config is not None and getattr(cicd_bot_config, "FIELDS_FOR_ANALYTICS", None): 54 additional_args["cicd_bot_config"] = cicd_bot_config.dict( 55 include=cicd_bot_config.FIELDS_FOR_ANALYTICS, mode="json" 56 ) 57 self._on_command( 58 "CICD_COMMAND", 59 command_name, 60 command_args, 61 parent_command_names=parent_command_names, 62 **additional_args, 63 )
Called when a CICD command is executed.
Arguments:
- command_name: The name of the command.
- command_args: The arguments of the command.
- parent_command_names: The names of the parent commands.
- cicd_config: The CICD bot configuration.
def
on_cli_command( self, *, command_name: str, command_args: Collection[str], parent_command_names: Collection[str]) -> None:
65 def on_cli_command( 66 self, 67 *, 68 command_name: str, 69 command_args: t.Collection[str], 70 parent_command_names: t.Collection[str], 71 ) -> None: 72 """Called when a CLI command is executed. 73 74 Args: 75 command_name: The name of the command. 76 command_args: The arguments of the command. 77 parent_command_names: The names of the parent commands. 78 """ 79 self._on_command( 80 "CLI_COMMAND", command_name, command_args, parent_command_names=parent_command_names 81 )
Called when a CLI command is executed.
Arguments:
- command_name: The name of the command.
- command_args: The arguments of the command.
- parent_command_names: The names of the parent commands.
def
on_magic_command(self, *, command_name: str, command_args: Collection[str]) -> None:
83 def on_magic_command(self, *, command_name: str, command_args: t.Collection[str]) -> None: 84 """Called when a Notebook magic command is executed. 85 86 Args: 87 command_name: The name of the command. 88 command_args: The arguments of the command. 89 """ 90 self._on_command("MAGIC_COMMAND", command_name, command_args)
Called when a Notebook magic command is executed.
Arguments:
- command_name: The name of the command.
- command_args: The arguments of the command.
def
on_python_api_command(self, *, command_name: str, command_args: Collection[str]) -> None:
92 def on_python_api_command(self, *, command_name: str, command_args: t.Collection[str]) -> None: 93 """Called when a Python method is called directly. 94 95 Args: 96 command_name: The name of the command. 97 command_args: The arguments of the command. 98 """ 99 self._on_command("PYTHON_API_COMMAND", command_name, command_args)
Called when a Python method is called directly.
Arguments:
- command_name: The name of the command.
- command_args: The arguments of the command.
def
on_project_loaded( self, *, project_type: str, models_count: int, audits_count: int, standalone_audits_count: int, macros_count: int, jinja_macros_count: int, load_time_sec: float, state_sync_fingerprint: str, project_name: str) -> None:
101 def on_project_loaded( 102 self, 103 *, 104 project_type: str, 105 models_count: int, 106 audits_count: int, 107 standalone_audits_count: int, 108 macros_count: int, 109 jinja_macros_count: int, 110 load_time_sec: float, 111 state_sync_fingerprint: str, 112 project_name: str, 113 ) -> None: 114 """Called when a project is loaded. 115 116 Args: 117 project_type: The type of the project. Eg. "dbt" or "native". 118 models_count: The number of models in the project. 119 audits_count: The number of audits in the project. 120 standalone_audits_count: The number of standalone audits in the project. 121 macros_count: The number of macros in the project. 122 jinja_macros_count: The number of Jinja macros in the project. 123 load_time_sec: The time it took to load the project in (fractional) seconds. 124 state_sync_fingerprint: The fingerprint of the state sync configuration. 125 project_name: The name of the project. 126 """ 127 project_type = project_type.lower() 128 event_data = { 129 "project_type": project_type, 130 "models_count": models_count, 131 "audits_count": audits_count, 132 "standalone_audits_count": standalone_audits_count, 133 "macros_count": macros_count, 134 "jinja_macros_count": jinja_macros_count, 135 "load_time_ms": int(load_time_sec * 1000), 136 "state_sync_fingerprint": state_sync_fingerprint, 137 "project_name_hash": _anonymize(project_name), 138 } 139 140 if project_type in {c.DBT, c.HYBRID}: 141 from dbt.version import __version__ as dbt_version 142 143 event_data["dbt_version"] = dbt_version 144 145 self._add_event("PROJECT_LOADED", event_data)
Called when a project is loaded.
Arguments:
- project_type: The type of the project. Eg. "dbt" or "native".
- models_count: The number of models in the project.
- audits_count: The number of audits in the project.
- standalone_audits_count: The number of standalone audits in the project.
- macros_count: The number of macros in the project.
- jinja_macros_count: The number of Jinja macros in the project.
- load_time_sec: The time it took to load the project in (fractional) seconds.
- state_sync_fingerprint: The fingerprint of the state sync configuration.
- project_name: The name of the project.
def
on_plan_apply_start( self, *, plan: sqlmesh.core.plan.definition.EvaluatablePlan, engine_type: Optional[str], state_sync_type: Optional[str], scheduler_type: str) -> None:
147 def on_plan_apply_start( 148 self, 149 *, 150 plan: EvaluatablePlan, 151 engine_type: t.Optional[str], 152 state_sync_type: t.Optional[str], 153 scheduler_type: str, 154 ) -> None: 155 """Called after the plan application starts. 156 157 Args: 158 plan: The plan that is being applied. 159 engine_type: The type of the target engine. 160 state_sync_type: The type of the engine used to store the SQLMesh state. 161 scheduler_type: The type of the scheduler being used. Eg. "builtin". 162 """ 163 self._add_event( 164 "PLAN_APPLY_START", 165 { 166 "plan_id": plan.plan_id, 167 "engine_type": engine_type.lower() if engine_type is not None else None, 168 "state_sync_type": state_sync_type.lower() if state_sync_type is not None else None, 169 "scheduler_type": scheduler_type.lower(), 170 "is_dev": plan.is_dev, 171 "skip_backfill": plan.skip_backfill, 172 "no_gaps": plan.no_gaps, 173 "forward_only": plan.forward_only, 174 "ensure_finalized_snapshots": plan.ensure_finalized_snapshots, 175 "has_restatements": bool(plan.restatements), 176 "directly_modified_count": len(plan.directly_modified_snapshots), 177 "indirectly_modified_count": len( 178 { 179 s_id 180 for s_ids in plan.indirectly_modified_snapshots.values() 181 for s_id in s_ids 182 } 183 ), 184 "environment_name_hash": _anonymize(plan.environment.name), 185 }, 186 )
Called after the plan application starts.
Arguments:
- plan: The plan that is being applied.
- engine_type: The type of the target engine.
- state_sync_type: The type of the engine used to store the SQLMesh state.
- scheduler_type: The type of the scheduler being used. Eg. "builtin".
def
on_plan_apply_end(self, *, plan_id: str, error: Optional[Any] = None) -> None:
188 def on_plan_apply_end(self, *, plan_id: str, error: t.Optional[t.Any] = None) -> None: 189 """Called after the plan application ends. 190 191 Args: 192 plan_id: The ID of the plan that was applied. 193 error: The error that occurred during plan application, if any. 194 """ 195 self._add_event( 196 "PLAN_APPLY_END", 197 { 198 "plan_id": plan_id, 199 "succeeded": error is None, 200 "error": type(error).__name__ if error else None, 201 }, 202 )
Called after the plan application ends.
Arguments:
- plan_id: The ID of the plan that was applied.
- error: The error that occurred during plan application, if any.
def
on_snapshots_created( self, *, new_snapshots: Collection[sqlmesh.core.snapshot.definition.Snapshot], plan_id: str) -> None:
204 def on_snapshots_created(self, *, new_snapshots: t.Collection[Snapshot], plan_id: str) -> None: 205 """Called after new snapshots were created and stored in the SQLMesh state. 206 207 Args: 208 new_snapshots: The list of new snapshots. 209 plan_id: The ID of the plan that created the snapshots. 210 """ 211 if not new_snapshots: 212 return 213 snapshots = [] 214 for snapshot in new_snapshots: 215 snapshots.append( 216 { 217 "name_hash": _anonymize(snapshot.name), 218 "identifier": snapshot.identifier, 219 "version": snapshot.version, 220 "node_type": snapshot.node_type.lower(), 221 "model_kind": snapshot.model.kind.name.value.lower() 222 if snapshot.is_model 223 else None, 224 "is_sql": snapshot.model.is_sql if snapshot.is_model else None, 225 "change_category": ( 226 snapshot.change_category.name.lower() if snapshot.change_category else None 227 ), 228 "dialect": getattr(snapshot.node, "dialect", None), 229 "audits_count": len(snapshot.model.audits) if snapshot.is_model else None, 230 "effective_from_set": snapshot.effective_from is not None, 231 } 232 ) 233 self._add_event("SNAPSHOTS_CREATED", {"plan_id": plan_id, "snapshots": snapshots})
Called after new snapshots were created and stored in the SQLMesh state.
Arguments:
- new_snapshots: The list of new snapshots.
- plan_id: The ID of the plan that created the snapshots.
def
on_run_start(self, *, engine_type: str, state_sync_type: str) -> str:
235 def on_run_start(self, *, engine_type: str, state_sync_type: str) -> str: 236 """Called after a run starts. 237 238 Args: 239 engine_type: The type of the target engine. 240 state_sync_type: The type of the engine used to store the SQLMesh state. 241 242 Returns: 243 The run ID. 244 """ 245 run_id = random_id() 246 self._add_event( 247 "RUN_START", 248 { 249 "run_id": run_id, 250 "engine_type": engine_type.lower(), 251 "state_sync_type": state_sync_type.lower(), 252 }, 253 ) 254 return run_id
Called after a run starts.
Arguments:
- engine_type: The type of the target engine.
- state_sync_type: The type of the engine used to store the SQLMesh state.
Returns:
The run ID.
def
on_run_end( self, *, run_id: str, succeeded: bool, interrupted: bool, error: Optional[Any] = None) -> None:
256 def on_run_end( 257 self, *, run_id: str, succeeded: bool, interrupted: bool, error: t.Optional[t.Any] = None 258 ) -> None: 259 """Called after a run ends. 260 261 Args: 262 run_id: The ID of the run. 263 succeeded: Whether the run succeeded. 264 interrupted: Whether the run was interrupted. 265 error: The error that occurred during the run, if any. 266 """ 267 self._add_event( 268 "RUN_END", 269 { 270 "run_id": run_id, 271 "succeeded": succeeded, 272 "interrupted": interrupted, 273 "error": type(error).__name__ if error else None, 274 }, 275 )
Called after a run ends.
Arguments:
- run_id: The ID of the run.
- succeeded: Whether the run succeeded.
- interrupted: Whether the run was interrupted.
- error: The error that occurred during the run, if any.
def
on_migration_end( self, *, from_sqlmesh_version: str, state_sync_type: str, migration_time_sec: float, error: Optional[Any] = None) -> None:
277 def on_migration_end( 278 self, 279 *, 280 from_sqlmesh_version: str, 281 state_sync_type: str, 282 migration_time_sec: float, 283 error: t.Optional[t.Any] = None, 284 ) -> None: 285 """Called after the migration of the SQLMesh state ends. 286 287 Args: 288 from_sqlmesh_version: The version of SQLMesh from which the migration started. 289 state_sync_type: The type of the engine used to store the SQLMesh state. 290 migration_time_sec: The time it took to migrate the SQLMesh state in (fractional) seconds. 291 error: The error that occurred during the migration, if any. 292 """ 293 self._add_event( 294 "MIGRATION_END", 295 { 296 "from_sqlmesh_version": from_sqlmesh_version, 297 "state_sync_type": state_sync_type, 298 "succeeded": error is None, 299 "error": type(error).__name__ if error else None, 300 "migration_time_ms": int(migration_time_sec * 1000), 301 }, 302 )
Called after the migration of the SQLMesh state ends.
Arguments:
- from_sqlmesh_version: The version of SQLMesh from which the migration started.
- state_sync_type: The type of the engine used to store the SQLMesh state.
- migration_time_sec: The time it took to migrate the SQLMesh state in (fractional) seconds.
- error: The error that occurred during the migration, if any.
343class User(PydanticModel): 344 id: str 345 346 @classmethod 347 def load_or_create(cls, sqlmesh_path: Path) -> User: 348 if not sqlmesh_path.exists(): 349 sqlmesh_path.mkdir(parents=True, exist_ok=True) 350 351 user_path = sqlmesh_path / "user.yaml" 352 if user_path.exists(): 353 raw_user = yaml_load(user_path, raise_if_empty=False) 354 if raw_user: 355 return cls.parse_obj(raw_user) 356 357 user = User(id=random_id()) 358 with user_path.open("w") as fd: 359 yaml_dump(user.dict(mode="json"), stream=fd) 360 return user
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
346 @classmethod 347 def load_or_create(cls, sqlmesh_path: Path) -> User: 348 if not sqlmesh_path.exists(): 349 sqlmesh_path.mkdir(parents=True, exist_ok=True) 350 351 user_path = sqlmesh_path / "user.yaml" 352 if user_path.exists(): 353 raw_user = yaml_load(user_path, raise_if_empty=False) 354 if raw_user: 355 return cls.parse_obj(raw_user) 356 357 user = User(id=random_id()) 358 with user_path.open("w") as fd: 359 yaml_dump(user.dict(mode="json"), stream=fd) 360 return user
model_config =
{'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs