Edit on GitHub

sqlmesh.core.node

  1from __future__ import annotations
  2
  3import typing as t
  4from datetime import datetime
  5from enum import Enum
  6from pathlib import Path
  7
  8from pydantic import Field
  9from sqlglot import exp
 10
 11from sqlmesh.utils.cron import CroniterCache
 12from sqlmesh.utils.date import TimeLike, to_datetime, validate_date_range
 13from sqlmesh.utils.errors import ConfigError
 14from sqlmesh.utils.pydantic import (
 15    PydanticModel,
 16    field_validator,
 17    model_validator,
 18    model_validator_v1_args,
 19)
 20
 21if t.TYPE_CHECKING:
 22    from sqlmesh.core.audit import ModelAudit
 23    from sqlmesh.core.snapshot import Node
 24
 25
 26class IntervalUnit(str, Enum):
 27    """IntervalUnit is the inferred granularity of an incremental node.
 28
 29    IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred
 30    based on the cron schedule of a node. The minimum time delta between a sample set of dates
 31    is used to determine which unit a node's schedule is.
 32    """
 33
 34    YEAR = "year"
 35    MONTH = "month"
 36    DAY = "day"
 37    HOUR = "hour"
 38    HALF_HOUR = "half_hour"
 39    QUARTER_HOUR = "quarter_hour"
 40    FIVE_MINUTE = "five_minute"
 41
 42    @classmethod
 43    def from_cron(klass, cron: str) -> IntervalUnit:
 44        croniter = CroniterCache(cron)
 45        interval_seconds = croniter.interval_seconds
 46
 47        if not interval_seconds:
 48            samples = [croniter.get_next() for _ in range(5)]
 49            interval_seconds = int(min(b - a for a, b in zip(samples, samples[1:])).total_seconds())
 50
 51        for unit, seconds in INTERVAL_SECONDS.items():
 52            if seconds <= interval_seconds:
 53                return unit
 54        raise ConfigError(f"Invalid cron '{cron}': must have a cadence of 5 minutes or more.")
 55
 56    @property
 57    def is_date_granularity(self) -> bool:
 58        return self in (IntervalUnit.YEAR, IntervalUnit.MONTH, IntervalUnit.DAY)
 59
 60    @property
 61    def is_year(self) -> bool:
 62        return self == IntervalUnit.YEAR
 63
 64    @property
 65    def is_month(self) -> bool:
 66        return self == IntervalUnit.MONTH
 67
 68    @property
 69    def is_day(self) -> bool:
 70        return self == IntervalUnit.DAY
 71
 72    @property
 73    def is_hour(self) -> bool:
 74        return self == IntervalUnit.HOUR
 75
 76    @property
 77    def is_minute(self) -> bool:
 78        return self in (IntervalUnit.FIVE_MINUTE, IntervalUnit.QUARTER_HOUR, IntervalUnit.HALF_HOUR)
 79
 80    @property
 81    def cron_expr(self) -> str:
 82        if self == IntervalUnit.FIVE_MINUTE:
 83            return "*/5 * * * *"
 84        if self == IntervalUnit.QUARTER_HOUR:
 85            return "*/15 * * * *"
 86        if self == IntervalUnit.HALF_HOUR:
 87            return "*/30 * * * *"
 88        if self == IntervalUnit.HOUR:
 89            return "0 * * * *"
 90        if self == IntervalUnit.DAY:
 91            return "0 0 * * *"
 92        if self == IntervalUnit.MONTH:
 93            return "0 0 1 * *"
 94        if self == IntervalUnit.YEAR:
 95            return "0 0 1 1 *"
 96        return ""
 97
 98    def croniter(self, value: TimeLike) -> CroniterCache:
 99        return CroniterCache(self.cron_expr, value)
100
101    def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime:
102        """
103        Get the next timestamp given a time-like value for this interval unit.
104
105        Args:
106            value: A variety of date formats.
107            estimate: Whether or not to estimate, only use this if the value is floored.
108
109        Returns:
110            The timestamp for the next run.
111        """
112        return self.croniter(value).get_next(estimate=estimate)
113
114    def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime:
115        """
116        Get the previous timestamp given a time-like value for this interval unit.
117
118        Args:
119            value: A variety of date formats.
120            estimate: Whether or not to estimate, only use this if the value is floored.
121
122        Returns:
123            The timestamp for the previous run.
124        """
125        return self.croniter(value).get_prev(estimate=estimate)
126
127    def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime:
128        """
129        Get the floor timestamp given a time-like value for this interval unit.
130
131        Args:
132            value: A variety of date formats.
133            estimate: Whether or not to estimate, only use this if the value is floored.
134
135        Returns:
136            The timestamp floor.
137        """
138        croniter = self.croniter(value)
139        croniter.get_next(estimate=estimate)
140        return croniter.get_prev(estimate=True)
141
142    @property
143    def seconds(self) -> int:
144        return INTERVAL_SECONDS[self]
145
146    @property
147    def milliseconds(self) -> int:
148        return self.seconds * 1000
149
150
151# this must be sorted in descending order
152INTERVAL_SECONDS = {
153    IntervalUnit.YEAR: 60 * 60 * 24 * 365,
154    IntervalUnit.MONTH: 60 * 60 * 24 * 28,
155    IntervalUnit.DAY: 60 * 60 * 24,
156    IntervalUnit.HOUR: 60 * 60,
157    IntervalUnit.HALF_HOUR: 60 * 30,
158    IntervalUnit.QUARTER_HOUR: 60 * 15,
159    IntervalUnit.FIVE_MINUTE: 60 * 5,
160}
161
162
163class _Node(PydanticModel):
164    """
165    Node is the core abstraction for entity that can be executed within the scheduler.
166
167    Args:
168        name: The name of the node.
169        description: The name of the project this node belongs to, used in multi-repo deployments.
170        description: The optional node description.
171        owner: The owner of the node.
172        start: The earliest date that the node will be executed for. If this is None,
173            then the date is inferred by taking the most recent start date of its ancestors.
174            The start date can be a static datetime or a relative datetime like "1 year ago"
175        end: The latest date that the model will be executed for. If this is None,
176            the date from the scheduler will be used
177        cron: A cron string specifying how often the node should be run, leveraging the
178            [croniter](https://github.com/kiorky/croniter) library.
179        interval_unit: The duration of an interval for the node. By default, it is computed from the cron expression.
180        tags: A list of tags that can be used to filter nodes.
181        stamp: An optional arbitrary string sequence used to create new node versions without making
182            changes to any of the functional components of the definition.
183    """
184
185    name: str
186    project: str = ""
187    description: t.Optional[str] = None
188    owner: t.Optional[str] = None
189    start: t.Optional[TimeLike] = None
190    end: t.Optional[TimeLike] = None
191    cron: str = "@daily"
192    interval_unit_: t.Optional[IntervalUnit] = Field(alias="interval_unit", default=None)
193    tags: t.List[str] = []
194    stamp: t.Optional[str] = None
195    _path: Path = Path()
196
197    _croniter: t.Optional[CroniterCache] = None
198    __inferred_interval_unit: t.Optional[IntervalUnit] = None
199
200    def __str__(self) -> str:
201        path = f": {self._path.name}" if self._path else ""
202        return f"{self.__class__.__name__}<{self.name}{path}>"
203
204    @field_validator("name", mode="before")
205    @classmethod
206    def _name_validator(cls, v: t.Any) -> t.Optional[str]:
207        if v is None:
208            return None
209        if isinstance(v, exp.Expression):
210            return v.meta["sql"]
211        return str(v)
212
213    @field_validator("start", "end", mode="before")
214    @classmethod
215    def _date_validator(cls, v: t.Any) -> t.Optional[TimeLike]:
216        if isinstance(v, exp.Expression):
217            v = v.name
218        if v and not to_datetime(v):
219            raise ConfigError(f"'{v}' needs to be time-like: https://pypi.org/project/dateparser")
220        return v
221
222    @field_validator("cron", mode="before")
223    @classmethod
224    def _cron_validator(cls, v: t.Any) -> t.Optional[str]:
225        cron = str_or_exp_to_str(v)
226        if cron:
227            from croniter import CroniterBadCronError, croniter
228
229            try:
230                croniter(cron)
231            except CroniterBadCronError:
232                raise ConfigError(f"Invalid cron expression '{cron}'")
233        return cron
234
235    @field_validator("owner", "description", "stamp", mode="before")
236    @classmethod
237    def _string_expr_validator(cls, v: t.Any) -> t.Optional[str]:
238        return str_or_exp_to_str(v)
239
240    @field_validator("interval_unit_", mode="before")
241    @classmethod
242    def _interval_unit_validator(cls, v: t.Any) -> t.Optional[t.Union[IntervalUnit, str]]:
243        if isinstance(v, IntervalUnit):
244            return v
245        v = str_or_exp_to_str(v)
246        if v:
247            v = v.lower()
248        return v
249
250    @model_validator(mode="after")
251    @model_validator_v1_args
252    def _node_root_validator(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
253        interval_unit = values.get("interval_unit_")
254        if interval_unit:
255            cron = values["cron"]
256            max_interval_unit = IntervalUnit.from_cron(cron)
257            if interval_unit.seconds > max_interval_unit.seconds:
258                raise ConfigError(
259                    f"Interval unit of '{interval_unit}' is larger than cron period of '{cron}'"
260                )
261        start = values.get("start")
262        end = values.get("end")
263        if end is not None and start is None:
264            raise ConfigError("Must define a start date if an end date is defined.")
265        validate_date_range(start, end)
266        return values
267
268    @property
269    def batch_size(self) -> t.Optional[int]:
270        """The maximal number of units in a single task for a backfill."""
271        return None
272
273    @property
274    def batch_concurrency(self) -> t.Optional[int]:
275        """The maximal number of batches that can run concurrently for a backfill."""
276        return None
277
278    @property
279    def data_hash(self) -> str:
280        """
281        Computes the data hash for the node.
282
283        Returns:
284            The data hash for the node.
285        """
286        raise NotImplementedError
287
288    @property
289    def interval_unit(self) -> IntervalUnit:
290        """Returns the interval unit using which data intervals are computed for this node."""
291        if self.interval_unit_ is not None:
292            return self.interval_unit_
293        return self._inferred_interval_unit()
294
295    @property
296    def depends_on(self) -> t.Set[str]:
297        return set()
298
299    @property
300    def fqn(self) -> str:
301        return self.name
302
303    def metadata_hash(self, audits: t.Dict[str, ModelAudit]) -> str:
304        """
305        Computes the metadata hash for the node.
306
307        Args:
308            audits: Available audits by name.
309
310        Returns:
311            The metadata hash for the node.
312        """
313        raise NotImplementedError
314
315    def croniter(self, value: TimeLike) -> CroniterCache:
316        if self._croniter is None:
317            self._croniter = CroniterCache(self.cron, value)
318        else:
319            self._croniter.curr = to_datetime(value)
320        return self._croniter
321
322    def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime:
323        """
324        Get the next timestamp given a time-like value and the node's cron.
325
326        Args:
327            value: A variety of date formats.
328            estimate: Whether or not to estimate, only use this if the value is floored.
329
330        Returns:
331            The timestamp for the next run.
332        """
333        return self.croniter(value).get_next(estimate=estimate)
334
335    def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime:
336        """
337        Get the previous timestamp given a time-like value and the node's cron.
338
339        Args:
340            value: A variety of date formats.
341            estimate: Whether or not to estimate, only use this if the value is floored.
342
343        Returns:
344            The timestamp for the previous run.
345        """
346        return self.croniter(value).get_prev(estimate=estimate)
347
348    def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime:
349        """
350        Get the floor timestamp given a time-like value and the node's cron.
351
352        Args:
353            value: A variety of date formats.
354            estimate: Whether or not to estimate, only use this if the value is floored.
355
356        Returns:
357            The timestamp floor.
358        """
359        return self.croniter(self.cron_next(value, estimate=estimate)).get_prev(estimate=True)
360
361    def text_diff(self, other: Node) -> str:
362        """Produce a text diff against another node.
363
364        Args:
365            other: The node to diff against. Must be of the same type.
366
367        Returns:
368            A unified text diff showing additions and deletions.
369        """
370        raise NotImplementedError
371
372    def _inferred_interval_unit(self) -> IntervalUnit:
373        """Infers the interval unit from the cron expression.
374
375        The interval unit is used to determine the lag applied to start_date and end_date for node rendering and intervals.
376
377        Returns:
378            The IntervalUnit enum.
379        """
380        if not self.__inferred_interval_unit:
381            self.__inferred_interval_unit = IntervalUnit.from_cron(self.cron)
382        return self.__inferred_interval_unit
383
384    @property
385    def is_model(self) -> bool:
386        """Return True if this is a model node"""
387        return False
388
389    @property
390    def is_audit(self) -> bool:
391        """Return True if this is an audit node"""
392        return False
393
394
395class NodeType(str, Enum):
396    MODEL = "model"
397    AUDIT = "audit"
398
399    def __str__(self) -> str:
400        return self.name
401
402
403def str_or_exp_to_str(v: t.Any) -> t.Optional[str]:
404    if isinstance(v, exp.Expression):
405        return v.name
406    return str(v) if v is not None else None
class IntervalUnit(builtins.str, enum.Enum):
 27class IntervalUnit(str, Enum):
 28    """IntervalUnit is the inferred granularity of an incremental node.
 29
 30    IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred
 31    based on the cron schedule of a node. The minimum time delta between a sample set of dates
 32    is used to determine which unit a node's schedule is.
 33    """
 34
 35    YEAR = "year"
 36    MONTH = "month"
 37    DAY = "day"
 38    HOUR = "hour"
 39    HALF_HOUR = "half_hour"
 40    QUARTER_HOUR = "quarter_hour"
 41    FIVE_MINUTE = "five_minute"
 42
 43    @classmethod
 44    def from_cron(klass, cron: str) -> IntervalUnit:
 45        croniter = CroniterCache(cron)
 46        interval_seconds = croniter.interval_seconds
 47
 48        if not interval_seconds:
 49            samples = [croniter.get_next() for _ in range(5)]
 50            interval_seconds = int(min(b - a for a, b in zip(samples, samples[1:])).total_seconds())
 51
 52        for unit, seconds in INTERVAL_SECONDS.items():
 53            if seconds <= interval_seconds:
 54                return unit
 55        raise ConfigError(f"Invalid cron '{cron}': must have a cadence of 5 minutes or more.")
 56
 57    @property
 58    def is_date_granularity(self) -> bool:
 59        return self in (IntervalUnit.YEAR, IntervalUnit.MONTH, IntervalUnit.DAY)
 60
 61    @property
 62    def is_year(self) -> bool:
 63        return self == IntervalUnit.YEAR
 64
 65    @property
 66    def is_month(self) -> bool:
 67        return self == IntervalUnit.MONTH
 68
 69    @property
 70    def is_day(self) -> bool:
 71        return self == IntervalUnit.DAY
 72
 73    @property
 74    def is_hour(self) -> bool:
 75        return self == IntervalUnit.HOUR
 76
 77    @property
 78    def is_minute(self) -> bool:
 79        return self in (IntervalUnit.FIVE_MINUTE, IntervalUnit.QUARTER_HOUR, IntervalUnit.HALF_HOUR)
 80
 81    @property
 82    def cron_expr(self) -> str:
 83        if self == IntervalUnit.FIVE_MINUTE:
 84            return "*/5 * * * *"
 85        if self == IntervalUnit.QUARTER_HOUR:
 86            return "*/15 * * * *"
 87        if self == IntervalUnit.HALF_HOUR:
 88            return "*/30 * * * *"
 89        if self == IntervalUnit.HOUR:
 90            return "0 * * * *"
 91        if self == IntervalUnit.DAY:
 92            return "0 0 * * *"
 93        if self == IntervalUnit.MONTH:
 94            return "0 0 1 * *"
 95        if self == IntervalUnit.YEAR:
 96            return "0 0 1 1 *"
 97        return ""
 98
 99    def croniter(self, value: TimeLike) -> CroniterCache:
100        return CroniterCache(self.cron_expr, value)
101
102    def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime:
103        """
104        Get the next timestamp given a time-like value for this interval unit.
105
106        Args:
107            value: A variety of date formats.
108            estimate: Whether or not to estimate, only use this if the value is floored.
109
110        Returns:
111            The timestamp for the next run.
112        """
113        return self.croniter(value).get_next(estimate=estimate)
114
115    def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime:
116        """
117        Get the previous timestamp given a time-like value for this interval unit.
118
119        Args:
120            value: A variety of date formats.
121            estimate: Whether or not to estimate, only use this if the value is floored.
122
123        Returns:
124            The timestamp for the previous run.
125        """
126        return self.croniter(value).get_prev(estimate=estimate)
127
128    def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime:
129        """
130        Get the floor timestamp given a time-like value for this interval unit.
131
132        Args:
133            value: A variety of date formats.
134            estimate: Whether or not to estimate, only use this if the value is floored.
135
136        Returns:
137            The timestamp floor.
138        """
139        croniter = self.croniter(value)
140        croniter.get_next(estimate=estimate)
141        return croniter.get_prev(estimate=True)
142
143    @property
144    def seconds(self) -> int:
145        return INTERVAL_SECONDS[self]
146
147    @property
148    def milliseconds(self) -> int:
149        return self.seconds * 1000

IntervalUnit is the inferred granularity of an incremental node.

IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred based on the cron schedule of a node. The minimum time delta between a sample set of dates is used to determine which unit a node's schedule is.

YEAR = <IntervalUnit.YEAR: 'year'>
MONTH = <IntervalUnit.MONTH: 'month'>
DAY = <IntervalUnit.DAY: 'day'>
HOUR = <IntervalUnit.HOUR: 'hour'>
HALF_HOUR = <IntervalUnit.HALF_HOUR: 'half_hour'>
QUARTER_HOUR = <IntervalUnit.QUARTER_HOUR: 'quarter_hour'>
FIVE_MINUTE = <IntervalUnit.FIVE_MINUTE: 'five_minute'>
@classmethod
def from_cron(klass, cron: str) -> sqlmesh.core.node.IntervalUnit:
43    @classmethod
44    def from_cron(klass, cron: str) -> IntervalUnit:
45        croniter = CroniterCache(cron)
46        interval_seconds = croniter.interval_seconds
47
48        if not interval_seconds:
49            samples = [croniter.get_next() for _ in range(5)]
50            interval_seconds = int(min(b - a for a, b in zip(samples, samples[1:])).total_seconds())
51
52        for unit, seconds in INTERVAL_SECONDS.items():
53            if seconds <= interval_seconds:
54                return unit
55        raise ConfigError(f"Invalid cron '{cron}': must have a cadence of 5 minutes or more.")
def croniter( self, value: Union[datetime.date, datetime.datetime, str, int, float]) -> sqlmesh.utils.cron.CroniterCache:
 99    def croniter(self, value: TimeLike) -> CroniterCache:
100        return CroniterCache(self.cron_expr, value)
def cron_next( self, value: Union[datetime.date, datetime.datetime, str, int, float], estimate: bool = False) -> datetime.datetime:
102    def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime:
103        """
104        Get the next timestamp given a time-like value for this interval unit.
105
106        Args:
107            value: A variety of date formats.
108            estimate: Whether or not to estimate, only use this if the value is floored.
109
110        Returns:
111            The timestamp for the next run.
112        """
113        return self.croniter(value).get_next(estimate=estimate)

Get the next timestamp given a time-like value for this interval unit.

Arguments:
  • value: A variety of date formats.
  • estimate: Whether or not to estimate, only use this if the value is floored.
Returns:

The timestamp for the next run.

def cron_prev( self, value: Union[datetime.date, datetime.datetime, str, int, float], estimate: bool = False) -> datetime.datetime:
115    def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime:
116        """
117        Get the previous timestamp given a time-like value for this interval unit.
118
119        Args:
120            value: A variety of date formats.
121            estimate: Whether or not to estimate, only use this if the value is floored.
122
123        Returns:
124            The timestamp for the previous run.
125        """
126        return self.croniter(value).get_prev(estimate=estimate)

Get the previous timestamp given a time-like value for this interval unit.

Arguments:
  • value: A variety of date formats.
  • estimate: Whether or not to estimate, only use this if the value is floored.
Returns:

The timestamp for the previous run.

def cron_floor( self, value: Union[datetime.date, datetime.datetime, str, int, float], estimate: bool = False) -> datetime.datetime:
128    def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime:
129        """
130        Get the floor timestamp given a time-like value for this interval unit.
131
132        Args:
133            value: A variety of date formats.
134            estimate: Whether or not to estimate, only use this if the value is floored.
135
136        Returns:
137            The timestamp floor.
138        """
139        croniter = self.croniter(value)
140        croniter.get_next(estimate=estimate)
141        return croniter.get_prev(estimate=True)

Get the floor timestamp given a time-like value for this interval unit.

Arguments:
  • value: A variety of date formats.
  • estimate: Whether or not to estimate, only use this if the value is floored.
Returns:

The timestamp floor.

Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class NodeType(builtins.str, enum.Enum):
396class NodeType(str, Enum):
397    MODEL = "model"
398    AUDIT = "audit"
399
400    def __str__(self) -> str:
401        return self.name

An enumeration.

MODEL = <NodeType.MODEL: 'model'>
AUDIT = <NodeType.AUDIT: 'audit'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
def str_or_exp_to_str(v: Any) -> Union[str, NoneType]:
404def str_or_exp_to_str(v: t.Any) -> t.Optional[str]:
405    if isinstance(v, exp.Expression):
406        return v.name
407    return str(v) if v is not None else None