Edit on GitHub

sqlmesh.core.node

  1from __future__ import annotations
  2
  3import typing as t
  4import zoneinfo
  5from datetime import datetime
  6from enum import Enum
  7from pathlib import Path
  8
  9from pydantic import Field
 10from sqlglot import exp
 11
 12from sqlmesh.utils.cron import CroniterCache
 13from sqlmesh.utils.date import TimeLike, to_datetime, validate_date_range
 14from sqlmesh.utils.errors import ConfigError
 15from sqlmesh.utils.pydantic import (
 16    PydanticModel,
 17    SQLGlotCron,
 18    field_validator,
 19    model_validator,
 20    PRIVATE_FIELDS,
 21)
 22
 23if t.TYPE_CHECKING:
 24    from sqlmesh.core._typing import Self
 25    from sqlmesh.core.snapshot import Node
 26
 27
 28class IntervalUnit(str, Enum):
 29    """IntervalUnit is the inferred granularity of an incremental node.
 30
 31    IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred
 32    based on the cron schedule of a node. The minimum time delta between a sample set of dates
 33    is used to determine which unit a node's schedule is.
 34
 35    It's designed to align with common partitioning schemes, hence why there is no WEEK unit
 36    because generally tables are not partitioned by week
 37    """
 38
 39    YEAR = "year"
 40    MONTH = "month"
 41    DAY = "day"
 42    HOUR = "hour"
 43    HALF_HOUR = "half_hour"
 44    QUARTER_HOUR = "quarter_hour"
 45    FIVE_MINUTE = "five_minute"
 46
 47    @classmethod
 48    def from_cron(klass, cron: str) -> IntervalUnit:
 49        croniter = CroniterCache(cron)
 50        interval_seconds = croniter.interval_seconds
 51
 52        if not interval_seconds:
 53            samples = [croniter.get_next() for _ in range(5)]
 54            interval_seconds = int(min(b - a for a, b in zip(samples, samples[1:])).total_seconds())
 55
 56        for unit, seconds in INTERVAL_SECONDS.items():
 57            if seconds <= interval_seconds:
 58                return unit
 59        raise ConfigError(f"Invalid cron '{cron}': must run at a frequency of 5 minutes or slower.")
 60
 61    @property
 62    def is_date_granularity(self) -> bool:
 63        return self in (IntervalUnit.YEAR, IntervalUnit.MONTH, IntervalUnit.DAY)
 64
 65    @property
 66    def is_year(self) -> bool:
 67        return self == IntervalUnit.YEAR
 68
 69    @property
 70    def is_month(self) -> bool:
 71        return self == IntervalUnit.MONTH
 72
 73    @property
 74    def is_day(self) -> bool:
 75        return self == IntervalUnit.DAY
 76
 77    @property
 78    def is_hour(self) -> bool:
 79        return self == IntervalUnit.HOUR
 80
 81    @property
 82    def is_minute(self) -> bool:
 83        return self in (IntervalUnit.FIVE_MINUTE, IntervalUnit.QUARTER_HOUR, IntervalUnit.HALF_HOUR)
 84
 85    @property
 86    def cron_expr(self) -> str:
 87        if self == IntervalUnit.FIVE_MINUTE:
 88            return "*/5 * * * *"
 89        if self == IntervalUnit.QUARTER_HOUR:
 90            return "*/15 * * * *"
 91        if self == IntervalUnit.HALF_HOUR:
 92            return "*/30 * * * *"
 93        if self == IntervalUnit.HOUR:
 94            return "0 * * * *"
 95        if self == IntervalUnit.DAY:
 96            return "0 0 * * *"
 97        if self == IntervalUnit.MONTH:
 98            return "0 0 1 * *"
 99        if self == IntervalUnit.YEAR:
100            return "0 0 1 1 *"
101        return ""
102
103    def croniter(self, value: TimeLike) -> CroniterCache:
104        return CroniterCache(self.cron_expr, value)
105
106    def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime:
107        """
108        Get the next timestamp given a time-like value for this interval unit.
109
110        Args:
111            value: A variety of date formats.
112            estimate: Whether or not to estimate, only use this if the value is floored.
113
114        Returns:
115            The timestamp for the next run.
116        """
117        return self.croniter(value).get_next(estimate=estimate)
118
119    def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime:
120        """
121        Get the previous timestamp given a time-like value for this interval unit.
122
123        Args:
124            value: A variety of date formats.
125            estimate: Whether or not to estimate, only use this if the value is floored.
126
127        Returns:
128            The timestamp for the previous run.
129        """
130        return self.croniter(value).get_prev(estimate=estimate)
131
132    def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime:
133        """
134        Get the floor timestamp given a time-like value for this interval unit.
135
136        Args:
137            value: A variety of date formats.
138            estimate: Whether or not to estimate, only use this if the value is floored.
139
140        Returns:
141            The timestamp floor.
142        """
143        croniter = self.croniter(value)
144        croniter.get_next(estimate=estimate)
145        return croniter.get_prev(estimate=True)
146
147    @property
148    def seconds(self) -> int:
149        return INTERVAL_SECONDS[self]
150
151    @property
152    def milliseconds(self) -> int:
153        return self.seconds * 1000
154
155
156class DbtNodeInfo(PydanticModel):
157    """
158    Represents dbt-specific model information set by the dbt loader and intended to be made available at the Snapshot level
159    (as opposed to hidden within the individual model jinja macro registries).
160
161    This allows for things like injecting implementations of variables / functions into the Jinja context that are compatible with
162    their dbt equivalents but are backed by the sqlmesh snapshots in any given plan / environment
163    """
164
165    unique_id: str
166    """This is the node/resource name/unique_id that's used as the node key in the dbt manifest.
167    It's prefixed by the resource type and is exposed in context variables like {{ selected_resources }}.
168
169    Examples:
170        - test.jaffle_shop.unique_stg_orders_order_id.e3b841c71a
171        - seed.jaffle_shop.raw_payments
172        - model.jaffle_shop.stg_orders
173    """
174
175    name: str
176    """Name of this object in the dbt global namespace, used by things like {{ ref() }} calls.    
177    
178    Examples:
179        - unique_stg_orders_order_id
180        - raw_payments
181        - stg_orders
182    """
183
184    fqn: str
185    """Used for selectors in --select/--exclude.
186    Takes the filesystem into account so may be structured differently to :unique_id.
187    
188    Examples:
189        - jaffle_shop.staging.unique_stg_orders_order_id
190        - jaffle_shop.raw_payments
191        - jaffle_shop.staging.stg_orders
192    """
193
194    alias: t.Optional[str] = None
195    """This is dbt's way of overriding the _physical table_ a model is written to.
196
197    It's used in the following situation:
198     - Say you have two models, "stg_customers" and "customers"
199     - You want "stg_customers" to be written to the "staging" schema as eg "staging.customers" - NOT "staging.stg_customers"
200     - But you cant rename the file to "customers" because it will conflict with your other model file "customers"
201     - Even if you put it in a different folder, eg "staging/customers.sql" - dbt still has a global namespace so it will conflict
202        when you try to do something like "{{ ref('customers') }}"
203     - So dbt's solution to this problem is to keep calling it "stg_customers" at the dbt project/model level,
204        but allow overriding the physical table to "customers" via something like "{{ config(alias='customers', schema='staging') }}"
205
206    Note that if :alias is set, it does *not* replace :name at the model level and cannot be used interchangably with :name.
207    It also does not affect the :fqn or :unique_id. It's just used to override :name when it comes time to generate the physical table name.
208    """
209
210    @model_validator(mode="after")
211    def post_init(self) -> Self:
212        # by default, dbt sets alias to the same as :name
213        # however, we only want to include :alias if it is actually different / actually providing an override
214        if self.alias == self.name:
215            self.alias = None
216        return self
217
218    def to_expression(self) -> exp.Expr:
219        """Produce a SQLGlot expression representing this object, for use in things like the model/audit definition renderers"""
220        return exp.tuple_(
221            *(
222                exp.PropertyEQ(this=exp.var(k), expression=exp.Literal.string(v))
223                for k, v in sorted(self.model_dump(exclude_none=True).items())
224            )
225        )
226
227
228class DbtInfoMixin:
229    """This mixin encapsulates properties that only exist for dbt compatibility and are otherwise not required
230    for native projects"""
231
232    @property
233    def dbt_node_info(self) -> t.Optional[DbtNodeInfo]:
234        raise NotImplementedError()
235
236    @property
237    def dbt_unique_id(self) -> t.Optional[str]:
238        """Used for compatibility with jinja context variables such as {{ selected_resources }}"""
239        if self.dbt_node_info:
240            return self.dbt_node_info.unique_id
241        return None
242
243    @property
244    def dbt_fqn(self) -> t.Optional[str]:
245        """Used in the selector engine for compatibility with selectors that select models by dbt fqn"""
246        if self.dbt_node_info:
247            return self.dbt_node_info.fqn
248        return None
249
250
251# this must be sorted in descending order
252INTERVAL_SECONDS = {
253    IntervalUnit.YEAR: 60 * 60 * 24 * 365,
254    IntervalUnit.MONTH: 60 * 60 * 24 * 28,
255    IntervalUnit.DAY: 60 * 60 * 24,
256    IntervalUnit.HOUR: 60 * 60,
257    IntervalUnit.HALF_HOUR: 60 * 30,
258    IntervalUnit.QUARTER_HOUR: 60 * 15,
259    IntervalUnit.FIVE_MINUTE: 60 * 5,
260}
261
262
263class _Node(DbtInfoMixin, PydanticModel):
264    """
265    Node is the core abstraction for entity that can be executed within the scheduler.
266
267    Args:
268        name: The name of the node.
269        project: The name of the project this node belongs to, used in multi-repo deployments.
270        description: The optional node description.
271        owner: The owner of the node.
272        start: The earliest date that the node will be executed for. If this is None,
273            then the date is inferred by taking the most recent start date of its ancestors.
274            The start date can be a static datetime or a relative datetime like "1 year ago"
275        end: The latest date that the model will be executed for. If this is None,
276            the date from the scheduler will be used
277        cron: A cron string specifying how often the node should be run, leveraging the
278            [croniter](https://github.com/kiorky/croniter) library.
279        cron_tz: Time zone for the cron, defaults to utc, [IANA time zones](https://docs.python.org/3/library/zoneinfo.html).
280        interval_unit: The duration of an interval for the node. By default, it is computed from the cron expression.
281        tags: A list of tags that can be used to filter nodes.
282        stamp: An optional arbitrary string sequence used to create new node versions without making
283            changes to any of the functional components of the definition.
284    """
285
286    name: str
287    project: str = ""
288    description: t.Optional[str] = None
289    owner: t.Optional[str] = None
290    start: t.Optional[TimeLike] = None
291    end: t.Optional[TimeLike] = None
292    cron: SQLGlotCron = "@daily"
293    cron_tz: t.Optional[zoneinfo.ZoneInfo] = None
294    interval_unit_: t.Optional[IntervalUnit] = Field(alias="interval_unit", default=None)
295    tags: t.List[str] = []
296    stamp: t.Optional[str] = None
297    dbt_node_info_: t.Optional[DbtNodeInfo] = Field(alias="dbt_node_info", default=None)
298    _path: t.Optional[Path] = None
299    _data_hash: t.Optional[str] = None
300    _metadata_hash: t.Optional[str] = None
301
302    _croniter: t.Optional[CroniterCache] = None
303    __inferred_interval_unit: t.Optional[IntervalUnit] = None
304
305    def __str__(self) -> str:
306        path = f": {self._path.name}" if self._path else ""
307        return f"{self.__class__.__name__}<{self.name}{path}>"
308
309    def __getstate__(self) -> t.Dict[t.Any, t.Any]:
310        state = super().__getstate__()
311        private = state[PRIVATE_FIELDS]
312        private["_data_hash"] = None
313        private["_metadata_hash"] = None
314        return state
315
316    def copy(self, **kwargs: t.Any) -> Self:
317        node = super().copy(**kwargs)
318        node._data_hash = None
319        node._metadata_hash = None
320        return node
321
322    @field_validator("name", mode="before")
323    @classmethod
324    def _name_validator(cls, v: t.Any) -> t.Optional[str]:
325        if v is None:
326            return None
327        if isinstance(v, exp.Expr):
328            return v.meta["sql"]
329        return str(v)
330
331    @field_validator("cron_tz", mode="before")
332    def _cron_tz_validator(cls, v: t.Any) -> t.Optional[zoneinfo.ZoneInfo]:
333        if not v or v == "UTC":
334            return None
335
336        v = str_or_exp_to_str(v)
337
338        try:
339            return zoneinfo.ZoneInfo(v)
340        except Exception as e:
341            available_timezones = zoneinfo.available_timezones()
342
343            if available_timezones:
344                raise ConfigError(f"{e}. {v} must be in {available_timezones}.")
345            else:
346                raise ConfigError(
347                    f"{e}. IANA time zone data is not available on your system. `pip install tzdata` to leverage cron time zones or remove this field which will default to UTC."
348                )
349
350        return None
351
352    @field_validator("start", "end", mode="before")
353    @classmethod
354    def _date_validator(cls, v: t.Any) -> t.Optional[TimeLike]:
355        if isinstance(v, exp.Expr):
356            v = v.name
357        if v and not to_datetime(v):
358            raise ConfigError(f"'{v}' needs to be time-like: https://pypi.org/project/dateparser")
359        return v
360
361    @field_validator("owner", "description", "stamp", mode="before")
362    @classmethod
363    def _string_expr_validator(cls, v: t.Any) -> t.Optional[str]:
364        return str_or_exp_to_str(v)
365
366    @field_validator("interval_unit_", mode="before")
367    @classmethod
368    def _interval_unit_validator(cls, v: t.Any) -> t.Optional[t.Union[IntervalUnit, str]]:
369        if isinstance(v, IntervalUnit):
370            return v
371        v = str_or_exp_to_str(v)
372        if v:
373            v = v.lower()
374        return v
375
376    @model_validator(mode="after")
377    def _node_root_validator(self) -> Self:
378        interval_unit = self.interval_unit_
379        if interval_unit and not getattr(self, "allow_partials", None):
380            cron = self.cron
381            max_interval_unit = IntervalUnit.from_cron(cron)
382            if interval_unit.seconds > max_interval_unit.seconds:
383                raise ConfigError(
384                    f"Cron '{cron}' cannot be more frequent than interval unit '{interval_unit.value}'. "
385                    "If this is intentional, set allow_partials to True."
386                )
387
388        start = self.start
389        end = self.end
390
391        if end is not None and start is None:
392            raise ConfigError("Must define a start date if an end date is defined.")
393        validate_date_range(start, end)
394        return self
395
396    @property
397    def batch_size(self) -> t.Optional[int]:
398        """The maximal number of units in a single task for a backfill."""
399        return None
400
401    @property
402    def batch_concurrency(self) -> t.Optional[int]:
403        """The maximal number of batches that can run concurrently for a backfill."""
404        return None
405
406    @property
407    def interval_unit(self) -> IntervalUnit:
408        """Returns the interval unit using which data intervals are computed for this node."""
409        if self.interval_unit_ is not None:
410            return self.interval_unit_
411        return self._inferred_interval_unit()
412
413    @property
414    def depends_on(self) -> t.Set[str]:
415        return set()
416
417    @property
418    def fqn(self) -> str:
419        return self.name
420
421    @property
422    def data_hash(self) -> str:
423        """
424        Computes the data hash for the node.
425
426        Returns:
427            The data hash for the node.
428        """
429        raise NotImplementedError
430
431    @property
432    def metadata_hash(self) -> str:
433        """
434        Computes the metadata hash for the node.
435
436        Returns:
437            The metadata hash for the node.
438        """
439        raise NotImplementedError
440
441    def is_metadata_only_change(self, previous: _Node) -> bool:
442        """Determines if this node is a metadata only change in relation to the `previous` node.
443
444        Args:
445            previous: The previous node to compare against.
446
447        Returns:
448            True if this node is a metadata only change, False otherwise.
449        """
450        return self.data_hash == previous.data_hash and self.metadata_hash != previous.metadata_hash
451
452    def is_data_change(self, previous: _Node) -> bool:
453        """Determines if this node is a data change in relation to the `previous` node.
454
455        Args:
456            previous: The previous node to compare against.
457
458        Returns:
459            True if this node is a data change, False otherwise.
460        """
461        return (
462            self.data_hash != previous.data_hash or self.metadata_hash != previous.metadata_hash
463        ) and not self.is_metadata_only_change(previous)
464
465    def croniter(self, value: TimeLike) -> CroniterCache:
466        if self._croniter is None:
467            self._croniter = CroniterCache(self.cron, value, tz=self.cron_tz)
468        else:
469            self._croniter.curr = to_datetime(value, tz=self.cron_tz)
470        return self._croniter
471
472    def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime:
473        """
474        Get the next timestamp given a time-like value and the node's cron.
475
476        Args:
477            value: A variety of date formats.
478            estimate: Whether or not to estimate, only use this if the value is floored.
479
480        Returns:
481            The timestamp for the next run.
482        """
483        return self.croniter(value).get_next(estimate=estimate)
484
485    def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime:
486        """
487        Get the previous timestamp given a time-like value and the node's cron.
488
489        Args:
490            value: A variety of date formats.
491            estimate: Whether or not to estimate, only use this if the value is floored.
492
493        Returns:
494            The timestamp for the previous run.
495        """
496        return self.croniter(value).get_prev(estimate=estimate)
497
498    def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime:
499        """
500        Get the floor timestamp given a time-like value and the node's cron.
501
502        Args:
503            value: A variety of date formats.
504            estimate: Whether or not to estimate, only use this if the value is floored.
505
506        Returns:
507            The timestamp floor.
508        """
509        return self.croniter(self.cron_next(value, estimate=estimate)).get_prev(estimate=True)
510
511    def text_diff(self, other: Node, rendered: bool = False) -> str:
512        """Produce a text diff against another node.
513
514        Args:
515            other: The node to diff against. Must be of the same type.
516
517        Returns:
518            A unified text diff showing additions and deletions.
519        """
520        raise NotImplementedError
521
522    def _inferred_interval_unit(self) -> IntervalUnit:
523        """Infers the interval unit from the cron expression.
524
525        The interval unit is used to determine the lag applied to start_date and end_date for node rendering and intervals.
526
527        Returns:
528            The IntervalUnit enum.
529        """
530        if not self.__inferred_interval_unit:
531            self.__inferred_interval_unit = IntervalUnit.from_cron(self.cron)
532        return self.__inferred_interval_unit
533
534    @property
535    def is_model(self) -> bool:
536        """Return True if this is a model node"""
537        return False
538
539    @property
540    def is_audit(self) -> bool:
541        """Return True if this is an audit node"""
542        return False
543
544    @property
545    def dbt_node_info(self) -> t.Optional[DbtNodeInfo]:
546        return self.dbt_node_info_
547
548
549class NodeType(str, Enum):
550    MODEL = "model"
551    AUDIT = "audit"
552
553    def __str__(self) -> str:
554        return self.name
555
556
557def str_or_exp_to_str(v: t.Any) -> t.Optional[str]:
558    if isinstance(v, exp.Expr):
559        return v.name
560    return str(v) if v is not None else None
class IntervalUnit(builtins.str, enum.Enum):
 29class IntervalUnit(str, Enum):
 30    """IntervalUnit is the inferred granularity of an incremental node.
 31
 32    IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred
 33    based on the cron schedule of a node. The minimum time delta between a sample set of dates
 34    is used to determine which unit a node's schedule is.
 35
 36    It's designed to align with common partitioning schemes, hence why there is no WEEK unit
 37    because generally tables are not partitioned by week
 38    """
 39
 40    YEAR = "year"
 41    MONTH = "month"
 42    DAY = "day"
 43    HOUR = "hour"
 44    HALF_HOUR = "half_hour"
 45    QUARTER_HOUR = "quarter_hour"
 46    FIVE_MINUTE = "five_minute"
 47
 48    @classmethod
 49    def from_cron(klass, cron: str) -> IntervalUnit:
 50        croniter = CroniterCache(cron)
 51        interval_seconds = croniter.interval_seconds
 52
 53        if not interval_seconds:
 54            samples = [croniter.get_next() for _ in range(5)]
 55            interval_seconds = int(min(b - a for a, b in zip(samples, samples[1:])).total_seconds())
 56
 57        for unit, seconds in INTERVAL_SECONDS.items():
 58            if seconds <= interval_seconds:
 59                return unit
 60        raise ConfigError(f"Invalid cron '{cron}': must run at a frequency of 5 minutes or slower.")
 61
 62    @property
 63    def is_date_granularity(self) -> bool:
 64        return self in (IntervalUnit.YEAR, IntervalUnit.MONTH, IntervalUnit.DAY)
 65
 66    @property
 67    def is_year(self) -> bool:
 68        return self == IntervalUnit.YEAR
 69
 70    @property
 71    def is_month(self) -> bool:
 72        return self == IntervalUnit.MONTH
 73
 74    @property
 75    def is_day(self) -> bool:
 76        return self == IntervalUnit.DAY
 77
 78    @property
 79    def is_hour(self) -> bool:
 80        return self == IntervalUnit.HOUR
 81
 82    @property
 83    def is_minute(self) -> bool:
 84        return self in (IntervalUnit.FIVE_MINUTE, IntervalUnit.QUARTER_HOUR, IntervalUnit.HALF_HOUR)
 85
 86    @property
 87    def cron_expr(self) -> str:
 88        if self == IntervalUnit.FIVE_MINUTE:
 89            return "*/5 * * * *"
 90        if self == IntervalUnit.QUARTER_HOUR:
 91            return "*/15 * * * *"
 92        if self == IntervalUnit.HALF_HOUR:
 93            return "*/30 * * * *"
 94        if self == IntervalUnit.HOUR:
 95            return "0 * * * *"
 96        if self == IntervalUnit.DAY:
 97            return "0 0 * * *"
 98        if self == IntervalUnit.MONTH:
 99            return "0 0 1 * *"
100        if self == IntervalUnit.YEAR:
101            return "0 0 1 1 *"
102        return ""
103
104    def croniter(self, value: TimeLike) -> CroniterCache:
105        return CroniterCache(self.cron_expr, value)
106
107    def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime:
108        """
109        Get the next timestamp given a time-like value for this interval unit.
110
111        Args:
112            value: A variety of date formats.
113            estimate: Whether or not to estimate, only use this if the value is floored.
114
115        Returns:
116            The timestamp for the next run.
117        """
118        return self.croniter(value).get_next(estimate=estimate)
119
120    def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime:
121        """
122        Get the previous timestamp given a time-like value for this interval unit.
123
124        Args:
125            value: A variety of date formats.
126            estimate: Whether or not to estimate, only use this if the value is floored.
127
128        Returns:
129            The timestamp for the previous run.
130        """
131        return self.croniter(value).get_prev(estimate=estimate)
132
133    def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime:
134        """
135        Get the floor timestamp given a time-like value for this interval unit.
136
137        Args:
138            value: A variety of date formats.
139            estimate: Whether or not to estimate, only use this if the value is floored.
140
141        Returns:
142            The timestamp floor.
143        """
144        croniter = self.croniter(value)
145        croniter.get_next(estimate=estimate)
146        return croniter.get_prev(estimate=True)
147
148    @property
149    def seconds(self) -> int:
150        return INTERVAL_SECONDS[self]
151
152    @property
153    def milliseconds(self) -> int:
154        return self.seconds * 1000

IntervalUnit is the inferred granularity of an incremental node.

IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred based on the cron schedule of a node. The minimum time delta between a sample set of dates is used to determine which unit a node's schedule is.

It's designed to align with common partitioning schemes, hence why there is no WEEK unit because generally tables are not partitioned by week

YEAR = <IntervalUnit.YEAR: 'year'>
MONTH = <IntervalUnit.MONTH: 'month'>
DAY = <IntervalUnit.DAY: 'day'>
HOUR = <IntervalUnit.HOUR: 'hour'>
HALF_HOUR = <IntervalUnit.HALF_HOUR: 'half_hour'>
QUARTER_HOUR = <IntervalUnit.QUARTER_HOUR: 'quarter_hour'>
FIVE_MINUTE = <IntervalUnit.FIVE_MINUTE: 'five_minute'>
@classmethod
def from_cron(klass, cron: str) -> IntervalUnit:
48    @classmethod
49    def from_cron(klass, cron: str) -> IntervalUnit:
50        croniter = CroniterCache(cron)
51        interval_seconds = croniter.interval_seconds
52
53        if not interval_seconds:
54            samples = [croniter.get_next() for _ in range(5)]
55            interval_seconds = int(min(b - a for a, b in zip(samples, samples[1:])).total_seconds())
56
57        for unit, seconds in INTERVAL_SECONDS.items():
58            if seconds <= interval_seconds:
59                return unit
60        raise ConfigError(f"Invalid cron '{cron}': must run at a frequency of 5 minutes or slower.")
is_date_granularity: bool
62    @property
63    def is_date_granularity(self) -> bool:
64        return self in (IntervalUnit.YEAR, IntervalUnit.MONTH, IntervalUnit.DAY)
is_year: bool
66    @property
67    def is_year(self) -> bool:
68        return self == IntervalUnit.YEAR
is_month: bool
70    @property
71    def is_month(self) -> bool:
72        return self == IntervalUnit.MONTH
is_day: bool
74    @property
75    def is_day(self) -> bool:
76        return self == IntervalUnit.DAY
is_hour: bool
78    @property
79    def is_hour(self) -> bool:
80        return self == IntervalUnit.HOUR
is_minute: bool
82    @property
83    def is_minute(self) -> bool:
84        return self in (IntervalUnit.FIVE_MINUTE, IntervalUnit.QUARTER_HOUR, IntervalUnit.HALF_HOUR)
cron_expr: str
 86    @property
 87    def cron_expr(self) -> str:
 88        if self == IntervalUnit.FIVE_MINUTE:
 89            return "*/5 * * * *"
 90        if self == IntervalUnit.QUARTER_HOUR:
 91            return "*/15 * * * *"
 92        if self == IntervalUnit.HALF_HOUR:
 93            return "*/30 * * * *"
 94        if self == IntervalUnit.HOUR:
 95            return "0 * * * *"
 96        if self == IntervalUnit.DAY:
 97            return "0 0 * * *"
 98        if self == IntervalUnit.MONTH:
 99            return "0 0 1 * *"
100        if self == IntervalUnit.YEAR:
101            return "0 0 1 1 *"
102        return ""
def croniter( self, value: Union[datetime.date, datetime.datetime, str, int, float]) -> sqlmesh.utils.cron.CroniterCache:
104    def croniter(self, value: TimeLike) -> CroniterCache:
105        return CroniterCache(self.cron_expr, value)
def cron_next( self, value: Union[datetime.date, datetime.datetime, str, int, float], estimate: bool = False) -> datetime.datetime:
107    def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime:
108        """
109        Get the next timestamp given a time-like value for this interval unit.
110
111        Args:
112            value: A variety of date formats.
113            estimate: Whether or not to estimate, only use this if the value is floored.
114
115        Returns:
116            The timestamp for the next run.
117        """
118        return self.croniter(value).get_next(estimate=estimate)

Get the next timestamp given a time-like value for this interval unit.

Arguments:
  • value: A variety of date formats.
  • estimate: Whether or not to estimate, only use this if the value is floored.
Returns:

The timestamp for the next run.

def cron_prev( self, value: Union[datetime.date, datetime.datetime, str, int, float], estimate: bool = False) -> datetime.datetime:
120    def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime:
121        """
122        Get the previous timestamp given a time-like value for this interval unit.
123
124        Args:
125            value: A variety of date formats.
126            estimate: Whether or not to estimate, only use this if the value is floored.
127
128        Returns:
129            The timestamp for the previous run.
130        """
131        return self.croniter(value).get_prev(estimate=estimate)

Get the previous timestamp given a time-like value for this interval unit.

Arguments:
  • value: A variety of date formats.
  • estimate: Whether or not to estimate, only use this if the value is floored.
Returns:

The timestamp for the previous run.

def cron_floor( self, value: Union[datetime.date, datetime.datetime, str, int, float], estimate: bool = False) -> datetime.datetime:
133    def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime:
134        """
135        Get the floor timestamp given a time-like value for this interval unit.
136
137        Args:
138            value: A variety of date formats.
139            estimate: Whether or not to estimate, only use this if the value is floored.
140
141        Returns:
142            The timestamp floor.
143        """
144        croniter = self.croniter(value)
145        croniter.get_next(estimate=estimate)
146        return croniter.get_prev(estimate=True)

Get the floor timestamp given a time-like value for this interval unit.

Arguments:
  • value: A variety of date formats.
  • estimate: Whether or not to estimate, only use this if the value is floored.
Returns:

The timestamp floor.

seconds: int
148    @property
149    def seconds(self) -> int:
150        return INTERVAL_SECONDS[self]
milliseconds: int
152    @property
153    def milliseconds(self) -> int:
154        return self.seconds * 1000
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class DbtNodeInfo(sqlmesh.utils.pydantic.PydanticModel):
157class DbtNodeInfo(PydanticModel):
158    """
159    Represents dbt-specific model information set by the dbt loader and intended to be made available at the Snapshot level
160    (as opposed to hidden within the individual model jinja macro registries).
161
162    This allows for things like injecting implementations of variables / functions into the Jinja context that are compatible with
163    their dbt equivalents but are backed by the sqlmesh snapshots in any given plan / environment
164    """
165
166    unique_id: str
167    """This is the node/resource name/unique_id that's used as the node key in the dbt manifest.
168    It's prefixed by the resource type and is exposed in context variables like {{ selected_resources }}.
169
170    Examples:
171        - test.jaffle_shop.unique_stg_orders_order_id.e3b841c71a
172        - seed.jaffle_shop.raw_payments
173        - model.jaffle_shop.stg_orders
174    """
175
176    name: str
177    """Name of this object in the dbt global namespace, used by things like {{ ref() }} calls.    
178    
179    Examples:
180        - unique_stg_orders_order_id
181        - raw_payments
182        - stg_orders
183    """
184
185    fqn: str
186    """Used for selectors in --select/--exclude.
187    Takes the filesystem into account so may be structured differently to :unique_id.
188    
189    Examples:
190        - jaffle_shop.staging.unique_stg_orders_order_id
191        - jaffle_shop.raw_payments
192        - jaffle_shop.staging.stg_orders
193    """
194
195    alias: t.Optional[str] = None
196    """This is dbt's way of overriding the _physical table_ a model is written to.
197
198    It's used in the following situation:
199     - Say you have two models, "stg_customers" and "customers"
200     - You want "stg_customers" to be written to the "staging" schema as eg "staging.customers" - NOT "staging.stg_customers"
201     - But you cant rename the file to "customers" because it will conflict with your other model file "customers"
202     - Even if you put it in a different folder, eg "staging/customers.sql" - dbt still has a global namespace so it will conflict
203        when you try to do something like "{{ ref('customers') }}"
204     - So dbt's solution to this problem is to keep calling it "stg_customers" at the dbt project/model level,
205        but allow overriding the physical table to "customers" via something like "{{ config(alias='customers', schema='staging') }}"
206
207    Note that if :alias is set, it does *not* replace :name at the model level and cannot be used interchangably with :name.
208    It also does not affect the :fqn or :unique_id. It's just used to override :name when it comes time to generate the physical table name.
209    """
210
211    @model_validator(mode="after")
212    def post_init(self) -> Self:
213        # by default, dbt sets alias to the same as :name
214        # however, we only want to include :alias if it is actually different / actually providing an override
215        if self.alias == self.name:
216            self.alias = None
217        return self
218
219    def to_expression(self) -> exp.Expr:
220        """Produce a SQLGlot expression representing this object, for use in things like the model/audit definition renderers"""
221        return exp.tuple_(
222            *(
223                exp.PropertyEQ(this=exp.var(k), expression=exp.Literal.string(v))
224                for k, v in sorted(self.model_dump(exclude_none=True).items())
225            )
226        )

Represents dbt-specific model information set by the dbt loader and intended to be made available at the Snapshot level (as opposed to hidden within the individual model jinja macro registries).

This allows for things like injecting implementations of variables / functions into the Jinja context that are compatible with their dbt equivalents but are backed by the sqlmesh snapshots in any given plan / environment

unique_id: str

This is the node/resource name/unique_id that's used as the node key in the dbt manifest. It's prefixed by the resource type and is exposed in context variables like {{ selected_resources }}.

Examples:
  • test.jaffle_shop.unique_stg_orders_order_id.e3b841c71a
  • seed.jaffle_shop.raw_payments
  • model.jaffle_shop.stg_orders
name: str

Name of this object in the dbt global namespace, used by things like {{ ref() }} calls.

Examples:
  • unique_stg_orders_order_id
  • raw_payments
  • stg_orders
fqn: str

Used for selectors in --select/--exclude. Takes the filesystem into account so may be structured differently to :unique_id.

Examples:
  • jaffle_shop.staging.unique_stg_orders_order_id
  • jaffle_shop.raw_payments
  • jaffle_shop.staging.stg_orders
alias: Optional[str]

This is dbt's way of overriding the _physical table_ a model is written to.

It's used in the following situation:

  • Say you have two models, "stg_customers" and "customers"
  • You want "stg_customers" to be written to the "staging" schema as eg "staging.customers" - NOT "staging.stg_customers"
  • But you cant rename the file to "customers" because it will conflict with your other model file "customers"
  • Even if you put it in a different folder, eg "staging/customers.sql" - dbt still has a global namespace so it will conflict when you try to do something like "{{ ref('customers') }}"
  • So dbt's solution to this problem is to keep calling it "stg_customers" at the dbt project/model level, but allow overriding the physical table to "customers" via something like "{{ config(alias='customers', schema='staging') }}"

Note that if :alias is set, it does not replace :name at the model level and cannot be used interchangably with :name. It also does not affect the :fqn or :unique_id. It's just used to override :name when it comes time to generate the physical table name.

@model_validator(mode='after')
def post_init(self) -> typing_extensions.Self:
211    @model_validator(mode="after")
212    def post_init(self) -> Self:
213        # by default, dbt sets alias to the same as :name
214        # however, we only want to include :alias if it is actually different / actually providing an override
215        if self.alias == self.name:
216            self.alias = None
217        return self
def to_expression(self) -> sqlglot.expressions.core.Expr:
219    def to_expression(self) -> exp.Expr:
220        """Produce a SQLGlot expression representing this object, for use in things like the model/audit definition renderers"""
221        return exp.tuple_(
222            *(
223                exp.PropertyEQ(this=exp.var(k), expression=exp.Literal.string(v))
224                for k, v in sorted(self.model_dump(exclude_none=True).items())
225            )
226        )

Produce a SQLGlot expression representing this object, for use in things like the model/audit definition renderers

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class DbtInfoMixin:
229class DbtInfoMixin:
230    """This mixin encapsulates properties that only exist for dbt compatibility and are otherwise not required
231    for native projects"""
232
233    @property
234    def dbt_node_info(self) -> t.Optional[DbtNodeInfo]:
235        raise NotImplementedError()
236
237    @property
238    def dbt_unique_id(self) -> t.Optional[str]:
239        """Used for compatibility with jinja context variables such as {{ selected_resources }}"""
240        if self.dbt_node_info:
241            return self.dbt_node_info.unique_id
242        return None
243
244    @property
245    def dbt_fqn(self) -> t.Optional[str]:
246        """Used in the selector engine for compatibility with selectors that select models by dbt fqn"""
247        if self.dbt_node_info:
248            return self.dbt_node_info.fqn
249        return None

This mixin encapsulates properties that only exist for dbt compatibility and are otherwise not required for native projects

dbt_node_info: Optional[DbtNodeInfo]
233    @property
234    def dbt_node_info(self) -> t.Optional[DbtNodeInfo]:
235        raise NotImplementedError()
dbt_unique_id: Optional[str]
237    @property
238    def dbt_unique_id(self) -> t.Optional[str]:
239        """Used for compatibility with jinja context variables such as {{ selected_resources }}"""
240        if self.dbt_node_info:
241            return self.dbt_node_info.unique_id
242        return None

Used for compatibility with jinja context variables such as {{ selected_resources }}

dbt_fqn: Optional[str]
244    @property
245    def dbt_fqn(self) -> t.Optional[str]:
246        """Used in the selector engine for compatibility with selectors that select models by dbt fqn"""
247        if self.dbt_node_info:
248            return self.dbt_node_info.fqn
249        return None

Used in the selector engine for compatibility with selectors that select models by dbt fqn

INTERVAL_SECONDS = {<IntervalUnit.YEAR: 'year'>: 31536000, <IntervalUnit.MONTH: 'month'>: 2419200, <IntervalUnit.DAY: 'day'>: 86400, <IntervalUnit.HOUR: 'hour'>: 3600, <IntervalUnit.HALF_HOUR: 'half_hour'>: 1800, <IntervalUnit.QUARTER_HOUR: 'quarter_hour'>: 900, <IntervalUnit.FIVE_MINUTE: 'five_minute'>: 300}
class NodeType(builtins.str, enum.Enum):
550class NodeType(str, Enum):
551    MODEL = "model"
552    AUDIT = "audit"
553
554    def __str__(self) -> str:
555        return self.name

An enumeration.

MODEL = <NodeType.MODEL: 'model'>
AUDIT = <NodeType.AUDIT: 'audit'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
def str_or_exp_to_str(v: Any) -> Optional[str]:
558def str_or_exp_to_str(v: t.Any) -> t.Optional[str]:
559    if isinstance(v, exp.Expr):
560        return v.name
561    return str(v) if v is not None else None