sqlmesh.core.notification_target
1from __future__ import annotations 2 3import smtplib 4import sys 5import typing as t 6from email.message import EmailMessage 7from enum import Enum 8 9from pydantic import Field, SecretStr 10 11from sqlmesh.core.console import Console, get_console 12from sqlmesh.integrations import slack 13from sqlmesh.utils.errors import AuditError, ConfigError, MissingDependencyError 14from sqlmesh.utils.pydantic import PydanticModel 15 16if t.TYPE_CHECKING: 17 from slack_sdk import WebClient, WebhookClient 18 19 20def _sqlmesh_version() -> str: 21 try: 22 from sqlmesh import __version__ 23 24 return __version__ 25 except ImportError: 26 return "0.0.0" 27 28 29class NotificationStatus(str, Enum): 30 SUCCESS = "success" 31 FAILURE = "failure" 32 WARNING = "warning" 33 INFO = "info" 34 PROGRESS = "progress" 35 36 @property 37 def is_success(self) -> bool: 38 return self == NotificationStatus.SUCCESS 39 40 @property 41 def is_failure(self) -> bool: 42 return self == NotificationStatus.FAILURE 43 44 @property 45 def is_info(self) -> bool: 46 return self == NotificationStatus.INFO 47 48 @property 49 def is_warning(self) -> bool: 50 return self == NotificationStatus.WARNING 51 52 @property 53 def is_progress(self) -> bool: 54 return self == NotificationStatus.PROGRESS 55 56 57class NotificationEvent(str, Enum): 58 APPLY_START = "apply_start" 59 APPLY_END = "apply_end" 60 RUN_START = "run_start" 61 RUN_END = "run_end" 62 MIGRATION_START = "migration_start" 63 MIGRATION_END = "migration_end" 64 APPLY_FAILURE = "apply_failure" 65 RUN_FAILURE = "run_failure" 66 AUDIT_FAILURE = "audit_failure" 67 MIGRATION_FAILURE = "migration_failure" 68 69 70class BaseNotificationTarget(PydanticModel, frozen=True): 71 """ 72 Base notification target model. Provides a command for sending notifications that is currently only used 73 by the built-in scheduler. 74 75 Notification functions follow a naming convention of `notify_` + NotificationEvent value. 76 """ 77 78 type_: str 79 notify_on: t.FrozenSet[NotificationEvent] = frozenset() 80 81 def send(self, notification_status: NotificationStatus, msg: str, **kwargs: t.Any) -> None: 82 """Sends notification with the provided message. 83 84 Args: 85 notification_status: The status of the notification. One of: success, failure, warning, info, or progress. 86 msg: The message to send. 87 """ 88 89 def notify_apply_start( 90 self, environment: str, plan_id: str, *args: t.Any, **kwargs: t.Any 91 ) -> None: 92 """Notify when an apply starts. 93 94 Args: 95 environment: The target environment of the plan. 96 plan_id: plan_id that is being applied. 97 """ 98 self.send( 99 NotificationStatus.INFO, 100 f"Plan `{plan_id}` apply started for environment `{environment}`.", 101 ) 102 103 def notify_apply_end( 104 self, environment: str, plan_id: str, *args: t.Any, **kwargs: t.Any 105 ) -> None: 106 """Notify when an apply ends. 107 108 Args: 109 environment: The target environment of the plan. 110 plan_id: plan_id that was applied. 111 """ 112 self.send( 113 NotificationStatus.SUCCESS, 114 f"Plan `{plan_id}` apply finished for environment `{environment}`.", 115 ) 116 117 def notify_run_start(self, environment: str, *args: t.Any, **kwargs: t.Any) -> None: 118 """Notify when a SQLMesh run starts. 119 120 Args: 121 environment: The target environment of the run. 122 """ 123 self.send(NotificationStatus.INFO, f"SQLMesh run started for environment `{environment}`.") 124 125 def notify_run_end(self, environment: str, *args: t.Any, **kwargs: t.Any) -> None: 126 """Notify when a SQLMesh run ends. 127 128 Args: 129 environment: The target environment of the run. 130 """ 131 self.send( 132 NotificationStatus.SUCCESS, f"SQLMesh run finished for environment `{environment}`." 133 ) 134 135 def notify_migration_start(self, *args: t.Any, **kwargs: t.Any) -> None: 136 """Notify when a SQLMesh migration starts.""" 137 self.send(NotificationStatus.INFO, "SQLMesh migration started.") 138 139 def notify_migration_end(self, *args: t.Any, **kwargs: t.Any) -> None: 140 """Notify when a SQLMesh migration ends.""" 141 self.send(NotificationStatus.SUCCESS, "SQLMesh migration finished.") 142 143 def notify_apply_failure( 144 self, environment: str, plan_id: str, exc: str, *args: t.Any, **kwargs: t.Any 145 ) -> None: 146 """Notify in the case of an apply failure. 147 148 Args: 149 environment: The target environment of the run. 150 plan_id: The plan id of the failed apply 151 exc: The exception stack trace. 152 """ 153 self.send( 154 NotificationStatus.FAILURE, 155 f"Plan `{plan_id}` in environment `{environment}` apply failed.", 156 exc=exc, 157 ) 158 159 def notify_run_failure(self, exc: str, *args: t.Any, **kwargs: t.Any) -> None: 160 """Notify in the case of a run failure. 161 162 Args: 163 exc: The exception stack trace. 164 """ 165 self.send(NotificationStatus.FAILURE, "SQLMesh run failed.", exc=exc) 166 167 def notify_audit_failure(self, audit_error: AuditError, *args: t.Any, **kwargs: t.Any) -> None: 168 """Notify in the case of an audit failure. 169 170 Args: 171 audit_error: The AuditError object. 172 """ 173 self.send(NotificationStatus.FAILURE, "Audit failure.", audit_error=audit_error) 174 175 def notify_migration_failure(self, exc: str, *args: t.Any, **kwargs: t.Any) -> None: 176 """Notify in the case of a migration failure. 177 178 Args: 179 exc: The exception stack trace. 180 """ 181 self.send(NotificationStatus.FAILURE, "SQLMesh migration failed.", exc=exc) 182 183 @property 184 def is_configured(self) -> bool: 185 return True 186 187 188class BaseTextBasedNotificationTarget(BaseNotificationTarget): 189 """ 190 A base class for unstructured notification targets (e.g.: console, email, etc.) 191 """ 192 193 def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None: 194 """Send the notification message as text.""" 195 196 def send( 197 self, 198 notification_status: NotificationStatus, 199 msg: str, 200 audit_error: t.Optional[AuditError] = None, 201 exc: t.Optional[str] = None, 202 **kwargs: t.Any, 203 ) -> None: 204 error = None 205 if audit_error: 206 error = str(audit_error) 207 elif exc: 208 error = exc 209 210 self.send_text_message(notification_status, msg if error is None else f"{msg}\n{error}") 211 212 213class ConsoleNotificationTarget(BaseTextBasedNotificationTarget): 214 """ 215 Example console notification target. Keeping this around for testing purposes. 216 """ 217 218 type_: t.Literal["console"] = Field(alias="type", default="console") 219 _console: t.Optional[Console] = None 220 221 @property 222 def console(self) -> Console: 223 if not self._console: 224 self._console = get_console() 225 return self._console 226 227 def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None: 228 if notification_status.is_success: 229 self.console.log_success(msg) 230 elif notification_status.is_failure: 231 self.console.log_error(msg) 232 else: 233 self.console.log_status_update(msg) 234 235 236class BaseSlackNotificationTarget(BaseNotificationTarget): 237 def send( 238 self, 239 notification_status: NotificationStatus, 240 msg: str, 241 audit_error: t.Optional[AuditError] = None, 242 exc: t.Optional[str] = None, 243 **kwargs: t.Any, 244 ) -> None: 245 status_emoji = { 246 NotificationStatus.PROGRESS: slack.SlackAlertIcon.START, 247 NotificationStatus.SUCCESS: slack.SlackAlertIcon.SUCCESS, 248 NotificationStatus.FAILURE: slack.SlackAlertIcon.FAILURE, 249 NotificationStatus.WARNING: slack.SlackAlertIcon.WARNING, 250 NotificationStatus.INFO: slack.SlackAlertIcon.INFO, 251 } 252 253 composed = slack.message().add_primary_blocks( 254 slack.header_block(f"{status_emoji[notification_status]} SQLMesh Notification"), 255 slack.context_block(f"*Status:* `{notification_status.value}`"), 256 slack.divider_block(), 257 slack.text_section_block(f"*Message*: {msg}"), 258 ) 259 260 details = [] 261 if audit_error: 262 details = [ 263 slack.fields_section_block( 264 f"*Audit*: `{audit_error.audit_name}`", 265 f"*Model*: `{audit_error.model_name}`", 266 f"*Count*: `{audit_error.count}`", 267 ), 268 slack.preformatted_rich_text_block(audit_error.sql(pretty=True)), 269 ] 270 elif exc: 271 details = [slack.preformatted_rich_text_block(exc)] 272 273 composed.add_primary_blocks( 274 *details, 275 slack.divider_block(), 276 slack.context_block( 277 f"*SQLMesh Version:* {_sqlmesh_version()}", f"*Python Version:* {sys.version}" 278 ), 279 ) 280 281 composed.add_text(msg) 282 283 self._send_slack_message( 284 composed=composed.slack_message, 285 ) 286 287 def _send_slack_message(self, composed: slack.TSlackMessage) -> None: 288 """Send a composed message Slack. 289 290 Args: 291 composed: the formatted message to send to Slack 292 """ 293 294 295class SlackWebhookNotificationTarget(BaseSlackNotificationTarget): 296 url: t.Optional[str] = None 297 type_: t.Literal["slack_webhook"] = Field(alias="type", default="slack_webhook") 298 _client: t.Optional[WebhookClient] = None 299 300 @property 301 def client(self) -> WebhookClient: 302 if not self._client: 303 try: 304 from slack_sdk import WebhookClient 305 except ModuleNotFoundError as e: 306 raise MissingDependencyError( 307 "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them." 308 ) from e 309 310 if not self.url: 311 raise ConfigError("Missing Slack webhook URL") 312 313 self._client = WebhookClient(url=self.url) 314 return self._client 315 316 def _send_slack_message(self, composed: slack.TSlackMessage) -> None: 317 self.client.send( 318 text=composed["text"], 319 blocks=composed["blocks"], 320 attachments=composed["attachments"], # type: ignore 321 ) 322 323 @property 324 def is_configured(self) -> bool: 325 return bool(self.url) 326 327 328class SlackApiNotificationTarget(BaseSlackNotificationTarget): 329 token: t.Optional[str] = None 330 channel: t.Optional[str] = None 331 type_: t.Literal["slack_api"] = Field(alias="type", default="slack_api") 332 _client: t.Optional[WebClient] = None 333 334 @property 335 def client(self) -> WebClient: 336 if not self._client: 337 try: 338 from slack_sdk import WebClient 339 except ModuleNotFoundError as e: 340 raise MissingDependencyError( 341 "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them." 342 ) from e 343 344 self._client = WebClient(token=self.token) 345 return self._client 346 347 def _send_slack_message(self, composed: slack.TSlackMessage) -> None: 348 if not self.channel: 349 raise ConfigError("Missing Slack channel for notification") 350 351 self.client.chat_postMessage( 352 channel=self.channel, 353 text=composed["text"], 354 blocks=composed["blocks"], 355 attachments=composed["attachments"], # type: ignore 356 ) 357 358 @property 359 def is_configured(self) -> bool: 360 return all((self.token, self.channel)) 361 362 363class BasicSMTPNotificationTarget(BaseTextBasedNotificationTarget): 364 host: t.Optional[str] = None 365 port: int = 465 366 user: t.Optional[str] = None 367 password: t.Optional[SecretStr] = None 368 sender: t.Optional[str] = None 369 recipients: t.Optional[t.FrozenSet[str]] = None 370 subject: t.Optional[str] = "SQLMesh Notification" 371 type_: t.Literal["smtp"] = Field(alias="type", default="smtp") 372 373 def send_text_message( 374 self, 375 notification_status: NotificationStatus, 376 msg: str, 377 ) -> None: 378 if not self.host: 379 raise ConfigError("Missing SMTP host for notification") 380 381 email = EmailMessage() 382 email["Subject"] = self.subject 383 email["To"] = ",".join(self.recipients or []) 384 email["From"] = self.sender 385 email.set_content(msg) 386 with smtplib.SMTP_SSL(host=self.host, port=self.port) as smtp: 387 if self.user and self.password: 388 smtp.login(user=self.user, password=self.password.get_secret_value()) 389 smtp.send_message(email) 390 391 @property 392 def is_configured(self) -> bool: 393 return all((self.host, self.user, self.password, self.sender)) 394 395 396class GenericNotificationTarget(BaseNotificationTarget): 397 """A generic notification target that can be used to create custom notification targets. 398 399 This target is not meant to be used directly, but rather as a base class for custom notification targets. 400 401 The `send` method should be overridden to provide the actual notification functionality. 402 403 Example: 404 ```python 405 class MyCustomNotificationTarget(GenericNotificationTarget): 406 def send(self, notification_status: NotificationStatus, msg: str, audit_error: t.Optional[AuditError] = None, exc: t.Optional[str] = None, **kwargs: t.Any) -> None: 407 error = None 408 if audit_error: 409 error = str(audit_error) 410 elif exc: 411 error = exc 412 413 if error: 414 msg = f"{error} - {msg}" 415 print(f"Sending notification: {msg}") 416 ``` 417 """ 418 419 type_: t.Literal["generic"] = Field(alias="type", default="generic") 420 421 422NotificationTarget = t.Annotated[ 423 t.Union[ 424 BasicSMTPNotificationTarget, 425 GenericNotificationTarget, 426 ConsoleNotificationTarget, 427 SlackApiNotificationTarget, 428 SlackWebhookNotificationTarget, 429 ], 430 Field(discriminator="type_"), 431] 432 433 434class NotificationTargetManager: 435 """Wrapper around a list of notification targets. 436 437 Calling a notification target's "notify_" method on this object will call it 438 on all registered notification targets. 439 """ 440 441 def __init__( 442 self, 443 notification_targets: t.Dict[NotificationEvent, t.Set[NotificationTarget]] | None = None, 444 user_notification_targets: t.Dict[str, t.Set[NotificationTarget]] | None = None, 445 username: str | None = None, 446 ) -> None: 447 self.notification_targets = notification_targets or {} 448 self.user_notification_targets = user_notification_targets or {} 449 self.username = username 450 451 def notify(self, event: NotificationEvent, *args: t.Any, **kwargs: t.Any) -> None: 452 """Call the 'notify_`event`' function of all notification targets that care about the event.""" 453 if self.username: 454 self.notify_user(event, self.username, *args, **kwargs) 455 else: 456 for notification_target in self.notification_targets.get(event, set()): 457 notify_func = self._get_notification_function(notification_target, event) 458 notify_func(*args, **kwargs) 459 460 def notify_user( 461 self, event: NotificationEvent, username: str, *args: t.Any, **kwargs: t.Any 462 ) -> None: 463 """Call the 'notify_`event`' function of the user's notification targets that care about the event.""" 464 notification_targets = self.user_notification_targets.get(username, set()) 465 for notification_target in notification_targets: 466 if event in notification_target.notify_on: 467 notify_func = self._get_notification_function(notification_target, event) 468 notify_func(*args, **kwargs) 469 470 def _get_notification_function( 471 self, notification_target: NotificationTarget, event: NotificationEvent 472 ) -> t.Callable: 473 """Fetch the function for a notification event""" 474 return getattr(notification_target, f"notify_{event.value}")
30class NotificationStatus(str, Enum): 31 SUCCESS = "success" 32 FAILURE = "failure" 33 WARNING = "warning" 34 INFO = "info" 35 PROGRESS = "progress" 36 37 @property 38 def is_success(self) -> bool: 39 return self == NotificationStatus.SUCCESS 40 41 @property 42 def is_failure(self) -> bool: 43 return self == NotificationStatus.FAILURE 44 45 @property 46 def is_info(self) -> bool: 47 return self == NotificationStatus.INFO 48 49 @property 50 def is_warning(self) -> bool: 51 return self == NotificationStatus.WARNING 52 53 @property 54 def is_progress(self) -> bool: 55 return self == NotificationStatus.PROGRESS
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
58class NotificationEvent(str, Enum): 59 APPLY_START = "apply_start" 60 APPLY_END = "apply_end" 61 RUN_START = "run_start" 62 RUN_END = "run_end" 63 MIGRATION_START = "migration_start" 64 MIGRATION_END = "migration_end" 65 APPLY_FAILURE = "apply_failure" 66 RUN_FAILURE = "run_failure" 67 AUDIT_FAILURE = "audit_failure" 68 MIGRATION_FAILURE = "migration_failure"
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
71class BaseNotificationTarget(PydanticModel, frozen=True): 72 """ 73 Base notification target model. Provides a command for sending notifications that is currently only used 74 by the built-in scheduler. 75 76 Notification functions follow a naming convention of `notify_` + NotificationEvent value. 77 """ 78 79 type_: str 80 notify_on: t.FrozenSet[NotificationEvent] = frozenset() 81 82 def send(self, notification_status: NotificationStatus, msg: str, **kwargs: t.Any) -> None: 83 """Sends notification with the provided message. 84 85 Args: 86 notification_status: The status of the notification. One of: success, failure, warning, info, or progress. 87 msg: The message to send. 88 """ 89 90 def notify_apply_start( 91 self, environment: str, plan_id: str, *args: t.Any, **kwargs: t.Any 92 ) -> None: 93 """Notify when an apply starts. 94 95 Args: 96 environment: The target environment of the plan. 97 plan_id: plan_id that is being applied. 98 """ 99 self.send( 100 NotificationStatus.INFO, 101 f"Plan `{plan_id}` apply started for environment `{environment}`.", 102 ) 103 104 def notify_apply_end( 105 self, environment: str, plan_id: str, *args: t.Any, **kwargs: t.Any 106 ) -> None: 107 """Notify when an apply ends. 108 109 Args: 110 environment: The target environment of the plan. 111 plan_id: plan_id that was applied. 112 """ 113 self.send( 114 NotificationStatus.SUCCESS, 115 f"Plan `{plan_id}` apply finished for environment `{environment}`.", 116 ) 117 118 def notify_run_start(self, environment: str, *args: t.Any, **kwargs: t.Any) -> None: 119 """Notify when a SQLMesh run starts. 120 121 Args: 122 environment: The target environment of the run. 123 """ 124 self.send(NotificationStatus.INFO, f"SQLMesh run started for environment `{environment}`.") 125 126 def notify_run_end(self, environment: str, *args: t.Any, **kwargs: t.Any) -> None: 127 """Notify when a SQLMesh run ends. 128 129 Args: 130 environment: The target environment of the run. 131 """ 132 self.send( 133 NotificationStatus.SUCCESS, f"SQLMesh run finished for environment `{environment}`." 134 ) 135 136 def notify_migration_start(self, *args: t.Any, **kwargs: t.Any) -> None: 137 """Notify when a SQLMesh migration starts.""" 138 self.send(NotificationStatus.INFO, "SQLMesh migration started.") 139 140 def notify_migration_end(self, *args: t.Any, **kwargs: t.Any) -> None: 141 """Notify when a SQLMesh migration ends.""" 142 self.send(NotificationStatus.SUCCESS, "SQLMesh migration finished.") 143 144 def notify_apply_failure( 145 self, environment: str, plan_id: str, exc: str, *args: t.Any, **kwargs: t.Any 146 ) -> None: 147 """Notify in the case of an apply failure. 148 149 Args: 150 environment: The target environment of the run. 151 plan_id: The plan id of the failed apply 152 exc: The exception stack trace. 153 """ 154 self.send( 155 NotificationStatus.FAILURE, 156 f"Plan `{plan_id}` in environment `{environment}` apply failed.", 157 exc=exc, 158 ) 159 160 def notify_run_failure(self, exc: str, *args: t.Any, **kwargs: t.Any) -> None: 161 """Notify in the case of a run failure. 162 163 Args: 164 exc: The exception stack trace. 165 """ 166 self.send(NotificationStatus.FAILURE, "SQLMesh run failed.", exc=exc) 167 168 def notify_audit_failure(self, audit_error: AuditError, *args: t.Any, **kwargs: t.Any) -> None: 169 """Notify in the case of an audit failure. 170 171 Args: 172 audit_error: The AuditError object. 173 """ 174 self.send(NotificationStatus.FAILURE, "Audit failure.", audit_error=audit_error) 175 176 def notify_migration_failure(self, exc: str, *args: t.Any, **kwargs: t.Any) -> None: 177 """Notify in the case of a migration failure. 178 179 Args: 180 exc: The exception stack trace. 181 """ 182 self.send(NotificationStatus.FAILURE, "SQLMesh migration failed.", exc=exc) 183 184 @property 185 def is_configured(self) -> bool: 186 return True
Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler.
Notification functions follow a naming convention of notify_ + NotificationEvent value.
82 def send(self, notification_status: NotificationStatus, msg: str, **kwargs: t.Any) -> None: 83 """Sends notification with the provided message. 84 85 Args: 86 notification_status: The status of the notification. One of: success, failure, warning, info, or progress. 87 msg: The message to send. 88 """
Sends notification with the provided message.
Arguments:
- notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
- msg: The message to send.
90 def notify_apply_start( 91 self, environment: str, plan_id: str, *args: t.Any, **kwargs: t.Any 92 ) -> None: 93 """Notify when an apply starts. 94 95 Args: 96 environment: The target environment of the plan. 97 plan_id: plan_id that is being applied. 98 """ 99 self.send( 100 NotificationStatus.INFO, 101 f"Plan `{plan_id}` apply started for environment `{environment}`.", 102 )
Notify when an apply starts.
Arguments:
- environment: The target environment of the plan.
- plan_id: plan_id that is being applied.
104 def notify_apply_end( 105 self, environment: str, plan_id: str, *args: t.Any, **kwargs: t.Any 106 ) -> None: 107 """Notify when an apply ends. 108 109 Args: 110 environment: The target environment of the plan. 111 plan_id: plan_id that was applied. 112 """ 113 self.send( 114 NotificationStatus.SUCCESS, 115 f"Plan `{plan_id}` apply finished for environment `{environment}`.", 116 )
Notify when an apply ends.
Arguments:
- environment: The target environment of the plan.
- plan_id: plan_id that was applied.
118 def notify_run_start(self, environment: str, *args: t.Any, **kwargs: t.Any) -> None: 119 """Notify when a SQLMesh run starts. 120 121 Args: 122 environment: The target environment of the run. 123 """ 124 self.send(NotificationStatus.INFO, f"SQLMesh run started for environment `{environment}`.")
Notify when a SQLMesh run starts.
Arguments:
- environment: The target environment of the run.
126 def notify_run_end(self, environment: str, *args: t.Any, **kwargs: t.Any) -> None: 127 """Notify when a SQLMesh run ends. 128 129 Args: 130 environment: The target environment of the run. 131 """ 132 self.send( 133 NotificationStatus.SUCCESS, f"SQLMesh run finished for environment `{environment}`." 134 )
Notify when a SQLMesh run ends.
Arguments:
- environment: The target environment of the run.
136 def notify_migration_start(self, *args: t.Any, **kwargs: t.Any) -> None: 137 """Notify when a SQLMesh migration starts.""" 138 self.send(NotificationStatus.INFO, "SQLMesh migration started.")
Notify when a SQLMesh migration starts.
140 def notify_migration_end(self, *args: t.Any, **kwargs: t.Any) -> None: 141 """Notify when a SQLMesh migration ends.""" 142 self.send(NotificationStatus.SUCCESS, "SQLMesh migration finished.")
Notify when a SQLMesh migration ends.
144 def notify_apply_failure( 145 self, environment: str, plan_id: str, exc: str, *args: t.Any, **kwargs: t.Any 146 ) -> None: 147 """Notify in the case of an apply failure. 148 149 Args: 150 environment: The target environment of the run. 151 plan_id: The plan id of the failed apply 152 exc: The exception stack trace. 153 """ 154 self.send( 155 NotificationStatus.FAILURE, 156 f"Plan `{plan_id}` in environment `{environment}` apply failed.", 157 exc=exc, 158 )
Notify in the case of an apply failure.
Arguments:
- environment: The target environment of the run.
- plan_id: The plan id of the failed apply
- exc: The exception stack trace.
160 def notify_run_failure(self, exc: str, *args: t.Any, **kwargs: t.Any) -> None: 161 """Notify in the case of a run failure. 162 163 Args: 164 exc: The exception stack trace. 165 """ 166 self.send(NotificationStatus.FAILURE, "SQLMesh run failed.", exc=exc)
Notify in the case of a run failure.
Arguments:
- exc: The exception stack trace.
168 def notify_audit_failure(self, audit_error: AuditError, *args: t.Any, **kwargs: t.Any) -> None: 169 """Notify in the case of an audit failure. 170 171 Args: 172 audit_error: The AuditError object. 173 """ 174 self.send(NotificationStatus.FAILURE, "Audit failure.", audit_error=audit_error)
Notify in the case of an audit failure.
Arguments:
- audit_error: The AuditError object.
176 def notify_migration_failure(self, exc: str, *args: t.Any, **kwargs: t.Any) -> None: 177 """Notify in the case of a migration failure. 178 179 Args: 180 exc: The exception stack trace. 181 """ 182 self.send(NotificationStatus.FAILURE, "SQLMesh migration failed.", exc=exc)
Notify in the case of a migration failure.
Arguments:
- exc: The exception stack trace.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
189class BaseTextBasedNotificationTarget(BaseNotificationTarget): 190 """ 191 A base class for unstructured notification targets (e.g.: console, email, etc.) 192 """ 193 194 def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None: 195 """Send the notification message as text.""" 196 197 def send( 198 self, 199 notification_status: NotificationStatus, 200 msg: str, 201 audit_error: t.Optional[AuditError] = None, 202 exc: t.Optional[str] = None, 203 **kwargs: t.Any, 204 ) -> None: 205 error = None 206 if audit_error: 207 error = str(audit_error) 208 elif exc: 209 error = exc 210 211 self.send_text_message(notification_status, msg if error is None else f"{msg}\n{error}")
A base class for unstructured notification targets (e.g.: console, email, etc.)
194 def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None: 195 """Send the notification message as text."""
Send the notification message as text.
197 def send( 198 self, 199 notification_status: NotificationStatus, 200 msg: str, 201 audit_error: t.Optional[AuditError] = None, 202 exc: t.Optional[str] = None, 203 **kwargs: t.Any, 204 ) -> None: 205 error = None 206 if audit_error: 207 error = str(audit_error) 208 elif exc: 209 error = exc 210 211 self.send_text_message(notification_status, msg if error is None else f"{msg}\n{error}")
Sends notification with the provided message.
Arguments:
- notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
- msg: The message to send.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
214class ConsoleNotificationTarget(BaseTextBasedNotificationTarget): 215 """ 216 Example console notification target. Keeping this around for testing purposes. 217 """ 218 219 type_: t.Literal["console"] = Field(alias="type", default="console") 220 _console: t.Optional[Console] = None 221 222 @property 223 def console(self) -> Console: 224 if not self._console: 225 self._console = get_console() 226 return self._console 227 228 def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None: 229 if notification_status.is_success: 230 self.console.log_success(msg) 231 elif notification_status.is_failure: 232 self.console.log_error(msg) 233 else: 234 self.console.log_status_update(msg)
Example console notification target. Keeping this around for testing purposes.
228 def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None: 229 if notification_status.is_success: 230 self.console.log_success(msg) 231 elif notification_status.is_failure: 232 self.console.log_error(msg) 233 else: 234 self.console.log_status_update(msg)
Send the notification message as text.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
237class BaseSlackNotificationTarget(BaseNotificationTarget): 238 def send( 239 self, 240 notification_status: NotificationStatus, 241 msg: str, 242 audit_error: t.Optional[AuditError] = None, 243 exc: t.Optional[str] = None, 244 **kwargs: t.Any, 245 ) -> None: 246 status_emoji = { 247 NotificationStatus.PROGRESS: slack.SlackAlertIcon.START, 248 NotificationStatus.SUCCESS: slack.SlackAlertIcon.SUCCESS, 249 NotificationStatus.FAILURE: slack.SlackAlertIcon.FAILURE, 250 NotificationStatus.WARNING: slack.SlackAlertIcon.WARNING, 251 NotificationStatus.INFO: slack.SlackAlertIcon.INFO, 252 } 253 254 composed = slack.message().add_primary_blocks( 255 slack.header_block(f"{status_emoji[notification_status]} SQLMesh Notification"), 256 slack.context_block(f"*Status:* `{notification_status.value}`"), 257 slack.divider_block(), 258 slack.text_section_block(f"*Message*: {msg}"), 259 ) 260 261 details = [] 262 if audit_error: 263 details = [ 264 slack.fields_section_block( 265 f"*Audit*: `{audit_error.audit_name}`", 266 f"*Model*: `{audit_error.model_name}`", 267 f"*Count*: `{audit_error.count}`", 268 ), 269 slack.preformatted_rich_text_block(audit_error.sql(pretty=True)), 270 ] 271 elif exc: 272 details = [slack.preformatted_rich_text_block(exc)] 273 274 composed.add_primary_blocks( 275 *details, 276 slack.divider_block(), 277 slack.context_block( 278 f"*SQLMesh Version:* {_sqlmesh_version()}", f"*Python Version:* {sys.version}" 279 ), 280 ) 281 282 composed.add_text(msg) 283 284 self._send_slack_message( 285 composed=composed.slack_message, 286 ) 287 288 def _send_slack_message(self, composed: slack.TSlackMessage) -> None: 289 """Send a composed message Slack. 290 291 Args: 292 composed: the formatted message to send to Slack 293 """
Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler.
Notification functions follow a naming convention of notify_ + NotificationEvent value.
238 def send( 239 self, 240 notification_status: NotificationStatus, 241 msg: str, 242 audit_error: t.Optional[AuditError] = None, 243 exc: t.Optional[str] = None, 244 **kwargs: t.Any, 245 ) -> None: 246 status_emoji = { 247 NotificationStatus.PROGRESS: slack.SlackAlertIcon.START, 248 NotificationStatus.SUCCESS: slack.SlackAlertIcon.SUCCESS, 249 NotificationStatus.FAILURE: slack.SlackAlertIcon.FAILURE, 250 NotificationStatus.WARNING: slack.SlackAlertIcon.WARNING, 251 NotificationStatus.INFO: slack.SlackAlertIcon.INFO, 252 } 253 254 composed = slack.message().add_primary_blocks( 255 slack.header_block(f"{status_emoji[notification_status]} SQLMesh Notification"), 256 slack.context_block(f"*Status:* `{notification_status.value}`"), 257 slack.divider_block(), 258 slack.text_section_block(f"*Message*: {msg}"), 259 ) 260 261 details = [] 262 if audit_error: 263 details = [ 264 slack.fields_section_block( 265 f"*Audit*: `{audit_error.audit_name}`", 266 f"*Model*: `{audit_error.model_name}`", 267 f"*Count*: `{audit_error.count}`", 268 ), 269 slack.preformatted_rich_text_block(audit_error.sql(pretty=True)), 270 ] 271 elif exc: 272 details = [slack.preformatted_rich_text_block(exc)] 273 274 composed.add_primary_blocks( 275 *details, 276 slack.divider_block(), 277 slack.context_block( 278 f"*SQLMesh Version:* {_sqlmesh_version()}", f"*Python Version:* {sys.version}" 279 ), 280 ) 281 282 composed.add_text(msg) 283 284 self._send_slack_message( 285 composed=composed.slack_message, 286 )
Sends notification with the provided message.
Arguments:
- notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
- msg: The message to send.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
296class SlackWebhookNotificationTarget(BaseSlackNotificationTarget): 297 url: t.Optional[str] = None 298 type_: t.Literal["slack_webhook"] = Field(alias="type", default="slack_webhook") 299 _client: t.Optional[WebhookClient] = None 300 301 @property 302 def client(self) -> WebhookClient: 303 if not self._client: 304 try: 305 from slack_sdk import WebhookClient 306 except ModuleNotFoundError as e: 307 raise MissingDependencyError( 308 "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them." 309 ) from e 310 311 if not self.url: 312 raise ConfigError("Missing Slack webhook URL") 313 314 self._client = WebhookClient(url=self.url) 315 return self._client 316 317 def _send_slack_message(self, composed: slack.TSlackMessage) -> None: 318 self.client.send( 319 text=composed["text"], 320 blocks=composed["blocks"], 321 attachments=composed["attachments"], # type: ignore 322 ) 323 324 @property 325 def is_configured(self) -> bool: 326 return bool(self.url)
Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler.
Notification functions follow a naming convention of notify_ + NotificationEvent value.
301 @property 302 def client(self) -> WebhookClient: 303 if not self._client: 304 try: 305 from slack_sdk import WebhookClient 306 except ModuleNotFoundError as e: 307 raise MissingDependencyError( 308 "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them." 309 ) from e 310 311 if not self.url: 312 raise ConfigError("Missing Slack webhook URL") 313 314 self._client = WebhookClient(url=self.url) 315 return self._client
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
329class SlackApiNotificationTarget(BaseSlackNotificationTarget): 330 token: t.Optional[str] = None 331 channel: t.Optional[str] = None 332 type_: t.Literal["slack_api"] = Field(alias="type", default="slack_api") 333 _client: t.Optional[WebClient] = None 334 335 @property 336 def client(self) -> WebClient: 337 if not self._client: 338 try: 339 from slack_sdk import WebClient 340 except ModuleNotFoundError as e: 341 raise MissingDependencyError( 342 "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them." 343 ) from e 344 345 self._client = WebClient(token=self.token) 346 return self._client 347 348 def _send_slack_message(self, composed: slack.TSlackMessage) -> None: 349 if not self.channel: 350 raise ConfigError("Missing Slack channel for notification") 351 352 self.client.chat_postMessage( 353 channel=self.channel, 354 text=composed["text"], 355 blocks=composed["blocks"], 356 attachments=composed["attachments"], # type: ignore 357 ) 358 359 @property 360 def is_configured(self) -> bool: 361 return all((self.token, self.channel))
Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler.
Notification functions follow a naming convention of notify_ + NotificationEvent value.
335 @property 336 def client(self) -> WebClient: 337 if not self._client: 338 try: 339 from slack_sdk import WebClient 340 except ModuleNotFoundError as e: 341 raise MissingDependencyError( 342 "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them." 343 ) from e 344 345 self._client = WebClient(token=self.token) 346 return self._client
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
364class BasicSMTPNotificationTarget(BaseTextBasedNotificationTarget): 365 host: t.Optional[str] = None 366 port: int = 465 367 user: t.Optional[str] = None 368 password: t.Optional[SecretStr] = None 369 sender: t.Optional[str] = None 370 recipients: t.Optional[t.FrozenSet[str]] = None 371 subject: t.Optional[str] = "SQLMesh Notification" 372 type_: t.Literal["smtp"] = Field(alias="type", default="smtp") 373 374 def send_text_message( 375 self, 376 notification_status: NotificationStatus, 377 msg: str, 378 ) -> None: 379 if not self.host: 380 raise ConfigError("Missing SMTP host for notification") 381 382 email = EmailMessage() 383 email["Subject"] = self.subject 384 email["To"] = ",".join(self.recipients or []) 385 email["From"] = self.sender 386 email.set_content(msg) 387 with smtplib.SMTP_SSL(host=self.host, port=self.port) as smtp: 388 if self.user and self.password: 389 smtp.login(user=self.user, password=self.password.get_secret_value()) 390 smtp.send_message(email) 391 392 @property 393 def is_configured(self) -> bool: 394 return all((self.host, self.user, self.password, self.sender))
A base class for unstructured notification targets (e.g.: console, email, etc.)
374 def send_text_message( 375 self, 376 notification_status: NotificationStatus, 377 msg: str, 378 ) -> None: 379 if not self.host: 380 raise ConfigError("Missing SMTP host for notification") 381 382 email = EmailMessage() 383 email["Subject"] = self.subject 384 email["To"] = ",".join(self.recipients or []) 385 email["From"] = self.sender 386 email.set_content(msg) 387 with smtplib.SMTP_SSL(host=self.host, port=self.port) as smtp: 388 if self.user and self.password: 389 smtp.login(user=self.user, password=self.password.get_secret_value()) 390 smtp.send_message(email)
Send the notification message as text.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
397class GenericNotificationTarget(BaseNotificationTarget): 398 """A generic notification target that can be used to create custom notification targets. 399 400 This target is not meant to be used directly, but rather as a base class for custom notification targets. 401 402 The `send` method should be overridden to provide the actual notification functionality. 403 404 Example: 405 ```python 406 class MyCustomNotificationTarget(GenericNotificationTarget): 407 def send(self, notification_status: NotificationStatus, msg: str, audit_error: t.Optional[AuditError] = None, exc: t.Optional[str] = None, **kwargs: t.Any) -> None: 408 error = None 409 if audit_error: 410 error = str(audit_error) 411 elif exc: 412 error = exc 413 414 if error: 415 msg = f"{error} - {msg}" 416 print(f"Sending notification: {msg}") 417 ``` 418 """ 419 420 type_: t.Literal["generic"] = Field(alias="type", default="generic")
A generic notification target that can be used to create custom notification targets.
This target is not meant to be used directly, but rather as a base class for custom notification targets.
The send method should be overridden to provide the actual notification functionality.
Example:
class MyCustomNotificationTarget(GenericNotificationTarget):
def send(self, notification_status: NotificationStatus, msg: str, audit_error: t.Optional[AuditError] = None, exc: t.Optional[str] = None, **kwargs: t.Any) -> None:
error = None
if audit_error:
error = str(audit_error)
elif exc:
error = exc
if error:
msg = f"{error} - {msg}"
print(f"Sending notification: {msg}")
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
435class NotificationTargetManager: 436 """Wrapper around a list of notification targets. 437 438 Calling a notification target's "notify_" method on this object will call it 439 on all registered notification targets. 440 """ 441 442 def __init__( 443 self, 444 notification_targets: t.Dict[NotificationEvent, t.Set[NotificationTarget]] | None = None, 445 user_notification_targets: t.Dict[str, t.Set[NotificationTarget]] | None = None, 446 username: str | None = None, 447 ) -> None: 448 self.notification_targets = notification_targets or {} 449 self.user_notification_targets = user_notification_targets or {} 450 self.username = username 451 452 def notify(self, event: NotificationEvent, *args: t.Any, **kwargs: t.Any) -> None: 453 """Call the 'notify_`event`' function of all notification targets that care about the event.""" 454 if self.username: 455 self.notify_user(event, self.username, *args, **kwargs) 456 else: 457 for notification_target in self.notification_targets.get(event, set()): 458 notify_func = self._get_notification_function(notification_target, event) 459 notify_func(*args, **kwargs) 460 461 def notify_user( 462 self, event: NotificationEvent, username: str, *args: t.Any, **kwargs: t.Any 463 ) -> None: 464 """Call the 'notify_`event`' function of the user's notification targets that care about the event.""" 465 notification_targets = self.user_notification_targets.get(username, set()) 466 for notification_target in notification_targets: 467 if event in notification_target.notify_on: 468 notify_func = self._get_notification_function(notification_target, event) 469 notify_func(*args, **kwargs) 470 471 def _get_notification_function( 472 self, notification_target: NotificationTarget, event: NotificationEvent 473 ) -> t.Callable: 474 """Fetch the function for a notification event""" 475 return getattr(notification_target, f"notify_{event.value}")
Wrapper around a list of notification targets.
Calling a notification target's "notify_" method on this object will call it on all registered notification targets.
442 def __init__( 443 self, 444 notification_targets: t.Dict[NotificationEvent, t.Set[NotificationTarget]] | None = None, 445 user_notification_targets: t.Dict[str, t.Set[NotificationTarget]] | None = None, 446 username: str | None = None, 447 ) -> None: 448 self.notification_targets = notification_targets or {} 449 self.user_notification_targets = user_notification_targets or {} 450 self.username = username
452 def notify(self, event: NotificationEvent, *args: t.Any, **kwargs: t.Any) -> None: 453 """Call the 'notify_`event`' function of all notification targets that care about the event.""" 454 if self.username: 455 self.notify_user(event, self.username, *args, **kwargs) 456 else: 457 for notification_target in self.notification_targets.get(event, set()): 458 notify_func = self._get_notification_function(notification_target, event) 459 notify_func(*args, **kwargs)
Call the 'notify_event' function of all notification targets that care about the event.
461 def notify_user( 462 self, event: NotificationEvent, username: str, *args: t.Any, **kwargs: t.Any 463 ) -> None: 464 """Call the 'notify_`event`' function of the user's notification targets that care about the event.""" 465 notification_targets = self.user_notification_targets.get(username, set()) 466 for notification_target in notification_targets: 467 if event in notification_target.notify_on: 468 notify_func = self._get_notification_function(notification_target, event) 469 notify_func(*args, **kwargs)
Call the 'notify_event' function of the user's notification targets that care about the event.