sqlmesh.core.node
1from __future__ import annotations 2 3import typing as t 4import zoneinfo 5from datetime import datetime 6from enum import Enum 7from pathlib import Path 8 9from pydantic import Field 10from sqlglot import exp 11 12from sqlmesh.utils.cron import CroniterCache 13from sqlmesh.utils.date import TimeLike, to_datetime, validate_date_range 14from sqlmesh.utils.errors import ConfigError 15from sqlmesh.utils.pydantic import ( 16 PydanticModel, 17 SQLGlotCron, 18 field_validator, 19 model_validator, 20 PRIVATE_FIELDS, 21) 22 23if t.TYPE_CHECKING: 24 from sqlmesh.core._typing import Self 25 from sqlmesh.core.snapshot import Node 26 27 28class IntervalUnit(str, Enum): 29 """IntervalUnit is the inferred granularity of an incremental node. 30 31 IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred 32 based on the cron schedule of a node. The minimum time delta between a sample set of dates 33 is used to determine which unit a node's schedule is. 34 35 It's designed to align with common partitioning schemes, hence why there is no WEEK unit 36 because generally tables are not partitioned by week 37 """ 38 39 YEAR = "year" 40 MONTH = "month" 41 DAY = "day" 42 HOUR = "hour" 43 HALF_HOUR = "half_hour" 44 QUARTER_HOUR = "quarter_hour" 45 FIVE_MINUTE = "five_minute" 46 47 @classmethod 48 def from_cron(klass, cron: str) -> IntervalUnit: 49 croniter = CroniterCache(cron) 50 interval_seconds = croniter.interval_seconds 51 52 if not interval_seconds: 53 samples = [croniter.get_next() for _ in range(5)] 54 interval_seconds = int(min(b - a for a, b in zip(samples, samples[1:])).total_seconds()) 55 56 for unit, seconds in INTERVAL_SECONDS.items(): 57 if seconds <= interval_seconds: 58 return unit 59 raise ConfigError(f"Invalid cron '{cron}': must run at a frequency of 5 minutes or slower.") 60 61 @property 62 def is_date_granularity(self) -> bool: 63 return self in (IntervalUnit.YEAR, IntervalUnit.MONTH, IntervalUnit.DAY) 64 65 @property 66 def is_year(self) -> bool: 67 return self == IntervalUnit.YEAR 68 69 @property 70 def is_month(self) -> bool: 71 return self == IntervalUnit.MONTH 72 73 @property 74 def is_day(self) -> bool: 75 return self == IntervalUnit.DAY 76 77 @property 78 def is_hour(self) -> bool: 79 return self == IntervalUnit.HOUR 80 81 @property 82 def is_minute(self) -> bool: 83 return self in (IntervalUnit.FIVE_MINUTE, IntervalUnit.QUARTER_HOUR, IntervalUnit.HALF_HOUR) 84 85 @property 86 def cron_expr(self) -> str: 87 if self == IntervalUnit.FIVE_MINUTE: 88 return "*/5 * * * *" 89 if self == IntervalUnit.QUARTER_HOUR: 90 return "*/15 * * * *" 91 if self == IntervalUnit.HALF_HOUR: 92 return "*/30 * * * *" 93 if self == IntervalUnit.HOUR: 94 return "0 * * * *" 95 if self == IntervalUnit.DAY: 96 return "0 0 * * *" 97 if self == IntervalUnit.MONTH: 98 return "0 0 1 * *" 99 if self == IntervalUnit.YEAR: 100 return "0 0 1 1 *" 101 return "" 102 103 def croniter(self, value: TimeLike) -> CroniterCache: 104 return CroniterCache(self.cron_expr, value) 105 106 def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime: 107 """ 108 Get the next timestamp given a time-like value for this interval unit. 109 110 Args: 111 value: A variety of date formats. 112 estimate: Whether or not to estimate, only use this if the value is floored. 113 114 Returns: 115 The timestamp for the next run. 116 """ 117 return self.croniter(value).get_next(estimate=estimate) 118 119 def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime: 120 """ 121 Get the previous timestamp given a time-like value for this interval unit. 122 123 Args: 124 value: A variety of date formats. 125 estimate: Whether or not to estimate, only use this if the value is floored. 126 127 Returns: 128 The timestamp for the previous run. 129 """ 130 return self.croniter(value).get_prev(estimate=estimate) 131 132 def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime: 133 """ 134 Get the floor timestamp given a time-like value for this interval unit. 135 136 Args: 137 value: A variety of date formats. 138 estimate: Whether or not to estimate, only use this if the value is floored. 139 140 Returns: 141 The timestamp floor. 142 """ 143 croniter = self.croniter(value) 144 croniter.get_next(estimate=estimate) 145 return croniter.get_prev(estimate=True) 146 147 @property 148 def seconds(self) -> int: 149 return INTERVAL_SECONDS[self] 150 151 @property 152 def milliseconds(self) -> int: 153 return self.seconds * 1000 154 155 156class DbtNodeInfo(PydanticModel): 157 """ 158 Represents dbt-specific model information set by the dbt loader and intended to be made available at the Snapshot level 159 (as opposed to hidden within the individual model jinja macro registries). 160 161 This allows for things like injecting implementations of variables / functions into the Jinja context that are compatible with 162 their dbt equivalents but are backed by the sqlmesh snapshots in any given plan / environment 163 """ 164 165 unique_id: str 166 """This is the node/resource name/unique_id that's used as the node key in the dbt manifest. 167 It's prefixed by the resource type and is exposed in context variables like {{ selected_resources }}. 168 169 Examples: 170 - test.jaffle_shop.unique_stg_orders_order_id.e3b841c71a 171 - seed.jaffle_shop.raw_payments 172 - model.jaffle_shop.stg_orders 173 """ 174 175 name: str 176 """Name of this object in the dbt global namespace, used by things like {{ ref() }} calls. 177 178 Examples: 179 - unique_stg_orders_order_id 180 - raw_payments 181 - stg_orders 182 """ 183 184 fqn: str 185 """Used for selectors in --select/--exclude. 186 Takes the filesystem into account so may be structured differently to :unique_id. 187 188 Examples: 189 - jaffle_shop.staging.unique_stg_orders_order_id 190 - jaffle_shop.raw_payments 191 - jaffle_shop.staging.stg_orders 192 """ 193 194 alias: t.Optional[str] = None 195 """This is dbt's way of overriding the _physical table_ a model is written to. 196 197 It's used in the following situation: 198 - Say you have two models, "stg_customers" and "customers" 199 - You want "stg_customers" to be written to the "staging" schema as eg "staging.customers" - NOT "staging.stg_customers" 200 - But you cant rename the file to "customers" because it will conflict with your other model file "customers" 201 - Even if you put it in a different folder, eg "staging/customers.sql" - dbt still has a global namespace so it will conflict 202 when you try to do something like "{{ ref('customers') }}" 203 - So dbt's solution to this problem is to keep calling it "stg_customers" at the dbt project/model level, 204 but allow overriding the physical table to "customers" via something like "{{ config(alias='customers', schema='staging') }}" 205 206 Note that if :alias is set, it does *not* replace :name at the model level and cannot be used interchangably with :name. 207 It also does not affect the :fqn or :unique_id. It's just used to override :name when it comes time to generate the physical table name. 208 """ 209 210 @model_validator(mode="after") 211 def post_init(self) -> Self: 212 # by default, dbt sets alias to the same as :name 213 # however, we only want to include :alias if it is actually different / actually providing an override 214 if self.alias == self.name: 215 self.alias = None 216 return self 217 218 def to_expression(self) -> exp.Expr: 219 """Produce a SQLGlot expression representing this object, for use in things like the model/audit definition renderers""" 220 return exp.tuple_( 221 *( 222 exp.PropertyEQ(this=exp.var(k), expression=exp.Literal.string(v)) 223 for k, v in sorted(self.model_dump(exclude_none=True).items()) 224 ) 225 ) 226 227 228class DbtInfoMixin: 229 """This mixin encapsulates properties that only exist for dbt compatibility and are otherwise not required 230 for native projects""" 231 232 @property 233 def dbt_node_info(self) -> t.Optional[DbtNodeInfo]: 234 raise NotImplementedError() 235 236 @property 237 def dbt_unique_id(self) -> t.Optional[str]: 238 """Used for compatibility with jinja context variables such as {{ selected_resources }}""" 239 if self.dbt_node_info: 240 return self.dbt_node_info.unique_id 241 return None 242 243 @property 244 def dbt_fqn(self) -> t.Optional[str]: 245 """Used in the selector engine for compatibility with selectors that select models by dbt fqn""" 246 if self.dbt_node_info: 247 return self.dbt_node_info.fqn 248 return None 249 250 251# this must be sorted in descending order 252INTERVAL_SECONDS = { 253 IntervalUnit.YEAR: 60 * 60 * 24 * 365, 254 IntervalUnit.MONTH: 60 * 60 * 24 * 28, 255 IntervalUnit.DAY: 60 * 60 * 24, 256 IntervalUnit.HOUR: 60 * 60, 257 IntervalUnit.HALF_HOUR: 60 * 30, 258 IntervalUnit.QUARTER_HOUR: 60 * 15, 259 IntervalUnit.FIVE_MINUTE: 60 * 5, 260} 261 262 263class _Node(DbtInfoMixin, PydanticModel): 264 """ 265 Node is the core abstraction for entity that can be executed within the scheduler. 266 267 Args: 268 name: The name of the node. 269 project: The name of the project this node belongs to, used in multi-repo deployments. 270 description: The optional node description. 271 owner: The owner of the node. 272 start: The earliest date that the node will be executed for. If this is None, 273 then the date is inferred by taking the most recent start date of its ancestors. 274 The start date can be a static datetime or a relative datetime like "1 year ago" 275 end: The latest date that the model will be executed for. If this is None, 276 the date from the scheduler will be used 277 cron: A cron string specifying how often the node should be run, leveraging the 278 [croniter](https://github.com/kiorky/croniter) library. 279 cron_tz: Time zone for the cron, defaults to utc, [IANA time zones](https://docs.python.org/3/library/zoneinfo.html). 280 interval_unit: The duration of an interval for the node. By default, it is computed from the cron expression. 281 tags: A list of tags that can be used to filter nodes. 282 stamp: An optional arbitrary string sequence used to create new node versions without making 283 changes to any of the functional components of the definition. 284 """ 285 286 name: str 287 project: str = "" 288 description: t.Optional[str] = None 289 owner: t.Optional[str] = None 290 start: t.Optional[TimeLike] = None 291 end: t.Optional[TimeLike] = None 292 cron: SQLGlotCron = "@daily" 293 cron_tz: t.Optional[zoneinfo.ZoneInfo] = None 294 interval_unit_: t.Optional[IntervalUnit] = Field(alias="interval_unit", default=None) 295 tags: t.List[str] = [] 296 stamp: t.Optional[str] = None 297 dbt_node_info_: t.Optional[DbtNodeInfo] = Field(alias="dbt_node_info", default=None) 298 _path: t.Optional[Path] = None 299 _data_hash: t.Optional[str] = None 300 _metadata_hash: t.Optional[str] = None 301 302 _croniter: t.Optional[CroniterCache] = None 303 __inferred_interval_unit: t.Optional[IntervalUnit] = None 304 305 def __str__(self) -> str: 306 path = f": {self._path.name}" if self._path else "" 307 return f"{self.__class__.__name__}<{self.name}{path}>" 308 309 def __getstate__(self) -> t.Dict[t.Any, t.Any]: 310 state = super().__getstate__() 311 private = state[PRIVATE_FIELDS] 312 private["_data_hash"] = None 313 private["_metadata_hash"] = None 314 return state 315 316 def copy(self, **kwargs: t.Any) -> Self: 317 node = super().copy(**kwargs) 318 node._data_hash = None 319 node._metadata_hash = None 320 return node 321 322 @field_validator("name", mode="before") 323 @classmethod 324 def _name_validator(cls, v: t.Any) -> t.Optional[str]: 325 if v is None: 326 return None 327 if isinstance(v, exp.Expr): 328 return v.meta["sql"] 329 return str(v) 330 331 @field_validator("cron_tz", mode="before") 332 def _cron_tz_validator(cls, v: t.Any) -> t.Optional[zoneinfo.ZoneInfo]: 333 if not v or v == "UTC": 334 return None 335 336 v = str_or_exp_to_str(v) 337 338 try: 339 return zoneinfo.ZoneInfo(v) 340 except Exception as e: 341 available_timezones = zoneinfo.available_timezones() 342 343 if available_timezones: 344 raise ConfigError(f"{e}. {v} must be in {available_timezones}.") 345 else: 346 raise ConfigError( 347 f"{e}. IANA time zone data is not available on your system. `pip install tzdata` to leverage cron time zones or remove this field which will default to UTC." 348 ) 349 350 return None 351 352 @field_validator("start", "end", mode="before") 353 @classmethod 354 def _date_validator(cls, v: t.Any) -> t.Optional[TimeLike]: 355 if isinstance(v, exp.Expr): 356 v = v.name 357 if v and not to_datetime(v): 358 raise ConfigError(f"'{v}' needs to be time-like: https://pypi.org/project/dateparser") 359 return v 360 361 @field_validator("owner", "description", "stamp", mode="before") 362 @classmethod 363 def _string_expr_validator(cls, v: t.Any) -> t.Optional[str]: 364 return str_or_exp_to_str(v) 365 366 @field_validator("interval_unit_", mode="before") 367 @classmethod 368 def _interval_unit_validator(cls, v: t.Any) -> t.Optional[t.Union[IntervalUnit, str]]: 369 if isinstance(v, IntervalUnit): 370 return v 371 v = str_or_exp_to_str(v) 372 if v: 373 v = v.lower() 374 return v 375 376 @model_validator(mode="after") 377 def _node_root_validator(self) -> Self: 378 interval_unit = self.interval_unit_ 379 if interval_unit and not getattr(self, "allow_partials", None): 380 cron = self.cron 381 max_interval_unit = IntervalUnit.from_cron(cron) 382 if interval_unit.seconds > max_interval_unit.seconds: 383 raise ConfigError( 384 f"Cron '{cron}' cannot be more frequent than interval unit '{interval_unit.value}'. " 385 "If this is intentional, set allow_partials to True." 386 ) 387 388 start = self.start 389 end = self.end 390 391 if end is not None and start is None: 392 raise ConfigError("Must define a start date if an end date is defined.") 393 validate_date_range(start, end) 394 return self 395 396 @property 397 def batch_size(self) -> t.Optional[int]: 398 """The maximal number of units in a single task for a backfill.""" 399 return None 400 401 @property 402 def batch_concurrency(self) -> t.Optional[int]: 403 """The maximal number of batches that can run concurrently for a backfill.""" 404 return None 405 406 @property 407 def interval_unit(self) -> IntervalUnit: 408 """Returns the interval unit using which data intervals are computed for this node.""" 409 if self.interval_unit_ is not None: 410 return self.interval_unit_ 411 return self._inferred_interval_unit() 412 413 @property 414 def depends_on(self) -> t.Set[str]: 415 return set() 416 417 @property 418 def fqn(self) -> str: 419 return self.name 420 421 @property 422 def data_hash(self) -> str: 423 """ 424 Computes the data hash for the node. 425 426 Returns: 427 The data hash for the node. 428 """ 429 raise NotImplementedError 430 431 @property 432 def metadata_hash(self) -> str: 433 """ 434 Computes the metadata hash for the node. 435 436 Returns: 437 The metadata hash for the node. 438 """ 439 raise NotImplementedError 440 441 def is_metadata_only_change(self, previous: _Node) -> bool: 442 """Determines if this node is a metadata only change in relation to the `previous` node. 443 444 Args: 445 previous: The previous node to compare against. 446 447 Returns: 448 True if this node is a metadata only change, False otherwise. 449 """ 450 return self.data_hash == previous.data_hash and self.metadata_hash != previous.metadata_hash 451 452 def is_data_change(self, previous: _Node) -> bool: 453 """Determines if this node is a data change in relation to the `previous` node. 454 455 Args: 456 previous: The previous node to compare against. 457 458 Returns: 459 True if this node is a data change, False otherwise. 460 """ 461 return ( 462 self.data_hash != previous.data_hash or self.metadata_hash != previous.metadata_hash 463 ) and not self.is_metadata_only_change(previous) 464 465 def croniter(self, value: TimeLike) -> CroniterCache: 466 if self._croniter is None: 467 self._croniter = CroniterCache(self.cron, value, tz=self.cron_tz) 468 else: 469 self._croniter.curr = to_datetime(value, tz=self.cron_tz) 470 return self._croniter 471 472 def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime: 473 """ 474 Get the next timestamp given a time-like value and the node's cron. 475 476 Args: 477 value: A variety of date formats. 478 estimate: Whether or not to estimate, only use this if the value is floored. 479 480 Returns: 481 The timestamp for the next run. 482 """ 483 return self.croniter(value).get_next(estimate=estimate) 484 485 def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime: 486 """ 487 Get the previous timestamp given a time-like value and the node's cron. 488 489 Args: 490 value: A variety of date formats. 491 estimate: Whether or not to estimate, only use this if the value is floored. 492 493 Returns: 494 The timestamp for the previous run. 495 """ 496 return self.croniter(value).get_prev(estimate=estimate) 497 498 def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime: 499 """ 500 Get the floor timestamp given a time-like value and the node's cron. 501 502 Args: 503 value: A variety of date formats. 504 estimate: Whether or not to estimate, only use this if the value is floored. 505 506 Returns: 507 The timestamp floor. 508 """ 509 return self.croniter(self.cron_next(value, estimate=estimate)).get_prev(estimate=True) 510 511 def text_diff(self, other: Node, rendered: bool = False) -> str: 512 """Produce a text diff against another node. 513 514 Args: 515 other: The node to diff against. Must be of the same type. 516 517 Returns: 518 A unified text diff showing additions and deletions. 519 """ 520 raise NotImplementedError 521 522 def _inferred_interval_unit(self) -> IntervalUnit: 523 """Infers the interval unit from the cron expression. 524 525 The interval unit is used to determine the lag applied to start_date and end_date for node rendering and intervals. 526 527 Returns: 528 The IntervalUnit enum. 529 """ 530 if not self.__inferred_interval_unit: 531 self.__inferred_interval_unit = IntervalUnit.from_cron(self.cron) 532 return self.__inferred_interval_unit 533 534 @property 535 def is_model(self) -> bool: 536 """Return True if this is a model node""" 537 return False 538 539 @property 540 def is_audit(self) -> bool: 541 """Return True if this is an audit node""" 542 return False 543 544 @property 545 def dbt_node_info(self) -> t.Optional[DbtNodeInfo]: 546 return self.dbt_node_info_ 547 548 549class NodeType(str, Enum): 550 MODEL = "model" 551 AUDIT = "audit" 552 553 def __str__(self) -> str: 554 return self.name 555 556 557def str_or_exp_to_str(v: t.Any) -> t.Optional[str]: 558 if isinstance(v, exp.Expr): 559 return v.name 560 return str(v) if v is not None else None
29class IntervalUnit(str, Enum): 30 """IntervalUnit is the inferred granularity of an incremental node. 31 32 IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred 33 based on the cron schedule of a node. The minimum time delta between a sample set of dates 34 is used to determine which unit a node's schedule is. 35 36 It's designed to align with common partitioning schemes, hence why there is no WEEK unit 37 because generally tables are not partitioned by week 38 """ 39 40 YEAR = "year" 41 MONTH = "month" 42 DAY = "day" 43 HOUR = "hour" 44 HALF_HOUR = "half_hour" 45 QUARTER_HOUR = "quarter_hour" 46 FIVE_MINUTE = "five_minute" 47 48 @classmethod 49 def from_cron(klass, cron: str) -> IntervalUnit: 50 croniter = CroniterCache(cron) 51 interval_seconds = croniter.interval_seconds 52 53 if not interval_seconds: 54 samples = [croniter.get_next() for _ in range(5)] 55 interval_seconds = int(min(b - a for a, b in zip(samples, samples[1:])).total_seconds()) 56 57 for unit, seconds in INTERVAL_SECONDS.items(): 58 if seconds <= interval_seconds: 59 return unit 60 raise ConfigError(f"Invalid cron '{cron}': must run at a frequency of 5 minutes or slower.") 61 62 @property 63 def is_date_granularity(self) -> bool: 64 return self in (IntervalUnit.YEAR, IntervalUnit.MONTH, IntervalUnit.DAY) 65 66 @property 67 def is_year(self) -> bool: 68 return self == IntervalUnit.YEAR 69 70 @property 71 def is_month(self) -> bool: 72 return self == IntervalUnit.MONTH 73 74 @property 75 def is_day(self) -> bool: 76 return self == IntervalUnit.DAY 77 78 @property 79 def is_hour(self) -> bool: 80 return self == IntervalUnit.HOUR 81 82 @property 83 def is_minute(self) -> bool: 84 return self in (IntervalUnit.FIVE_MINUTE, IntervalUnit.QUARTER_HOUR, IntervalUnit.HALF_HOUR) 85 86 @property 87 def cron_expr(self) -> str: 88 if self == IntervalUnit.FIVE_MINUTE: 89 return "*/5 * * * *" 90 if self == IntervalUnit.QUARTER_HOUR: 91 return "*/15 * * * *" 92 if self == IntervalUnit.HALF_HOUR: 93 return "*/30 * * * *" 94 if self == IntervalUnit.HOUR: 95 return "0 * * * *" 96 if self == IntervalUnit.DAY: 97 return "0 0 * * *" 98 if self == IntervalUnit.MONTH: 99 return "0 0 1 * *" 100 if self == IntervalUnit.YEAR: 101 return "0 0 1 1 *" 102 return "" 103 104 def croniter(self, value: TimeLike) -> CroniterCache: 105 return CroniterCache(self.cron_expr, value) 106 107 def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime: 108 """ 109 Get the next timestamp given a time-like value for this interval unit. 110 111 Args: 112 value: A variety of date formats. 113 estimate: Whether or not to estimate, only use this if the value is floored. 114 115 Returns: 116 The timestamp for the next run. 117 """ 118 return self.croniter(value).get_next(estimate=estimate) 119 120 def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime: 121 """ 122 Get the previous timestamp given a time-like value for this interval unit. 123 124 Args: 125 value: A variety of date formats. 126 estimate: Whether or not to estimate, only use this if the value is floored. 127 128 Returns: 129 The timestamp for the previous run. 130 """ 131 return self.croniter(value).get_prev(estimate=estimate) 132 133 def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime: 134 """ 135 Get the floor timestamp given a time-like value for this interval unit. 136 137 Args: 138 value: A variety of date formats. 139 estimate: Whether or not to estimate, only use this if the value is floored. 140 141 Returns: 142 The timestamp floor. 143 """ 144 croniter = self.croniter(value) 145 croniter.get_next(estimate=estimate) 146 return croniter.get_prev(estimate=True) 147 148 @property 149 def seconds(self) -> int: 150 return INTERVAL_SECONDS[self] 151 152 @property 153 def milliseconds(self) -> int: 154 return self.seconds * 1000
IntervalUnit is the inferred granularity of an incremental node.
IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred based on the cron schedule of a node. The minimum time delta between a sample set of dates is used to determine which unit a node's schedule is.
It's designed to align with common partitioning schemes, hence why there is no WEEK unit because generally tables are not partitioned by week
48 @classmethod 49 def from_cron(klass, cron: str) -> IntervalUnit: 50 croniter = CroniterCache(cron) 51 interval_seconds = croniter.interval_seconds 52 53 if not interval_seconds: 54 samples = [croniter.get_next() for _ in range(5)] 55 interval_seconds = int(min(b - a for a, b in zip(samples, samples[1:])).total_seconds()) 56 57 for unit, seconds in INTERVAL_SECONDS.items(): 58 if seconds <= interval_seconds: 59 return unit 60 raise ConfigError(f"Invalid cron '{cron}': must run at a frequency of 5 minutes or slower.")
86 @property 87 def cron_expr(self) -> str: 88 if self == IntervalUnit.FIVE_MINUTE: 89 return "*/5 * * * *" 90 if self == IntervalUnit.QUARTER_HOUR: 91 return "*/15 * * * *" 92 if self == IntervalUnit.HALF_HOUR: 93 return "*/30 * * * *" 94 if self == IntervalUnit.HOUR: 95 return "0 * * * *" 96 if self == IntervalUnit.DAY: 97 return "0 0 * * *" 98 if self == IntervalUnit.MONTH: 99 return "0 0 1 * *" 100 if self == IntervalUnit.YEAR: 101 return "0 0 1 1 *" 102 return ""
107 def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime: 108 """ 109 Get the next timestamp given a time-like value for this interval unit. 110 111 Args: 112 value: A variety of date formats. 113 estimate: Whether or not to estimate, only use this if the value is floored. 114 115 Returns: 116 The timestamp for the next run. 117 """ 118 return self.croniter(value).get_next(estimate=estimate)
Get the next timestamp given a time-like value for this interval unit.
Arguments:
- value: A variety of date formats.
- estimate: Whether or not to estimate, only use this if the value is floored.
Returns:
The timestamp for the next run.
120 def cron_prev(self, value: TimeLike, estimate: bool = False) -> datetime: 121 """ 122 Get the previous timestamp given a time-like value for this interval unit. 123 124 Args: 125 value: A variety of date formats. 126 estimate: Whether or not to estimate, only use this if the value is floored. 127 128 Returns: 129 The timestamp for the previous run. 130 """ 131 return self.croniter(value).get_prev(estimate=estimate)
Get the previous timestamp given a time-like value for this interval unit.
Arguments:
- value: A variety of date formats.
- estimate: Whether or not to estimate, only use this if the value is floored.
Returns:
The timestamp for the previous run.
133 def cron_floor(self, value: TimeLike, estimate: bool = False) -> datetime: 134 """ 135 Get the floor timestamp given a time-like value for this interval unit. 136 137 Args: 138 value: A variety of date formats. 139 estimate: Whether or not to estimate, only use this if the value is floored. 140 141 Returns: 142 The timestamp floor. 143 """ 144 croniter = self.croniter(value) 145 croniter.get_next(estimate=estimate) 146 return croniter.get_prev(estimate=True)
Get the floor timestamp given a time-like value for this interval unit.
Arguments:
- value: A variety of date formats.
- estimate: Whether or not to estimate, only use this if the value is floored.
Returns:
The timestamp floor.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
157class DbtNodeInfo(PydanticModel): 158 """ 159 Represents dbt-specific model information set by the dbt loader and intended to be made available at the Snapshot level 160 (as opposed to hidden within the individual model jinja macro registries). 161 162 This allows for things like injecting implementations of variables / functions into the Jinja context that are compatible with 163 their dbt equivalents but are backed by the sqlmesh snapshots in any given plan / environment 164 """ 165 166 unique_id: str 167 """This is the node/resource name/unique_id that's used as the node key in the dbt manifest. 168 It's prefixed by the resource type and is exposed in context variables like {{ selected_resources }}. 169 170 Examples: 171 - test.jaffle_shop.unique_stg_orders_order_id.e3b841c71a 172 - seed.jaffle_shop.raw_payments 173 - model.jaffle_shop.stg_orders 174 """ 175 176 name: str 177 """Name of this object in the dbt global namespace, used by things like {{ ref() }} calls. 178 179 Examples: 180 - unique_stg_orders_order_id 181 - raw_payments 182 - stg_orders 183 """ 184 185 fqn: str 186 """Used for selectors in --select/--exclude. 187 Takes the filesystem into account so may be structured differently to :unique_id. 188 189 Examples: 190 - jaffle_shop.staging.unique_stg_orders_order_id 191 - jaffle_shop.raw_payments 192 - jaffle_shop.staging.stg_orders 193 """ 194 195 alias: t.Optional[str] = None 196 """This is dbt's way of overriding the _physical table_ a model is written to. 197 198 It's used in the following situation: 199 - Say you have two models, "stg_customers" and "customers" 200 - You want "stg_customers" to be written to the "staging" schema as eg "staging.customers" - NOT "staging.stg_customers" 201 - But you cant rename the file to "customers" because it will conflict with your other model file "customers" 202 - Even if you put it in a different folder, eg "staging/customers.sql" - dbt still has a global namespace so it will conflict 203 when you try to do something like "{{ ref('customers') }}" 204 - So dbt's solution to this problem is to keep calling it "stg_customers" at the dbt project/model level, 205 but allow overriding the physical table to "customers" via something like "{{ config(alias='customers', schema='staging') }}" 206 207 Note that if :alias is set, it does *not* replace :name at the model level and cannot be used interchangably with :name. 208 It also does not affect the :fqn or :unique_id. It's just used to override :name when it comes time to generate the physical table name. 209 """ 210 211 @model_validator(mode="after") 212 def post_init(self) -> Self: 213 # by default, dbt sets alias to the same as :name 214 # however, we only want to include :alias if it is actually different / actually providing an override 215 if self.alias == self.name: 216 self.alias = None 217 return self 218 219 def to_expression(self) -> exp.Expr: 220 """Produce a SQLGlot expression representing this object, for use in things like the model/audit definition renderers""" 221 return exp.tuple_( 222 *( 223 exp.PropertyEQ(this=exp.var(k), expression=exp.Literal.string(v)) 224 for k, v in sorted(self.model_dump(exclude_none=True).items()) 225 ) 226 )
Represents dbt-specific model information set by the dbt loader and intended to be made available at the Snapshot level (as opposed to hidden within the individual model jinja macro registries).
This allows for things like injecting implementations of variables / functions into the Jinja context that are compatible with their dbt equivalents but are backed by the sqlmesh snapshots in any given plan / environment
This is the node/resource name/unique_id that's used as the node key in the dbt manifest. It's prefixed by the resource type and is exposed in context variables like {{ selected_resources }}.
Examples:
- test.jaffle_shop.unique_stg_orders_order_id.e3b841c71a
- seed.jaffle_shop.raw_payments
- model.jaffle_shop.stg_orders
Name of this object in the dbt global namespace, used by things like {{ ref() }} calls.
Examples:
- unique_stg_orders_order_id
- raw_payments
- stg_orders
Used for selectors in --select/--exclude. Takes the filesystem into account so may be structured differently to :unique_id.
Examples:
- jaffle_shop.staging.unique_stg_orders_order_id
- jaffle_shop.raw_payments
- jaffle_shop.staging.stg_orders
This is dbt's way of overriding the _physical table_ a model is written to.
It's used in the following situation:
- Say you have two models, "stg_customers" and "customers"
- You want "stg_customers" to be written to the "staging" schema as eg "staging.customers" - NOT "staging.stg_customers"
- But you cant rename the file to "customers" because it will conflict with your other model file "customers"
- Even if you put it in a different folder, eg "staging/customers.sql" - dbt still has a global namespace so it will conflict when you try to do something like "{{ ref('customers') }}"
- So dbt's solution to this problem is to keep calling it "stg_customers" at the dbt project/model level, but allow overriding the physical table to "customers" via something like "{{ config(alias='customers', schema='staging') }}"
Note that if :alias is set, it does not replace :name at the model level and cannot be used interchangably with :name. It also does not affect the :fqn or :unique_id. It's just used to override :name when it comes time to generate the physical table name.
219 def to_expression(self) -> exp.Expr: 220 """Produce a SQLGlot expression representing this object, for use in things like the model/audit definition renderers""" 221 return exp.tuple_( 222 *( 223 exp.PropertyEQ(this=exp.var(k), expression=exp.Literal.string(v)) 224 for k, v in sorted(self.model_dump(exclude_none=True).items()) 225 ) 226 )
Produce a SQLGlot expression representing this object, for use in things like the model/audit definition renderers
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
229class DbtInfoMixin: 230 """This mixin encapsulates properties that only exist for dbt compatibility and are otherwise not required 231 for native projects""" 232 233 @property 234 def dbt_node_info(self) -> t.Optional[DbtNodeInfo]: 235 raise NotImplementedError() 236 237 @property 238 def dbt_unique_id(self) -> t.Optional[str]: 239 """Used for compatibility with jinja context variables such as {{ selected_resources }}""" 240 if self.dbt_node_info: 241 return self.dbt_node_info.unique_id 242 return None 243 244 @property 245 def dbt_fqn(self) -> t.Optional[str]: 246 """Used in the selector engine for compatibility with selectors that select models by dbt fqn""" 247 if self.dbt_node_info: 248 return self.dbt_node_info.fqn 249 return None
This mixin encapsulates properties that only exist for dbt compatibility and are otherwise not required for native projects
237 @property 238 def dbt_unique_id(self) -> t.Optional[str]: 239 """Used for compatibility with jinja context variables such as {{ selected_resources }}""" 240 if self.dbt_node_info: 241 return self.dbt_node_info.unique_id 242 return None
Used for compatibility with jinja context variables such as {{ selected_resources }}
244 @property 245 def dbt_fqn(self) -> t.Optional[str]: 246 """Used in the selector engine for compatibility with selectors that select models by dbt fqn""" 247 if self.dbt_node_info: 248 return self.dbt_node_info.fqn 249 return None
Used in the selector engine for compatibility with selectors that select models by dbt fqn
550class NodeType(str, Enum): 551 MODEL = "model" 552 AUDIT = "audit" 553 554 def __str__(self) -> str: 555 return self.name
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans