Edit on GitHub

sqlmesh.core.notification_target

  1from __future__ import annotations
  2
  3import smtplib
  4import sys
  5import typing as t
  6from email.message import EmailMessage
  7from enum import Enum
  8
  9from pydantic import Field, SecretStr
 10
 11from sqlmesh.core.console import Console, get_console
 12from sqlmesh.integrations import slack
 13from sqlmesh.utils.errors import AuditError, ConfigError, MissingDependencyError
 14from sqlmesh.utils.pydantic import PydanticModel
 15
 16if t.TYPE_CHECKING:
 17    from slack_sdk import WebClient, WebhookClient
 18
 19
 20def _sqlmesh_version() -> str:
 21    try:
 22        from sqlmesh import __version__
 23
 24        return __version__
 25    except ImportError:
 26        return "0.0.0"
 27
 28
 29class NotificationStatus(str, Enum):
 30    SUCCESS = "success"
 31    FAILURE = "failure"
 32    WARNING = "warning"
 33    INFO = "info"
 34    PROGRESS = "progress"
 35
 36    @property
 37    def is_success(self) -> bool:
 38        return self == NotificationStatus.SUCCESS
 39
 40    @property
 41    def is_failure(self) -> bool:
 42        return self == NotificationStatus.FAILURE
 43
 44    @property
 45    def is_info(self) -> bool:
 46        return self == NotificationStatus.INFO
 47
 48    @property
 49    def is_warning(self) -> bool:
 50        return self == NotificationStatus.WARNING
 51
 52    @property
 53    def is_progress(self) -> bool:
 54        return self == NotificationStatus.PROGRESS
 55
 56
 57class NotificationEvent(str, Enum):
 58    APPLY_START = "apply_start"
 59    APPLY_END = "apply_end"
 60    RUN_START = "run_start"
 61    RUN_END = "run_end"
 62    MIGRATION_START = "migration_start"
 63    MIGRATION_END = "migration_end"
 64    APPLY_FAILURE = "apply_failure"
 65    RUN_FAILURE = "run_failure"
 66    AUDIT_FAILURE = "audit_failure"
 67    MIGRATION_FAILURE = "migration_failure"
 68
 69
 70class BaseNotificationTarget(PydanticModel, frozen=True):
 71    """
 72    Base notification target model. Provides a command for sending notifications that is currently only used
 73    by the built-in scheduler.
 74
 75    Notification functions follow a naming convention of `notify_` + NotificationEvent value.
 76    """
 77
 78    type_: str
 79    notify_on: t.FrozenSet[NotificationEvent] = frozenset()
 80
 81    def send(self, notification_status: NotificationStatus, msg: str, **kwargs: t.Any) -> None:
 82        """Sends notification with the provided message.
 83
 84        Args:
 85            notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
 86            msg: The message to send.
 87        """
 88
 89    def notify_apply_start(
 90        self, environment: str, plan_id: str, *args: t.Any, **kwargs: t.Any
 91    ) -> None:
 92        """Notify when an apply starts.
 93
 94        Args:
 95            environment: The target environment of the plan.
 96            plan_id: plan_id that is being applied.
 97        """
 98        self.send(
 99            NotificationStatus.INFO,
100            f"Plan `{plan_id}` apply started for environment `{environment}`.",
101        )
102
103    def notify_apply_end(
104        self, environment: str, plan_id: str, *args: t.Any, **kwargs: t.Any
105    ) -> None:
106        """Notify when an apply ends.
107
108        Args:
109            environment: The target environment of the plan.
110            plan_id: plan_id that was applied.
111        """
112        self.send(
113            NotificationStatus.SUCCESS,
114            f"Plan `{plan_id}` apply finished for environment `{environment}`.",
115        )
116
117    def notify_run_start(self, environment: str, *args: t.Any, **kwargs: t.Any) -> None:
118        """Notify when a SQLMesh run starts.
119
120        Args:
121            environment: The target environment of the run.
122        """
123        self.send(NotificationStatus.INFO, f"SQLMesh run started for environment `{environment}`.")
124
125    def notify_run_end(self, environment: str, *args: t.Any, **kwargs: t.Any) -> None:
126        """Notify when a SQLMesh run ends.
127
128        Args:
129            environment: The target environment of the run.
130        """
131        self.send(
132            NotificationStatus.SUCCESS, f"SQLMesh run finished for environment `{environment}`."
133        )
134
135    def notify_migration_start(self, *args: t.Any, **kwargs: t.Any) -> None:
136        """Notify when a SQLMesh migration starts."""
137        self.send(NotificationStatus.INFO, "SQLMesh migration started.")
138
139    def notify_migration_end(self, *args: t.Any, **kwargs: t.Any) -> None:
140        """Notify when a SQLMesh migration ends."""
141        self.send(NotificationStatus.SUCCESS, "SQLMesh migration finished.")
142
143    def notify_apply_failure(
144        self, environment: str, plan_id: str, exc: str, *args: t.Any, **kwargs: t.Any
145    ) -> None:
146        """Notify in the case of an apply failure.
147
148        Args:
149            environment: The target environment of the run.
150            plan_id: The plan id of the failed apply
151            exc: The exception stack trace.
152        """
153        self.send(
154            NotificationStatus.FAILURE,
155            f"Plan `{plan_id}` in environment `{environment}` apply failed.",
156            exc=exc,
157        )
158
159    def notify_run_failure(self, exc: str, *args: t.Any, **kwargs: t.Any) -> None:
160        """Notify in the case of a run failure.
161
162        Args:
163            exc: The exception stack trace.
164        """
165        self.send(NotificationStatus.FAILURE, "SQLMesh run failed.", exc=exc)
166
167    def notify_audit_failure(self, audit_error: AuditError, *args: t.Any, **kwargs: t.Any) -> None:
168        """Notify in the case of an audit failure.
169
170        Args:
171            audit_error: The AuditError object.
172        """
173        self.send(NotificationStatus.FAILURE, "Audit failure.", audit_error=audit_error)
174
175    def notify_migration_failure(self, exc: str, *args: t.Any, **kwargs: t.Any) -> None:
176        """Notify in the case of a migration failure.
177
178        Args:
179            exc: The exception stack trace.
180        """
181        self.send(NotificationStatus.FAILURE, "SQLMesh migration failed.", exc=exc)
182
183    @property
184    def is_configured(self) -> bool:
185        return True
186
187
188class BaseTextBasedNotificationTarget(BaseNotificationTarget):
189    """
190    A base class for unstructured notification targets (e.g.: console, email, etc.)
191    """
192
193    def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None:
194        """Send the notification message as text."""
195
196    def send(
197        self,
198        notification_status: NotificationStatus,
199        msg: str,
200        audit_error: t.Optional[AuditError] = None,
201        exc: t.Optional[str] = None,
202        **kwargs: t.Any,
203    ) -> None:
204        error = None
205        if audit_error:
206            error = str(audit_error)
207        elif exc:
208            error = exc
209
210        self.send_text_message(notification_status, msg if error is None else f"{msg}\n{error}")
211
212
213class ConsoleNotificationTarget(BaseTextBasedNotificationTarget):
214    """
215    Example console notification target. Keeping this around for testing purposes.
216    """
217
218    type_: t.Literal["console"] = Field(alias="type", default="console")
219    _console: t.Optional[Console] = None
220
221    @property
222    def console(self) -> Console:
223        if not self._console:
224            self._console = get_console()
225        return self._console
226
227    def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None:
228        if notification_status.is_success:
229            self.console.log_success(msg)
230        elif notification_status.is_failure:
231            self.console.log_error(msg)
232        else:
233            self.console.log_status_update(msg)
234
235
236class BaseSlackNotificationTarget(BaseNotificationTarget):
237    def send(
238        self,
239        notification_status: NotificationStatus,
240        msg: str,
241        audit_error: t.Optional[AuditError] = None,
242        exc: t.Optional[str] = None,
243        **kwargs: t.Any,
244    ) -> None:
245        status_emoji = {
246            NotificationStatus.PROGRESS: slack.SlackAlertIcon.START,
247            NotificationStatus.SUCCESS: slack.SlackAlertIcon.SUCCESS,
248            NotificationStatus.FAILURE: slack.SlackAlertIcon.FAILURE,
249            NotificationStatus.WARNING: slack.SlackAlertIcon.WARNING,
250            NotificationStatus.INFO: slack.SlackAlertIcon.INFO,
251        }
252
253        composed = slack.message().add_primary_blocks(
254            slack.header_block(f"{status_emoji[notification_status]} SQLMesh Notification"),
255            slack.context_block(f"*Status:* `{notification_status.value}`"),
256            slack.divider_block(),
257            slack.text_section_block(f"*Message*: {msg}"),
258        )
259
260        details = []
261        if audit_error:
262            details = [
263                slack.fields_section_block(
264                    f"*Audit*: `{audit_error.audit_name}`",
265                    f"*Model*: `{audit_error.model_name}`",
266                    f"*Count*: `{audit_error.count}`",
267                ),
268                slack.preformatted_rich_text_block(audit_error.sql(pretty=True)),
269            ]
270        elif exc:
271            details = [slack.preformatted_rich_text_block(exc)]
272
273        composed.add_primary_blocks(
274            *details,
275            slack.divider_block(),
276            slack.context_block(
277                f"*SQLMesh Version:* {_sqlmesh_version()}", f"*Python Version:* {sys.version}"
278            ),
279        )
280
281        composed.add_text(msg)
282
283        self._send_slack_message(
284            composed=composed.slack_message,
285        )
286
287    def _send_slack_message(self, composed: slack.TSlackMessage) -> None:
288        """Send a composed message Slack.
289
290        Args:
291            composed: the formatted message to send to Slack
292        """
293
294
295class SlackWebhookNotificationTarget(BaseSlackNotificationTarget):
296    url: t.Optional[str] = None
297    type_: t.Literal["slack_webhook"] = Field(alias="type", default="slack_webhook")
298    _client: t.Optional[WebhookClient] = None
299
300    @property
301    def client(self) -> WebhookClient:
302        if not self._client:
303            try:
304                from slack_sdk import WebhookClient
305            except ModuleNotFoundError as e:
306                raise MissingDependencyError(
307                    "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them."
308                ) from e
309
310            if not self.url:
311                raise ConfigError("Missing Slack webhook URL")
312
313            self._client = WebhookClient(url=self.url)
314        return self._client
315
316    def _send_slack_message(self, composed: slack.TSlackMessage) -> None:
317        self.client.send(
318            text=composed["text"],
319            blocks=composed["blocks"],
320            attachments=composed["attachments"],  # type: ignore
321        )
322
323    @property
324    def is_configured(self) -> bool:
325        return bool(self.url)
326
327
328class SlackApiNotificationTarget(BaseSlackNotificationTarget):
329    token: t.Optional[str] = None
330    channel: t.Optional[str] = None
331    type_: t.Literal["slack_api"] = Field(alias="type", default="slack_api")
332    _client: t.Optional[WebClient] = None
333
334    @property
335    def client(self) -> WebClient:
336        if not self._client:
337            try:
338                from slack_sdk import WebClient
339            except ModuleNotFoundError as e:
340                raise MissingDependencyError(
341                    "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them."
342                ) from e
343
344            self._client = WebClient(token=self.token)
345        return self._client
346
347    def _send_slack_message(self, composed: slack.TSlackMessage) -> None:
348        if not self.channel:
349            raise ConfigError("Missing Slack channel for notification")
350
351        self.client.chat_postMessage(
352            channel=self.channel,
353            text=composed["text"],
354            blocks=composed["blocks"],
355            attachments=composed["attachments"],  # type: ignore
356        )
357
358    @property
359    def is_configured(self) -> bool:
360        return all((self.token, self.channel))
361
362
363class BasicSMTPNotificationTarget(BaseTextBasedNotificationTarget):
364    host: t.Optional[str] = None
365    port: int = 465
366    user: t.Optional[str] = None
367    password: t.Optional[SecretStr] = None
368    sender: t.Optional[str] = None
369    recipients: t.Optional[t.FrozenSet[str]] = None
370    subject: t.Optional[str] = "SQLMesh Notification"
371    type_: t.Literal["smtp"] = Field(alias="type", default="smtp")
372
373    def send_text_message(
374        self,
375        notification_status: NotificationStatus,
376        msg: str,
377    ) -> None:
378        if not self.host:
379            raise ConfigError("Missing SMTP host for notification")
380
381        email = EmailMessage()
382        email["Subject"] = self.subject
383        email["To"] = ",".join(self.recipients or [])
384        email["From"] = self.sender
385        email.set_content(msg)
386        with smtplib.SMTP_SSL(host=self.host, port=self.port) as smtp:
387            if self.user and self.password:
388                smtp.login(user=self.user, password=self.password.get_secret_value())
389            smtp.send_message(email)
390
391    @property
392    def is_configured(self) -> bool:
393        return all((self.host, self.user, self.password, self.sender))
394
395
396class GenericNotificationTarget(BaseNotificationTarget):
397    """A generic notification target that can be used to create custom notification targets.
398
399    This target is not meant to be used directly, but rather as a base class for custom notification targets.
400
401    The `send` method should be overridden to provide the actual notification functionality.
402
403    Example:
404    ```python
405    class MyCustomNotificationTarget(GenericNotificationTarget):
406        def send(self, notification_status: NotificationStatus, msg: str, audit_error: t.Optional[AuditError] = None, exc: t.Optional[str] = None, **kwargs: t.Any) -> None:
407            error = None
408            if audit_error:
409                error = str(audit_error)
410            elif exc:
411                error = exc
412
413            if error:
414                msg = f"{error} - {msg}"
415            print(f"Sending notification: {msg}")
416    ```
417    """
418
419    type_: t.Literal["generic"] = Field(alias="type", default="generic")
420
421
422NotificationTarget = t.Annotated[
423    t.Union[
424        BasicSMTPNotificationTarget,
425        GenericNotificationTarget,
426        ConsoleNotificationTarget,
427        SlackApiNotificationTarget,
428        SlackWebhookNotificationTarget,
429    ],
430    Field(discriminator="type_"),
431]
432
433
434class NotificationTargetManager:
435    """Wrapper around a list of notification targets.
436
437    Calling a notification target's "notify_" method on this object will call it
438    on all registered notification targets.
439    """
440
441    def __init__(
442        self,
443        notification_targets: t.Dict[NotificationEvent, t.Set[NotificationTarget]] | None = None,
444        user_notification_targets: t.Dict[str, t.Set[NotificationTarget]] | None = None,
445        username: str | None = None,
446    ) -> None:
447        self.notification_targets = notification_targets or {}
448        self.user_notification_targets = user_notification_targets or {}
449        self.username = username
450
451    def notify(self, event: NotificationEvent, *args: t.Any, **kwargs: t.Any) -> None:
452        """Call the 'notify_`event`' function of all notification targets that care about the event."""
453        if self.username:
454            self.notify_user(event, self.username, *args, **kwargs)
455        else:
456            for notification_target in self.notification_targets.get(event, set()):
457                notify_func = self._get_notification_function(notification_target, event)
458                notify_func(*args, **kwargs)
459
460    def notify_user(
461        self, event: NotificationEvent, username: str, *args: t.Any, **kwargs: t.Any
462    ) -> None:
463        """Call the 'notify_`event`' function of the user's notification targets that care about the event."""
464        notification_targets = self.user_notification_targets.get(username, set())
465        for notification_target in notification_targets:
466            if event in notification_target.notify_on:
467                notify_func = self._get_notification_function(notification_target, event)
468                notify_func(*args, **kwargs)
469
470    def _get_notification_function(
471        self, notification_target: NotificationTarget, event: NotificationEvent
472    ) -> t.Callable:
473        """Fetch the function for a notification event"""
474        return getattr(notification_target, f"notify_{event.value}")
class NotificationStatus(builtins.str, enum.Enum):
30class NotificationStatus(str, Enum):
31    SUCCESS = "success"
32    FAILURE = "failure"
33    WARNING = "warning"
34    INFO = "info"
35    PROGRESS = "progress"
36
37    @property
38    def is_success(self) -> bool:
39        return self == NotificationStatus.SUCCESS
40
41    @property
42    def is_failure(self) -> bool:
43        return self == NotificationStatus.FAILURE
44
45    @property
46    def is_info(self) -> bool:
47        return self == NotificationStatus.INFO
48
49    @property
50    def is_warning(self) -> bool:
51        return self == NotificationStatus.WARNING
52
53    @property
54    def is_progress(self) -> bool:
55        return self == NotificationStatus.PROGRESS

An enumeration.

SUCCESS = <NotificationStatus.SUCCESS: 'success'>
FAILURE = <NotificationStatus.FAILURE: 'failure'>
WARNING = <NotificationStatus.WARNING: 'warning'>
INFO = <NotificationStatus.INFO: 'info'>
PROGRESS = <NotificationStatus.PROGRESS: 'progress'>
is_success: bool
37    @property
38    def is_success(self) -> bool:
39        return self == NotificationStatus.SUCCESS
is_failure: bool
41    @property
42    def is_failure(self) -> bool:
43        return self == NotificationStatus.FAILURE
is_info: bool
45    @property
46    def is_info(self) -> bool:
47        return self == NotificationStatus.INFO
is_warning: bool
49    @property
50    def is_warning(self) -> bool:
51        return self == NotificationStatus.WARNING
is_progress: bool
53    @property
54    def is_progress(self) -> bool:
55        return self == NotificationStatus.PROGRESS
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class NotificationEvent(builtins.str, enum.Enum):
58class NotificationEvent(str, Enum):
59    APPLY_START = "apply_start"
60    APPLY_END = "apply_end"
61    RUN_START = "run_start"
62    RUN_END = "run_end"
63    MIGRATION_START = "migration_start"
64    MIGRATION_END = "migration_end"
65    APPLY_FAILURE = "apply_failure"
66    RUN_FAILURE = "run_failure"
67    AUDIT_FAILURE = "audit_failure"
68    MIGRATION_FAILURE = "migration_failure"

An enumeration.

APPLY_START = <NotificationEvent.APPLY_START: 'apply_start'>
APPLY_END = <NotificationEvent.APPLY_END: 'apply_end'>
RUN_START = <NotificationEvent.RUN_START: 'run_start'>
RUN_END = <NotificationEvent.RUN_END: 'run_end'>
MIGRATION_START = <NotificationEvent.MIGRATION_START: 'migration_start'>
MIGRATION_END = <NotificationEvent.MIGRATION_END: 'migration_end'>
APPLY_FAILURE = <NotificationEvent.APPLY_FAILURE: 'apply_failure'>
RUN_FAILURE = <NotificationEvent.RUN_FAILURE: 'run_failure'>
AUDIT_FAILURE = <NotificationEvent.AUDIT_FAILURE: 'audit_failure'>
MIGRATION_FAILURE = <NotificationEvent.MIGRATION_FAILURE: 'migration_failure'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class BaseNotificationTarget(sqlmesh.utils.pydantic.PydanticModel):
 71class BaseNotificationTarget(PydanticModel, frozen=True):
 72    """
 73    Base notification target model. Provides a command for sending notifications that is currently only used
 74    by the built-in scheduler.
 75
 76    Notification functions follow a naming convention of `notify_` + NotificationEvent value.
 77    """
 78
 79    type_: str
 80    notify_on: t.FrozenSet[NotificationEvent] = frozenset()
 81
 82    def send(self, notification_status: NotificationStatus, msg: str, **kwargs: t.Any) -> None:
 83        """Sends notification with the provided message.
 84
 85        Args:
 86            notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
 87            msg: The message to send.
 88        """
 89
 90    def notify_apply_start(
 91        self, environment: str, plan_id: str, *args: t.Any, **kwargs: t.Any
 92    ) -> None:
 93        """Notify when an apply starts.
 94
 95        Args:
 96            environment: The target environment of the plan.
 97            plan_id: plan_id that is being applied.
 98        """
 99        self.send(
100            NotificationStatus.INFO,
101            f"Plan `{plan_id}` apply started for environment `{environment}`.",
102        )
103
104    def notify_apply_end(
105        self, environment: str, plan_id: str, *args: t.Any, **kwargs: t.Any
106    ) -> None:
107        """Notify when an apply ends.
108
109        Args:
110            environment: The target environment of the plan.
111            plan_id: plan_id that was applied.
112        """
113        self.send(
114            NotificationStatus.SUCCESS,
115            f"Plan `{plan_id}` apply finished for environment `{environment}`.",
116        )
117
118    def notify_run_start(self, environment: str, *args: t.Any, **kwargs: t.Any) -> None:
119        """Notify when a SQLMesh run starts.
120
121        Args:
122            environment: The target environment of the run.
123        """
124        self.send(NotificationStatus.INFO, f"SQLMesh run started for environment `{environment}`.")
125
126    def notify_run_end(self, environment: str, *args: t.Any, **kwargs: t.Any) -> None:
127        """Notify when a SQLMesh run ends.
128
129        Args:
130            environment: The target environment of the run.
131        """
132        self.send(
133            NotificationStatus.SUCCESS, f"SQLMesh run finished for environment `{environment}`."
134        )
135
136    def notify_migration_start(self, *args: t.Any, **kwargs: t.Any) -> None:
137        """Notify when a SQLMesh migration starts."""
138        self.send(NotificationStatus.INFO, "SQLMesh migration started.")
139
140    def notify_migration_end(self, *args: t.Any, **kwargs: t.Any) -> None:
141        """Notify when a SQLMesh migration ends."""
142        self.send(NotificationStatus.SUCCESS, "SQLMesh migration finished.")
143
144    def notify_apply_failure(
145        self, environment: str, plan_id: str, exc: str, *args: t.Any, **kwargs: t.Any
146    ) -> None:
147        """Notify in the case of an apply failure.
148
149        Args:
150            environment: The target environment of the run.
151            plan_id: The plan id of the failed apply
152            exc: The exception stack trace.
153        """
154        self.send(
155            NotificationStatus.FAILURE,
156            f"Plan `{plan_id}` in environment `{environment}` apply failed.",
157            exc=exc,
158        )
159
160    def notify_run_failure(self, exc: str, *args: t.Any, **kwargs: t.Any) -> None:
161        """Notify in the case of a run failure.
162
163        Args:
164            exc: The exception stack trace.
165        """
166        self.send(NotificationStatus.FAILURE, "SQLMesh run failed.", exc=exc)
167
168    def notify_audit_failure(self, audit_error: AuditError, *args: t.Any, **kwargs: t.Any) -> None:
169        """Notify in the case of an audit failure.
170
171        Args:
172            audit_error: The AuditError object.
173        """
174        self.send(NotificationStatus.FAILURE, "Audit failure.", audit_error=audit_error)
175
176    def notify_migration_failure(self, exc: str, *args: t.Any, **kwargs: t.Any) -> None:
177        """Notify in the case of a migration failure.
178
179        Args:
180            exc: The exception stack trace.
181        """
182        self.send(NotificationStatus.FAILURE, "SQLMesh migration failed.", exc=exc)
183
184    @property
185    def is_configured(self) -> bool:
186        return True

Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler.

Notification functions follow a naming convention of notify_ + NotificationEvent value.

type_: str
notify_on: FrozenSet[NotificationEvent]
def send( self, notification_status: NotificationStatus, msg: str, **kwargs: Any) -> None:
82    def send(self, notification_status: NotificationStatus, msg: str, **kwargs: t.Any) -> None:
83        """Sends notification with the provided message.
84
85        Args:
86            notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
87            msg: The message to send.
88        """

Sends notification with the provided message.

Arguments:
  • notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
  • msg: The message to send.
def notify_apply_start(self, environment: str, plan_id: str, *args: Any, **kwargs: Any) -> None:
 90    def notify_apply_start(
 91        self, environment: str, plan_id: str, *args: t.Any, **kwargs: t.Any
 92    ) -> None:
 93        """Notify when an apply starts.
 94
 95        Args:
 96            environment: The target environment of the plan.
 97            plan_id: plan_id that is being applied.
 98        """
 99        self.send(
100            NotificationStatus.INFO,
101            f"Plan `{plan_id}` apply started for environment `{environment}`.",
102        )

Notify when an apply starts.

Arguments:
  • environment: The target environment of the plan.
  • plan_id: plan_id that is being applied.
def notify_apply_end(self, environment: str, plan_id: str, *args: Any, **kwargs: Any) -> None:
104    def notify_apply_end(
105        self, environment: str, plan_id: str, *args: t.Any, **kwargs: t.Any
106    ) -> None:
107        """Notify when an apply ends.
108
109        Args:
110            environment: The target environment of the plan.
111            plan_id: plan_id that was applied.
112        """
113        self.send(
114            NotificationStatus.SUCCESS,
115            f"Plan `{plan_id}` apply finished for environment `{environment}`.",
116        )

Notify when an apply ends.

Arguments:
  • environment: The target environment of the plan.
  • plan_id: plan_id that was applied.
def notify_run_start(self, environment: str, *args: Any, **kwargs: Any) -> None:
118    def notify_run_start(self, environment: str, *args: t.Any, **kwargs: t.Any) -> None:
119        """Notify when a SQLMesh run starts.
120
121        Args:
122            environment: The target environment of the run.
123        """
124        self.send(NotificationStatus.INFO, f"SQLMesh run started for environment `{environment}`.")

Notify when a SQLMesh run starts.

Arguments:
  • environment: The target environment of the run.
def notify_run_end(self, environment: str, *args: Any, **kwargs: Any) -> None:
126    def notify_run_end(self, environment: str, *args: t.Any, **kwargs: t.Any) -> None:
127        """Notify when a SQLMesh run ends.
128
129        Args:
130            environment: The target environment of the run.
131        """
132        self.send(
133            NotificationStatus.SUCCESS, f"SQLMesh run finished for environment `{environment}`."
134        )

Notify when a SQLMesh run ends.

Arguments:
  • environment: The target environment of the run.
def notify_migration_start(self, *args: Any, **kwargs: Any) -> None:
136    def notify_migration_start(self, *args: t.Any, **kwargs: t.Any) -> None:
137        """Notify when a SQLMesh migration starts."""
138        self.send(NotificationStatus.INFO, "SQLMesh migration started.")

Notify when a SQLMesh migration starts.

def notify_migration_end(self, *args: Any, **kwargs: Any) -> None:
140    def notify_migration_end(self, *args: t.Any, **kwargs: t.Any) -> None:
141        """Notify when a SQLMesh migration ends."""
142        self.send(NotificationStatus.SUCCESS, "SQLMesh migration finished.")

Notify when a SQLMesh migration ends.

def notify_apply_failure( self, environment: str, plan_id: str, exc: str, *args: Any, **kwargs: Any) -> None:
144    def notify_apply_failure(
145        self, environment: str, plan_id: str, exc: str, *args: t.Any, **kwargs: t.Any
146    ) -> None:
147        """Notify in the case of an apply failure.
148
149        Args:
150            environment: The target environment of the run.
151            plan_id: The plan id of the failed apply
152            exc: The exception stack trace.
153        """
154        self.send(
155            NotificationStatus.FAILURE,
156            f"Plan `{plan_id}` in environment `{environment}` apply failed.",
157            exc=exc,
158        )

Notify in the case of an apply failure.

Arguments:
  • environment: The target environment of the run.
  • plan_id: The plan id of the failed apply
  • exc: The exception stack trace.
def notify_run_failure(self, exc: str, *args: Any, **kwargs: Any) -> None:
160    def notify_run_failure(self, exc: str, *args: t.Any, **kwargs: t.Any) -> None:
161        """Notify in the case of a run failure.
162
163        Args:
164            exc: The exception stack trace.
165        """
166        self.send(NotificationStatus.FAILURE, "SQLMesh run failed.", exc=exc)

Notify in the case of a run failure.

Arguments:
  • exc: The exception stack trace.
def notify_audit_failure( self, audit_error: sqlmesh.utils.errors.AuditError, *args: Any, **kwargs: Any) -> None:
168    def notify_audit_failure(self, audit_error: AuditError, *args: t.Any, **kwargs: t.Any) -> None:
169        """Notify in the case of an audit failure.
170
171        Args:
172            audit_error: The AuditError object.
173        """
174        self.send(NotificationStatus.FAILURE, "Audit failure.", audit_error=audit_error)

Notify in the case of an audit failure.

Arguments:
  • audit_error: The AuditError object.
def notify_migration_failure(self, exc: str, *args: Any, **kwargs: Any) -> None:
176    def notify_migration_failure(self, exc: str, *args: t.Any, **kwargs: t.Any) -> None:
177        """Notify in the case of a migration failure.
178
179        Args:
180            exc: The exception stack trace.
181        """
182        self.send(NotificationStatus.FAILURE, "SQLMesh migration failed.", exc=exc)

Notify in the case of a migration failure.

Arguments:
  • exc: The exception stack trace.
is_configured: bool
184    @property
185    def is_configured(self) -> bool:
186        return True
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class BaseTextBasedNotificationTarget(BaseNotificationTarget):
189class BaseTextBasedNotificationTarget(BaseNotificationTarget):
190    """
191    A base class for unstructured notification targets (e.g.: console, email, etc.)
192    """
193
194    def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None:
195        """Send the notification message as text."""
196
197    def send(
198        self,
199        notification_status: NotificationStatus,
200        msg: str,
201        audit_error: t.Optional[AuditError] = None,
202        exc: t.Optional[str] = None,
203        **kwargs: t.Any,
204    ) -> None:
205        error = None
206        if audit_error:
207            error = str(audit_error)
208        elif exc:
209            error = exc
210
211        self.send_text_message(notification_status, msg if error is None else f"{msg}\n{error}")

A base class for unstructured notification targets (e.g.: console, email, etc.)

def send_text_message( self, notification_status: NotificationStatus, msg: str) -> None:
194    def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None:
195        """Send the notification message as text."""

Send the notification message as text.

def send( self, notification_status: NotificationStatus, msg: str, audit_error: Optional[sqlmesh.utils.errors.AuditError] = None, exc: Optional[str] = None, **kwargs: Any) -> None:
197    def send(
198        self,
199        notification_status: NotificationStatus,
200        msg: str,
201        audit_error: t.Optional[AuditError] = None,
202        exc: t.Optional[str] = None,
203        **kwargs: t.Any,
204    ) -> None:
205        error = None
206        if audit_error:
207            error = str(audit_error)
208        elif exc:
209            error = exc
210
211        self.send_text_message(notification_status, msg if error is None else f"{msg}\n{error}")

Sends notification with the provided message.

Arguments:
  • notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
  • msg: The message to send.
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
BaseNotificationTarget
type_
notify_on
notify_apply_start
notify_apply_end
notify_run_start
notify_run_end
notify_migration_start
notify_migration_end
notify_apply_failure
notify_run_failure
notify_audit_failure
notify_migration_failure
is_configured
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class ConsoleNotificationTarget(BaseTextBasedNotificationTarget):
214class ConsoleNotificationTarget(BaseTextBasedNotificationTarget):
215    """
216    Example console notification target. Keeping this around for testing purposes.
217    """
218
219    type_: t.Literal["console"] = Field(alias="type", default="console")
220    _console: t.Optional[Console] = None
221
222    @property
223    def console(self) -> Console:
224        if not self._console:
225            self._console = get_console()
226        return self._console
227
228    def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None:
229        if notification_status.is_success:
230            self.console.log_success(msg)
231        elif notification_status.is_failure:
232            self.console.log_error(msg)
233        else:
234            self.console.log_status_update(msg)

Example console notification target. Keeping this around for testing purposes.

type_: Literal['console']
console: sqlmesh.core.console.Console
222    @property
223    def console(self) -> Console:
224        if not self._console:
225            self._console = get_console()
226        return self._console
def send_text_message( self, notification_status: NotificationStatus, msg: str) -> None:
228    def send_text_message(self, notification_status: NotificationStatus, msg: str) -> None:
229        if notification_status.is_success:
230            self.console.log_success(msg)
231        elif notification_status.is_failure:
232            self.console.log_error(msg)
233        else:
234            self.console.log_status_update(msg)

Send the notification message as text.

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
BaseTextBasedNotificationTarget
send
BaseNotificationTarget
notify_on
notify_apply_start
notify_apply_end
notify_run_start
notify_run_end
notify_migration_start
notify_migration_end
notify_apply_failure
notify_run_failure
notify_audit_failure
notify_migration_failure
is_configured
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class BaseSlackNotificationTarget(BaseNotificationTarget):
237class BaseSlackNotificationTarget(BaseNotificationTarget):
238    def send(
239        self,
240        notification_status: NotificationStatus,
241        msg: str,
242        audit_error: t.Optional[AuditError] = None,
243        exc: t.Optional[str] = None,
244        **kwargs: t.Any,
245    ) -> None:
246        status_emoji = {
247            NotificationStatus.PROGRESS: slack.SlackAlertIcon.START,
248            NotificationStatus.SUCCESS: slack.SlackAlertIcon.SUCCESS,
249            NotificationStatus.FAILURE: slack.SlackAlertIcon.FAILURE,
250            NotificationStatus.WARNING: slack.SlackAlertIcon.WARNING,
251            NotificationStatus.INFO: slack.SlackAlertIcon.INFO,
252        }
253
254        composed = slack.message().add_primary_blocks(
255            slack.header_block(f"{status_emoji[notification_status]} SQLMesh Notification"),
256            slack.context_block(f"*Status:* `{notification_status.value}`"),
257            slack.divider_block(),
258            slack.text_section_block(f"*Message*: {msg}"),
259        )
260
261        details = []
262        if audit_error:
263            details = [
264                slack.fields_section_block(
265                    f"*Audit*: `{audit_error.audit_name}`",
266                    f"*Model*: `{audit_error.model_name}`",
267                    f"*Count*: `{audit_error.count}`",
268                ),
269                slack.preformatted_rich_text_block(audit_error.sql(pretty=True)),
270            ]
271        elif exc:
272            details = [slack.preformatted_rich_text_block(exc)]
273
274        composed.add_primary_blocks(
275            *details,
276            slack.divider_block(),
277            slack.context_block(
278                f"*SQLMesh Version:* {_sqlmesh_version()}", f"*Python Version:* {sys.version}"
279            ),
280        )
281
282        composed.add_text(msg)
283
284        self._send_slack_message(
285            composed=composed.slack_message,
286        )
287
288    def _send_slack_message(self, composed: slack.TSlackMessage) -> None:
289        """Send a composed message Slack.
290
291        Args:
292            composed: the formatted message to send to Slack
293        """

Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler.

Notification functions follow a naming convention of notify_ + NotificationEvent value.

def send( self, notification_status: NotificationStatus, msg: str, audit_error: Optional[sqlmesh.utils.errors.AuditError] = None, exc: Optional[str] = None, **kwargs: Any) -> None:
238    def send(
239        self,
240        notification_status: NotificationStatus,
241        msg: str,
242        audit_error: t.Optional[AuditError] = None,
243        exc: t.Optional[str] = None,
244        **kwargs: t.Any,
245    ) -> None:
246        status_emoji = {
247            NotificationStatus.PROGRESS: slack.SlackAlertIcon.START,
248            NotificationStatus.SUCCESS: slack.SlackAlertIcon.SUCCESS,
249            NotificationStatus.FAILURE: slack.SlackAlertIcon.FAILURE,
250            NotificationStatus.WARNING: slack.SlackAlertIcon.WARNING,
251            NotificationStatus.INFO: slack.SlackAlertIcon.INFO,
252        }
253
254        composed = slack.message().add_primary_blocks(
255            slack.header_block(f"{status_emoji[notification_status]} SQLMesh Notification"),
256            slack.context_block(f"*Status:* `{notification_status.value}`"),
257            slack.divider_block(),
258            slack.text_section_block(f"*Message*: {msg}"),
259        )
260
261        details = []
262        if audit_error:
263            details = [
264                slack.fields_section_block(
265                    f"*Audit*: `{audit_error.audit_name}`",
266                    f"*Model*: `{audit_error.model_name}`",
267                    f"*Count*: `{audit_error.count}`",
268                ),
269                slack.preformatted_rich_text_block(audit_error.sql(pretty=True)),
270            ]
271        elif exc:
272            details = [slack.preformatted_rich_text_block(exc)]
273
274        composed.add_primary_blocks(
275            *details,
276            slack.divider_block(),
277            slack.context_block(
278                f"*SQLMesh Version:* {_sqlmesh_version()}", f"*Python Version:* {sys.version}"
279            ),
280        )
281
282        composed.add_text(msg)
283
284        self._send_slack_message(
285            composed=composed.slack_message,
286        )

Sends notification with the provided message.

Arguments:
  • notification_status: The status of the notification. One of: success, failure, warning, info, or progress.
  • msg: The message to send.
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
BaseNotificationTarget
type_
notify_on
notify_apply_start
notify_apply_end
notify_run_start
notify_run_end
notify_migration_start
notify_migration_end
notify_apply_failure
notify_run_failure
notify_audit_failure
notify_migration_failure
is_configured
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class SlackWebhookNotificationTarget(BaseSlackNotificationTarget):
296class SlackWebhookNotificationTarget(BaseSlackNotificationTarget):
297    url: t.Optional[str] = None
298    type_: t.Literal["slack_webhook"] = Field(alias="type", default="slack_webhook")
299    _client: t.Optional[WebhookClient] = None
300
301    @property
302    def client(self) -> WebhookClient:
303        if not self._client:
304            try:
305                from slack_sdk import WebhookClient
306            except ModuleNotFoundError as e:
307                raise MissingDependencyError(
308                    "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them."
309                ) from e
310
311            if not self.url:
312                raise ConfigError("Missing Slack webhook URL")
313
314            self._client = WebhookClient(url=self.url)
315        return self._client
316
317    def _send_slack_message(self, composed: slack.TSlackMessage) -> None:
318        self.client.send(
319            text=composed["text"],
320            blocks=composed["blocks"],
321            attachments=composed["attachments"],  # type: ignore
322        )
323
324    @property
325    def is_configured(self) -> bool:
326        return bool(self.url)

Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler.

Notification functions follow a naming convention of notify_ + NotificationEvent value.

url: Optional[str]
type_: Literal['slack_webhook']
client: slack_sdk.webhook.client.WebhookClient
301    @property
302    def client(self) -> WebhookClient:
303        if not self._client:
304            try:
305                from slack_sdk import WebhookClient
306            except ModuleNotFoundError as e:
307                raise MissingDependencyError(
308                    "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them."
309                ) from e
310
311            if not self.url:
312                raise ConfigError("Missing Slack webhook URL")
313
314            self._client = WebhookClient(url=self.url)
315        return self._client
is_configured: bool
324    @property
325    def is_configured(self) -> bool:
326        return bool(self.url)
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
BaseSlackNotificationTarget
send
BaseNotificationTarget
notify_on
notify_apply_start
notify_apply_end
notify_run_start
notify_run_end
notify_migration_start
notify_migration_end
notify_apply_failure
notify_run_failure
notify_audit_failure
notify_migration_failure
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class SlackApiNotificationTarget(BaseSlackNotificationTarget):
329class SlackApiNotificationTarget(BaseSlackNotificationTarget):
330    token: t.Optional[str] = None
331    channel: t.Optional[str] = None
332    type_: t.Literal["slack_api"] = Field(alias="type", default="slack_api")
333    _client: t.Optional[WebClient] = None
334
335    @property
336    def client(self) -> WebClient:
337        if not self._client:
338            try:
339                from slack_sdk import WebClient
340            except ModuleNotFoundError as e:
341                raise MissingDependencyError(
342                    "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them."
343                ) from e
344
345            self._client = WebClient(token=self.token)
346        return self._client
347
348    def _send_slack_message(self, composed: slack.TSlackMessage) -> None:
349        if not self.channel:
350            raise ConfigError("Missing Slack channel for notification")
351
352        self.client.chat_postMessage(
353            channel=self.channel,
354            text=composed["text"],
355            blocks=composed["blocks"],
356            attachments=composed["attachments"],  # type: ignore
357        )
358
359    @property
360    def is_configured(self) -> bool:
361        return all((self.token, self.channel))

Base notification target model. Provides a command for sending notifications that is currently only used by the built-in scheduler.

Notification functions follow a naming convention of notify_ + NotificationEvent value.

token: Optional[str]
channel: Optional[str]
type_: Literal['slack_api']
client: slack_sdk.web.client.WebClient
335    @property
336    def client(self) -> WebClient:
337        if not self._client:
338            try:
339                from slack_sdk import WebClient
340            except ModuleNotFoundError as e:
341                raise MissingDependencyError(
342                    "Missing Slack dependencies. Run `pip install 'sqlmesh[slack]'` to install them."
343                ) from e
344
345            self._client = WebClient(token=self.token)
346        return self._client
is_configured: bool
359    @property
360    def is_configured(self) -> bool:
361        return all((self.token, self.channel))
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
BaseSlackNotificationTarget
send
BaseNotificationTarget
notify_on
notify_apply_start
notify_apply_end
notify_run_start
notify_run_end
notify_migration_start
notify_migration_end
notify_apply_failure
notify_run_failure
notify_audit_failure
notify_migration_failure
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class BasicSMTPNotificationTarget(BaseTextBasedNotificationTarget):
364class BasicSMTPNotificationTarget(BaseTextBasedNotificationTarget):
365    host: t.Optional[str] = None
366    port: int = 465
367    user: t.Optional[str] = None
368    password: t.Optional[SecretStr] = None
369    sender: t.Optional[str] = None
370    recipients: t.Optional[t.FrozenSet[str]] = None
371    subject: t.Optional[str] = "SQLMesh Notification"
372    type_: t.Literal["smtp"] = Field(alias="type", default="smtp")
373
374    def send_text_message(
375        self,
376        notification_status: NotificationStatus,
377        msg: str,
378    ) -> None:
379        if not self.host:
380            raise ConfigError("Missing SMTP host for notification")
381
382        email = EmailMessage()
383        email["Subject"] = self.subject
384        email["To"] = ",".join(self.recipients or [])
385        email["From"] = self.sender
386        email.set_content(msg)
387        with smtplib.SMTP_SSL(host=self.host, port=self.port) as smtp:
388            if self.user and self.password:
389                smtp.login(user=self.user, password=self.password.get_secret_value())
390            smtp.send_message(email)
391
392    @property
393    def is_configured(self) -> bool:
394        return all((self.host, self.user, self.password, self.sender))

A base class for unstructured notification targets (e.g.: console, email, etc.)

host: Optional[str]
port: int
user: Optional[str]
password: Optional[pydantic.types.SecretStr]
sender: Optional[str]
recipients: Optional[FrozenSet[str]]
subject: Optional[str]
type_: Literal['smtp']
def send_text_message( self, notification_status: NotificationStatus, msg: str) -> None:
374    def send_text_message(
375        self,
376        notification_status: NotificationStatus,
377        msg: str,
378    ) -> None:
379        if not self.host:
380            raise ConfigError("Missing SMTP host for notification")
381
382        email = EmailMessage()
383        email["Subject"] = self.subject
384        email["To"] = ",".join(self.recipients or [])
385        email["From"] = self.sender
386        email.set_content(msg)
387        with smtplib.SMTP_SSL(host=self.host, port=self.port) as smtp:
388            if self.user and self.password:
389                smtp.login(user=self.user, password=self.password.get_secret_value())
390            smtp.send_message(email)

Send the notification message as text.

is_configured: bool
392    @property
393    def is_configured(self) -> bool:
394        return all((self.host, self.user, self.password, self.sender))
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
BaseTextBasedNotificationTarget
send
BaseNotificationTarget
notify_on
notify_apply_start
notify_apply_end
notify_run_start
notify_run_end
notify_migration_start
notify_migration_end
notify_apply_failure
notify_run_failure
notify_audit_failure
notify_migration_failure
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class GenericNotificationTarget(BaseNotificationTarget):
397class GenericNotificationTarget(BaseNotificationTarget):
398    """A generic notification target that can be used to create custom notification targets.
399
400    This target is not meant to be used directly, but rather as a base class for custom notification targets.
401
402    The `send` method should be overridden to provide the actual notification functionality.
403
404    Example:
405    ```python
406    class MyCustomNotificationTarget(GenericNotificationTarget):
407        def send(self, notification_status: NotificationStatus, msg: str, audit_error: t.Optional[AuditError] = None, exc: t.Optional[str] = None, **kwargs: t.Any) -> None:
408            error = None
409            if audit_error:
410                error = str(audit_error)
411            elif exc:
412                error = exc
413
414            if error:
415                msg = f"{error} - {msg}"
416            print(f"Sending notification: {msg}")
417    ```
418    """
419
420    type_: t.Literal["generic"] = Field(alias="type", default="generic")

A generic notification target that can be used to create custom notification targets.

This target is not meant to be used directly, but rather as a base class for custom notification targets.

The send method should be overridden to provide the actual notification functionality.

Example:

class MyCustomNotificationTarget(GenericNotificationTarget):
    def send(self, notification_status: NotificationStatus, msg: str, audit_error: t.Optional[AuditError] = None, exc: t.Optional[str] = None, **kwargs: t.Any) -> None:
        error = None
        if audit_error:
            error = str(audit_error)
        elif exc:
            error = exc

        if error:
            msg = f"{error} - {msg}"
        print(f"Sending notification: {msg}")
type_: Literal['generic']
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
BaseNotificationTarget
notify_on
send
notify_apply_start
notify_apply_end
notify_run_start
notify_run_end
notify_migration_start
notify_migration_end
notify_apply_failure
notify_run_failure
notify_audit_failure
notify_migration_failure
is_configured
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
NotificationTarget = typing.Annotated[typing.Union[BasicSMTPNotificationTarget, GenericNotificationTarget, ConsoleNotificationTarget, SlackApiNotificationTarget, SlackWebhookNotificationTarget], FieldInfo(annotation=NoneType, required=True, discriminator='type_')]
class NotificationTargetManager:
435class NotificationTargetManager:
436    """Wrapper around a list of notification targets.
437
438    Calling a notification target's "notify_" method on this object will call it
439    on all registered notification targets.
440    """
441
442    def __init__(
443        self,
444        notification_targets: t.Dict[NotificationEvent, t.Set[NotificationTarget]] | None = None,
445        user_notification_targets: t.Dict[str, t.Set[NotificationTarget]] | None = None,
446        username: str | None = None,
447    ) -> None:
448        self.notification_targets = notification_targets or {}
449        self.user_notification_targets = user_notification_targets or {}
450        self.username = username
451
452    def notify(self, event: NotificationEvent, *args: t.Any, **kwargs: t.Any) -> None:
453        """Call the 'notify_`event`' function of all notification targets that care about the event."""
454        if self.username:
455            self.notify_user(event, self.username, *args, **kwargs)
456        else:
457            for notification_target in self.notification_targets.get(event, set()):
458                notify_func = self._get_notification_function(notification_target, event)
459                notify_func(*args, **kwargs)
460
461    def notify_user(
462        self, event: NotificationEvent, username: str, *args: t.Any, **kwargs: t.Any
463    ) -> None:
464        """Call the 'notify_`event`' function of the user's notification targets that care about the event."""
465        notification_targets = self.user_notification_targets.get(username, set())
466        for notification_target in notification_targets:
467            if event in notification_target.notify_on:
468                notify_func = self._get_notification_function(notification_target, event)
469                notify_func(*args, **kwargs)
470
471    def _get_notification_function(
472        self, notification_target: NotificationTarget, event: NotificationEvent
473    ) -> t.Callable:
474        """Fetch the function for a notification event"""
475        return getattr(notification_target, f"notify_{event.value}")

Wrapper around a list of notification targets.

Calling a notification target's "notify_" method on this object will call it on all registered notification targets.

NotificationTargetManager( notification_targets: Optional[Dict[NotificationEvent, Set[Annotated[Union[BasicSMTPNotificationTarget, GenericNotificationTarget, ConsoleNotificationTarget, SlackApiNotificationTarget, SlackWebhookNotificationTarget], FieldInfo(annotation=NoneType, required=True, discriminator='type_')]]]] = None, user_notification_targets: Optional[Dict[str, Set[Annotated[Union[BasicSMTPNotificationTarget, GenericNotificationTarget, ConsoleNotificationTarget, SlackApiNotificationTarget, SlackWebhookNotificationTarget], FieldInfo(annotation=NoneType, required=True, discriminator='type_')]]]] = None, username: str | None = None)
442    def __init__(
443        self,
444        notification_targets: t.Dict[NotificationEvent, t.Set[NotificationTarget]] | None = None,
445        user_notification_targets: t.Dict[str, t.Set[NotificationTarget]] | None = None,
446        username: str | None = None,
447    ) -> None:
448        self.notification_targets = notification_targets or {}
449        self.user_notification_targets = user_notification_targets or {}
450        self.username = username
notification_targets
user_notification_targets
username
def notify( self, event: NotificationEvent, *args: Any, **kwargs: Any) -> None:
452    def notify(self, event: NotificationEvent, *args: t.Any, **kwargs: t.Any) -> None:
453        """Call the 'notify_`event`' function of all notification targets that care about the event."""
454        if self.username:
455            self.notify_user(event, self.username, *args, **kwargs)
456        else:
457            for notification_target in self.notification_targets.get(event, set()):
458                notify_func = self._get_notification_function(notification_target, event)
459                notify_func(*args, **kwargs)

Call the 'notify_event' function of all notification targets that care about the event.

def notify_user( self, event: NotificationEvent, username: str, *args: Any, **kwargs: Any) -> None:
461    def notify_user(
462        self, event: NotificationEvent, username: str, *args: t.Any, **kwargs: t.Any
463    ) -> None:
464        """Call the 'notify_`event`' function of the user's notification targets that care about the event."""
465        notification_targets = self.user_notification_targets.get(username, set())
466        for notification_target in notification_targets:
467            if event in notification_target.notify_on:
468                notify_func = self._get_notification_function(notification_target, event)
469                notify_func(*args, **kwargs)

Call the 'notify_event' function of the user's notification targets that care about the event.