sqlmesh.core.node
1from __future__ import annotations 2 3import typing as t 4from datetime import datetime 5from enum import Enum 6from pathlib import Path 7 8from pydantic import Field 9from sqlglot import exp 10 11from sqlmesh.utils.cron import CroniterCache 12from sqlmesh.utils.date import TimeLike, to_datetime, validate_date_range 13from sqlmesh.utils.errors import ConfigError 14from sqlmesh.utils.pydantic import ( 15 PydanticModel, 16 field_validator, 17 model_validator, 18 model_validator_v1_args, 19) 20 21if t.TYPE_CHECKING: 22 from sqlmesh.core.audit import ModelAudit 23 from sqlmesh.core.snapshot import Node 24 25 26class IntervalUnit(str, Enum): 27 """IntervalUnit is the inferred granularity of an incremental node. 28 29 IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred 30 based on the cron schedule of a node. The minimum time delta between a sample set of dates 31 is used to determine which unit a node's schedule is. 32 """ 33 34 YEAR = "year" 35 MONTH = "month" 36 DAY = "day" 37 HOUR = "hour" 38 HALF_HOUR = "half_hour" 39 QUARTER_HOUR = "quarter_hour" 40 FIVE_MINUTE = "five_minute" 41 42 @classmethod 43 def from_cron(klass, cron: str) -> IntervalUnit: 44 croniter = CroniterCache(cron) 45 interval_seconds = croniter.interval_seconds 46 47 if not interval_seconds: 48 samples = [croniter.get_next() for _ in range(5)] 49 interval_seconds = int(min(b - a for a, b in zip(samples, samples[1:])).total_seconds()) 50 51 for unit, seconds in INTERVAL_SECONDS.items(): 52 if seconds <= interval_seconds: 53 return unit 54 raise ConfigError(f"Invalid cron '{cron}': must have a cadence of 5 minutes or more.") 55 56 @property 57 def is_date_granularity(self) -> bool: 58 return self in (IntervalUnit.YEAR, IntervalUnit.MONTH, IntervalUnit.DAY) 59 60 @property 61 def is_year(self) -> bool: 62 return self == IntervalUnit.YEAR 63 64 @property 65 def is_month(self) -> bool: 66 return self == IntervalUnit.MONTH 67 68 @property 69 def is_day(self) -> bool: 70 return self == IntervalUnit.DAY 71 72 @property 73 def is_hour(self) -> bool: 74 return self == IntervalUnit.HOUR 75 76 @property 77 def is_minute(self) -> bool: 78 return self in (IntervalUnit.FIVE_MINUTE, IntervalUnit.QUARTER_HOUR, IntervalUnit.HALF_HOUR) 79 80 @property 81 def cron_expr(self) -> str: 82 if self == IntervalUnit.FIVE_MINUTE: 83 return "*/5 * * * *" 84 if self == IntervalUnit.QUARTER_HOUR: 85 return "*/15 * * * *" 86 if self == IntervalUnit.HALF_HOUR: 87 return "*/30 * * * *" 88 if self == IntervalUnit.HOUR: 89 return "0 * * * *" 90 if self == IntervalUnit.DAY: 91 return "0 0 * * *" 92 if self == IntervalUnit.MONTH: 93 return "0 0 1 * *" 94 if self == IntervalUnit.YEAR: 95 return "0 0 1 1 *" 96 return "" 97 98 def croniter(self, value: TimeLike) -> CroniterCache: 99 return CroniterCache(self.cron_expr, value) 100 101 def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime: 102 """ 103 Get the next timestamp given a time-like value for this interval unit. 104 105 Args: 106 value: A variety of date formats. 107 estimate: Whether or not to estimate, only use this if the value is floored. 108 109 Returns: 110 The timestamp for the next run. 111 """ 112 return self.croniter(value).get_next(estimate=estimate) 113 114 def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime: 115 """ 116 Get the previous timestamp given a time-like value for this interval unit. 117 118 Args: 119 value: A variety of date formats. 120 estimate: Whether or not to estimate, only use this if the value is floored. 121 122 Returns: 123 The timestamp for the previous run. 124 """ 125 return self.croniter(value).get_prev(estimate=estimate) 126 127 def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime: 128 """ 129 Get the floor timestamp given a time-like value for this interval unit. 130 131 Args: 132 value: A variety of date formats. 133 estimate: Whether or not to estimate, only use this if the value is floored. 134 135 Returns: 136 The timestamp floor. 137 """ 138 croniter = self.croniter(value) 139 croniter.get_next(estimate=estimate) 140 return croniter.get_prev(estimate=True) 141 142 @property 143 def seconds(self) -> int: 144 return INTERVAL_SECONDS[self] 145 146 @property 147 def milliseconds(self) -> int: 148 return self.seconds * 1000 149 150 151# this must be sorted in descending order 152INTERVAL_SECONDS = { 153 IntervalUnit.YEAR: 60 * 60 * 24 * 365, 154 IntervalUnit.MONTH: 60 * 60 * 24 * 28, 155 IntervalUnit.DAY: 60 * 60 * 24, 156 IntervalUnit.HOUR: 60 * 60, 157 IntervalUnit.HALF_HOUR: 60 * 30, 158 IntervalUnit.QUARTER_HOUR: 60 * 15, 159 IntervalUnit.FIVE_MINUTE: 60 * 5, 160} 161 162 163class _Node(PydanticModel): 164 """ 165 Node is the core abstraction for entity that can be executed within the scheduler. 166 167 Args: 168 name: The name of the node. 169 description: The name of the project this node belongs to, used in multi-repo deployments. 170 description: The optional node description. 171 owner: The owner of the node. 172 start: The earliest date that the node will be executed for. If this is None, 173 then the date is inferred by taking the most recent start date of its ancestors. 174 The start date can be a static datetime or a relative datetime like "1 year ago" 175 end: The latest date that the model will be executed for. If this is None, 176 the date from the scheduler will be used 177 cron: A cron string specifying how often the node should be run, leveraging the 178 [croniter](https://github.com/kiorky/croniter) library. 179 interval_unit: The duration of an interval for the node. By default, it is computed from the cron expression. 180 tags: A list of tags that can be used to filter nodes. 181 stamp: An optional arbitrary string sequence used to create new node versions without making 182 changes to any of the functional components of the definition. 183 """ 184 185 name: str 186 project: str = "" 187 description: t.Optional[str] = None 188 owner: t.Optional[str] = None 189 start: t.Optional[TimeLike] = None 190 end: t.Optional[TimeLike] = None 191 cron: str = "@daily" 192 interval_unit_: t.Optional[IntervalUnit] = Field(alias="interval_unit", default=None) 193 tags: t.List[str] = [] 194 stamp: t.Optional[str] = None 195 _path: Path = Path() 196 197 _croniter: t.Optional[CroniterCache] = None 198 __inferred_interval_unit: t.Optional[IntervalUnit] = None 199 200 def __str__(self) -> str: 201 path = f": {self._path.name}" if self._path else "" 202 return f"{self.__class__.__name__}<{self.name}{path}>" 203 204 @field_validator("name", mode="before") 205 @classmethod 206 def _name_validator(cls, v: t.Any) -> t.Optional[str]: 207 if v is None: 208 return None 209 if isinstance(v, exp.Expression): 210 return v.meta["sql"] 211 return str(v) 212 213 @field_validator("start", "end", mode="before") 214 @classmethod 215 def _date_validator(cls, v: t.Any) -> t.Optional[TimeLike]: 216 if isinstance(v, exp.Expression): 217 v = v.name 218 if v and not to_datetime(v): 219 raise ConfigError(f"'{v}' needs to be time-like: https://pypi.org/project/dateparser") 220 return v 221 222 @field_validator("cron", mode="before") 223 @classmethod 224 def _cron_validator(cls, v: t.Any) -> t.Optional[str]: 225 cron = str_or_exp_to_str(v) 226 if cron: 227 from croniter import CroniterBadCronError, croniter 228 229 try: 230 croniter(cron) 231 except CroniterBadCronError: 232 raise ConfigError(f"Invalid cron expression '{cron}'") 233 return cron 234 235 @field_validator("owner", "description", "stamp", mode="before") 236 @classmethod 237 def _string_expr_validator(cls, v: t.Any) -> t.Optional[str]: 238 return str_or_exp_to_str(v) 239 240 @field_validator("interval_unit_", mode="before") 241 @classmethod 242 def _interval_unit_validator(cls, v: t.Any) -> t.Optional[t.Union[IntervalUnit, str]]: 243 if isinstance(v, IntervalUnit): 244 return v 245 v = str_or_exp_to_str(v) 246 if v: 247 v = v.lower() 248 return v 249 250 @model_validator(mode="after") 251 @model_validator_v1_args 252 def _node_root_validator(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: 253 interval_unit = values.get("interval_unit_") 254 if interval_unit: 255 cron = values["cron"] 256 max_interval_unit = IntervalUnit.from_cron(cron) 257 if interval_unit.seconds > max_interval_unit.seconds: 258 raise ConfigError( 259 f"Interval unit of '{interval_unit}' is larger than cron period of '{cron}'" 260 ) 261 start = values.get("start") 262 end = values.get("end") 263 if end is not None and start is None: 264 raise ConfigError("Must define a start date if an end date is defined.") 265 validate_date_range(start, end) 266 return values 267 268 @property 269 def batch_size(self) -> t.Optional[int]: 270 """The maximal number of units in a single task for a backfill.""" 271 return None 272 273 @property 274 def batch_concurrency(self) -> t.Optional[int]: 275 """The maximal number of batches that can run concurrently for a backfill.""" 276 return None 277 278 @property 279 def data_hash(self) -> str: 280 """ 281 Computes the data hash for the node. 282 283 Returns: 284 The data hash for the node. 285 """ 286 raise NotImplementedError 287 288 @property 289 def interval_unit(self) -> IntervalUnit: 290 """Returns the interval unit using which data intervals are computed for this node.""" 291 if self.interval_unit_ is not None: 292 return self.interval_unit_ 293 return self._inferred_interval_unit() 294 295 @property 296 def depends_on(self) -> t.Set[str]: 297 return set() 298 299 @property 300 def fqn(self) -> str: 301 return self.name 302 303 def metadata_hash(self, audits: t.Dict[str, ModelAudit]) -> str: 304 """ 305 Computes the metadata hash for the node. 306 307 Args: 308 audits: Available audits by name. 309 310 Returns: 311 The metadata hash for the node. 312 """ 313 raise NotImplementedError 314 315 def croniter(self, value: TimeLike) -> CroniterCache: 316 if self._croniter is None: 317 self._croniter = CroniterCache(self.cron, value) 318 else: 319 self._croniter.curr = to_datetime(value) 320 return self._croniter 321 322 def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime: 323 """ 324 Get the next timestamp given a time-like value and the node's cron. 325 326 Args: 327 value: A variety of date formats. 328 estimate: Whether or not to estimate, only use this if the value is floored. 329 330 Returns: 331 The timestamp for the next run. 332 """ 333 return self.croniter(value).get_next(estimate=estimate) 334 335 def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime: 336 """ 337 Get the previous timestamp given a time-like value and the node's cron. 338 339 Args: 340 value: A variety of date formats. 341 estimate: Whether or not to estimate, only use this if the value is floored. 342 343 Returns: 344 The timestamp for the previous run. 345 """ 346 return self.croniter(value).get_prev(estimate=estimate) 347 348 def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime: 349 """ 350 Get the floor timestamp given a time-like value and the node's cron. 351 352 Args: 353 value: A variety of date formats. 354 estimate: Whether or not to estimate, only use this if the value is floored. 355 356 Returns: 357 The timestamp floor. 358 """ 359 return self.croniter(self.cron_next(value, estimate=estimate)).get_prev(estimate=True) 360 361 def text_diff(self, other: Node) -> str: 362 """Produce a text diff against another node. 363 364 Args: 365 other: The node to diff against. Must be of the same type. 366 367 Returns: 368 A unified text diff showing additions and deletions. 369 """ 370 raise NotImplementedError 371 372 def _inferred_interval_unit(self) -> IntervalUnit: 373 """Infers the interval unit from the cron expression. 374 375 The interval unit is used to determine the lag applied to start_date and end_date for node rendering and intervals. 376 377 Returns: 378 The IntervalUnit enum. 379 """ 380 if not self.__inferred_interval_unit: 381 self.__inferred_interval_unit = IntervalUnit.from_cron(self.cron) 382 return self.__inferred_interval_unit 383 384 @property 385 def is_model(self) -> bool: 386 """Return True if this is a model node""" 387 return False 388 389 @property 390 def is_audit(self) -> bool: 391 """Return True if this is an audit node""" 392 return False 393 394 395class NodeType(str, Enum): 396 MODEL = "model" 397 AUDIT = "audit" 398 399 def __str__(self) -> str: 400 return self.name 401 402 403def str_or_exp_to_str(v: t.Any) -> t.Optional[str]: 404 if isinstance(v, exp.Expression): 405 return v.name 406 return str(v) if v is not None else None
class
IntervalUnit(builtins.str, enum.Enum):
27class IntervalUnit(str, Enum): 28 """IntervalUnit is the inferred granularity of an incremental node. 29 30 IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred 31 based on the cron schedule of a node. The minimum time delta between a sample set of dates 32 is used to determine which unit a node's schedule is. 33 """ 34 35 YEAR = "year" 36 MONTH = "month" 37 DAY = "day" 38 HOUR = "hour" 39 HALF_HOUR = "half_hour" 40 QUARTER_HOUR = "quarter_hour" 41 FIVE_MINUTE = "five_minute" 42 43 @classmethod 44 def from_cron(klass, cron: str) -> IntervalUnit: 45 croniter = CroniterCache(cron) 46 interval_seconds = croniter.interval_seconds 47 48 if not interval_seconds: 49 samples = [croniter.get_next() for _ in range(5)] 50 interval_seconds = int(min(b - a for a, b in zip(samples, samples[1:])).total_seconds()) 51 52 for unit, seconds in INTERVAL_SECONDS.items(): 53 if seconds <= interval_seconds: 54 return unit 55 raise ConfigError(f"Invalid cron '{cron}': must have a cadence of 5 minutes or more.") 56 57 @property 58 def is_date_granularity(self) -> bool: 59 return self in (IntervalUnit.YEAR, IntervalUnit.MONTH, IntervalUnit.DAY) 60 61 @property 62 def is_year(self) -> bool: 63 return self == IntervalUnit.YEAR 64 65 @property 66 def is_month(self) -> bool: 67 return self == IntervalUnit.MONTH 68 69 @property 70 def is_day(self) -> bool: 71 return self == IntervalUnit.DAY 72 73 @property 74 def is_hour(self) -> bool: 75 return self == IntervalUnit.HOUR 76 77 @property 78 def is_minute(self) -> bool: 79 return self in (IntervalUnit.FIVE_MINUTE, IntervalUnit.QUARTER_HOUR, IntervalUnit.HALF_HOUR) 80 81 @property 82 def cron_expr(self) -> str: 83 if self == IntervalUnit.FIVE_MINUTE: 84 return "*/5 * * * *" 85 if self == IntervalUnit.QUARTER_HOUR: 86 return "*/15 * * * *" 87 if self == IntervalUnit.HALF_HOUR: 88 return "*/30 * * * *" 89 if self == IntervalUnit.HOUR: 90 return "0 * * * *" 91 if self == IntervalUnit.DAY: 92 return "0 0 * * *" 93 if self == IntervalUnit.MONTH: 94 return "0 0 1 * *" 95 if self == IntervalUnit.YEAR: 96 return "0 0 1 1 *" 97 return "" 98 99 def croniter(self, value: TimeLike) -> CroniterCache: 100 return CroniterCache(self.cron_expr, value) 101 102 def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime: 103 """ 104 Get the next timestamp given a time-like value for this interval unit. 105 106 Args: 107 value: A variety of date formats. 108 estimate: Whether or not to estimate, only use this if the value is floored. 109 110 Returns: 111 The timestamp for the next run. 112 """ 113 return self.croniter(value).get_next(estimate=estimate) 114 115 def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime: 116 """ 117 Get the previous timestamp given a time-like value for this interval unit. 118 119 Args: 120 value: A variety of date formats. 121 estimate: Whether or not to estimate, only use this if the value is floored. 122 123 Returns: 124 The timestamp for the previous run. 125 """ 126 return self.croniter(value).get_prev(estimate=estimate) 127 128 def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime: 129 """ 130 Get the floor timestamp given a time-like value for this interval unit. 131 132 Args: 133 value: A variety of date formats. 134 estimate: Whether or not to estimate, only use this if the value is floored. 135 136 Returns: 137 The timestamp floor. 138 """ 139 croniter = self.croniter(value) 140 croniter.get_next(estimate=estimate) 141 return croniter.get_prev(estimate=True) 142 143 @property 144 def seconds(self) -> int: 145 return INTERVAL_SECONDS[self] 146 147 @property 148 def milliseconds(self) -> int: 149 return self.seconds * 1000
IntervalUnit is the inferred granularity of an incremental node.
IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred based on the cron schedule of a node. The minimum time delta between a sample set of dates is used to determine which unit a node's schedule is.
YEAR =
<IntervalUnit.YEAR: 'year'>
MONTH =
<IntervalUnit.MONTH: 'month'>
DAY =
<IntervalUnit.DAY: 'day'>
HOUR =
<IntervalUnit.HOUR: 'hour'>
HALF_HOUR =
<IntervalUnit.HALF_HOUR: 'half_hour'>
QUARTER_HOUR =
<IntervalUnit.QUARTER_HOUR: 'quarter_hour'>
FIVE_MINUTE =
<IntervalUnit.FIVE_MINUTE: 'five_minute'>
43 @classmethod 44 def from_cron(klass, cron: str) -> IntervalUnit: 45 croniter = CroniterCache(cron) 46 interval_seconds = croniter.interval_seconds 47 48 if not interval_seconds: 49 samples = [croniter.get_next() for _ in range(5)] 50 interval_seconds = int(min(b - a for a, b in zip(samples, samples[1:])).total_seconds()) 51 52 for unit, seconds in INTERVAL_SECONDS.items(): 53 if seconds <= interval_seconds: 54 return unit 55 raise ConfigError(f"Invalid cron '{cron}': must have a cadence of 5 minutes or more.")
def
croniter( self, value: Union[datetime.date, datetime.datetime, str, int, float]) -> sqlmesh.utils.cron.CroniterCache:
def
cron_next( self, value: Union[datetime.date, datetime.datetime, str, int, float], estimate: bool = False) -> datetime.datetime:
102 def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime: 103 """ 104 Get the next timestamp given a time-like value for this interval unit. 105 106 Args: 107 value: A variety of date formats. 108 estimate: Whether or not to estimate, only use this if the value is floored. 109 110 Returns: 111 The timestamp for the next run. 112 """ 113 return self.croniter(value).get_next(estimate=estimate)
Get the next timestamp given a time-like value for this interval unit.
Arguments:
- value: A variety of date formats.
- estimate: Whether or not to estimate, only use this if the value is floored.
Returns:
The timestamp for the next run.
def
cron_prev( self, value: Union[datetime.date, datetime.datetime, str, int, float], estimate: bool = False) -> datetime.datetime:
115 def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime: 116 """ 117 Get the previous timestamp given a time-like value for this interval unit. 118 119 Args: 120 value: A variety of date formats. 121 estimate: Whether or not to estimate, only use this if the value is floored. 122 123 Returns: 124 The timestamp for the previous run. 125 """ 126 return self.croniter(value).get_prev(estimate=estimate)
Get the previous timestamp given a time-like value for this interval unit.
Arguments:
- value: A variety of date formats.
- estimate: Whether or not to estimate, only use this if the value is floored.
Returns:
The timestamp for the previous run.
def
cron_floor( self, value: Union[datetime.date, datetime.datetime, str, int, float], estimate: bool = False) -> datetime.datetime:
128 def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime: 129 """ 130 Get the floor timestamp given a time-like value for this interval unit. 131 132 Args: 133 value: A variety of date formats. 134 estimate: Whether or not to estimate, only use this if the value is floored. 135 136 Returns: 137 The timestamp floor. 138 """ 139 croniter = self.croniter(value) 140 croniter.get_next(estimate=estimate) 141 return croniter.get_prev(estimate=True)
Get the floor timestamp given a time-like value for this interval unit.
Arguments:
- value: A variety of date formats.
- estimate: Whether or not to estimate, only use this if the value is floored.
Returns:
The timestamp floor.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
class
NodeType(builtins.str, enum.Enum):
396class NodeType(str, Enum): 397 MODEL = "model" 398 AUDIT = "audit" 399 400 def __str__(self) -> str: 401 return self.name
An enumeration.
MODEL =
<NodeType.MODEL: 'model'>
AUDIT =
<NodeType.AUDIT: 'audit'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
def
str_or_exp_to_str(v: Any) -> Union[str, NoneType]: