sqlmesh.core.notification_target
1from __future__ import annotations 2 3import functools 4import smtplib 5import sys 6import typing as t 7from email.message import EmailMessage 8from enum import Enum 9 10from pydantic import Field, SecretStr 11 12from sqlmesh.core.console import Console, get_console 13from sqlmesh.integrations import slack 14from sqlmesh.utils.errors import AuditError, ConfigError, MissingDependencyError 15from sqlmesh.utils.pydantic import PydanticModel 16 17if sys.version_info >= (3, 8): 18 from typing import Literal 19else: 20 from typing_extensions import Literal 21 22if sys.version_info >= (3, 9): 23 from typing import Annotated 24else: 25 from typing_extensions import Annotated 26 27if t.TYPE_CHECKING: 28 from slack_sdk import WebClient, WebhookClient 29 30 31def _sqlmesh_version() -> str: 32 try: 33 from sqlmesh import __version__ 34 35 return __version__ 36 except ImportError: 37 return "0.0.0" 38 39 40NOTIFICATION_FUNCTIONS: t.Dict[NotificationEvent, str] = {} 41 42 43class NotificationStatus(str, Enum): 44 SUCCESS = "success" 45 FAILURE = "failure" 46 WARNING = "warning" 47 INFO = "info" 48 PROGRESS = "progress" 49 50 @property 51 def is_success(self) -> bool: 52 return self == NotificationStatus.SUCCESS 53 54 @property 55 def is_failure(self) -> bool: 56 return self == NotificationStatus.FAILURE 57 58 @property 59 def is_info(self) -> bool: 60 return self == NotificationStatus.INFO 61 62 @property 63 def is_warning(self) -> bool: 64 return self == NotificationStatus.WARNING 65 66 @property 67 def is_progress(self) -> bool: 68 return self == NotificationStatus.PROGRESS 69 70 71class NotificationEvent(str, Enum): 72 APPLY_START = "apply_start" 73 APPLY_END = "apply_end" 74 RUN_START = "run_start" 75 RUN_END = "run_end" 76 MIGRATION_START = "migration_start" 77 MIGRATION_END = "migration_end" 78 APPLY_FAILURE = "apply_failure" 79 RUN_FAILURE = "run_failure" 80 AUDIT_FAILURE = "audit_failure" 81 MIGRATION_FAILURE = "migration_failure" 82 83 84def notify(event: NotificationEvent) -> t.Callable: 85 """Decorator used to register 'notify' methods and the events they correspond to.""" 86 87 def decorator(f: t.Callable) -> t.Callable: 88 @functools.wraps(f) 89 def wrapper(*args: t.List[t.Any], **kwargs: t.Dict[str, t.Any]) -> None: 90 return f(*args, **kwargs) 91 92 NOTIFICATION_FUNCTIONS[event] = f.__name__ 93 return wrapper 94 95 return decorator 96 97 98class BaseNotificationTarget(PydanticModel, frozen=True): 99 """ 100 Base notification target model. Provides a command for sending notifications that is currently only used 101 by the built-in scheduler. Other schedulers like Airflow use the configuration of the target itself 102 to create the notification constructs appropriate for the scheduler. 103 """ 104 105 type_: str 106 notify_on: t.FrozenSet[NotificationEvent] = frozenset() 107 108 def send(self, notification_status: NotificationStatus, msg: str, **kwargs: t.Any) -> None: 109 """Sends notification with the provided message. 110 111 Args: 112 notification_status: The status of the notification. One of: success, failure, warning, info, or progress. 113 msg: The message to send. 114 """ 115 116 @notify(NotificationEvent.APPLY_START) 117 def notify_apply_start(self, environment: str, plan_id: str) -> None: 118 """Notify when an apply starts. 119 120 Args: 121 environment: The target environment of the plan. 122 plan_id: plan_id that is being applied. 123 """ 124 self.send( 125 NotificationStatus.INFO, 126 f"Plan {plan_id} apply started for environment `{environment}`.", 127 ) 128 129 @notify(NotificationEvent.APPLY_END) 130 def notify_apply_end(self, environment: str, plan_id: str) -> None: 131 """Notify when an apply ends. 132 133 Args: 134 environment: The target environment of the plan. 135 plan_id: plan_id that was applied. 136 """ 137 self.send( 138 NotificationStatus.SUCCESS, 139 f"Plan {plan_id} apply finished for environment `{environment}`.", 140 ) 141 142 @notify(NotificationEvent.RUN_START) 143 def notify_run_start(self, environment: str) -> None: 144 """Notify when a SQLMesh run starts. 145 146 Args: 147 environment: The target environment of the run. 148 """ 149 self.send(NotificationStatus.INFO, f"SQLMesh run started for environment `{environment}`.") 150 151 @notify(NotificationEvent.RUN_END) 152 def notify_run_end(self, environment: str) -> None: 153 """Notify when a SQLMesh run ends. 154 155 Args: 156 environment: The target environment of the run. 157 """ 158 self.send( 159 NotificationStatus.SUCCESS, f"SQLMesh run finished for environment `{environment}`." 160 ) 161 162 @notify(NotificationEvent.MIGRATION_START) 163 def notify_migration_start(self) -> None: 164 """Notify when a SQLMesh migration starts.""" 165 self.send(NotificationStatus.INFO, "SQLMesh migration started.") 166 167 @notify(NotificationEvent.MIGRATION_END) 168 def notify_migration_end(self) -> None: 169 """Notify when a SQLMesh migration ends.""" 170 self.send(NotificationStatus.SUCCESS, "SQLMesh migration finished.") 171 172 @notify(NotificationEvent.APPLY_FAILURE) 173 def notify_apply_failure(self, environment: str, plan_id: str, exc: str) -> None: 174 """Notify in the case of an apply failure. 175 176 Args: 177 environment: The target environment of the run. 178 plan_id: The plan id of the failed apply 179 exc: The exception stack trace. 180 """ 181 self.send( 182 NotificationStatus.FAILURE, 183 f"Plan {plan_id} in environment `{environment}` apply failed.", 184 exc=exc, 185 ) 186 187 @notify(NotificationEvent.RUN_FAILURE) 188 def notify_run_failure(self, exc: str) -> None: 189 """Notify in the case of a run failure. 190 191 Args: 192 exc: The exception stack trace. 193 """ 194 self.send(NotificationStatus.FAILURE, "SQLMesh run failed.", exc=exc) 195 196 @notify(NotificationEvent.AUDIT_FAILURE) 197 def notify_audit_failure(self, audit_error: AuditError) -> None: 198 """Notify in the case of an audit failure. 199 200 Args: 201 audit_error: The AuditError object. 202 """ 203 self.send(NotificationStatus.FAILURE, "Audit failure.", audit_error=audit_error) 204 205 @notify(NotificationEvent.MIGRATION_FAILURE) 206 def notify_migration_failure(self, exc: str) -> None: 207 """Notify in the case of a migration failure. 208 209 Args: 210 exc: The exception stack trace. 211 """ 212 self.send(NotificationStatus.FAILURE, "SQLMesh migration failed.", exc=exc) 213 214 @property 215 def is_configured(self) -> bool: 216 return True 217 218 219class BaseTextBasedNotificationTarget(BaseNotificationTarget): 220 """ 221 A base class for unstructured notification targets (e.g.: console, email, etc.) 222 """ 223 224 def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None: 225 """Send the notification message as text.""" 226 227 def send( 228 self, 229 notification_status: NotificationStatus, 230 msg: str, 231 audit_error: t.Optional[AuditError] = None, 232 exc: t.Optional[str] = None, 233 **kwargs: t.Any, 234 ) -> None: 235 error = None 236 if audit_error: 237 error = str(audit_error) 238 elif exc: 239 error = exc 240 241 self.send_text_message(notification_status, msg if error is None else f"{msg}\n{error}") 242 243 244class ConsoleNotificationTarget(BaseTextBasedNotificationTarget): 245 """ 246 Example console notification target. Keeping this around for testing purposes. 247 """ 248 249 type_: Literal["console"] = Field(alias="type", default="console") 250 _console: t.Optional[Console] = None 251 252 @property 253 def console(self) -> Console: 254 if not self._console: 255 self._console = get_console() 256 return self._console 257 258 def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None: 259 if notification_status.is_success: 260 self.console.log_success(msg) 261 elif notification_status.is_failure: 262 self.console.log_error(msg) 263 else: 264 self.console.log_status_update(msg) 265 266 267class BaseSlackNotificationTarget(BaseNotificationTarget): 268 def send( 269 self, 270 notification_status: NotificationStatus, 271 msg: str, 272 audit_error: t.Optional[AuditError] = None, 273 exc: t.Optional[str] = None, 274 **kwargs: t.Any, 275 ) -> None: 276 277 status_emoji = { 278 NotificationStatus.PROGRESS: slack.SlackAlertIcon.START, 279 NotificationStatus.SUCCESS: slack.SlackAlertIcon.SUCCESS, 280 NotificationStatus.FAILURE: slack.SlackAlertIcon.FAILURE, 281 NotificationStatus.WARNING: slack.SlackAlertIcon.WARNING, 282 NotificationStatus.INFO: slack.SlackAlertIcon.INFO, 283 } 284 285 composed = slack.message().add_primary_blocks( 286 slack.header_block(f"{status_emoji[notification_status]} SQLMesh Notification"), 287 slack.context_block(f"*Status:* `{notification_status.value}`"), 288 slack.divider_block(), 289 slack.text_section_block(f"*Message*: {msg}"), 290 ) 291 292 details = [] 293 if audit_error: 294 details = [ 295 slack.fields_section_block( 296 f"*Audit*: `{audit_error.audit_name}`", 297 f"*Model*: `{audit_error.model_name}`", 298 f"*Count*: `{audit_error.count}`", 299 ), 300 slack.preformatted_rich_text_block(audit_error.sql(pretty=True)), 301 ] 302 elif exc: 303 details = [slack.preformatted_rich_text_block(exc)] 304 305 composed.add_primary_blocks( 306 *details, 307 slack.divider_block(), 308 slack.context_block( 309 f"*SQLMesh Version:* {_sqlmesh_version()}", f"*Python Version:* {sys.version}" 310 ), 311 ) 312 313 self._send_slack_message( 314 composed=composed.slack_message, 315 ) 316 317 def _send_slack_message(self, composed: slack.TSlackMessage) -> None: 318 """Send a composed message Slack. 319 320 Args: 321 composed: the formatted message to send to Slack 322 """ 323 324 325class SlackWebhookNotificationTarget(BaseSlackNotificationTarget): 326 url: t.Optional[str] = None 327 type_: Literal["slack_webhook"] = Field(alias="type", default="slack_webhook") 328 _client: t.Optional[WebhookClient] = None 329 330 @property 331 def client(self) -> WebhookClient: 332 if not self._client: 333 try: 334 from slack_sdk import WebhookClient 335 except ModuleNotFoundError as e: 336 raise MissingDependencyError( 337 "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them." 338 ) from e 339 340 if not self.url: 341 raise ConfigError("Missing Slack webhook URL") 342 343 self._client = WebhookClient(url=self.url) 344 return self._client 345 346 def _send_slack_message(self, composed: slack.TSlackMessage) -> None: 347 self.client.send( 348 blocks=composed["blocks"], 349 attachments=composed["attachments"], # type: ignore 350 ) 351 352 @property 353 def is_configured(self) -> bool: 354 return bool(self.url) 355 356 357class SlackApiNotificationTarget(BaseSlackNotificationTarget): 358 token: t.Optional[str] = None 359 channel: t.Optional[str] = None 360 type_: Literal["slack_api"] = Field(alias="type", default="slack_api") 361 _client: t.Optional[WebClient] = None 362 363 @property 364 def client(self) -> WebClient: 365 if not self._client: 366 try: 367 from slack_sdk import WebClient 368 except ModuleNotFoundError as e: 369 raise MissingDependencyError( 370 "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them." 371 ) from e 372 373 self._client = WebClient(token=self.token) 374 return self._client 375 376 def _send_slack_message(self, composed: slack.TSlackMessage) -> None: 377 if not self.channel: 378 raise ConfigError("Missing Slack channel for notification") 379 380 self.client.chat_postMessage( 381 channel=self.channel, 382 blocks=composed["blocks"], 383 attachments=composed["attachments"], # type: ignore 384 ) 385 386 @property 387 def is_configured(self) -> bool: 388 return all((self.token, self.channel)) 389 390 391class BasicSMTPNotificationTarget(BaseTextBasedNotificationTarget): 392 host: t.Optional[str] = None 393 port: int = 465 394 user: t.Optional[str] = None 395 password: t.Optional[SecretStr] = None 396 sender: t.Optional[str] = None 397 recipients: t.Optional[t.FrozenSet[str]] = None 398 subject: t.Optional[str] = "SQLMesh Notification" 399 type_: Literal["smtp"] = Field(alias="type", default="smtp") 400 401 def send_text_message( 402 self, 403 notification_status: NotificationStatus, 404 msg: str, 405 ) -> None: 406 if not self.host: 407 raise ConfigError("Missing SMTP host for notification") 408 409 email = EmailMessage() 410 email["Subject"] = self.subject 411 email["To"] = ",".join(self.recipients or []) 412 email["From"] = self.sender 413 email.set_content(msg) 414 with smtplib.SMTP_SSL(host=self.host, port=self.port) as smtp: 415 if self.user and self.password: 416 smtp.login(user=self.user, password=self.password.get_secret_value()) 417 smtp.send_message(email) 418 419 @property 420 def is_configured(self) -> bool: 421 return all((self.host, self.user, self.password, self.sender)) 422 423 424NotificationTarget = Annotated[ 425 t.Union[ 426 BasicSMTPNotificationTarget, 427 ConsoleNotificationTarget, 428 SlackApiNotificationTarget, 429 SlackWebhookNotificationTarget, 430 ], 431 Field(discriminator="type_"), 432] 433 434 435class NotificationTargetManager: 436 """Wrapper around a list of notification targets. 437 438 Calling a notification target's "notify_" method on this object will call it 439 on all registered notification targets. 440 """ 441 442 def __init__( 443 self, 444 notification_targets: t.Dict[NotificationEvent, t.Set[NotificationTarget]] | None = None, 445 user_notification_targets: t.Dict[str, t.Set[NotificationTarget]] | None = None, 446 username: str | None = None, 447 ) -> None: 448 self.notification_targets = notification_targets or {} 449 self.user_notification_targets = user_notification_targets or {} 450 self.username = username 451 452 def notify(self, event: NotificationEvent, *args: t.Any, **kwargs: t.Any) -> None: 453 """Call the 'notify_`event`' function of all notification targets that care about the event.""" 454 if self.username: 455 self.notify_user(event, self.username, *args, **kwargs) 456 else: 457 for notification_target in self.notification_targets.get(event, set()): 458 notify_func = self._get_notification_function(notification_target, event) 459 notify_func(*args, **kwargs) 460 461 def notify_user( 462 self, event: NotificationEvent, username: str, *args: t.Any, **kwargs: t.Any 463 ) -> None: 464 """Call the 'notify_`event`' function of the user's notification targets that care about the event.""" 465 notification_targets = self.user_notification_targets.get(username, set()) 466 for notification_target in notification_targets: 467 if event in notification_target.notify_on: 468 notify_func = self._get_notification_function(notification_target, event) 469 notify_func(*args, **kwargs) 470 471 def _get_notification_function( 472 self, notification_target: NotificationTarget, event: NotificationEvent 473 ) -> t.Callable: 474 """Lookup the registered function for a notification event""" 475 func_name = NOTIFICATION_FUNCTIONS[event] 476 return getattr(notification_target, func_name)
44class NotificationStatus(str, Enum): 45 SUCCESS = "success" 46 FAILURE = "failure" 47 WARNING = "warning" 48 INFO = "info" 49 PROGRESS = "progress" 50 51 @property 52 def is_success(self) -> bool: 53 return self == NotificationStatus.SUCCESS 54 55 @property 56 def is_failure(self) -> bool: 57 return self == NotificationStatus.FAILURE 58 59 @property 60 def is_info(self) -> bool: 61 return self == NotificationStatus.INFO 62 63 @property 64 def is_warning(self) -> bool: 65 return self == NotificationStatus.WARNING 66 67 @property 68 def is_progress(self) -> bool: 69 return self == NotificationStatus.PROGRESS
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
72class NotificationEvent(str, Enum): 73 APPLY_START = "apply_start" 74 APPLY_END = "apply_end" 75 RUN_START = "run_start" 76 RUN_END = "run_end" 77 MIGRATION_START = "migration_start" 78 MIGRATION_END = "migration_end" 79 APPLY_FAILURE = "apply_failure" 80 RUN_FAILURE = "run_failure" 81 AUDIT_FAILURE = "audit_failure" 82 MIGRATION_FAILURE = "migration_failure"
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
85def notify(event: NotificationEvent) -> t.Callable: 86 """Decorator used to register 'notify' methods and the events they correspond to.""" 87 88 def decorator(f: t.Callable) -> t.Callable: 89 @functools.wraps(f) 90 def wrapper(*args: t.List[t.Any], **kwargs: t.Dict[str, t.Any]) -> None: 91 return f(*args, **kwargs) 92 93 NOTIFICATION_FUNCTIONS[event] = f.__name__ 94 return wrapper 95 96 return decorator
Decorator used to register 'notify' methods and the events they correspond to.
99class BaseNotificationTarget(PydanticModel, frozen=True): 100 """ 101 Base notification target model. Provides a command for sending notifications that is currently only used 102 by the built-in scheduler. Other schedulers like Airflow use the configuration of the target itself 103 to create the notification constructs appropriate for the scheduler. 104 """ 105 106 type_: str 107 notify_on: t.FrozenSet[NotificationEvent] = frozenset() 108 109 def send(self, notification_status: NotificationStatus, msg: str, **kwargs: t.Any) -> None: 110 """Sends notification with the provided message. 111 112 Args: 113 notification_status: The status of the notification. One of: success, failure, warning, info, or progress. 114 msg: The message to send. 115 """ 116 117 @notify(NotificationEvent.APPLY_START) 118 def notify_apply_start(self, environment: str, plan_id: str) -> None: 119 """Notify when an apply starts. 120 121 Args: 122 environment: The target environment of the plan. 123 plan_id: plan_id that is being applied. 124 """ 125 self.send( 126 NotificationStatus.INFO, 127 f"Plan {plan_id} apply started for environment `{environment}`.", 128 ) 129 130 @notify(NotificationEvent.APPLY_END) 131 def notify_apply_end(self, environment: str, plan_id: str) -> None: 132 """Notify when an apply ends. 133 134 Args: 135 environment: The target environment of the plan. 136 plan_id: plan_id that was applied. 137 """ 138 self.send( 139 NotificationStatus.SUCCESS, 140 f"Plan {plan_id} apply finished for environment `{environment}`.", 141 ) 142 143 @notify(NotificationEvent.RUN_START) 144 def notify_run_start(self, environment: str) -> None: 145 """Notify when a SQLMesh run starts. 146 147 Args: 148 environment: The target environment of the run. 149 """ 150 self.send(NotificationStatus.INFO, f"SQLMesh run started for environment `{environment}`.") 151 152 @notify(NotificationEvent.RUN_END) 153 def notify_run_end(self, environment: str) -> None: 154 """Notify when a SQLMesh run ends. 155 156 Args: 157 environment: The target environment of the run. 158 """ 159 self.send( 160 NotificationStatus.SUCCESS, f"SQLMesh run finished for environment `{environment}`." 161 ) 162 163 @notify(NotificationEvent.MIGRATION_START) 164 def notify_migration_start(self) -> None: 165 """Notify when a SQLMesh migration starts.""" 166 self.send(NotificationStatus.INFO, "SQLMesh migration started.") 167 168 @notify(NotificationEvent.MIGRATION_END) 169 def notify_migration_end(self) -> None: 170 """Notify when a SQLMesh migration ends.""" 171 self.send(NotificationStatus.SUCCESS, "SQLMesh migration finished.") 172 173 @notify(NotificationEvent.APPLY_FAILURE) 174 def notify_apply_failure(self, environment: str, plan_id: str, exc: str) -> None: 175 """Notify in the case of an apply failure. 176 177 Args: 178 environment: The target environment of the run. 179 plan_id: The plan id of the failed apply 180 exc: The exception stack trace. 181 """ 182 self.send( 183 NotificationStatus.FAILURE, 184 f"Plan {plan_id} in environment `{environment}` apply failed.", 185 exc=exc, 186 ) 187 188 @notify(NotificationEvent.RUN_FAILURE) 189 def notify_run_failure(self, exc: str) -> None: 190 """Notify in the case of a run failure. 191 192 Args: 193 exc: The exception stack trace. 194 """ 195 self.send(NotificationStatus.FAILURE, "SQLMesh run failed.", exc=exc) 196 197 @notify(NotificationEvent.AUDIT_FAILURE) 198 def notify_audit_failure(self, audit_error: AuditError) -> None: 199 """Notify in the case of an audit failure. 200 201 Args: 202 audit_error: The AuditError object. 203 """ 204 self.send(NotificationStatus.FAILURE, "Audit failure.", audit_error=audit_error) 205 206 @notify(NotificationEvent.MIGRATION_FAILURE) 207 def notify_migration_failure(self, exc: str) -> None: 208 """Notify in the case of a migration failure. 209 210 Args: 211 exc: The exception stack trace. 212 """ 213 self.send(NotificationStatus.FAILURE, "SQLMesh migration failed.", exc=exc) 214 215 @property 216 def is_configured(self) -> bool: 217 return True
Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler. Other schedulers like Airflow use the configuration of the target itself to create the notification constructs appropriate for the scheduler.
109 def send(self, notification_status: NotificationStatus, msg: str, **kwargs: t.Any) -> None: 110 """Sends notification with the provided message. 111 112 Args: 113 notification_status: The status of the notification. One of: success, failure, warning, info, or progress. 114 msg: The message to send. 115 """
Sends notification with the provided message.
Arguments:
- notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
- msg: The message to send.
117 @notify(NotificationEvent.APPLY_START) 118 def notify_apply_start(self, environment: str, plan_id: str) -> None: 119 """Notify when an apply starts. 120 121 Args: 122 environment: The target environment of the plan. 123 plan_id: plan_id that is being applied. 124 """ 125 self.send( 126 NotificationStatus.INFO, 127 f"Plan {plan_id} apply started for environment `{environment}`.", 128 )
Notify when an apply starts.
Arguments:
- environment: The target environment of the plan.
- plan_id: plan_id that is being applied.
130 @notify(NotificationEvent.APPLY_END) 131 def notify_apply_end(self, environment: str, plan_id: str) -> None: 132 """Notify when an apply ends. 133 134 Args: 135 environment: The target environment of the plan. 136 plan_id: plan_id that was applied. 137 """ 138 self.send( 139 NotificationStatus.SUCCESS, 140 f"Plan {plan_id} apply finished for environment `{environment}`.", 141 )
Notify when an apply ends.
Arguments:
- environment: The target environment of the plan.
- plan_id: plan_id that was applied.
143 @notify(NotificationEvent.RUN_START) 144 def notify_run_start(self, environment: str) -> None: 145 """Notify when a SQLMesh run starts. 146 147 Args: 148 environment: The target environment of the run. 149 """ 150 self.send(NotificationStatus.INFO, f"SQLMesh run started for environment `{environment}`.")
Notify when a SQLMesh run starts.
Arguments:
- environment: The target environment of the run.
152 @notify(NotificationEvent.RUN_END) 153 def notify_run_end(self, environment: str) -> None: 154 """Notify when a SQLMesh run ends. 155 156 Args: 157 environment: The target environment of the run. 158 """ 159 self.send( 160 NotificationStatus.SUCCESS, f"SQLMesh run finished for environment `{environment}`." 161 )
Notify when a SQLMesh run ends.
Arguments:
- environment: The target environment of the run.
163 @notify(NotificationEvent.MIGRATION_START) 164 def notify_migration_start(self) -> None: 165 """Notify when a SQLMesh migration starts.""" 166 self.send(NotificationStatus.INFO, "SQLMesh migration started.")
Notify when a SQLMesh migration starts.
168 @notify(NotificationEvent.MIGRATION_END) 169 def notify_migration_end(self) -> None: 170 """Notify when a SQLMesh migration ends.""" 171 self.send(NotificationStatus.SUCCESS, "SQLMesh migration finished.")
Notify when a SQLMesh migration ends.
173 @notify(NotificationEvent.APPLY_FAILURE) 174 def notify_apply_failure(self, environment: str, plan_id: str, exc: str) -> None: 175 """Notify in the case of an apply failure. 176 177 Args: 178 environment: The target environment of the run. 179 plan_id: The plan id of the failed apply 180 exc: The exception stack trace. 181 """ 182 self.send( 183 NotificationStatus.FAILURE, 184 f"Plan {plan_id} in environment `{environment}` apply failed.", 185 exc=exc, 186 )
Notify in the case of an apply failure.
Arguments:
- environment: The target environment of the run.
- plan_id: The plan id of the failed apply
- exc: The exception stack trace.
188 @notify(NotificationEvent.RUN_FAILURE) 189 def notify_run_failure(self, exc: str) -> None: 190 """Notify in the case of a run failure. 191 192 Args: 193 exc: The exception stack trace. 194 """ 195 self.send(NotificationStatus.FAILURE, "SQLMesh run failed.", exc=exc)
Notify in the case of a run failure.
Arguments:
- exc: The exception stack trace.
197 @notify(NotificationEvent.AUDIT_FAILURE) 198 def notify_audit_failure(self, audit_error: AuditError) -> None: 199 """Notify in the case of an audit failure. 200 201 Args: 202 audit_error: The AuditError object. 203 """ 204 self.send(NotificationStatus.FAILURE, "Audit failure.", audit_error=audit_error)
Notify in the case of an audit failure.
Arguments:
- audit_error: The AuditError object.
206 @notify(NotificationEvent.MIGRATION_FAILURE) 207 def notify_migration_failure(self, exc: str) -> None: 208 """Notify in the case of a migration failure. 209 210 Args: 211 exc: The exception stack trace. 212 """ 213 self.send(NotificationStatus.FAILURE, "SQLMesh migration failed.", exc=exc)
Notify in the case of a migration failure.
Arguments:
- exc: The exception stack trace.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
220class BaseTextBasedNotificationTarget(BaseNotificationTarget): 221 """ 222 A base class for unstructured notification targets (e.g.: console, email, etc.) 223 """ 224 225 def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None: 226 """Send the notification message as text.""" 227 228 def send( 229 self, 230 notification_status: NotificationStatus, 231 msg: str, 232 audit_error: t.Optional[AuditError] = None, 233 exc: t.Optional[str] = None, 234 **kwargs: t.Any, 235 ) -> None: 236 error = None 237 if audit_error: 238 error = str(audit_error) 239 elif exc: 240 error = exc 241 242 self.send_text_message(notification_status, msg if error is None else f"{msg}\n{error}")
A base class for unstructured notification targets (e.g.: console, email, etc.)
225 def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None: 226 """Send the notification message as text."""
Send the notification message as text.
228 def send( 229 self, 230 notification_status: NotificationStatus, 231 msg: str, 232 audit_error: t.Optional[AuditError] = None, 233 exc: t.Optional[str] = None, 234 **kwargs: t.Any, 235 ) -> None: 236 error = None 237 if audit_error: 238 error = str(audit_error) 239 elif exc: 240 error = exc 241 242 self.send_text_message(notification_status, msg if error is None else f"{msg}\n{error}")
Sends notification with the provided message.
Arguments:
- notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
- msg: The message to send.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
245class ConsoleNotificationTarget(BaseTextBasedNotificationTarget): 246 """ 247 Example console notification target. Keeping this around for testing purposes. 248 """ 249 250 type_: Literal["console"] = Field(alias="type", default="console") 251 _console: t.Optional[Console] = None 252 253 @property 254 def console(self) -> Console: 255 if not self._console: 256 self._console = get_console() 257 return self._console 258 259 def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None: 260 if notification_status.is_success: 261 self.console.log_success(msg) 262 elif notification_status.is_failure: 263 self.console.log_error(msg) 264 else: 265 self.console.log_status_update(msg)
Example console notification target. Keeping this around for testing purposes.
259 def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None: 260 if notification_status.is_success: 261 self.console.log_success(msg) 262 elif notification_status.is_failure: 263 self.console.log_error(msg) 264 else: 265 self.console.log_status_update(msg)
Send the notification message as text.
102 def wrapped_model_post_init(self: BaseModel, __context: Any) -> None: 103 """We need to both initialize private attributes and call the user-defined model_post_init 104 method. 105 """ 106 init_private_attributes(self, __context) 107 original_model_post_init(self, __context)
Override this method to perform additional initialization after __init__
and model_construct
.
This is useful if you want to do some validation that requires the entire model to be initialized.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
268class BaseSlackNotificationTarget(BaseNotificationTarget): 269 def send( 270 self, 271 notification_status: NotificationStatus, 272 msg: str, 273 audit_error: t.Optional[AuditError] = None, 274 exc: t.Optional[str] = None, 275 **kwargs: t.Any, 276 ) -> None: 277 278 status_emoji = { 279 NotificationStatus.PROGRESS: slack.SlackAlertIcon.START, 280 NotificationStatus.SUCCESS: slack.SlackAlertIcon.SUCCESS, 281 NotificationStatus.FAILURE: slack.SlackAlertIcon.FAILURE, 282 NotificationStatus.WARNING: slack.SlackAlertIcon.WARNING, 283 NotificationStatus.INFO: slack.SlackAlertIcon.INFO, 284 } 285 286 composed = slack.message().add_primary_blocks( 287 slack.header_block(f"{status_emoji[notification_status]} SQLMesh Notification"), 288 slack.context_block(f"*Status:* `{notification_status.value}`"), 289 slack.divider_block(), 290 slack.text_section_block(f"*Message*: {msg}"), 291 ) 292 293 details = [] 294 if audit_error: 295 details = [ 296 slack.fields_section_block( 297 f"*Audit*: `{audit_error.audit_name}`", 298 f"*Model*: `{audit_error.model_name}`", 299 f"*Count*: `{audit_error.count}`", 300 ), 301 slack.preformatted_rich_text_block(audit_error.sql(pretty=True)), 302 ] 303 elif exc: 304 details = [slack.preformatted_rich_text_block(exc)] 305 306 composed.add_primary_blocks( 307 *details, 308 slack.divider_block(), 309 slack.context_block( 310 f"*SQLMesh Version:* {_sqlmesh_version()}", f"*Python Version:* {sys.version}" 311 ), 312 ) 313 314 self._send_slack_message( 315 composed=composed.slack_message, 316 ) 317 318 def _send_slack_message(self, composed: slack.TSlackMessage) -> None: 319 """Send a composed message Slack. 320 321 Args: 322 composed: the formatted message to send to Slack 323 """
Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler. Other schedulers like Airflow use the configuration of the target itself to create the notification constructs appropriate for the scheduler.
269 def send( 270 self, 271 notification_status: NotificationStatus, 272 msg: str, 273 audit_error: t.Optional[AuditError] = None, 274 exc: t.Optional[str] = None, 275 **kwargs: t.Any, 276 ) -> None: 277 278 status_emoji = { 279 NotificationStatus.PROGRESS: slack.SlackAlertIcon.START, 280 NotificationStatus.SUCCESS: slack.SlackAlertIcon.SUCCESS, 281 NotificationStatus.FAILURE: slack.SlackAlertIcon.FAILURE, 282 NotificationStatus.WARNING: slack.SlackAlertIcon.WARNING, 283 NotificationStatus.INFO: slack.SlackAlertIcon.INFO, 284 } 285 286 composed = slack.message().add_primary_blocks( 287 slack.header_block(f"{status_emoji[notification_status]} SQLMesh Notification"), 288 slack.context_block(f"*Status:* `{notification_status.value}`"), 289 slack.divider_block(), 290 slack.text_section_block(f"*Message*: {msg}"), 291 ) 292 293 details = [] 294 if audit_error: 295 details = [ 296 slack.fields_section_block( 297 f"*Audit*: `{audit_error.audit_name}`", 298 f"*Model*: `{audit_error.model_name}`", 299 f"*Count*: `{audit_error.count}`", 300 ), 301 slack.preformatted_rich_text_block(audit_error.sql(pretty=True)), 302 ] 303 elif exc: 304 details = [slack.preformatted_rich_text_block(exc)] 305 306 composed.add_primary_blocks( 307 *details, 308 slack.divider_block(), 309 slack.context_block( 310 f"*SQLMesh Version:* {_sqlmesh_version()}", f"*Python Version:* {sys.version}" 311 ), 312 ) 313 314 self._send_slack_message( 315 composed=composed.slack_message, 316 )
Sends notification with the provided message.
Arguments:
- notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
- msg: The message to send.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
326class SlackWebhookNotificationTarget(BaseSlackNotificationTarget): 327 url: t.Optional[str] = None 328 type_: Literal["slack_webhook"] = Field(alias="type", default="slack_webhook") 329 _client: t.Optional[WebhookClient] = None 330 331 @property 332 def client(self) -> WebhookClient: 333 if not self._client: 334 try: 335 from slack_sdk import WebhookClient 336 except ModuleNotFoundError as e: 337 raise MissingDependencyError( 338 "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them." 339 ) from e 340 341 if not self.url: 342 raise ConfigError("Missing Slack webhook URL") 343 344 self._client = WebhookClient(url=self.url) 345 return self._client 346 347 def _send_slack_message(self, composed: slack.TSlackMessage) -> None: 348 self.client.send( 349 blocks=composed["blocks"], 350 attachments=composed["attachments"], # type: ignore 351 ) 352 353 @property 354 def is_configured(self) -> bool: 355 return bool(self.url)
Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler. Other schedulers like Airflow use the configuration of the target itself to create the notification constructs appropriate for the scheduler.
102 def wrapped_model_post_init(self: BaseModel, __context: Any) -> None: 103 """We need to both initialize private attributes and call the user-defined model_post_init 104 method. 105 """ 106 init_private_attributes(self, __context) 107 original_model_post_init(self, __context)
Override this method to perform additional initialization after __init__
and model_construct
.
This is useful if you want to do some validation that requires the entire model to be initialized.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
358class SlackApiNotificationTarget(BaseSlackNotificationTarget): 359 token: t.Optional[str] = None 360 channel: t.Optional[str] = None 361 type_: Literal["slack_api"] = Field(alias="type", default="slack_api") 362 _client: t.Optional[WebClient] = None 363 364 @property 365 def client(self) -> WebClient: 366 if not self._client: 367 try: 368 from slack_sdk import WebClient 369 except ModuleNotFoundError as e: 370 raise MissingDependencyError( 371 "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them." 372 ) from e 373 374 self._client = WebClient(token=self.token) 375 return self._client 376 377 def _send_slack_message(self, composed: slack.TSlackMessage) -> None: 378 if not self.channel: 379 raise ConfigError("Missing Slack channel for notification") 380 381 self.client.chat_postMessage( 382 channel=self.channel, 383 blocks=composed["blocks"], 384 attachments=composed["attachments"], # type: ignore 385 ) 386 387 @property 388 def is_configured(self) -> bool: 389 return all((self.token, self.channel))
Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler. Other schedulers like Airflow use the configuration of the target itself to create the notification constructs appropriate for the scheduler.
102 def wrapped_model_post_init(self: BaseModel, __context: Any) -> None: 103 """We need to both initialize private attributes and call the user-defined model_post_init 104 method. 105 """ 106 init_private_attributes(self, __context) 107 original_model_post_init(self, __context)
Override this method to perform additional initialization after __init__
and model_construct
.
This is useful if you want to do some validation that requires the entire model to be initialized.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
392class BasicSMTPNotificationTarget(BaseTextBasedNotificationTarget): 393 host: t.Optional[str] = None 394 port: int = 465 395 user: t.Optional[str] = None 396 password: t.Optional[SecretStr] = None 397 sender: t.Optional[str] = None 398 recipients: t.Optional[t.FrozenSet[str]] = None 399 subject: t.Optional[str] = "SQLMesh Notification" 400 type_: Literal["smtp"] = Field(alias="type", default="smtp") 401 402 def send_text_message( 403 self, 404 notification_status: NotificationStatus, 405 msg: str, 406 ) -> None: 407 if not self.host: 408 raise ConfigError("Missing SMTP host for notification") 409 410 email = EmailMessage() 411 email["Subject"] = self.subject 412 email["To"] = ",".join(self.recipients or []) 413 email["From"] = self.sender 414 email.set_content(msg) 415 with smtplib.SMTP_SSL(host=self.host, port=self.port) as smtp: 416 if self.user and self.password: 417 smtp.login(user=self.user, password=self.password.get_secret_value()) 418 smtp.send_message(email) 419 420 @property 421 def is_configured(self) -> bool: 422 return all((self.host, self.user, self.password, self.sender))
A base class for unstructured notification targets (e.g.: console, email, etc.)
402 def send_text_message( 403 self, 404 notification_status: NotificationStatus, 405 msg: str, 406 ) -> None: 407 if not self.host: 408 raise ConfigError("Missing SMTP host for notification") 409 410 email = EmailMessage() 411 email["Subject"] = self.subject 412 email["To"] = ",".join(self.recipients or []) 413 email["From"] = self.sender 414 email.set_content(msg) 415 with smtplib.SMTP_SSL(host=self.host, port=self.port) as smtp: 416 if self.user and self.password: 417 smtp.login(user=self.user, password=self.password.get_secret_value()) 418 smtp.send_message(email)
Send the notification message as text.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
436class NotificationTargetManager: 437 """Wrapper around a list of notification targets. 438 439 Calling a notification target's "notify_" method on this object will call it 440 on all registered notification targets. 441 """ 442 443 def __init__( 444 self, 445 notification_targets: t.Dict[NotificationEvent, t.Set[NotificationTarget]] | None = None, 446 user_notification_targets: t.Dict[str, t.Set[NotificationTarget]] | None = None, 447 username: str | None = None, 448 ) -> None: 449 self.notification_targets = notification_targets or {} 450 self.user_notification_targets = user_notification_targets or {} 451 self.username = username 452 453 def notify(self, event: NotificationEvent, *args: t.Any, **kwargs: t.Any) -> None: 454 """Call the 'notify_`event`' function of all notification targets that care about the event.""" 455 if self.username: 456 self.notify_user(event, self.username, *args, **kwargs) 457 else: 458 for notification_target in self.notification_targets.get(event, set()): 459 notify_func = self._get_notification_function(notification_target, event) 460 notify_func(*args, **kwargs) 461 462 def notify_user( 463 self, event: NotificationEvent, username: str, *args: t.Any, **kwargs: t.Any 464 ) -> None: 465 """Call the 'notify_`event`' function of the user's notification targets that care about the event.""" 466 notification_targets = self.user_notification_targets.get(username, set()) 467 for notification_target in notification_targets: 468 if event in notification_target.notify_on: 469 notify_func = self._get_notification_function(notification_target, event) 470 notify_func(*args, **kwargs) 471 472 def _get_notification_function( 473 self, notification_target: NotificationTarget, event: NotificationEvent 474 ) -> t.Callable: 475 """Lookup the registered function for a notification event""" 476 func_name = NOTIFICATION_FUNCTIONS[event] 477 return getattr(notification_target, func_name)
Wrapper around a list of notification targets.
Calling a notification target's "notify_" method on this object will call it on all registered notification targets.
443 def __init__( 444 self, 445 notification_targets: t.Dict[NotificationEvent, t.Set[NotificationTarget]] | None = None, 446 user_notification_targets: t.Dict[str, t.Set[NotificationTarget]] | None = None, 447 username: str | None = None, 448 ) -> None: 449 self.notification_targets = notification_targets or {} 450 self.user_notification_targets = user_notification_targets or {} 451 self.username = username
453 def notify(self, event: NotificationEvent, *args: t.Any, **kwargs: t.Any) -> None: 454 """Call the 'notify_`event`' function of all notification targets that care about the event.""" 455 if self.username: 456 self.notify_user(event, self.username, *args, **kwargs) 457 else: 458 for notification_target in self.notification_targets.get(event, set()): 459 notify_func = self._get_notification_function(notification_target, event) 460 notify_func(*args, **kwargs)
Call the 'notify_event
' function of all notification targets that care about the event.
462 def notify_user( 463 self, event: NotificationEvent, username: str, *args: t.Any, **kwargs: t.Any 464 ) -> None: 465 """Call the 'notify_`event`' function of the user's notification targets that care about the event.""" 466 notification_targets = self.user_notification_targets.get(username, set()) 467 for notification_target in notification_targets: 468 if event in notification_target.notify_on: 469 notify_func = self._get_notification_function(notification_target, event) 470 notify_func(*args, **kwargs)
Call the 'notify_event
' function of the user's notification targets that care about the event.