Edit on GitHub

sqlmesh.core.notification_target

  1from __future__ import annotations
  2
  3import functools
  4import smtplib
  5import sys
  6import typing as t
  7from email.message import EmailMessage
  8from enum import Enum
  9
 10from pydantic import Field, SecretStr
 11
 12from sqlmesh.core.console import Console, get_console
 13from sqlmesh.integrations import slack
 14from sqlmesh.utils.errors import AuditError, ConfigError, MissingDependencyError
 15from sqlmesh.utils.pydantic import PydanticModel
 16
 17if sys.version_info >= (3, 8):
 18    from typing import Literal
 19else:
 20    from typing_extensions import Literal
 21
 22if sys.version_info >= (3, 9):
 23    from typing import Annotated
 24else:
 25    from typing_extensions import Annotated
 26
 27if t.TYPE_CHECKING:
 28    from slack_sdk import WebClient, WebhookClient
 29
 30
 31def _sqlmesh_version() -> str:
 32    try:
 33        from sqlmesh import __version__
 34
 35        return __version__
 36    except ImportError:
 37        return "0.0.0"
 38
 39
 40NOTIFICATION_FUNCTIONS: t.Dict[NotificationEvent, str] = {}
 41
 42
 43class NotificationStatus(str, Enum):
 44    SUCCESS = "success"
 45    FAILURE = "failure"
 46    WARNING = "warning"
 47    INFO = "info"
 48    PROGRESS = "progress"
 49
 50    @property
 51    def is_success(self) -> bool:
 52        return self == NotificationStatus.SUCCESS
 53
 54    @property
 55    def is_failure(self) -> bool:
 56        return self == NotificationStatus.FAILURE
 57
 58    @property
 59    def is_info(self) -> bool:
 60        return self == NotificationStatus.INFO
 61
 62    @property
 63    def is_warning(self) -> bool:
 64        return self == NotificationStatus.WARNING
 65
 66    @property
 67    def is_progress(self) -> bool:
 68        return self == NotificationStatus.PROGRESS
 69
 70
 71class NotificationEvent(str, Enum):
 72    APPLY_START = "apply_start"
 73    APPLY_END = "apply_end"
 74    RUN_START = "run_start"
 75    RUN_END = "run_end"
 76    MIGRATION_START = "migration_start"
 77    MIGRATION_END = "migration_end"
 78    APPLY_FAILURE = "apply_failure"
 79    RUN_FAILURE = "run_failure"
 80    AUDIT_FAILURE = "audit_failure"
 81    MIGRATION_FAILURE = "migration_failure"
 82
 83
 84def notify(event: NotificationEvent) -> t.Callable:
 85    """Decorator used to register 'notify' methods and the events they correspond to."""
 86
 87    def decorator(f: t.Callable) -> t.Callable:
 88        @functools.wraps(f)
 89        def wrapper(*args: t.List[t.Any], **kwargs: t.Dict[str, t.Any]) -> None:
 90            return f(*args, **kwargs)
 91
 92        NOTIFICATION_FUNCTIONS[event] = f.__name__
 93        return wrapper
 94
 95    return decorator
 96
 97
 98class BaseNotificationTarget(PydanticModel, frozen=True):
 99    """
100    Base notification target model. Provides a command for sending notifications that is currently only used
101    by the built-in scheduler. Other schedulers like Airflow use the configuration of the target itself
102    to create the notification constructs appropriate for the scheduler.
103    """
104
105    type_: str
106    notify_on: t.FrozenSet[NotificationEvent] = frozenset()
107
108    def send(self, notification_status: NotificationStatus, msg: str, **kwargs: t.Any) -> None:
109        """Sends notification with the provided message.
110
111        Args:
112            notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
113            msg: The message to send.
114        """
115
116    @notify(NotificationEvent.APPLY_START)
117    def notify_apply_start(self, environment: str, plan_id: str) -> None:
118        """Notify when an apply starts.
119
120        Args:
121            environment: The target environment of the plan.
122            plan_id: plan_id that is being applied.
123        """
124        self.send(
125            NotificationStatus.INFO,
126            f"Plan {plan_id} apply started for environment `{environment}`.",
127        )
128
129    @notify(NotificationEvent.APPLY_END)
130    def notify_apply_end(self, environment: str, plan_id: str) -> None:
131        """Notify when an apply ends.
132
133        Args:
134            environment: The target environment of the plan.
135            plan_id: plan_id that was applied.
136        """
137        self.send(
138            NotificationStatus.SUCCESS,
139            f"Plan {plan_id} apply finished for environment `{environment}`.",
140        )
141
142    @notify(NotificationEvent.RUN_START)
143    def notify_run_start(self, environment: str) -> None:
144        """Notify when a SQLMesh run starts.
145
146        Args:
147            environment: The target environment of the run.
148        """
149        self.send(NotificationStatus.INFO, f"SQLMesh run started for environment `{environment}`.")
150
151    @notify(NotificationEvent.RUN_END)
152    def notify_run_end(self, environment: str) -> None:
153        """Notify when a SQLMesh run ends.
154
155        Args:
156            environment: The target environment of the run.
157        """
158        self.send(
159            NotificationStatus.SUCCESS, f"SQLMesh run finished for environment `{environment}`."
160        )
161
162    @notify(NotificationEvent.MIGRATION_START)
163    def notify_migration_start(self) -> None:
164        """Notify when a SQLMesh migration starts."""
165        self.send(NotificationStatus.INFO, "SQLMesh migration started.")
166
167    @notify(NotificationEvent.MIGRATION_END)
168    def notify_migration_end(self) -> None:
169        """Notify when a SQLMesh migration ends."""
170        self.send(NotificationStatus.SUCCESS, "SQLMesh migration finished.")
171
172    @notify(NotificationEvent.APPLY_FAILURE)
173    def notify_apply_failure(self, environment: str, plan_id: str, exc: str) -> None:
174        """Notify in the case of an apply failure.
175
176        Args:
177            environment: The target environment of the run.
178            plan_id: The plan id of the failed apply
179            exc: The exception stack trace.
180        """
181        self.send(
182            NotificationStatus.FAILURE,
183            f"Plan {plan_id} in environment `{environment}` apply failed.",
184            exc=exc,
185        )
186
187    @notify(NotificationEvent.RUN_FAILURE)
188    def notify_run_failure(self, exc: str) -> None:
189        """Notify in the case of a run failure.
190
191        Args:
192            exc: The exception stack trace.
193        """
194        self.send(NotificationStatus.FAILURE, "SQLMesh run failed.", exc=exc)
195
196    @notify(NotificationEvent.AUDIT_FAILURE)
197    def notify_audit_failure(self, audit_error: AuditError) -> None:
198        """Notify in the case of an audit failure.
199
200        Args:
201            audit_error: The AuditError object.
202        """
203        self.send(NotificationStatus.FAILURE, "Audit failure.", audit_error=audit_error)
204
205    @notify(NotificationEvent.MIGRATION_FAILURE)
206    def notify_migration_failure(self, exc: str) -> None:
207        """Notify in the case of a migration failure.
208
209        Args:
210            exc: The exception stack trace.
211        """
212        self.send(NotificationStatus.FAILURE, "SQLMesh migration failed.", exc=exc)
213
214    @property
215    def is_configured(self) -> bool:
216        return True
217
218
219class BaseTextBasedNotificationTarget(BaseNotificationTarget):
220    """
221    A base class for unstructured notification targets (e.g.: console, email, etc.)
222    """
223
224    def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None:
225        """Send the notification message as text."""
226
227    def send(
228        self,
229        notification_status: NotificationStatus,
230        msg: str,
231        audit_error: t.Optional[AuditError] = None,
232        exc: t.Optional[str] = None,
233        **kwargs: t.Any,
234    ) -> None:
235        error = None
236        if audit_error:
237            error = str(audit_error)
238        elif exc:
239            error = exc
240
241        self.send_text_message(notification_status, msg if error is None else f"{msg}\n{error}")
242
243
244class ConsoleNotificationTarget(BaseTextBasedNotificationTarget):
245    """
246    Example console notification target. Keeping this around for testing purposes.
247    """
248
249    type_: Literal["console"] = Field(alias="type", default="console")
250    _console: t.Optional[Console] = None
251
252    @property
253    def console(self) -> Console:
254        if not self._console:
255            self._console = get_console()
256        return self._console
257
258    def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None:
259        if notification_status.is_success:
260            self.console.log_success(msg)
261        elif notification_status.is_failure:
262            self.console.log_error(msg)
263        else:
264            self.console.log_status_update(msg)
265
266
267class BaseSlackNotificationTarget(BaseNotificationTarget):
268    def send(
269        self,
270        notification_status: NotificationStatus,
271        msg: str,
272        audit_error: t.Optional[AuditError] = None,
273        exc: t.Optional[str] = None,
274        **kwargs: t.Any,
275    ) -> None:
276
277        status_emoji = {
278            NotificationStatus.PROGRESS: slack.SlackAlertIcon.START,
279            NotificationStatus.SUCCESS: slack.SlackAlertIcon.SUCCESS,
280            NotificationStatus.FAILURE: slack.SlackAlertIcon.FAILURE,
281            NotificationStatus.WARNING: slack.SlackAlertIcon.WARNING,
282            NotificationStatus.INFO: slack.SlackAlertIcon.INFO,
283        }
284
285        composed = slack.message().add_primary_blocks(
286            slack.header_block(f"{status_emoji[notification_status]} SQLMesh Notification"),
287            slack.context_block(f"*Status:* `{notification_status.value}`"),
288            slack.divider_block(),
289            slack.text_section_block(f"*Message*: {msg}"),
290        )
291
292        details = []
293        if audit_error:
294            details = [
295                slack.fields_section_block(
296                    f"*Audit*: `{audit_error.audit_name}`",
297                    f"*Model*: `{audit_error.model_name}`",
298                    f"*Count*: `{audit_error.count}`",
299                ),
300                slack.preformatted_rich_text_block(audit_error.sql(pretty=True)),
301            ]
302        elif exc:
303            details = [slack.preformatted_rich_text_block(exc)]
304
305        composed.add_primary_blocks(
306            *details,
307            slack.divider_block(),
308            slack.context_block(
309                f"*SQLMesh Version:* {_sqlmesh_version()}", f"*Python Version:* {sys.version}"
310            ),
311        )
312
313        self._send_slack_message(
314            composed=composed.slack_message,
315        )
316
317    def _send_slack_message(self, composed: slack.TSlackMessage) -> None:
318        """Send a composed message Slack.
319
320        Args:
321            composed: the formatted message to send to Slack
322        """
323
324
325class SlackWebhookNotificationTarget(BaseSlackNotificationTarget):
326    url: t.Optional[str] = None
327    type_: Literal["slack_webhook"] = Field(alias="type", default="slack_webhook")
328    _client: t.Optional[WebhookClient] = None
329
330    @property
331    def client(self) -> WebhookClient:
332        if not self._client:
333            try:
334                from slack_sdk import WebhookClient
335            except ModuleNotFoundError as e:
336                raise MissingDependencyError(
337                    "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them."
338                ) from e
339
340            if not self.url:
341                raise ConfigError("Missing Slack webhook URL")
342
343            self._client = WebhookClient(url=self.url)
344        return self._client
345
346    def _send_slack_message(self, composed: slack.TSlackMessage) -> None:
347        self.client.send(
348            blocks=composed["blocks"],
349            attachments=composed["attachments"],  # type: ignore
350        )
351
352    @property
353    def is_configured(self) -> bool:
354        return bool(self.url)
355
356
357class SlackApiNotificationTarget(BaseSlackNotificationTarget):
358    token: t.Optional[str] = None
359    channel: t.Optional[str] = None
360    type_: Literal["slack_api"] = Field(alias="type", default="slack_api")
361    _client: t.Optional[WebClient] = None
362
363    @property
364    def client(self) -> WebClient:
365        if not self._client:
366            try:
367                from slack_sdk import WebClient
368            except ModuleNotFoundError as e:
369                raise MissingDependencyError(
370                    "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them."
371                ) from e
372
373            self._client = WebClient(token=self.token)
374        return self._client
375
376    def _send_slack_message(self, composed: slack.TSlackMessage) -> None:
377        if not self.channel:
378            raise ConfigError("Missing Slack channel for notification")
379
380        self.client.chat_postMessage(
381            channel=self.channel,
382            blocks=composed["blocks"],
383            attachments=composed["attachments"],  # type: ignore
384        )
385
386    @property
387    def is_configured(self) -> bool:
388        return all((self.token, self.channel))
389
390
391class BasicSMTPNotificationTarget(BaseTextBasedNotificationTarget):
392    host: t.Optional[str] = None
393    port: int = 465
394    user: t.Optional[str] = None
395    password: t.Optional[SecretStr] = None
396    sender: t.Optional[str] = None
397    recipients: t.Optional[t.FrozenSet[str]] = None
398    subject: t.Optional[str] = "SQLMesh Notification"
399    type_: Literal["smtp"] = Field(alias="type", default="smtp")
400
401    def send_text_message(
402        self,
403        notification_status: NotificationStatus,
404        msg: str,
405    ) -> None:
406        if not self.host:
407            raise ConfigError("Missing SMTP host for notification")
408
409        email = EmailMessage()
410        email["Subject"] = self.subject
411        email["To"] = ",".join(self.recipients or [])
412        email["From"] = self.sender
413        email.set_content(msg)
414        with smtplib.SMTP_SSL(host=self.host, port=self.port) as smtp:
415            if self.user and self.password:
416                smtp.login(user=self.user, password=self.password.get_secret_value())
417            smtp.send_message(email)
418
419    @property
420    def is_configured(self) -> bool:
421        return all((self.host, self.user, self.password, self.sender))
422
423
424NotificationTarget = Annotated[
425    t.Union[
426        BasicSMTPNotificationTarget,
427        ConsoleNotificationTarget,
428        SlackApiNotificationTarget,
429        SlackWebhookNotificationTarget,
430    ],
431    Field(discriminator="type_"),
432]
433
434
435class NotificationTargetManager:
436    """Wrapper around a list of notification targets.
437
438    Calling a notification target's "notify_" method on this object will call it
439    on all registered notification targets.
440    """
441
442    def __init__(
443        self,
444        notification_targets: t.Dict[NotificationEvent, t.Set[NotificationTarget]] | None = None,
445        user_notification_targets: t.Dict[str, t.Set[NotificationTarget]] | None = None,
446        username: str | None = None,
447    ) -> None:
448        self.notification_targets = notification_targets or {}
449        self.user_notification_targets = user_notification_targets or {}
450        self.username = username
451
452    def notify(self, event: NotificationEvent, *args: t.Any, **kwargs: t.Any) -> None:
453        """Call the 'notify_`event`' function of all notification targets that care about the event."""
454        if self.username:
455            self.notify_user(event, self.username, *args, **kwargs)
456        else:
457            for notification_target in self.notification_targets.get(event, set()):
458                notify_func = self._get_notification_function(notification_target, event)
459                notify_func(*args, **kwargs)
460
461    def notify_user(
462        self, event: NotificationEvent, username: str, *args: t.Any, **kwargs: t.Any
463    ) -> None:
464        """Call the 'notify_`event`' function of the user's notification targets that care about the event."""
465        notification_targets = self.user_notification_targets.get(username, set())
466        for notification_target in notification_targets:
467            if event in notification_target.notify_on:
468                notify_func = self._get_notification_function(notification_target, event)
469                notify_func(*args, **kwargs)
470
471    def _get_notification_function(
472        self, notification_target: NotificationTarget, event: NotificationEvent
473    ) -> t.Callable:
474        """Lookup the registered function for a notification event"""
475        func_name = NOTIFICATION_FUNCTIONS[event]
476        return getattr(notification_target, func_name)
class NotificationStatus(builtins.str, enum.Enum):
44class NotificationStatus(str, Enum):
45    SUCCESS = "success"
46    FAILURE = "failure"
47    WARNING = "warning"
48    INFO = "info"
49    PROGRESS = "progress"
50
51    @property
52    def is_success(self) -> bool:
53        return self == NotificationStatus.SUCCESS
54
55    @property
56    def is_failure(self) -> bool:
57        return self == NotificationStatus.FAILURE
58
59    @property
60    def is_info(self) -> bool:
61        return self == NotificationStatus.INFO
62
63    @property
64    def is_warning(self) -> bool:
65        return self == NotificationStatus.WARNING
66
67    @property
68    def is_progress(self) -> bool:
69        return self == NotificationStatus.PROGRESS

An enumeration.

SUCCESS = <NotificationStatus.SUCCESS: 'success'>
FAILURE = <NotificationStatus.FAILURE: 'failure'>
WARNING = <NotificationStatus.WARNING: 'warning'>
INFO = <NotificationStatus.INFO: 'info'>
PROGRESS = <NotificationStatus.PROGRESS: 'progress'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class NotificationEvent(builtins.str, enum.Enum):
72class NotificationEvent(str, Enum):
73    APPLY_START = "apply_start"
74    APPLY_END = "apply_end"
75    RUN_START = "run_start"
76    RUN_END = "run_end"
77    MIGRATION_START = "migration_start"
78    MIGRATION_END = "migration_end"
79    APPLY_FAILURE = "apply_failure"
80    RUN_FAILURE = "run_failure"
81    AUDIT_FAILURE = "audit_failure"
82    MIGRATION_FAILURE = "migration_failure"

An enumeration.

APPLY_START = <NotificationEvent.APPLY_START: 'apply_start'>
APPLY_END = <NotificationEvent.APPLY_END: 'apply_end'>
RUN_START = <NotificationEvent.RUN_START: 'run_start'>
RUN_END = <NotificationEvent.RUN_END: 'run_end'>
MIGRATION_START = <NotificationEvent.MIGRATION_START: 'migration_start'>
MIGRATION_END = <NotificationEvent.MIGRATION_END: 'migration_end'>
APPLY_FAILURE = <NotificationEvent.APPLY_FAILURE: 'apply_failure'>
RUN_FAILURE = <NotificationEvent.RUN_FAILURE: 'run_failure'>
AUDIT_FAILURE = <NotificationEvent.AUDIT_FAILURE: 'audit_failure'>
MIGRATION_FAILURE = <NotificationEvent.MIGRATION_FAILURE: 'migration_failure'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
def notify(event: sqlmesh.core.notification_target.NotificationEvent) -> Callable:
85def notify(event: NotificationEvent) -> t.Callable:
86    """Decorator used to register 'notify' methods and the events they correspond to."""
87
88    def decorator(f: t.Callable) -> t.Callable:
89        @functools.wraps(f)
90        def wrapper(*args: t.List[t.Any], **kwargs: t.Dict[str, t.Any]) -> None:
91            return f(*args, **kwargs)
92
93        NOTIFICATION_FUNCTIONS[event] = f.__name__
94        return wrapper
95
96    return decorator

Decorator used to register 'notify' methods and the events they correspond to.

class BaseNotificationTarget(sqlmesh.utils.pydantic.PydanticModel):
 99class BaseNotificationTarget(PydanticModel, frozen=True):
100    """
101    Base notification target model. Provides a command for sending notifications that is currently only used
102    by the built-in scheduler. Other schedulers like Airflow use the configuration of the target itself
103    to create the notification constructs appropriate for the scheduler.
104    """
105
106    type_: str
107    notify_on: t.FrozenSet[NotificationEvent] = frozenset()
108
109    def send(self, notification_status: NotificationStatus, msg: str, **kwargs: t.Any) -> None:
110        """Sends notification with the provided message.
111
112        Args:
113            notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
114            msg: The message to send.
115        """
116
117    @notify(NotificationEvent.APPLY_START)
118    def notify_apply_start(self, environment: str, plan_id: str) -> None:
119        """Notify when an apply starts.
120
121        Args:
122            environment: The target environment of the plan.
123            plan_id: plan_id that is being applied.
124        """
125        self.send(
126            NotificationStatus.INFO,
127            f"Plan {plan_id} apply started for environment `{environment}`.",
128        )
129
130    @notify(NotificationEvent.APPLY_END)
131    def notify_apply_end(self, environment: str, plan_id: str) -> None:
132        """Notify when an apply ends.
133
134        Args:
135            environment: The target environment of the plan.
136            plan_id: plan_id that was applied.
137        """
138        self.send(
139            NotificationStatus.SUCCESS,
140            f"Plan {plan_id} apply finished for environment `{environment}`.",
141        )
142
143    @notify(NotificationEvent.RUN_START)
144    def notify_run_start(self, environment: str) -> None:
145        """Notify when a SQLMesh run starts.
146
147        Args:
148            environment: The target environment of the run.
149        """
150        self.send(NotificationStatus.INFO, f"SQLMesh run started for environment `{environment}`.")
151
152    @notify(NotificationEvent.RUN_END)
153    def notify_run_end(self, environment: str) -> None:
154        """Notify when a SQLMesh run ends.
155
156        Args:
157            environment: The target environment of the run.
158        """
159        self.send(
160            NotificationStatus.SUCCESS, f"SQLMesh run finished for environment `{environment}`."
161        )
162
163    @notify(NotificationEvent.MIGRATION_START)
164    def notify_migration_start(self) -> None:
165        """Notify when a SQLMesh migration starts."""
166        self.send(NotificationStatus.INFO, "SQLMesh migration started.")
167
168    @notify(NotificationEvent.MIGRATION_END)
169    def notify_migration_end(self) -> None:
170        """Notify when a SQLMesh migration ends."""
171        self.send(NotificationStatus.SUCCESS, "SQLMesh migration finished.")
172
173    @notify(NotificationEvent.APPLY_FAILURE)
174    def notify_apply_failure(self, environment: str, plan_id: str, exc: str) -> None:
175        """Notify in the case of an apply failure.
176
177        Args:
178            environment: The target environment of the run.
179            plan_id: The plan id of the failed apply
180            exc: The exception stack trace.
181        """
182        self.send(
183            NotificationStatus.FAILURE,
184            f"Plan {plan_id} in environment `{environment}` apply failed.",
185            exc=exc,
186        )
187
188    @notify(NotificationEvent.RUN_FAILURE)
189    def notify_run_failure(self, exc: str) -> None:
190        """Notify in the case of a run failure.
191
192        Args:
193            exc: The exception stack trace.
194        """
195        self.send(NotificationStatus.FAILURE, "SQLMesh run failed.", exc=exc)
196
197    @notify(NotificationEvent.AUDIT_FAILURE)
198    def notify_audit_failure(self, audit_error: AuditError) -> None:
199        """Notify in the case of an audit failure.
200
201        Args:
202            audit_error: The AuditError object.
203        """
204        self.send(NotificationStatus.FAILURE, "Audit failure.", audit_error=audit_error)
205
206    @notify(NotificationEvent.MIGRATION_FAILURE)
207    def notify_migration_failure(self, exc: str) -> None:
208        """Notify in the case of a migration failure.
209
210        Args:
211            exc: The exception stack trace.
212        """
213        self.send(NotificationStatus.FAILURE, "SQLMesh migration failed.", exc=exc)
214
215    @property
216    def is_configured(self) -> bool:
217        return True

Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler. Other schedulers like Airflow use the configuration of the target itself to create the notification constructs appropriate for the scheduler.

def send( self, notification_status: sqlmesh.core.notification_target.NotificationStatus, msg: str, **kwargs: Any) -> None:
109    def send(self, notification_status: NotificationStatus, msg: str, **kwargs: t.Any) -> None:
110        """Sends notification with the provided message.
111
112        Args:
113            notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
114            msg: The message to send.
115        """

Sends notification with the provided message.

Arguments:
  • notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
  • msg: The message to send.
@notify(NotificationEvent.APPLY_START)
def notify_apply_start(self, environment: str, plan_id: str) -> None:
117    @notify(NotificationEvent.APPLY_START)
118    def notify_apply_start(self, environment: str, plan_id: str) -> None:
119        """Notify when an apply starts.
120
121        Args:
122            environment: The target environment of the plan.
123            plan_id: plan_id that is being applied.
124        """
125        self.send(
126            NotificationStatus.INFO,
127            f"Plan {plan_id} apply started for environment `{environment}`.",
128        )

Notify when an apply starts.

Arguments:
  • environment: The target environment of the plan.
  • plan_id: plan_id that is being applied.
@notify(NotificationEvent.APPLY_END)
def notify_apply_end(self, environment: str, plan_id: str) -> None:
130    @notify(NotificationEvent.APPLY_END)
131    def notify_apply_end(self, environment: str, plan_id: str) -> None:
132        """Notify when an apply ends.
133
134        Args:
135            environment: The target environment of the plan.
136            plan_id: plan_id that was applied.
137        """
138        self.send(
139            NotificationStatus.SUCCESS,
140            f"Plan {plan_id} apply finished for environment `{environment}`.",
141        )

Notify when an apply ends.

Arguments:
  • environment: The target environment of the plan.
  • plan_id: plan_id that was applied.
@notify(NotificationEvent.RUN_START)
def notify_run_start(self, environment: str) -> None:
143    @notify(NotificationEvent.RUN_START)
144    def notify_run_start(self, environment: str) -> None:
145        """Notify when a SQLMesh run starts.
146
147        Args:
148            environment: The target environment of the run.
149        """
150        self.send(NotificationStatus.INFO, f"SQLMesh run started for environment `{environment}`.")

Notify when a SQLMesh run starts.

Arguments:
  • environment: The target environment of the run.
@notify(NotificationEvent.RUN_END)
def notify_run_end(self, environment: str) -> None:
152    @notify(NotificationEvent.RUN_END)
153    def notify_run_end(self, environment: str) -> None:
154        """Notify when a SQLMesh run ends.
155
156        Args:
157            environment: The target environment of the run.
158        """
159        self.send(
160            NotificationStatus.SUCCESS, f"SQLMesh run finished for environment `{environment}`."
161        )

Notify when a SQLMesh run ends.

Arguments:
  • environment: The target environment of the run.
@notify(NotificationEvent.MIGRATION_START)
def notify_migration_start(self) -> None:
163    @notify(NotificationEvent.MIGRATION_START)
164    def notify_migration_start(self) -> None:
165        """Notify when a SQLMesh migration starts."""
166        self.send(NotificationStatus.INFO, "SQLMesh migration started.")

Notify when a SQLMesh migration starts.

@notify(NotificationEvent.MIGRATION_END)
def notify_migration_end(self) -> None:
168    @notify(NotificationEvent.MIGRATION_END)
169    def notify_migration_end(self) -> None:
170        """Notify when a SQLMesh migration ends."""
171        self.send(NotificationStatus.SUCCESS, "SQLMesh migration finished.")

Notify when a SQLMesh migration ends.

@notify(NotificationEvent.APPLY_FAILURE)
def notify_apply_failure(self, environment: str, plan_id: str, exc: str) -> None:
173    @notify(NotificationEvent.APPLY_FAILURE)
174    def notify_apply_failure(self, environment: str, plan_id: str, exc: str) -> None:
175        """Notify in the case of an apply failure.
176
177        Args:
178            environment: The target environment of the run.
179            plan_id: The plan id of the failed apply
180            exc: The exception stack trace.
181        """
182        self.send(
183            NotificationStatus.FAILURE,
184            f"Plan {plan_id} in environment `{environment}` apply failed.",
185            exc=exc,
186        )

Notify in the case of an apply failure.

Arguments:
  • environment: The target environment of the run.
  • plan_id: The plan id of the failed apply
  • exc: The exception stack trace.
@notify(NotificationEvent.RUN_FAILURE)
def notify_run_failure(self, exc: str) -> None:
188    @notify(NotificationEvent.RUN_FAILURE)
189    def notify_run_failure(self, exc: str) -> None:
190        """Notify in the case of a run failure.
191
192        Args:
193            exc: The exception stack trace.
194        """
195        self.send(NotificationStatus.FAILURE, "SQLMesh run failed.", exc=exc)

Notify in the case of a run failure.

Arguments:
  • exc: The exception stack trace.
@notify(NotificationEvent.AUDIT_FAILURE)
def notify_audit_failure(self, audit_error: sqlmesh.utils.errors.AuditError) -> None:
197    @notify(NotificationEvent.AUDIT_FAILURE)
198    def notify_audit_failure(self, audit_error: AuditError) -> None:
199        """Notify in the case of an audit failure.
200
201        Args:
202            audit_error: The AuditError object.
203        """
204        self.send(NotificationStatus.FAILURE, "Audit failure.", audit_error=audit_error)

Notify in the case of an audit failure.

Arguments:
  • audit_error: The AuditError object.
@notify(NotificationEvent.MIGRATION_FAILURE)
def notify_migration_failure(self, exc: str) -> None:
206    @notify(NotificationEvent.MIGRATION_FAILURE)
207    def notify_migration_failure(self, exc: str) -> None:
208        """Notify in the case of a migration failure.
209
210        Args:
211            exc: The exception stack trace.
212        """
213        self.send(NotificationStatus.FAILURE, "SQLMesh migration failed.", exc=exc)

Notify in the case of a migration failure.

Arguments:
  • exc: The exception stack trace.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
model_post_init
class BaseTextBasedNotificationTarget(BaseNotificationTarget):
220class BaseTextBasedNotificationTarget(BaseNotificationTarget):
221    """
222    A base class for unstructured notification targets (e.g.: console, email, etc.)
223    """
224
225    def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None:
226        """Send the notification message as text."""
227
228    def send(
229        self,
230        notification_status: NotificationStatus,
231        msg: str,
232        audit_error: t.Optional[AuditError] = None,
233        exc: t.Optional[str] = None,
234        **kwargs: t.Any,
235    ) -> None:
236        error = None
237        if audit_error:
238            error = str(audit_error)
239        elif exc:
240            error = exc
241
242        self.send_text_message(notification_status, msg if error is None else f"{msg}\n{error}")

A base class for unstructured notification targets (e.g.: console, email, etc.)

def send_text_message( self, notification_status: sqlmesh.core.notification_target.NotificationStatus, msg: str) -> None:
225    def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None:
226        """Send the notification message as text."""

Send the notification message as text.

def send( self, notification_status: sqlmesh.core.notification_target.NotificationStatus, msg: str, audit_error: Union[sqlmesh.utils.errors.AuditError, NoneType] = None, exc: Union[str, NoneType] = None, **kwargs: Any) -> None:
228    def send(
229        self,
230        notification_status: NotificationStatus,
231        msg: str,
232        audit_error: t.Optional[AuditError] = None,
233        exc: t.Optional[str] = None,
234        **kwargs: t.Any,
235    ) -> None:
236        error = None
237        if audit_error:
238            error = str(audit_error)
239        elif exc:
240            error = exc
241
242        self.send_text_message(notification_status, msg if error is None else f"{msg}\n{error}")

Sends notification with the provided message.

Arguments:
  • notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
  • msg: The message to send.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
BaseNotificationTarget
notify_apply_start
notify_apply_end
notify_run_start
notify_run_end
notify_migration_start
notify_migration_end
notify_apply_failure
notify_run_failure
notify_audit_failure
notify_migration_failure
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
model_post_init
class ConsoleNotificationTarget(BaseTextBasedNotificationTarget):
245class ConsoleNotificationTarget(BaseTextBasedNotificationTarget):
246    """
247    Example console notification target. Keeping this around for testing purposes.
248    """
249
250    type_: Literal["console"] = Field(alias="type", default="console")
251    _console: t.Optional[Console] = None
252
253    @property
254    def console(self) -> Console:
255        if not self._console:
256            self._console = get_console()
257        return self._console
258
259    def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None:
260        if notification_status.is_success:
261            self.console.log_success(msg)
262        elif notification_status.is_failure:
263            self.console.log_error(msg)
264        else:
265            self.console.log_status_update(msg)

Example console notification target. Keeping this around for testing purposes.

def send_text_message( self, notification_status: sqlmesh.core.notification_target.NotificationStatus, msg: str) -> None:
259    def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None:
260        if notification_status.is_success:
261            self.console.log_success(msg)
262        elif notification_status.is_failure:
263            self.console.log_error(msg)
264        else:
265            self.console.log_status_update(msg)

Send the notification message as text.

def model_post_init(self: pydantic.main.BaseModel, _ModelMetaclass__context: Any) -> None:
102                    def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
103                        """We need to both initialize private attributes and call the user-defined model_post_init
104                        method.
105                        """
106                        init_private_attributes(self, __context)
107                        original_model_post_init(self, __context)

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
BaseTextBasedNotificationTarget
send
BaseNotificationTarget
notify_apply_start
notify_apply_end
notify_run_start
notify_run_end
notify_migration_start
notify_migration_end
notify_apply_failure
notify_run_failure
notify_audit_failure
notify_migration_failure
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class BaseSlackNotificationTarget(BaseNotificationTarget):
268class BaseSlackNotificationTarget(BaseNotificationTarget):
269    def send(
270        self,
271        notification_status: NotificationStatus,
272        msg: str,
273        audit_error: t.Optional[AuditError] = None,
274        exc: t.Optional[str] = None,
275        **kwargs: t.Any,
276    ) -> None:
277
278        status_emoji = {
279            NotificationStatus.PROGRESS: slack.SlackAlertIcon.START,
280            NotificationStatus.SUCCESS: slack.SlackAlertIcon.SUCCESS,
281            NotificationStatus.FAILURE: slack.SlackAlertIcon.FAILURE,
282            NotificationStatus.WARNING: slack.SlackAlertIcon.WARNING,
283            NotificationStatus.INFO: slack.SlackAlertIcon.INFO,
284        }
285
286        composed = slack.message().add_primary_blocks(
287            slack.header_block(f"{status_emoji[notification_status]} SQLMesh Notification"),
288            slack.context_block(f"*Status:* `{notification_status.value}`"),
289            slack.divider_block(),
290            slack.text_section_block(f"*Message*: {msg}"),
291        )
292
293        details = []
294        if audit_error:
295            details = [
296                slack.fields_section_block(
297                    f"*Audit*: `{audit_error.audit_name}`",
298                    f"*Model*: `{audit_error.model_name}`",
299                    f"*Count*: `{audit_error.count}`",
300                ),
301                slack.preformatted_rich_text_block(audit_error.sql(pretty=True)),
302            ]
303        elif exc:
304            details = [slack.preformatted_rich_text_block(exc)]
305
306        composed.add_primary_blocks(
307            *details,
308            slack.divider_block(),
309            slack.context_block(
310                f"*SQLMesh Version:* {_sqlmesh_version()}", f"*Python Version:* {sys.version}"
311            ),
312        )
313
314        self._send_slack_message(
315            composed=composed.slack_message,
316        )
317
318    def _send_slack_message(self, composed: slack.TSlackMessage) -> None:
319        """Send a composed message Slack.
320
321        Args:
322            composed: the formatted message to send to Slack
323        """

Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler. Other schedulers like Airflow use the configuration of the target itself to create the notification constructs appropriate for the scheduler.

def send( self, notification_status: sqlmesh.core.notification_target.NotificationStatus, msg: str, audit_error: Union[sqlmesh.utils.errors.AuditError, NoneType] = None, exc: Union[str, NoneType] = None, **kwargs: Any) -> None:
269    def send(
270        self,
271        notification_status: NotificationStatus,
272        msg: str,
273        audit_error: t.Optional[AuditError] = None,
274        exc: t.Optional[str] = None,
275        **kwargs: t.Any,
276    ) -> None:
277
278        status_emoji = {
279            NotificationStatus.PROGRESS: slack.SlackAlertIcon.START,
280            NotificationStatus.SUCCESS: slack.SlackAlertIcon.SUCCESS,
281            NotificationStatus.FAILURE: slack.SlackAlertIcon.FAILURE,
282            NotificationStatus.WARNING: slack.SlackAlertIcon.WARNING,
283            NotificationStatus.INFO: slack.SlackAlertIcon.INFO,
284        }
285
286        composed = slack.message().add_primary_blocks(
287            slack.header_block(f"{status_emoji[notification_status]} SQLMesh Notification"),
288            slack.context_block(f"*Status:* `{notification_status.value}`"),
289            slack.divider_block(),
290            slack.text_section_block(f"*Message*: {msg}"),
291        )
292
293        details = []
294        if audit_error:
295            details = [
296                slack.fields_section_block(
297                    f"*Audit*: `{audit_error.audit_name}`",
298                    f"*Model*: `{audit_error.model_name}`",
299                    f"*Count*: `{audit_error.count}`",
300                ),
301                slack.preformatted_rich_text_block(audit_error.sql(pretty=True)),
302            ]
303        elif exc:
304            details = [slack.preformatted_rich_text_block(exc)]
305
306        composed.add_primary_blocks(
307            *details,
308            slack.divider_block(),
309            slack.context_block(
310                f"*SQLMesh Version:* {_sqlmesh_version()}", f"*Python Version:* {sys.version}"
311            ),
312        )
313
314        self._send_slack_message(
315            composed=composed.slack_message,
316        )

Sends notification with the provided message.

Arguments:
  • notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
  • msg: The message to send.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
BaseNotificationTarget
notify_apply_start
notify_apply_end
notify_run_start
notify_run_end
notify_migration_start
notify_migration_end
notify_apply_failure
notify_run_failure
notify_audit_failure
notify_migration_failure
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
model_post_init
class SlackWebhookNotificationTarget(BaseSlackNotificationTarget):
326class SlackWebhookNotificationTarget(BaseSlackNotificationTarget):
327    url: t.Optional[str] = None
328    type_: Literal["slack_webhook"] = Field(alias="type", default="slack_webhook")
329    _client: t.Optional[WebhookClient] = None
330
331    @property
332    def client(self) -> WebhookClient:
333        if not self._client:
334            try:
335                from slack_sdk import WebhookClient
336            except ModuleNotFoundError as e:
337                raise MissingDependencyError(
338                    "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them."
339                ) from e
340
341            if not self.url:
342                raise ConfigError("Missing Slack webhook URL")
343
344            self._client = WebhookClient(url=self.url)
345        return self._client
346
347    def _send_slack_message(self, composed: slack.TSlackMessage) -> None:
348        self.client.send(
349            blocks=composed["blocks"],
350            attachments=composed["attachments"],  # type: ignore
351        )
352
353    @property
354    def is_configured(self) -> bool:
355        return bool(self.url)

Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler. Other schedulers like Airflow use the configuration of the target itself to create the notification constructs appropriate for the scheduler.

def model_post_init(self: pydantic.main.BaseModel, _ModelMetaclass__context: Any) -> None:
102                    def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
103                        """We need to both initialize private attributes and call the user-defined model_post_init
104                        method.
105                        """
106                        init_private_attributes(self, __context)
107                        original_model_post_init(self, __context)

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
BaseSlackNotificationTarget
send
BaseNotificationTarget
notify_apply_start
notify_apply_end
notify_run_start
notify_run_end
notify_migration_start
notify_migration_end
notify_apply_failure
notify_run_failure
notify_audit_failure
notify_migration_failure
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class SlackApiNotificationTarget(BaseSlackNotificationTarget):
358class SlackApiNotificationTarget(BaseSlackNotificationTarget):
359    token: t.Optional[str] = None
360    channel: t.Optional[str] = None
361    type_: Literal["slack_api"] = Field(alias="type", default="slack_api")
362    _client: t.Optional[WebClient] = None
363
364    @property
365    def client(self) -> WebClient:
366        if not self._client:
367            try:
368                from slack_sdk import WebClient
369            except ModuleNotFoundError as e:
370                raise MissingDependencyError(
371                    "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them."
372                ) from e
373
374            self._client = WebClient(token=self.token)
375        return self._client
376
377    def _send_slack_message(self, composed: slack.TSlackMessage) -> None:
378        if not self.channel:
379            raise ConfigError("Missing Slack channel for notification")
380
381        self.client.chat_postMessage(
382            channel=self.channel,
383            blocks=composed["blocks"],
384            attachments=composed["attachments"],  # type: ignore
385        )
386
387    @property
388    def is_configured(self) -> bool:
389        return all((self.token, self.channel))

Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler. Other schedulers like Airflow use the configuration of the target itself to create the notification constructs appropriate for the scheduler.

def model_post_init(self: pydantic.main.BaseModel, _ModelMetaclass__context: Any) -> None:
102                    def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
103                        """We need to both initialize private attributes and call the user-defined model_post_init
104                        method.
105                        """
106                        init_private_attributes(self, __context)
107                        original_model_post_init(self, __context)

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
BaseSlackNotificationTarget
send
BaseNotificationTarget
notify_apply_start
notify_apply_end
notify_run_start
notify_run_end
notify_migration_start
notify_migration_end
notify_apply_failure
notify_run_failure
notify_audit_failure
notify_migration_failure
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class BasicSMTPNotificationTarget(BaseTextBasedNotificationTarget):
392class BasicSMTPNotificationTarget(BaseTextBasedNotificationTarget):
393    host: t.Optional[str] = None
394    port: int = 465
395    user: t.Optional[str] = None
396    password: t.Optional[SecretStr] = None
397    sender: t.Optional[str] = None
398    recipients: t.Optional[t.FrozenSet[str]] = None
399    subject: t.Optional[str] = "SQLMesh Notification"
400    type_: Literal["smtp"] = Field(alias="type", default="smtp")
401
402    def send_text_message(
403        self,
404        notification_status: NotificationStatus,
405        msg: str,
406    ) -> None:
407        if not self.host:
408            raise ConfigError("Missing SMTP host for notification")
409
410        email = EmailMessage()
411        email["Subject"] = self.subject
412        email["To"] = ",".join(self.recipients or [])
413        email["From"] = self.sender
414        email.set_content(msg)
415        with smtplib.SMTP_SSL(host=self.host, port=self.port) as smtp:
416            if self.user and self.password:
417                smtp.login(user=self.user, password=self.password.get_secret_value())
418            smtp.send_message(email)
419
420    @property
421    def is_configured(self) -> bool:
422        return all((self.host, self.user, self.password, self.sender))

A base class for unstructured notification targets (e.g.: console, email, etc.)

def send_text_message( self, notification_status: sqlmesh.core.notification_target.NotificationStatus, msg: str) -> None:
402    def send_text_message(
403        self,
404        notification_status: NotificationStatus,
405        msg: str,
406    ) -> None:
407        if not self.host:
408            raise ConfigError("Missing SMTP host for notification")
409
410        email = EmailMessage()
411        email["Subject"] = self.subject
412        email["To"] = ",".join(self.recipients or [])
413        email["From"] = self.sender
414        email.set_content(msg)
415        with smtplib.SMTP_SSL(host=self.host, port=self.port) as smtp:
416            if self.user and self.password:
417                smtp.login(user=self.user, password=self.password.get_secret_value())
418            smtp.send_message(email)

Send the notification message as text.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
BaseTextBasedNotificationTarget
send
BaseNotificationTarget
notify_apply_start
notify_apply_end
notify_run_start
notify_run_end
notify_migration_start
notify_migration_end
notify_apply_failure
notify_run_failure
notify_audit_failure
notify_migration_failure
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
model_post_init
class NotificationTargetManager:
436class NotificationTargetManager:
437    """Wrapper around a list of notification targets.
438
439    Calling a notification target's "notify_" method on this object will call it
440    on all registered notification targets.
441    """
442
443    def __init__(
444        self,
445        notification_targets: t.Dict[NotificationEvent, t.Set[NotificationTarget]] | None = None,
446        user_notification_targets: t.Dict[str, t.Set[NotificationTarget]] | None = None,
447        username: str | None = None,
448    ) -> None:
449        self.notification_targets = notification_targets or {}
450        self.user_notification_targets = user_notification_targets or {}
451        self.username = username
452
453    def notify(self, event: NotificationEvent, *args: t.Any, **kwargs: t.Any) -> None:
454        """Call the 'notify_`event`' function of all notification targets that care about the event."""
455        if self.username:
456            self.notify_user(event, self.username, *args, **kwargs)
457        else:
458            for notification_target in self.notification_targets.get(event, set()):
459                notify_func = self._get_notification_function(notification_target, event)
460                notify_func(*args, **kwargs)
461
462    def notify_user(
463        self, event: NotificationEvent, username: str, *args: t.Any, **kwargs: t.Any
464    ) -> None:
465        """Call the 'notify_`event`' function of the user's notification targets that care about the event."""
466        notification_targets = self.user_notification_targets.get(username, set())
467        for notification_target in notification_targets:
468            if event in notification_target.notify_on:
469                notify_func = self._get_notification_function(notification_target, event)
470                notify_func(*args, **kwargs)
471
472    def _get_notification_function(
473        self, notification_target: NotificationTarget, event: NotificationEvent
474    ) -> t.Callable:
475        """Lookup the registered function for a notification event"""
476        func_name = NOTIFICATION_FUNCTIONS[event]
477        return getattr(notification_target, func_name)

Wrapper around a list of notification targets.

Calling a notification target's "notify_" method on this object will call it on all registered notification targets.

NotificationTargetManager( notification_targets: 't.Dict[NotificationEvent, t.Set[NotificationTarget]] | None' = None, user_notification_targets: 't.Dict[str, t.Set[NotificationTarget]] | None' = None, username: 'str | None' = None)
443    def __init__(
444        self,
445        notification_targets: t.Dict[NotificationEvent, t.Set[NotificationTarget]] | None = None,
446        user_notification_targets: t.Dict[str, t.Set[NotificationTarget]] | None = None,
447        username: str | None = None,
448    ) -> None:
449        self.notification_targets = notification_targets or {}
450        self.user_notification_targets = user_notification_targets or {}
451        self.username = username
def notify( self, event: sqlmesh.core.notification_target.NotificationEvent, *args: Any, **kwargs: Any) -> None:
453    def notify(self, event: NotificationEvent, *args: t.Any, **kwargs: t.Any) -> None:
454        """Call the 'notify_`event`' function of all notification targets that care about the event."""
455        if self.username:
456            self.notify_user(event, self.username, *args, **kwargs)
457        else:
458            for notification_target in self.notification_targets.get(event, set()):
459                notify_func = self._get_notification_function(notification_target, event)
460                notify_func(*args, **kwargs)

Call the 'notify_event' function of all notification targets that care about the event.

def notify_user( self, event: sqlmesh.core.notification_target.NotificationEvent, username: str, *args: Any, **kwargs: Any) -> None:
462    def notify_user(
463        self, event: NotificationEvent, username: str, *args: t.Any, **kwargs: t.Any
464    ) -> None:
465        """Call the 'notify_`event`' function of the user's notification targets that care about the event."""
466        notification_targets = self.user_notification_targets.get(username, set())
467        for notification_target in notification_targets:
468            if event in notification_target.notify_on:
469                notify_func = self._get_notification_function(notification_target, event)
470                notify_func(*args, **kwargs)

Call the 'notify_event' function of the user's notification targets that care about the event.