sqlmesh.core.model.meta
1from __future__ import annotations 2 3import typing as t 4from enum import Enum 5from functools import cached_property 6from typing_extensions import Self 7 8from pydantic import Field 9from sqlglot import Dialect, exp, parse_one 10from sqlglot.helper import ensure_collection, ensure_list 11from sqlglot.optimizer.normalize_identifiers import normalize_identifiers 12 13from sqlmesh.core import dialect as d 14from sqlmesh.core.config.common import VirtualEnvironmentMode 15from sqlmesh.core.config.linter import LinterConfig 16from sqlmesh.core.dialect import normalize_model_name 17from sqlmesh.utils import classproperty 18from sqlmesh.core.model.common import ( 19 bool_validator, 20 default_catalog_validator, 21 depends_on_validator, 22 properties_validator, 23 parse_properties, 24) 25from sqlmesh.core.model.kind import ( 26 CustomKind, 27 IncrementalByUniqueKeyKind, 28 ModelKind, 29 OnDestructiveChange, 30 SCDType2ByColumnKind, 31 SCDType2ByTimeKind, 32 TimeColumn, 33 ViewKind, 34 model_kind_validator, 35 OnAdditiveChange, 36) 37from sqlmesh.core.node import _Node, str_or_exp_to_str 38from sqlmesh.core.reference import Reference 39from sqlmesh.utils.date import TimeLike 40from sqlmesh.utils.errors import ConfigError 41from sqlmesh.utils.pydantic import ( 42 ValidationInfo, 43 field_validator, 44 list_of_fields_validator, 45 model_validator, 46 get_dialect, 47) 48 49if t.TYPE_CHECKING: 50 from sqlmesh.core._typing import CustomMaterializationProperties, SessionProperties 51 from sqlmesh.core.engine_adapter._typing import GrantsConfig 52 53FunctionCall = t.Tuple[str, t.Dict[str, exp.Expr]] 54 55 56class GrantsTargetLayer(str, Enum): 57 """Target layer(s) where grants should be applied.""" 58 59 ALL = "all" 60 PHYSICAL = "physical" 61 VIRTUAL = "virtual" 62 63 @classproperty 64 def default(cls) -> "GrantsTargetLayer": 65 return GrantsTargetLayer.VIRTUAL 66 67 @property 68 def is_all(self) -> bool: 69 return self == GrantsTargetLayer.ALL 70 71 @property 72 def is_physical(self) -> bool: 73 return self == GrantsTargetLayer.PHYSICAL 74 75 @property 76 def is_virtual(self) -> bool: 77 return self == GrantsTargetLayer.VIRTUAL 78 79 def __str__(self) -> str: 80 return self.name 81 82 def __repr__(self) -> str: 83 return str(self) 84 85 86class ModelMeta(_Node): 87 """Metadata for models which can be defined in SQL.""" 88 89 dialect: str = "" 90 name: str 91 kind: ModelKind = ViewKind() 92 retention: t.Optional[int] = None # not implemented yet 93 table_format: t.Optional[str] = None 94 storage_format: t.Optional[str] = None 95 partitioned_by_: t.List[exp.Expr] = Field(default=[], alias="partitioned_by") 96 clustered_by: t.List[exp.Expr] = [] 97 default_catalog: t.Optional[str] = None 98 depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on") 99 columns_to_types_: t.Optional[t.Dict[str, exp.DataType]] = Field(default=None, alias="columns") 100 column_descriptions_: t.Optional[t.Dict[str, str]] = Field( 101 default=None, alias="column_descriptions" 102 ) 103 audits: t.List[FunctionCall] = [] 104 grains: t.List[exp.Expr] = [] 105 references: t.List[exp.Expr] = [] 106 physical_schema_override: t.Optional[str] = None 107 physical_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="physical_properties") 108 virtual_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="virtual_properties") 109 session_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="session_properties") 110 allow_partials: bool = False 111 signals: t.List[FunctionCall] = [] 112 enabled: bool = True 113 physical_version: t.Optional[str] = None 114 gateway: t.Optional[str] = None 115 optimize_query: t.Optional[bool] = None 116 ignored_rules_: t.Optional[t.Set[str]] = Field( 117 default=None, exclude=True, alias="ignored_rules" 118 ) 119 formatting: t.Optional[bool] = Field(default=None, exclude=True) 120 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default 121 grants_: t.Optional[exp.Tuple] = Field(default=None, alias="grants") 122 grants_target_layer: GrantsTargetLayer = GrantsTargetLayer.default 123 124 _bool_validator = bool_validator 125 _model_kind_validator = model_kind_validator 126 _properties_validator = properties_validator 127 _default_catalog_validator = default_catalog_validator 128 _depends_on_validator = depends_on_validator 129 130 @field_validator("audits", "signals", mode="before") 131 def _func_call_validator(cls, v: t.Any, field: t.Any) -> t.Any: 132 is_signal = getattr(field, "name" if hasattr(field, "name") else "field_name") == "signals" 133 134 return d.extract_function_calls(v, allow_tuples=is_signal) 135 136 @field_validator("tags", mode="before") 137 def _value_or_tuple_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any: 138 return ensure_list(cls._validate_value_or_tuple(v, info.data)) 139 140 @classmethod 141 def _validate_value_or_tuple( 142 cls, v: t.Dict[str, t.Any], data: t.Dict[str, t.Any], normalize: bool = False 143 ) -> t.Any: 144 dialect = data.get("dialect") 145 146 def _normalize(value: t.Any) -> t.Any: 147 return normalize_identifiers(value, dialect=dialect) if normalize else value 148 149 if isinstance(v, exp.Paren): 150 v = [v.unnest()] 151 152 if isinstance(v, (exp.Tuple, exp.Array)): 153 return [_normalize(e).name for e in v.expressions] 154 if isinstance(v, exp.Expr): 155 return _normalize(v).name 156 if isinstance(v, str): 157 value = _normalize(v) 158 return value.name if isinstance(value, exp.Expr) else value 159 if isinstance(v, (list, tuple)): 160 return [cls._validate_value_or_tuple(elm, data, normalize=normalize) for elm in v] 161 162 return v 163 164 @field_validator("table_format", "storage_format", mode="before") 165 def _format_validator(cls, v: t.Any, info: ValidationInfo) -> t.Optional[str]: 166 if isinstance(v, exp.Expr) and not (isinstance(v, (exp.Literal, exp.Identifier))): 167 return v.sql(info.data.get("dialect")) 168 return str_or_exp_to_str(v) 169 170 @field_validator("dialect", mode="before") 171 def _dialect_validator(cls, v: t.Any) -> t.Optional[str]: 172 # dialects are parsed as identifiers and may get normalized as uppercase, 173 # so this ensures they'll be stored as lowercase 174 dialect = str_or_exp_to_str(v) 175 return dialect and dialect.lower() 176 177 @field_validator("physical_version", mode="before") 178 def _physical_version_validator(cls, v: t.Any) -> t.Optional[str]: 179 if v is None: 180 return v 181 return str_or_exp_to_str(v) 182 183 @field_validator("gateway", mode="before") 184 def _gateway_validator(cls, v: t.Any) -> t.Optional[str]: 185 if v is None: 186 return None 187 gateway = str_or_exp_to_str(v) 188 return gateway and gateway.lower() 189 190 @field_validator("partitioned_by_", "clustered_by", mode="before") 191 def _partition_and_cluster_validator(cls, v: t.Any, info: ValidationInfo) -> t.List[exp.Expr]: 192 if ( 193 isinstance(v, list) 194 and all(isinstance(i, str) for i in v) 195 and info.field_name == "partitioned_by_" 196 ): 197 # this branch gets hit when we are deserializing from json because `partitioned_by` is stored as a List[str] 198 # however, we should only invoke this if the list contains strings because this validator is also 199 # called by Python models which might pass a List[exp.Expression] 200 string_to_parse = ( 201 f"({','.join(v)})" # recreate the (a, b, c) part of "partitioned_by (a, b, c)" 202 ) 203 parsed = parse_one( 204 string_to_parse, into=exp.PartitionedByProperty, dialect=get_dialect(info) 205 ) 206 v = parsed.this.expressions if isinstance(parsed.this, exp.Schema) else v 207 208 expressions = list_of_fields_validator(v, info.data) 209 210 for expression in expressions: 211 num_cols = len(list(expression.find_all(exp.Column))) 212 213 error_msg: t.Optional[str] = None 214 if num_cols == 0: 215 error_msg = "does not contain a column" 216 elif num_cols > 1: 217 error_msg = "contains multiple columns" 218 219 if error_msg: 220 raise ConfigError(f"Field '{expression}' {error_msg}") 221 222 return expressions 223 224 @field_validator( 225 "columns_to_types_", "derived_columns_to_types", mode="before", check_fields=False 226 ) 227 def _columns_validator( 228 cls, v: t.Any, info: ValidationInfo 229 ) -> t.Optional[t.Dict[str, exp.DataType]]: 230 columns_to_types = {} 231 dialect = info.data.get("dialect") 232 233 if isinstance(v, exp.Schema): 234 for column in v.expressions: 235 expr = column.args.get("kind") 236 if not isinstance(expr, exp.DataType): 237 raise ConfigError(f"Missing data type for column '{column.name}'.") 238 239 expr.meta["dialect"] = dialect 240 columns_to_types[normalize_identifiers(column, dialect=dialect).name] = expr 241 242 return columns_to_types 243 244 if isinstance(v, dict): 245 dialect_obj = Dialect.get_or_raise(dialect) 246 udt = dialect_obj.SUPPORTS_USER_DEFINED_TYPES 247 for k, data_type in v.items(): 248 is_string_type = isinstance(data_type, str) 249 expr = exp.DataType.build(data_type, dialect=dialect, udt=udt) 250 # When deserializing from a string (e.g. JSON roundtrip), normalize the type 251 # through the dialect's type system so that aliases (e.g. INT in BigQuery, 252 # which is an alias for INT64/BIGINT) are resolved to their canonical form. 253 # This ensures stable data hash computation across serialization/deserialization 254 # roundtrips. We skip this for DataType objects passed directly (Python API) 255 # since those should be used as-is. 256 if ( 257 is_string_type 258 and dialect 259 and expr.this 260 not in ( 261 exp.DataType.Type.USERDEFINED, 262 exp.DataType.Type.UNKNOWN, 263 ) 264 ): 265 sql_repr = expr.sql(dialect=dialect) 266 try: 267 normalized = parse_one(sql_repr, read=dialect, into=exp.DataType) 268 if normalized is not None: 269 expr = normalized 270 except Exception: 271 pass 272 expr.meta["dialect"] = dialect 273 columns_to_types[normalize_identifiers(k, dialect=dialect).name] = expr 274 275 return columns_to_types 276 277 return v 278 279 @field_validator("column_descriptions_", mode="before") 280 def _column_descriptions_validator( 281 cls, vs: t.Any, info: ValidationInfo 282 ) -> t.Optional[t.Dict[str, str]]: 283 dialect = info.data.get("dialect") 284 285 if vs is None: 286 return None 287 288 if isinstance(vs, exp.Paren): 289 vs = vs.flatten() 290 291 if isinstance(vs, (exp.Tuple, exp.Array)): 292 vs = vs.expressions 293 294 raw_col_descriptions = ( 295 vs 296 if isinstance(vs, dict) 297 else {".".join([part.this for part in v.this.parts]): v.expression.name for v in vs} 298 ) 299 300 col_descriptions = { 301 normalize_identifiers(k, dialect=dialect).name: v 302 for k, v in raw_col_descriptions.items() 303 } 304 305 columns_to_types = info.data.get("columns_to_types_") 306 if columns_to_types: 307 from sqlmesh.core.console import get_console 308 309 console = get_console() 310 for column_name in list(col_descriptions): 311 if column_name not in columns_to_types: 312 console.log_warning( 313 f"In model '{info.data['name']}', a description is provided for column '{column_name}' but it is not a column in the model." 314 ) 315 del col_descriptions[column_name] 316 317 return col_descriptions 318 319 @field_validator("grains", "references", mode="before") 320 def _refs_validator(cls, vs: t.Any, info: ValidationInfo) -> t.List[exp.Expr]: 321 dialect = info.data.get("dialect") 322 323 if isinstance(vs, exp.Paren): 324 vs = vs.unnest() 325 326 if isinstance(vs, (exp.Tuple, exp.Array)): 327 vs = vs.expressions 328 else: 329 vs = [ 330 d.parse_one(v, dialect=dialect) if isinstance(v, str) else v 331 for v in ensure_collection(vs) 332 ] 333 334 refs = [] 335 336 for v in vs: 337 v = exp.column(v) if isinstance(v, exp.Identifier) else v 338 v.meta["dialect"] = dialect 339 refs.append(v) 340 341 return refs 342 343 @field_validator("ignored_rules_", mode="before") 344 def ignored_rules_validator(cls, vs: t.Any) -> t.Any: 345 return LinterConfig._validate_rules(vs) 346 347 @field_validator("grants_target_layer", mode="before") 348 def _grants_target_layer_validator(cls, v: t.Any) -> t.Any: 349 if isinstance(v, exp.Identifier): 350 return v.this 351 if isinstance(v, exp.Literal) and v.is_string: 352 return v.this 353 return v 354 355 @field_validator("session_properties_", mode="before") 356 def session_properties_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any: 357 # use the generic properties validator to parse the session properties 358 parsed_session_properties = parse_properties(type(cls), v, info) 359 if not parsed_session_properties: 360 return parsed_session_properties 361 362 for eq in parsed_session_properties: 363 prop_name = eq.left.name 364 365 if prop_name == "query_label": 366 query_label = eq.right 367 if not isinstance( 368 query_label, (exp.Array, exp.Tuple, exp.Paren, d.MacroFunc, d.MacroVar) 369 ): 370 raise ConfigError( 371 "Invalid value for `session_properties.query_label`. Must be an array or tuple." 372 ) 373 374 label_tuples: t.List[exp.Expr] = ( 375 [query_label.unnest()] 376 if isinstance(query_label, exp.Paren) 377 else query_label.expressions 378 ) 379 380 for label_tuple in label_tuples: 381 if not ( 382 isinstance(label_tuple, exp.Tuple) 383 and len(label_tuple.expressions) == 2 384 and all(isinstance(label, exp.Literal) for label in label_tuple.expressions) 385 ): 386 raise ConfigError( 387 "Invalid entry in `session_properties.query_label`. Must be tuples of string literals with length 2." 388 ) 389 elif prop_name == "authorization": 390 authorization = eq.right 391 if not ( 392 isinstance(authorization, exp.Literal) and authorization.is_string 393 ) and not isinstance(authorization, (d.MacroFunc, d.MacroVar)): 394 raise ConfigError( 395 "Invalid value for `session_properties.authorization`. Must be a string literal." 396 ) 397 398 return parsed_session_properties 399 400 @model_validator(mode="before") 401 def _pre_root_validator(cls, data: t.Any) -> t.Any: 402 if not isinstance(data, dict): 403 return data 404 405 grain = data.pop("grain", None) 406 if grain: 407 grains = data.get("grains") 408 if grains: 409 raise ConfigError( 410 f"Cannot use argument 'grain' ({grain}) with 'grains' ({grains}), use only grains" 411 ) 412 data["grains"] = ensure_list(grain) 413 414 table_properties = data.pop("table_properties", None) 415 if table_properties: 416 if not isinstance(table_properties, str): 417 # Do not warn when deserializing from the state. 418 model_name = data["name"] 419 from sqlmesh.core.console import get_console 420 421 get_console().log_warning( 422 f"Model '{model_name}' is using the `table_properties` attribute which is deprecated. Please use `physical_properties` instead." 423 ) 424 physical_properties = data.get("physical_properties") 425 if physical_properties: 426 raise ConfigError( 427 f"Cannot use argument 'table_properties' ({table_properties}) with 'physical_properties' ({physical_properties}), use only physical_properties." 428 ) 429 430 data["physical_properties"] = table_properties 431 432 return data 433 434 @model_validator(mode="after") 435 def _root_validator(self) -> Self: 436 kind: t.Any = self.kind 437 438 for field in ("partitioned_by_", "clustered_by"): 439 if ( 440 getattr(self, field, None) 441 and not kind.is_materialized 442 and not (kind.is_view and kind.materialized) 443 ): 444 name = field[:-1] if field.endswith("_") else field 445 raise ValueError(f"{name} field cannot be set for {kind.name} models") 446 if kind.is_incremental_by_partition and not getattr(self, "partitioned_by_", None): 447 raise ValueError(f"partitioned_by field is required for {kind.name} models") 448 449 # needs to be in a mode=after model validator so that the field validators have run to convert from Expression -> str 450 if (storage_format := self.storage_format) and storage_format.lower() in { 451 "iceberg", 452 "hive", 453 "hudi", 454 "delta", 455 }: 456 from sqlmesh.core.console import get_console 457 458 get_console().log_warning( 459 f"Model {self.name} has `storage_format` set to a table format '{storage_format}' which is deprecated. Please use the `table_format` property instead." 460 ) 461 462 # Validate grants configuration for model kind support 463 if self.grants is not None and not kind.supports_grants: 464 raise ValueError(f"grants cannot be set for {kind.name} models") 465 466 return self 467 468 @property 469 def time_column(self) -> t.Optional[TimeColumn]: 470 """The time column for incremental models.""" 471 return getattr(self.kind, "time_column", None) 472 473 @property 474 def unique_key(self) -> t.List[exp.Expr]: 475 if isinstance( 476 self.kind, (SCDType2ByTimeKind, SCDType2ByColumnKind, IncrementalByUniqueKeyKind) 477 ): 478 return self.kind.unique_key 479 return [] 480 481 @property 482 def column_descriptions(self) -> t.Dict[str, str]: 483 """A dictionary of column names to annotation comments.""" 484 return self.column_descriptions_ or {} 485 486 @property 487 def lookback(self) -> int: 488 """The incremental lookback window.""" 489 return getattr(self.kind, "lookback", 0) or 0 490 491 def lookback_start(self, start: TimeLike) -> TimeLike: 492 if self.lookback == 0: 493 return start 494 495 for _ in range(self.lookback): 496 start = self.interval_unit.cron_prev(start) 497 return start 498 499 @property 500 def batch_size(self) -> t.Optional[int]: 501 """The maximal number of units in a single task for a backfill.""" 502 return getattr(self.kind, "batch_size", None) 503 504 @property 505 def batch_concurrency(self) -> t.Optional[int]: 506 """The maximal number of batches that can run concurrently for a backfill.""" 507 return getattr(self.kind, "batch_concurrency", None) 508 509 @cached_property 510 def physical_properties(self) -> t.Dict[str, exp.Expr]: 511 """A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated.""" 512 if self.physical_properties_: 513 return {e.this.name: e.expression for e in self.physical_properties_.expressions} 514 return {} 515 516 @cached_property 517 def virtual_properties(self) -> t.Dict[str, exp.Expr]: 518 """A dictionary of properties that will be applied to the virtual layer.""" 519 if self.virtual_properties_: 520 return {e.this.name: e.expression for e in self.virtual_properties_.expressions} 521 return {} 522 523 @property 524 def session_properties(self) -> SessionProperties: 525 """A dictionary of session properties.""" 526 if not self.session_properties_: 527 return {} 528 529 return d.interpret_key_value_pairs(self.session_properties_) 530 531 @property 532 def custom_materialization_properties(self) -> CustomMaterializationProperties: 533 if isinstance(self.kind, CustomKind): 534 return self.kind.materialization_properties 535 return {} 536 537 @cached_property 538 def grants(self) -> t.Optional[GrantsConfig]: 539 """A dictionary of grants mapping permission names to lists of grantees.""" 540 541 if self.grants_ is None: 542 return None 543 544 if not self.grants_.expressions: 545 return {} 546 547 grants_dict = {} 548 for eq_expr in self.grants_.expressions: 549 try: 550 permission_name = self._validate_config_expression(eq_expr.left) 551 grantee_list = self._validate_nested_config_values(eq_expr.expression) 552 grants_dict[permission_name] = grantee_list 553 except ConfigError as e: 554 permission_name = ( 555 eq_expr.left.name if hasattr(eq_expr.left, "name") else str(eq_expr.left) 556 ) 557 raise ConfigError(f"Invalid grants configuration for '{permission_name}': {e}") 558 559 return grants_dict if grants_dict else None 560 561 @property 562 def all_references(self) -> t.List[Reference]: 563 """All references including grains.""" 564 return [Reference(model_name=self.name, expression=e, unique=True) for e in self.grains] + [ 565 Reference(model_name=self.name, expression=e, unique=True) for e in self.references 566 ] 567 568 @property 569 def on(self) -> t.List[str]: 570 """The grains to be used as join condition in table_diff.""" 571 572 on: t.List[str] = [] 573 for expr in [ref.expression for ref in self.all_references if ref.unique]: 574 if isinstance(expr, exp.Tuple): 575 on.extend([key.this.sql(dialect=self.dialect) for key in expr.expressions]) 576 else: 577 # Handle a single Column or Paren expression 578 on.append(expr.this.sql(dialect=self.dialect)) 579 580 return on 581 582 @property 583 def managed_columns(self) -> t.Dict[str, exp.DataType]: 584 return getattr(self.kind, "managed_columns", {}) 585 586 @property 587 def when_matched(self) -> t.Optional[exp.Whens]: 588 if isinstance(self.kind, IncrementalByUniqueKeyKind): 589 return self.kind.when_matched 590 return None 591 592 @property 593 def merge_filter(self) -> t.Optional[exp.Expr]: 594 if isinstance(self.kind, IncrementalByUniqueKeyKind): 595 return self.kind.merge_filter 596 return None 597 598 @property 599 def catalog(self) -> t.Optional[str]: 600 """Returns the catalog of a model.""" 601 return self.fully_qualified_table.catalog 602 603 @cached_property 604 def fully_qualified_table(self) -> exp.Table: 605 return exp.to_table(self.fqn) 606 607 @cached_property 608 def fqn(self) -> str: 609 return normalize_model_name( 610 self.name, default_catalog=self.default_catalog, dialect=self.dialect 611 ) 612 613 @property 614 def on_destructive_change(self) -> OnDestructiveChange: 615 return getattr(self.kind, "on_destructive_change", OnDestructiveChange.ALLOW) 616 617 @property 618 def on_additive_change(self) -> OnAdditiveChange: 619 """Return the model's additive change setting if it has one.""" 620 return getattr(self.kind, "on_additive_change", OnAdditiveChange.ALLOW) 621 622 @property 623 def ignored_rules(self) -> t.Set[str]: 624 return self.ignored_rules_ or set() 625 626 def _validate_config_expression(self, expr: exp.Expr) -> str: 627 if isinstance(expr, (d.MacroFunc, d.MacroVar)): 628 raise ConfigError(f"Unresolved macro: {expr.sql(dialect=self.dialect)}") 629 630 if isinstance(expr, exp.Null): 631 raise ConfigError("NULL value") 632 633 if isinstance(expr, exp.Literal): 634 return str(expr.this).strip() 635 if isinstance(expr, (exp.Column, exp.Identifier)): 636 return expr.name 637 return expr.sql(dialect=self.dialect).strip() 638 639 def _validate_nested_config_values(self, value_expr: exp.Expr) -> t.List[str]: 640 result = [] 641 642 def flatten_expr(expr: exp.Expr) -> None: 643 if isinstance(expr, exp.Array): 644 for elem in expr.expressions: 645 flatten_expr(elem) 646 elif isinstance(expr, (exp.Tuple, exp.Paren)): 647 expressions = [expr.unnest()] if isinstance(expr, exp.Paren) else expr.expressions 648 for elem in expressions: 649 flatten_expr(elem) 650 else: 651 result.append(self._validate_config_expression(expr)) 652 653 flatten_expr(value_expr) 654 return result
57class GrantsTargetLayer(str, Enum): 58 """Target layer(s) where grants should be applied.""" 59 60 ALL = "all" 61 PHYSICAL = "physical" 62 VIRTUAL = "virtual" 63 64 @classproperty 65 def default(cls) -> "GrantsTargetLayer": 66 return GrantsTargetLayer.VIRTUAL 67 68 @property 69 def is_all(self) -> bool: 70 return self == GrantsTargetLayer.ALL 71 72 @property 73 def is_physical(self) -> bool: 74 return self == GrantsTargetLayer.PHYSICAL 75 76 @property 77 def is_virtual(self) -> bool: 78 return self == GrantsTargetLayer.VIRTUAL 79 80 def __str__(self) -> str: 81 return self.name 82 83 def __repr__(self) -> str: 84 return str(self)
Target layer(s) where grants should be applied.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
87class ModelMeta(_Node): 88 """Metadata for models which can be defined in SQL.""" 89 90 dialect: str = "" 91 name: str 92 kind: ModelKind = ViewKind() 93 retention: t.Optional[int] = None # not implemented yet 94 table_format: t.Optional[str] = None 95 storage_format: t.Optional[str] = None 96 partitioned_by_: t.List[exp.Expr] = Field(default=[], alias="partitioned_by") 97 clustered_by: t.List[exp.Expr] = [] 98 default_catalog: t.Optional[str] = None 99 depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on") 100 columns_to_types_: t.Optional[t.Dict[str, exp.DataType]] = Field(default=None, alias="columns") 101 column_descriptions_: t.Optional[t.Dict[str, str]] = Field( 102 default=None, alias="column_descriptions" 103 ) 104 audits: t.List[FunctionCall] = [] 105 grains: t.List[exp.Expr] = [] 106 references: t.List[exp.Expr] = [] 107 physical_schema_override: t.Optional[str] = None 108 physical_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="physical_properties") 109 virtual_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="virtual_properties") 110 session_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="session_properties") 111 allow_partials: bool = False 112 signals: t.List[FunctionCall] = [] 113 enabled: bool = True 114 physical_version: t.Optional[str] = None 115 gateway: t.Optional[str] = None 116 optimize_query: t.Optional[bool] = None 117 ignored_rules_: t.Optional[t.Set[str]] = Field( 118 default=None, exclude=True, alias="ignored_rules" 119 ) 120 formatting: t.Optional[bool] = Field(default=None, exclude=True) 121 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default 122 grants_: t.Optional[exp.Tuple] = Field(default=None, alias="grants") 123 grants_target_layer: GrantsTargetLayer = GrantsTargetLayer.default 124 125 _bool_validator = bool_validator 126 _model_kind_validator = model_kind_validator 127 _properties_validator = properties_validator 128 _default_catalog_validator = default_catalog_validator 129 _depends_on_validator = depends_on_validator 130 131 @field_validator("audits", "signals", mode="before") 132 def _func_call_validator(cls, v: t.Any, field: t.Any) -> t.Any: 133 is_signal = getattr(field, "name" if hasattr(field, "name") else "field_name") == "signals" 134 135 return d.extract_function_calls(v, allow_tuples=is_signal) 136 137 @field_validator("tags", mode="before") 138 def _value_or_tuple_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any: 139 return ensure_list(cls._validate_value_or_tuple(v, info.data)) 140 141 @classmethod 142 def _validate_value_or_tuple( 143 cls, v: t.Dict[str, t.Any], data: t.Dict[str, t.Any], normalize: bool = False 144 ) -> t.Any: 145 dialect = data.get("dialect") 146 147 def _normalize(value: t.Any) -> t.Any: 148 return normalize_identifiers(value, dialect=dialect) if normalize else value 149 150 if isinstance(v, exp.Paren): 151 v = [v.unnest()] 152 153 if isinstance(v, (exp.Tuple, exp.Array)): 154 return [_normalize(e).name for e in v.expressions] 155 if isinstance(v, exp.Expr): 156 return _normalize(v).name 157 if isinstance(v, str): 158 value = _normalize(v) 159 return value.name if isinstance(value, exp.Expr) else value 160 if isinstance(v, (list, tuple)): 161 return [cls._validate_value_or_tuple(elm, data, normalize=normalize) for elm in v] 162 163 return v 164 165 @field_validator("table_format", "storage_format", mode="before") 166 def _format_validator(cls, v: t.Any, info: ValidationInfo) -> t.Optional[str]: 167 if isinstance(v, exp.Expr) and not (isinstance(v, (exp.Literal, exp.Identifier))): 168 return v.sql(info.data.get("dialect")) 169 return str_or_exp_to_str(v) 170 171 @field_validator("dialect", mode="before") 172 def _dialect_validator(cls, v: t.Any) -> t.Optional[str]: 173 # dialects are parsed as identifiers and may get normalized as uppercase, 174 # so this ensures they'll be stored as lowercase 175 dialect = str_or_exp_to_str(v) 176 return dialect and dialect.lower() 177 178 @field_validator("physical_version", mode="before") 179 def _physical_version_validator(cls, v: t.Any) -> t.Optional[str]: 180 if v is None: 181 return v 182 return str_or_exp_to_str(v) 183 184 @field_validator("gateway", mode="before") 185 def _gateway_validator(cls, v: t.Any) -> t.Optional[str]: 186 if v is None: 187 return None 188 gateway = str_or_exp_to_str(v) 189 return gateway and gateway.lower() 190 191 @field_validator("partitioned_by_", "clustered_by", mode="before") 192 def _partition_and_cluster_validator(cls, v: t.Any, info: ValidationInfo) -> t.List[exp.Expr]: 193 if ( 194 isinstance(v, list) 195 and all(isinstance(i, str) for i in v) 196 and info.field_name == "partitioned_by_" 197 ): 198 # this branch gets hit when we are deserializing from json because `partitioned_by` is stored as a List[str] 199 # however, we should only invoke this if the list contains strings because this validator is also 200 # called by Python models which might pass a List[exp.Expression] 201 string_to_parse = ( 202 f"({','.join(v)})" # recreate the (a, b, c) part of "partitioned_by (a, b, c)" 203 ) 204 parsed = parse_one( 205 string_to_parse, into=exp.PartitionedByProperty, dialect=get_dialect(info) 206 ) 207 v = parsed.this.expressions if isinstance(parsed.this, exp.Schema) else v 208 209 expressions = list_of_fields_validator(v, info.data) 210 211 for expression in expressions: 212 num_cols = len(list(expression.find_all(exp.Column))) 213 214 error_msg: t.Optional[str] = None 215 if num_cols == 0: 216 error_msg = "does not contain a column" 217 elif num_cols > 1: 218 error_msg = "contains multiple columns" 219 220 if error_msg: 221 raise ConfigError(f"Field '{expression}' {error_msg}") 222 223 return expressions 224 225 @field_validator( 226 "columns_to_types_", "derived_columns_to_types", mode="before", check_fields=False 227 ) 228 def _columns_validator( 229 cls, v: t.Any, info: ValidationInfo 230 ) -> t.Optional[t.Dict[str, exp.DataType]]: 231 columns_to_types = {} 232 dialect = info.data.get("dialect") 233 234 if isinstance(v, exp.Schema): 235 for column in v.expressions: 236 expr = column.args.get("kind") 237 if not isinstance(expr, exp.DataType): 238 raise ConfigError(f"Missing data type for column '{column.name}'.") 239 240 expr.meta["dialect"] = dialect 241 columns_to_types[normalize_identifiers(column, dialect=dialect).name] = expr 242 243 return columns_to_types 244 245 if isinstance(v, dict): 246 dialect_obj = Dialect.get_or_raise(dialect) 247 udt = dialect_obj.SUPPORTS_USER_DEFINED_TYPES 248 for k, data_type in v.items(): 249 is_string_type = isinstance(data_type, str) 250 expr = exp.DataType.build(data_type, dialect=dialect, udt=udt) 251 # When deserializing from a string (e.g. JSON roundtrip), normalize the type 252 # through the dialect's type system so that aliases (e.g. INT in BigQuery, 253 # which is an alias for INT64/BIGINT) are resolved to their canonical form. 254 # This ensures stable data hash computation across serialization/deserialization 255 # roundtrips. We skip this for DataType objects passed directly (Python API) 256 # since those should be used as-is. 257 if ( 258 is_string_type 259 and dialect 260 and expr.this 261 not in ( 262 exp.DataType.Type.USERDEFINED, 263 exp.DataType.Type.UNKNOWN, 264 ) 265 ): 266 sql_repr = expr.sql(dialect=dialect) 267 try: 268 normalized = parse_one(sql_repr, read=dialect, into=exp.DataType) 269 if normalized is not None: 270 expr = normalized 271 except Exception: 272 pass 273 expr.meta["dialect"] = dialect 274 columns_to_types[normalize_identifiers(k, dialect=dialect).name] = expr 275 276 return columns_to_types 277 278 return v 279 280 @field_validator("column_descriptions_", mode="before") 281 def _column_descriptions_validator( 282 cls, vs: t.Any, info: ValidationInfo 283 ) -> t.Optional[t.Dict[str, str]]: 284 dialect = info.data.get("dialect") 285 286 if vs is None: 287 return None 288 289 if isinstance(vs, exp.Paren): 290 vs = vs.flatten() 291 292 if isinstance(vs, (exp.Tuple, exp.Array)): 293 vs = vs.expressions 294 295 raw_col_descriptions = ( 296 vs 297 if isinstance(vs, dict) 298 else {".".join([part.this for part in v.this.parts]): v.expression.name for v in vs} 299 ) 300 301 col_descriptions = { 302 normalize_identifiers(k, dialect=dialect).name: v 303 for k, v in raw_col_descriptions.items() 304 } 305 306 columns_to_types = info.data.get("columns_to_types_") 307 if columns_to_types: 308 from sqlmesh.core.console import get_console 309 310 console = get_console() 311 for column_name in list(col_descriptions): 312 if column_name not in columns_to_types: 313 console.log_warning( 314 f"In model '{info.data['name']}', a description is provided for column '{column_name}' but it is not a column in the model." 315 ) 316 del col_descriptions[column_name] 317 318 return col_descriptions 319 320 @field_validator("grains", "references", mode="before") 321 def _refs_validator(cls, vs: t.Any, info: ValidationInfo) -> t.List[exp.Expr]: 322 dialect = info.data.get("dialect") 323 324 if isinstance(vs, exp.Paren): 325 vs = vs.unnest() 326 327 if isinstance(vs, (exp.Tuple, exp.Array)): 328 vs = vs.expressions 329 else: 330 vs = [ 331 d.parse_one(v, dialect=dialect) if isinstance(v, str) else v 332 for v in ensure_collection(vs) 333 ] 334 335 refs = [] 336 337 for v in vs: 338 v = exp.column(v) if isinstance(v, exp.Identifier) else v 339 v.meta["dialect"] = dialect 340 refs.append(v) 341 342 return refs 343 344 @field_validator("ignored_rules_", mode="before") 345 def ignored_rules_validator(cls, vs: t.Any) -> t.Any: 346 return LinterConfig._validate_rules(vs) 347 348 @field_validator("grants_target_layer", mode="before") 349 def _grants_target_layer_validator(cls, v: t.Any) -> t.Any: 350 if isinstance(v, exp.Identifier): 351 return v.this 352 if isinstance(v, exp.Literal) and v.is_string: 353 return v.this 354 return v 355 356 @field_validator("session_properties_", mode="before") 357 def session_properties_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any: 358 # use the generic properties validator to parse the session properties 359 parsed_session_properties = parse_properties(type(cls), v, info) 360 if not parsed_session_properties: 361 return parsed_session_properties 362 363 for eq in parsed_session_properties: 364 prop_name = eq.left.name 365 366 if prop_name == "query_label": 367 query_label = eq.right 368 if not isinstance( 369 query_label, (exp.Array, exp.Tuple, exp.Paren, d.MacroFunc, d.MacroVar) 370 ): 371 raise ConfigError( 372 "Invalid value for `session_properties.query_label`. Must be an array or tuple." 373 ) 374 375 label_tuples: t.List[exp.Expr] = ( 376 [query_label.unnest()] 377 if isinstance(query_label, exp.Paren) 378 else query_label.expressions 379 ) 380 381 for label_tuple in label_tuples: 382 if not ( 383 isinstance(label_tuple, exp.Tuple) 384 and len(label_tuple.expressions) == 2 385 and all(isinstance(label, exp.Literal) for label in label_tuple.expressions) 386 ): 387 raise ConfigError( 388 "Invalid entry in `session_properties.query_label`. Must be tuples of string literals with length 2." 389 ) 390 elif prop_name == "authorization": 391 authorization = eq.right 392 if not ( 393 isinstance(authorization, exp.Literal) and authorization.is_string 394 ) and not isinstance(authorization, (d.MacroFunc, d.MacroVar)): 395 raise ConfigError( 396 "Invalid value for `session_properties.authorization`. Must be a string literal." 397 ) 398 399 return parsed_session_properties 400 401 @model_validator(mode="before") 402 def _pre_root_validator(cls, data: t.Any) -> t.Any: 403 if not isinstance(data, dict): 404 return data 405 406 grain = data.pop("grain", None) 407 if grain: 408 grains = data.get("grains") 409 if grains: 410 raise ConfigError( 411 f"Cannot use argument 'grain' ({grain}) with 'grains' ({grains}), use only grains" 412 ) 413 data["grains"] = ensure_list(grain) 414 415 table_properties = data.pop("table_properties", None) 416 if table_properties: 417 if not isinstance(table_properties, str): 418 # Do not warn when deserializing from the state. 419 model_name = data["name"] 420 from sqlmesh.core.console import get_console 421 422 get_console().log_warning( 423 f"Model '{model_name}' is using the `table_properties` attribute which is deprecated. Please use `physical_properties` instead." 424 ) 425 physical_properties = data.get("physical_properties") 426 if physical_properties: 427 raise ConfigError( 428 f"Cannot use argument 'table_properties' ({table_properties}) with 'physical_properties' ({physical_properties}), use only physical_properties." 429 ) 430 431 data["physical_properties"] = table_properties 432 433 return data 434 435 @model_validator(mode="after") 436 def _root_validator(self) -> Self: 437 kind: t.Any = self.kind 438 439 for field in ("partitioned_by_", "clustered_by"): 440 if ( 441 getattr(self, field, None) 442 and not kind.is_materialized 443 and not (kind.is_view and kind.materialized) 444 ): 445 name = field[:-1] if field.endswith("_") else field 446 raise ValueError(f"{name} field cannot be set for {kind.name} models") 447 if kind.is_incremental_by_partition and not getattr(self, "partitioned_by_", None): 448 raise ValueError(f"partitioned_by field is required for {kind.name} models") 449 450 # needs to be in a mode=after model validator so that the field validators have run to convert from Expression -> str 451 if (storage_format := self.storage_format) and storage_format.lower() in { 452 "iceberg", 453 "hive", 454 "hudi", 455 "delta", 456 }: 457 from sqlmesh.core.console import get_console 458 459 get_console().log_warning( 460 f"Model {self.name} has `storage_format` set to a table format '{storage_format}' which is deprecated. Please use the `table_format` property instead." 461 ) 462 463 # Validate grants configuration for model kind support 464 if self.grants is not None and not kind.supports_grants: 465 raise ValueError(f"grants cannot be set for {kind.name} models") 466 467 return self 468 469 @property 470 def time_column(self) -> t.Optional[TimeColumn]: 471 """The time column for incremental models.""" 472 return getattr(self.kind, "time_column", None) 473 474 @property 475 def unique_key(self) -> t.List[exp.Expr]: 476 if isinstance( 477 self.kind, (SCDType2ByTimeKind, SCDType2ByColumnKind, IncrementalByUniqueKeyKind) 478 ): 479 return self.kind.unique_key 480 return [] 481 482 @property 483 def column_descriptions(self) -> t.Dict[str, str]: 484 """A dictionary of column names to annotation comments.""" 485 return self.column_descriptions_ or {} 486 487 @property 488 def lookback(self) -> int: 489 """The incremental lookback window.""" 490 return getattr(self.kind, "lookback", 0) or 0 491 492 def lookback_start(self, start: TimeLike) -> TimeLike: 493 if self.lookback == 0: 494 return start 495 496 for _ in range(self.lookback): 497 start = self.interval_unit.cron_prev(start) 498 return start 499 500 @property 501 def batch_size(self) -> t.Optional[int]: 502 """The maximal number of units in a single task for a backfill.""" 503 return getattr(self.kind, "batch_size", None) 504 505 @property 506 def batch_concurrency(self) -> t.Optional[int]: 507 """The maximal number of batches that can run concurrently for a backfill.""" 508 return getattr(self.kind, "batch_concurrency", None) 509 510 @cached_property 511 def physical_properties(self) -> t.Dict[str, exp.Expr]: 512 """A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated.""" 513 if self.physical_properties_: 514 return {e.this.name: e.expression for e in self.physical_properties_.expressions} 515 return {} 516 517 @cached_property 518 def virtual_properties(self) -> t.Dict[str, exp.Expr]: 519 """A dictionary of properties that will be applied to the virtual layer.""" 520 if self.virtual_properties_: 521 return {e.this.name: e.expression for e in self.virtual_properties_.expressions} 522 return {} 523 524 @property 525 def session_properties(self) -> SessionProperties: 526 """A dictionary of session properties.""" 527 if not self.session_properties_: 528 return {} 529 530 return d.interpret_key_value_pairs(self.session_properties_) 531 532 @property 533 def custom_materialization_properties(self) -> CustomMaterializationProperties: 534 if isinstance(self.kind, CustomKind): 535 return self.kind.materialization_properties 536 return {} 537 538 @cached_property 539 def grants(self) -> t.Optional[GrantsConfig]: 540 """A dictionary of grants mapping permission names to lists of grantees.""" 541 542 if self.grants_ is None: 543 return None 544 545 if not self.grants_.expressions: 546 return {} 547 548 grants_dict = {} 549 for eq_expr in self.grants_.expressions: 550 try: 551 permission_name = self._validate_config_expression(eq_expr.left) 552 grantee_list = self._validate_nested_config_values(eq_expr.expression) 553 grants_dict[permission_name] = grantee_list 554 except ConfigError as e: 555 permission_name = ( 556 eq_expr.left.name if hasattr(eq_expr.left, "name") else str(eq_expr.left) 557 ) 558 raise ConfigError(f"Invalid grants configuration for '{permission_name}': {e}") 559 560 return grants_dict if grants_dict else None 561 562 @property 563 def all_references(self) -> t.List[Reference]: 564 """All references including grains.""" 565 return [Reference(model_name=self.name, expression=e, unique=True) for e in self.grains] + [ 566 Reference(model_name=self.name, expression=e, unique=True) for e in self.references 567 ] 568 569 @property 570 def on(self) -> t.List[str]: 571 """The grains to be used as join condition in table_diff.""" 572 573 on: t.List[str] = [] 574 for expr in [ref.expression for ref in self.all_references if ref.unique]: 575 if isinstance(expr, exp.Tuple): 576 on.extend([key.this.sql(dialect=self.dialect) for key in expr.expressions]) 577 else: 578 # Handle a single Column or Paren expression 579 on.append(expr.this.sql(dialect=self.dialect)) 580 581 return on 582 583 @property 584 def managed_columns(self) -> t.Dict[str, exp.DataType]: 585 return getattr(self.kind, "managed_columns", {}) 586 587 @property 588 def when_matched(self) -> t.Optional[exp.Whens]: 589 if isinstance(self.kind, IncrementalByUniqueKeyKind): 590 return self.kind.when_matched 591 return None 592 593 @property 594 def merge_filter(self) -> t.Optional[exp.Expr]: 595 if isinstance(self.kind, IncrementalByUniqueKeyKind): 596 return self.kind.merge_filter 597 return None 598 599 @property 600 def catalog(self) -> t.Optional[str]: 601 """Returns the catalog of a model.""" 602 return self.fully_qualified_table.catalog 603 604 @cached_property 605 def fully_qualified_table(self) -> exp.Table: 606 return exp.to_table(self.fqn) 607 608 @cached_property 609 def fqn(self) -> str: 610 return normalize_model_name( 611 self.name, default_catalog=self.default_catalog, dialect=self.dialect 612 ) 613 614 @property 615 def on_destructive_change(self) -> OnDestructiveChange: 616 return getattr(self.kind, "on_destructive_change", OnDestructiveChange.ALLOW) 617 618 @property 619 def on_additive_change(self) -> OnAdditiveChange: 620 """Return the model's additive change setting if it has one.""" 621 return getattr(self.kind, "on_additive_change", OnAdditiveChange.ALLOW) 622 623 @property 624 def ignored_rules(self) -> t.Set[str]: 625 return self.ignored_rules_ or set() 626 627 def _validate_config_expression(self, expr: exp.Expr) -> str: 628 if isinstance(expr, (d.MacroFunc, d.MacroVar)): 629 raise ConfigError(f"Unresolved macro: {expr.sql(dialect=self.dialect)}") 630 631 if isinstance(expr, exp.Null): 632 raise ConfigError("NULL value") 633 634 if isinstance(expr, exp.Literal): 635 return str(expr.this).strip() 636 if isinstance(expr, (exp.Column, exp.Identifier)): 637 return expr.name 638 return expr.sql(dialect=self.dialect).strip() 639 640 def _validate_nested_config_values(self, value_expr: exp.Expr) -> t.List[str]: 641 result = [] 642 643 def flatten_expr(expr: exp.Expr) -> None: 644 if isinstance(expr, exp.Array): 645 for elem in expr.expressions: 646 flatten_expr(elem) 647 elif isinstance(expr, (exp.Tuple, exp.Paren)): 648 expressions = [expr.unnest()] if isinstance(expr, exp.Paren) else expr.expressions 649 for elem in expressions: 650 flatten_expr(elem) 651 else: 652 result.append(self._validate_config_expression(expr)) 653 654 flatten_expr(value_expr) 655 return result
Metadata for models which can be defined in SQL.
356 @field_validator("session_properties_", mode="before") 357 def session_properties_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any: 358 # use the generic properties validator to parse the session properties 359 parsed_session_properties = parse_properties(type(cls), v, info) 360 if not parsed_session_properties: 361 return parsed_session_properties 362 363 for eq in parsed_session_properties: 364 prop_name = eq.left.name 365 366 if prop_name == "query_label": 367 query_label = eq.right 368 if not isinstance( 369 query_label, (exp.Array, exp.Tuple, exp.Paren, d.MacroFunc, d.MacroVar) 370 ): 371 raise ConfigError( 372 "Invalid value for `session_properties.query_label`. Must be an array or tuple." 373 ) 374 375 label_tuples: t.List[exp.Expr] = ( 376 [query_label.unnest()] 377 if isinstance(query_label, exp.Paren) 378 else query_label.expressions 379 ) 380 381 for label_tuple in label_tuples: 382 if not ( 383 isinstance(label_tuple, exp.Tuple) 384 and len(label_tuple.expressions) == 2 385 and all(isinstance(label, exp.Literal) for label in label_tuple.expressions) 386 ): 387 raise ConfigError( 388 "Invalid entry in `session_properties.query_label`. Must be tuples of string literals with length 2." 389 ) 390 elif prop_name == "authorization": 391 authorization = eq.right 392 if not ( 393 isinstance(authorization, exp.Literal) and authorization.is_string 394 ) and not isinstance(authorization, (d.MacroFunc, d.MacroVar)): 395 raise ConfigError( 396 "Invalid value for `session_properties.authorization`. Must be a string literal." 397 ) 398 399 return parsed_session_properties
469 @property 470 def time_column(self) -> t.Optional[TimeColumn]: 471 """The time column for incremental models.""" 472 return getattr(self.kind, "time_column", None)
The time column for incremental models.
482 @property 483 def column_descriptions(self) -> t.Dict[str, str]: 484 """A dictionary of column names to annotation comments.""" 485 return self.column_descriptions_ or {}
A dictionary of column names to annotation comments.
487 @property 488 def lookback(self) -> int: 489 """The incremental lookback window.""" 490 return getattr(self.kind, "lookback", 0) or 0
The incremental lookback window.
500 @property 501 def batch_size(self) -> t.Optional[int]: 502 """The maximal number of units in a single task for a backfill.""" 503 return getattr(self.kind, "batch_size", None)
The maximal number of units in a single task for a backfill.
505 @property 506 def batch_concurrency(self) -> t.Optional[int]: 507 """The maximal number of batches that can run concurrently for a backfill.""" 508 return getattr(self.kind, "batch_concurrency", None)
The maximal number of batches that can run concurrently for a backfill.
510 @cached_property 511 def physical_properties(self) -> t.Dict[str, exp.Expr]: 512 """A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated.""" 513 if self.physical_properties_: 514 return {e.this.name: e.expression for e in self.physical_properties_.expressions} 515 return {}
A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated.
517 @cached_property 518 def virtual_properties(self) -> t.Dict[str, exp.Expr]: 519 """A dictionary of properties that will be applied to the virtual layer.""" 520 if self.virtual_properties_: 521 return {e.this.name: e.expression for e in self.virtual_properties_.expressions} 522 return {}
A dictionary of properties that will be applied to the virtual layer.
524 @property 525 def session_properties(self) -> SessionProperties: 526 """A dictionary of session properties.""" 527 if not self.session_properties_: 528 return {} 529 530 return d.interpret_key_value_pairs(self.session_properties_)
A dictionary of session properties.
538 @cached_property 539 def grants(self) -> t.Optional[GrantsConfig]: 540 """A dictionary of grants mapping permission names to lists of grantees.""" 541 542 if self.grants_ is None: 543 return None 544 545 if not self.grants_.expressions: 546 return {} 547 548 grants_dict = {} 549 for eq_expr in self.grants_.expressions: 550 try: 551 permission_name = self._validate_config_expression(eq_expr.left) 552 grantee_list = self._validate_nested_config_values(eq_expr.expression) 553 grants_dict[permission_name] = grantee_list 554 except ConfigError as e: 555 permission_name = ( 556 eq_expr.left.name if hasattr(eq_expr.left, "name") else str(eq_expr.left) 557 ) 558 raise ConfigError(f"Invalid grants configuration for '{permission_name}': {e}") 559 560 return grants_dict if grants_dict else None
A dictionary of grants mapping permission names to lists of grantees.
562 @property 563 def all_references(self) -> t.List[Reference]: 564 """All references including grains.""" 565 return [Reference(model_name=self.name, expression=e, unique=True) for e in self.grains] + [ 566 Reference(model_name=self.name, expression=e, unique=True) for e in self.references 567 ]
All references including grains.
569 @property 570 def on(self) -> t.List[str]: 571 """The grains to be used as join condition in table_diff.""" 572 573 on: t.List[str] = [] 574 for expr in [ref.expression for ref in self.all_references if ref.unique]: 575 if isinstance(expr, exp.Tuple): 576 on.extend([key.this.sql(dialect=self.dialect) for key in expr.expressions]) 577 else: 578 # Handle a single Column or Paren expression 579 on.append(expr.this.sql(dialect=self.dialect)) 580 581 return on
The grains to be used as join condition in table_diff.
599 @property 600 def catalog(self) -> t.Optional[str]: 601 """Returns the catalog of a model.""" 602 return self.fully_qualified_table.catalog
Returns the catalog of a model.
618 @property 619 def on_additive_change(self) -> OnAdditiveChange: 620 """Return the model's additive change setting if it has one.""" 621 return getattr(self.kind, "on_additive_change", OnAdditiveChange.ALLOW)
Return the model's additive change setting if it has one.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs