sqlmesh.core.model.meta
1from __future__ import annotations 2 3import typing as t 4from enum import Enum 5from functools import cached_property 6from typing_extensions import Self 7 8from pydantic import Field 9from sqlglot import Dialect, exp, parse_one 10from sqlglot.helper import ensure_collection, ensure_list 11from sqlglot.optimizer.normalize_identifiers import normalize_identifiers 12 13from sqlmesh.core import dialect as d 14from sqlmesh.core.config.common import VirtualEnvironmentMode 15from sqlmesh.core.config.linter import LinterConfig 16from sqlmesh.core.dialect import normalize_model_name 17from sqlmesh.utils import classproperty 18from sqlmesh.core.model.common import ( 19 bool_validator, 20 default_catalog_validator, 21 depends_on_validator, 22 properties_validator, 23 parse_properties, 24) 25from sqlmesh.core.model.kind import ( 26 CustomKind, 27 IncrementalByUniqueKeyKind, 28 ModelKind, 29 OnDestructiveChange, 30 SCDType2ByColumnKind, 31 SCDType2ByTimeKind, 32 TimeColumn, 33 ViewKind, 34 model_kind_validator, 35 OnAdditiveChange, 36) 37from sqlmesh.core.node import _Node, str_or_exp_to_str 38from sqlmesh.core.reference import Reference 39from sqlmesh.utils.date import TimeLike 40from sqlmesh.utils.errors import ConfigError 41from sqlmesh.utils.pydantic import ( 42 ValidationInfo, 43 field_validator, 44 list_of_fields_validator, 45 model_validator, 46 get_dialect, 47) 48 49if t.TYPE_CHECKING: 50 from sqlmesh.core._typing import CustomMaterializationProperties, SessionProperties 51 from sqlmesh.core.engine_adapter._typing import GrantsConfig 52 53FunctionCall = t.Tuple[str, t.Dict[str, exp.Expression]] 54 55 56class GrantsTargetLayer(str, Enum): 57 """Target layer(s) where grants should be applied.""" 58 59 ALL = "all" 60 PHYSICAL = "physical" 61 VIRTUAL = "virtual" 62 63 @classproperty 64 def default(cls) -> "GrantsTargetLayer": 65 return GrantsTargetLayer.VIRTUAL 66 67 @property 68 def is_all(self) -> bool: 69 return self == GrantsTargetLayer.ALL 70 71 @property 72 def is_physical(self) -> bool: 73 return self == GrantsTargetLayer.PHYSICAL 74 75 @property 76 def is_virtual(self) -> bool: 77 return self == GrantsTargetLayer.VIRTUAL 78 79 def __str__(self) -> str: 80 return self.name 81 82 def __repr__(self) -> str: 83 return str(self) 84 85 86class ModelMeta(_Node): 87 """Metadata for models which can be defined in SQL.""" 88 89 dialect: str = "" 90 name: str 91 kind: ModelKind = ViewKind() 92 retention: t.Optional[int] = None # not implemented yet 93 table_format: t.Optional[str] = None 94 storage_format: t.Optional[str] = None 95 partitioned_by_: t.List[exp.Expression] = Field(default=[], alias="partitioned_by") 96 clustered_by: t.List[exp.Expression] = [] 97 default_catalog: t.Optional[str] = None 98 depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on") 99 columns_to_types_: t.Optional[t.Dict[str, exp.DataType]] = Field(default=None, alias="columns") 100 column_descriptions_: t.Optional[t.Dict[str, str]] = Field( 101 default=None, alias="column_descriptions" 102 ) 103 audits: t.List[FunctionCall] = [] 104 grains: t.List[exp.Expression] = [] 105 references: t.List[exp.Expression] = [] 106 physical_schema_override: t.Optional[str] = None 107 physical_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="physical_properties") 108 virtual_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="virtual_properties") 109 session_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="session_properties") 110 allow_partials: bool = False 111 signals: t.List[FunctionCall] = [] 112 enabled: bool = True 113 physical_version: t.Optional[str] = None 114 gateway: t.Optional[str] = None 115 optimize_query: t.Optional[bool] = None 116 ignored_rules_: t.Optional[t.Set[str]] = Field( 117 default=None, exclude=True, alias="ignored_rules" 118 ) 119 formatting: t.Optional[bool] = Field(default=None, exclude=True) 120 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default 121 grants_: t.Optional[exp.Tuple] = Field(default=None, alias="grants") 122 grants_target_layer: GrantsTargetLayer = GrantsTargetLayer.default 123 124 _bool_validator = bool_validator 125 _model_kind_validator = model_kind_validator 126 _properties_validator = properties_validator 127 _default_catalog_validator = default_catalog_validator 128 _depends_on_validator = depends_on_validator 129 130 @field_validator("audits", "signals", mode="before") 131 def _func_call_validator(cls, v: t.Any, field: t.Any) -> t.Any: 132 is_signal = getattr(field, "name" if hasattr(field, "name") else "field_name") == "signals" 133 134 return d.extract_function_calls(v, allow_tuples=is_signal) 135 136 @field_validator("tags", mode="before") 137 def _value_or_tuple_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any: 138 return ensure_list(cls._validate_value_or_tuple(v, info.data)) 139 140 @classmethod 141 def _validate_value_or_tuple( 142 cls, v: t.Dict[str, t.Any], data: t.Dict[str, t.Any], normalize: bool = False 143 ) -> t.Any: 144 dialect = data.get("dialect") 145 146 def _normalize(value: t.Any) -> t.Any: 147 return normalize_identifiers(value, dialect=dialect) if normalize else value 148 149 if isinstance(v, exp.Paren): 150 v = [v.unnest()] 151 152 if isinstance(v, (exp.Tuple, exp.Array)): 153 return [_normalize(e).name for e in v.expressions] 154 if isinstance(v, exp.Expression): 155 return _normalize(v).name 156 if isinstance(v, str): 157 value = _normalize(v) 158 return value.name if isinstance(value, exp.Expression) else value 159 if isinstance(v, (list, tuple)): 160 return [cls._validate_value_or_tuple(elm, data, normalize=normalize) for elm in v] 161 162 return v 163 164 @field_validator("table_format", "storage_format", mode="before") 165 def _format_validator(cls, v: t.Any, info: ValidationInfo) -> t.Optional[str]: 166 if isinstance(v, exp.Expression) and not (isinstance(v, (exp.Literal, exp.Identifier))): 167 return v.sql(info.data.get("dialect")) 168 return str_or_exp_to_str(v) 169 170 @field_validator("dialect", mode="before") 171 def _dialect_validator(cls, v: t.Any) -> t.Optional[str]: 172 # dialects are parsed as identifiers and may get normalized as uppercase, 173 # so this ensures they'll be stored as lowercase 174 dialect = str_or_exp_to_str(v) 175 return dialect and dialect.lower() 176 177 @field_validator("physical_version", mode="before") 178 def _physical_version_validator(cls, v: t.Any) -> t.Optional[str]: 179 if v is None: 180 return v 181 return str_or_exp_to_str(v) 182 183 @field_validator("gateway", mode="before") 184 def _gateway_validator(cls, v: t.Any) -> t.Optional[str]: 185 if v is None: 186 return None 187 gateway = str_or_exp_to_str(v) 188 return gateway and gateway.lower() 189 190 @field_validator("partitioned_by_", "clustered_by", mode="before") 191 def _partition_and_cluster_validator( 192 cls, v: t.Any, info: ValidationInfo 193 ) -> t.List[exp.Expression]: 194 if ( 195 isinstance(v, list) 196 and all(isinstance(i, str) for i in v) 197 and info.field_name == "partitioned_by_" 198 ): 199 # this branch gets hit when we are deserializing from json because `partitioned_by` is stored as a List[str] 200 # however, we should only invoke this if the list contains strings because this validator is also 201 # called by Python models which might pass a List[exp.Expression] 202 string_to_parse = ( 203 f"({','.join(v)})" # recreate the (a, b, c) part of "partitioned_by (a, b, c)" 204 ) 205 parsed = parse_one( 206 string_to_parse, into=exp.PartitionedByProperty, dialect=get_dialect(info) 207 ) 208 v = parsed.this.expressions if isinstance(parsed.this, exp.Schema) else v 209 210 expressions = list_of_fields_validator(v, info.data) 211 212 for expression in expressions: 213 num_cols = len(list(expression.find_all(exp.Column))) 214 215 error_msg: t.Optional[str] = None 216 if num_cols == 0: 217 error_msg = "does not contain a column" 218 elif num_cols > 1: 219 error_msg = "contains multiple columns" 220 221 if error_msg: 222 raise ConfigError(f"Field '{expression}' {error_msg}") 223 224 return expressions 225 226 @field_validator( 227 "columns_to_types_", "derived_columns_to_types", mode="before", check_fields=False 228 ) 229 def _columns_validator( 230 cls, v: t.Any, info: ValidationInfo 231 ) -> t.Optional[t.Dict[str, exp.DataType]]: 232 columns_to_types = {} 233 dialect = info.data.get("dialect") 234 235 if isinstance(v, exp.Schema): 236 for column in v.expressions: 237 expr = column.args.get("kind") 238 if not isinstance(expr, exp.DataType): 239 raise ConfigError(f"Missing data type for column '{column.name}'.") 240 241 expr.meta["dialect"] = dialect 242 columns_to_types[normalize_identifiers(column, dialect=dialect).name] = expr 243 244 return columns_to_types 245 246 if isinstance(v, dict): 247 udt = Dialect.get_or_raise(dialect).SUPPORTS_USER_DEFINED_TYPES 248 for k, data_type in v.items(): 249 expr = exp.DataType.build(data_type, dialect=dialect, udt=udt) 250 expr.meta["dialect"] = dialect 251 columns_to_types[normalize_identifiers(k, dialect=dialect).name] = expr 252 253 return columns_to_types 254 255 return v 256 257 @field_validator("column_descriptions_", mode="before") 258 def _column_descriptions_validator( 259 cls, vs: t.Any, info: ValidationInfo 260 ) -> t.Optional[t.Dict[str, str]]: 261 dialect = info.data.get("dialect") 262 263 if vs is None: 264 return None 265 266 if isinstance(vs, exp.Paren): 267 vs = vs.flatten() 268 269 if isinstance(vs, (exp.Tuple, exp.Array)): 270 vs = vs.expressions 271 272 raw_col_descriptions = ( 273 vs 274 if isinstance(vs, dict) 275 else {".".join([part.this for part in v.this.parts]): v.expression.name for v in vs} 276 ) 277 278 col_descriptions = { 279 normalize_identifiers(k, dialect=dialect).name: v 280 for k, v in raw_col_descriptions.items() 281 } 282 283 columns_to_types = info.data.get("columns_to_types_") 284 if columns_to_types: 285 from sqlmesh.core.console import get_console 286 287 console = get_console() 288 for column_name in list(col_descriptions): 289 if column_name not in columns_to_types: 290 console.log_warning( 291 f"In model '{info.data['name']}', a description is provided for column '{column_name}' but it is not a column in the model." 292 ) 293 del col_descriptions[column_name] 294 295 return col_descriptions 296 297 @field_validator("grains", "references", mode="before") 298 def _refs_validator(cls, vs: t.Any, info: ValidationInfo) -> t.List[exp.Expression]: 299 dialect = info.data.get("dialect") 300 301 if isinstance(vs, exp.Paren): 302 vs = vs.unnest() 303 304 if isinstance(vs, (exp.Tuple, exp.Array)): 305 vs = vs.expressions 306 else: 307 vs = [ 308 d.parse_one(v, dialect=dialect) if isinstance(v, str) else v 309 for v in ensure_collection(vs) 310 ] 311 312 refs = [] 313 314 for v in vs: 315 v = exp.column(v) if isinstance(v, exp.Identifier) else v 316 v.meta["dialect"] = dialect 317 refs.append(v) 318 319 return refs 320 321 @field_validator("ignored_rules_", mode="before") 322 def ignored_rules_validator(cls, vs: t.Any) -> t.Any: 323 return LinterConfig._validate_rules(vs) 324 325 @field_validator("grants_target_layer", mode="before") 326 def _grants_target_layer_validator(cls, v: t.Any) -> t.Any: 327 if isinstance(v, exp.Identifier): 328 return v.this 329 if isinstance(v, exp.Literal) and v.is_string: 330 return v.this 331 return v 332 333 @field_validator("session_properties_", mode="before") 334 def session_properties_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any: 335 # use the generic properties validator to parse the session properties 336 parsed_session_properties = parse_properties(type(cls), v, info) 337 if not parsed_session_properties: 338 return parsed_session_properties 339 340 for eq in parsed_session_properties: 341 prop_name = eq.left.name 342 343 if prop_name == "query_label": 344 query_label = eq.right 345 if not isinstance( 346 query_label, (exp.Array, exp.Tuple, exp.Paren, d.MacroFunc, d.MacroVar) 347 ): 348 raise ConfigError( 349 "Invalid value for `session_properties.query_label`. Must be an array or tuple." 350 ) 351 352 label_tuples: t.List[exp.Expression] = ( 353 [query_label.unnest()] 354 if isinstance(query_label, exp.Paren) 355 else query_label.expressions 356 ) 357 358 for label_tuple in label_tuples: 359 if not ( 360 isinstance(label_tuple, exp.Tuple) 361 and len(label_tuple.expressions) == 2 362 and all(isinstance(label, exp.Literal) for label in label_tuple.expressions) 363 ): 364 raise ConfigError( 365 "Invalid entry in `session_properties.query_label`. Must be tuples of string literals with length 2." 366 ) 367 elif prop_name == "authorization": 368 authorization = eq.right 369 if not ( 370 isinstance(authorization, exp.Literal) and authorization.is_string 371 ) and not isinstance(authorization, (d.MacroFunc, d.MacroVar)): 372 raise ConfigError( 373 "Invalid value for `session_properties.authorization`. Must be a string literal." 374 ) 375 376 return parsed_session_properties 377 378 @model_validator(mode="before") 379 def _pre_root_validator(cls, data: t.Any) -> t.Any: 380 if not isinstance(data, dict): 381 return data 382 383 grain = data.pop("grain", None) 384 if grain: 385 grains = data.get("grains") 386 if grains: 387 raise ConfigError( 388 f"Cannot use argument 'grain' ({grain}) with 'grains' ({grains}), use only grains" 389 ) 390 data["grains"] = ensure_list(grain) 391 392 table_properties = data.pop("table_properties", None) 393 if table_properties: 394 if not isinstance(table_properties, str): 395 # Do not warn when deserializing from the state. 396 model_name = data["name"] 397 from sqlmesh.core.console import get_console 398 399 get_console().log_warning( 400 f"Model '{model_name}' is using the `table_properties` attribute which is deprecated. Please use `physical_properties` instead." 401 ) 402 physical_properties = data.get("physical_properties") 403 if physical_properties: 404 raise ConfigError( 405 f"Cannot use argument 'table_properties' ({table_properties}) with 'physical_properties' ({physical_properties}), use only physical_properties." 406 ) 407 408 data["physical_properties"] = table_properties 409 410 return data 411 412 @model_validator(mode="after") 413 def _root_validator(self) -> Self: 414 kind: t.Any = self.kind 415 416 for field in ("partitioned_by_", "clustered_by"): 417 if ( 418 getattr(self, field, None) 419 and not kind.is_materialized 420 and not (kind.is_view and kind.materialized) 421 ): 422 name = field[:-1] if field.endswith("_") else field 423 raise ValueError(f"{name} field cannot be set for {kind.name} models") 424 if kind.is_incremental_by_partition and not getattr(self, "partitioned_by_", None): 425 raise ValueError(f"partitioned_by field is required for {kind.name} models") 426 427 # needs to be in a mode=after model validator so that the field validators have run to convert from Expression -> str 428 if (storage_format := self.storage_format) and storage_format.lower() in { 429 "iceberg", 430 "hive", 431 "hudi", 432 "delta", 433 }: 434 from sqlmesh.core.console import get_console 435 436 get_console().log_warning( 437 f"Model {self.name} has `storage_format` set to a table format '{storage_format}' which is deprecated. Please use the `table_format` property instead." 438 ) 439 440 # Validate grants configuration for model kind support 441 if self.grants is not None and not kind.supports_grants: 442 raise ValueError(f"grants cannot be set for {kind.name} models") 443 444 return self 445 446 @property 447 def time_column(self) -> t.Optional[TimeColumn]: 448 """The time column for incremental models.""" 449 return getattr(self.kind, "time_column", None) 450 451 @property 452 def unique_key(self) -> t.List[exp.Expression]: 453 if isinstance( 454 self.kind, (SCDType2ByTimeKind, SCDType2ByColumnKind, IncrementalByUniqueKeyKind) 455 ): 456 return self.kind.unique_key 457 return [] 458 459 @property 460 def column_descriptions(self) -> t.Dict[str, str]: 461 """A dictionary of column names to annotation comments.""" 462 return self.column_descriptions_ or {} 463 464 @property 465 def lookback(self) -> int: 466 """The incremental lookback window.""" 467 return getattr(self.kind, "lookback", 0) or 0 468 469 def lookback_start(self, start: TimeLike) -> TimeLike: 470 if self.lookback == 0: 471 return start 472 473 for _ in range(self.lookback): 474 start = self.interval_unit.cron_prev(start) 475 return start 476 477 @property 478 def batch_size(self) -> t.Optional[int]: 479 """The maximal number of units in a single task for a backfill.""" 480 return getattr(self.kind, "batch_size", None) 481 482 @property 483 def batch_concurrency(self) -> t.Optional[int]: 484 """The maximal number of batches that can run concurrently for a backfill.""" 485 return getattr(self.kind, "batch_concurrency", None) 486 487 @cached_property 488 def physical_properties(self) -> t.Dict[str, exp.Expression]: 489 """A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated.""" 490 if self.physical_properties_: 491 return {e.this.name: e.expression for e in self.physical_properties_.expressions} 492 return {} 493 494 @cached_property 495 def virtual_properties(self) -> t.Dict[str, exp.Expression]: 496 """A dictionary of properties that will be applied to the virtual layer.""" 497 if self.virtual_properties_: 498 return {e.this.name: e.expression for e in self.virtual_properties_.expressions} 499 return {} 500 501 @property 502 def session_properties(self) -> SessionProperties: 503 """A dictionary of session properties.""" 504 if not self.session_properties_: 505 return {} 506 507 return d.interpret_key_value_pairs(self.session_properties_) 508 509 @property 510 def custom_materialization_properties(self) -> CustomMaterializationProperties: 511 if isinstance(self.kind, CustomKind): 512 return self.kind.materialization_properties 513 return {} 514 515 @cached_property 516 def grants(self) -> t.Optional[GrantsConfig]: 517 """A dictionary of grants mapping permission names to lists of grantees.""" 518 519 if self.grants_ is None: 520 return None 521 522 if not self.grants_.expressions: 523 return {} 524 525 grants_dict = {} 526 for eq_expr in self.grants_.expressions: 527 try: 528 permission_name = self._validate_config_expression(eq_expr.left) 529 grantee_list = self._validate_nested_config_values(eq_expr.expression) 530 grants_dict[permission_name] = grantee_list 531 except ConfigError as e: 532 permission_name = ( 533 eq_expr.left.name if hasattr(eq_expr.left, "name") else str(eq_expr.left) 534 ) 535 raise ConfigError(f"Invalid grants configuration for '{permission_name}': {e}") 536 537 return grants_dict if grants_dict else None 538 539 @property 540 def all_references(self) -> t.List[Reference]: 541 """All references including grains.""" 542 return [Reference(model_name=self.name, expression=e, unique=True) for e in self.grains] + [ 543 Reference(model_name=self.name, expression=e, unique=True) for e in self.references 544 ] 545 546 @property 547 def on(self) -> t.List[str]: 548 """The grains to be used as join condition in table_diff.""" 549 550 on: t.List[str] = [] 551 for expr in [ref.expression for ref in self.all_references if ref.unique]: 552 if isinstance(expr, exp.Tuple): 553 on.extend([key.this.sql(dialect=self.dialect) for key in expr.expressions]) 554 else: 555 # Handle a single Column or Paren expression 556 on.append(expr.this.sql(dialect=self.dialect)) 557 558 return on 559 560 @property 561 def managed_columns(self) -> t.Dict[str, exp.DataType]: 562 return getattr(self.kind, "managed_columns", {}) 563 564 @property 565 def when_matched(self) -> t.Optional[exp.Whens]: 566 if isinstance(self.kind, IncrementalByUniqueKeyKind): 567 return self.kind.when_matched 568 return None 569 570 @property 571 def merge_filter(self) -> t.Optional[exp.Expression]: 572 if isinstance(self.kind, IncrementalByUniqueKeyKind): 573 return self.kind.merge_filter 574 return None 575 576 @property 577 def catalog(self) -> t.Optional[str]: 578 """Returns the catalog of a model.""" 579 return self.fully_qualified_table.catalog 580 581 @cached_property 582 def fully_qualified_table(self) -> exp.Table: 583 return exp.to_table(self.fqn) 584 585 @cached_property 586 def fqn(self) -> str: 587 return normalize_model_name( 588 self.name, default_catalog=self.default_catalog, dialect=self.dialect 589 ) 590 591 @property 592 def on_destructive_change(self) -> OnDestructiveChange: 593 return getattr(self.kind, "on_destructive_change", OnDestructiveChange.ALLOW) 594 595 @property 596 def on_additive_change(self) -> OnAdditiveChange: 597 """Return the model's additive change setting if it has one.""" 598 return getattr(self.kind, "on_additive_change", OnAdditiveChange.ALLOW) 599 600 @property 601 def ignored_rules(self) -> t.Set[str]: 602 return self.ignored_rules_ or set() 603 604 def _validate_config_expression(self, expr: exp.Expression) -> str: 605 if isinstance(expr, (d.MacroFunc, d.MacroVar)): 606 raise ConfigError(f"Unresolved macro: {expr.sql(dialect=self.dialect)}") 607 608 if isinstance(expr, exp.Null): 609 raise ConfigError("NULL value") 610 611 if isinstance(expr, exp.Literal): 612 return str(expr.this).strip() 613 if isinstance(expr, (exp.Column, exp.Identifier)): 614 return expr.name 615 return expr.sql(dialect=self.dialect).strip() 616 617 def _validate_nested_config_values(self, value_expr: exp.Expression) -> t.List[str]: 618 result = [] 619 620 def flatten_expr(expr: exp.Expression) -> None: 621 if isinstance(expr, exp.Array): 622 for elem in expr.expressions: 623 flatten_expr(elem) 624 elif isinstance(expr, (exp.Tuple, exp.Paren)): 625 expressions = [expr.unnest()] if isinstance(expr, exp.Paren) else expr.expressions 626 for elem in expressions: 627 flatten_expr(elem) 628 else: 629 result.append(self._validate_config_expression(expr)) 630 631 flatten_expr(value_expr) 632 return result
57class GrantsTargetLayer(str, Enum): 58 """Target layer(s) where grants should be applied.""" 59 60 ALL = "all" 61 PHYSICAL = "physical" 62 VIRTUAL = "virtual" 63 64 @classproperty 65 def default(cls) -> "GrantsTargetLayer": 66 return GrantsTargetLayer.VIRTUAL 67 68 @property 69 def is_all(self) -> bool: 70 return self == GrantsTargetLayer.ALL 71 72 @property 73 def is_physical(self) -> bool: 74 return self == GrantsTargetLayer.PHYSICAL 75 76 @property 77 def is_virtual(self) -> bool: 78 return self == GrantsTargetLayer.VIRTUAL 79 80 def __str__(self) -> str: 81 return self.name 82 83 def __repr__(self) -> str: 84 return str(self)
Target layer(s) where grants should be applied.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
87class ModelMeta(_Node): 88 """Metadata for models which can be defined in SQL.""" 89 90 dialect: str = "" 91 name: str 92 kind: ModelKind = ViewKind() 93 retention: t.Optional[int] = None # not implemented yet 94 table_format: t.Optional[str] = None 95 storage_format: t.Optional[str] = None 96 partitioned_by_: t.List[exp.Expression] = Field(default=[], alias="partitioned_by") 97 clustered_by: t.List[exp.Expression] = [] 98 default_catalog: t.Optional[str] = None 99 depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on") 100 columns_to_types_: t.Optional[t.Dict[str, exp.DataType]] = Field(default=None, alias="columns") 101 column_descriptions_: t.Optional[t.Dict[str, str]] = Field( 102 default=None, alias="column_descriptions" 103 ) 104 audits: t.List[FunctionCall] = [] 105 grains: t.List[exp.Expression] = [] 106 references: t.List[exp.Expression] = [] 107 physical_schema_override: t.Optional[str] = None 108 physical_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="physical_properties") 109 virtual_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="virtual_properties") 110 session_properties_: t.Optional[exp.Tuple] = Field(default=None, alias="session_properties") 111 allow_partials: bool = False 112 signals: t.List[FunctionCall] = [] 113 enabled: bool = True 114 physical_version: t.Optional[str] = None 115 gateway: t.Optional[str] = None 116 optimize_query: t.Optional[bool] = None 117 ignored_rules_: t.Optional[t.Set[str]] = Field( 118 default=None, exclude=True, alias="ignored_rules" 119 ) 120 formatting: t.Optional[bool] = Field(default=None, exclude=True) 121 virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default 122 grants_: t.Optional[exp.Tuple] = Field(default=None, alias="grants") 123 grants_target_layer: GrantsTargetLayer = GrantsTargetLayer.default 124 125 _bool_validator = bool_validator 126 _model_kind_validator = model_kind_validator 127 _properties_validator = properties_validator 128 _default_catalog_validator = default_catalog_validator 129 _depends_on_validator = depends_on_validator 130 131 @field_validator("audits", "signals", mode="before") 132 def _func_call_validator(cls, v: t.Any, field: t.Any) -> t.Any: 133 is_signal = getattr(field, "name" if hasattr(field, "name") else "field_name") == "signals" 134 135 return d.extract_function_calls(v, allow_tuples=is_signal) 136 137 @field_validator("tags", mode="before") 138 def _value_or_tuple_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any: 139 return ensure_list(cls._validate_value_or_tuple(v, info.data)) 140 141 @classmethod 142 def _validate_value_or_tuple( 143 cls, v: t.Dict[str, t.Any], data: t.Dict[str, t.Any], normalize: bool = False 144 ) -> t.Any: 145 dialect = data.get("dialect") 146 147 def _normalize(value: t.Any) -> t.Any: 148 return normalize_identifiers(value, dialect=dialect) if normalize else value 149 150 if isinstance(v, exp.Paren): 151 v = [v.unnest()] 152 153 if isinstance(v, (exp.Tuple, exp.Array)): 154 return [_normalize(e).name for e in v.expressions] 155 if isinstance(v, exp.Expression): 156 return _normalize(v).name 157 if isinstance(v, str): 158 value = _normalize(v) 159 return value.name if isinstance(value, exp.Expression) else value 160 if isinstance(v, (list, tuple)): 161 return [cls._validate_value_or_tuple(elm, data, normalize=normalize) for elm in v] 162 163 return v 164 165 @field_validator("table_format", "storage_format", mode="before") 166 def _format_validator(cls, v: t.Any, info: ValidationInfo) -> t.Optional[str]: 167 if isinstance(v, exp.Expression) and not (isinstance(v, (exp.Literal, exp.Identifier))): 168 return v.sql(info.data.get("dialect")) 169 return str_or_exp_to_str(v) 170 171 @field_validator("dialect", mode="before") 172 def _dialect_validator(cls, v: t.Any) -> t.Optional[str]: 173 # dialects are parsed as identifiers and may get normalized as uppercase, 174 # so this ensures they'll be stored as lowercase 175 dialect = str_or_exp_to_str(v) 176 return dialect and dialect.lower() 177 178 @field_validator("physical_version", mode="before") 179 def _physical_version_validator(cls, v: t.Any) -> t.Optional[str]: 180 if v is None: 181 return v 182 return str_or_exp_to_str(v) 183 184 @field_validator("gateway", mode="before") 185 def _gateway_validator(cls, v: t.Any) -> t.Optional[str]: 186 if v is None: 187 return None 188 gateway = str_or_exp_to_str(v) 189 return gateway and gateway.lower() 190 191 @field_validator("partitioned_by_", "clustered_by", mode="before") 192 def _partition_and_cluster_validator( 193 cls, v: t.Any, info: ValidationInfo 194 ) -> t.List[exp.Expression]: 195 if ( 196 isinstance(v, list) 197 and all(isinstance(i, str) for i in v) 198 and info.field_name == "partitioned_by_" 199 ): 200 # this branch gets hit when we are deserializing from json because `partitioned_by` is stored as a List[str] 201 # however, we should only invoke this if the list contains strings because this validator is also 202 # called by Python models which might pass a List[exp.Expression] 203 string_to_parse = ( 204 f"({','.join(v)})" # recreate the (a, b, c) part of "partitioned_by (a, b, c)" 205 ) 206 parsed = parse_one( 207 string_to_parse, into=exp.PartitionedByProperty, dialect=get_dialect(info) 208 ) 209 v = parsed.this.expressions if isinstance(parsed.this, exp.Schema) else v 210 211 expressions = list_of_fields_validator(v, info.data) 212 213 for expression in expressions: 214 num_cols = len(list(expression.find_all(exp.Column))) 215 216 error_msg: t.Optional[str] = None 217 if num_cols == 0: 218 error_msg = "does not contain a column" 219 elif num_cols > 1: 220 error_msg = "contains multiple columns" 221 222 if error_msg: 223 raise ConfigError(f"Field '{expression}' {error_msg}") 224 225 return expressions 226 227 @field_validator( 228 "columns_to_types_", "derived_columns_to_types", mode="before", check_fields=False 229 ) 230 def _columns_validator( 231 cls, v: t.Any, info: ValidationInfo 232 ) -> t.Optional[t.Dict[str, exp.DataType]]: 233 columns_to_types = {} 234 dialect = info.data.get("dialect") 235 236 if isinstance(v, exp.Schema): 237 for column in v.expressions: 238 expr = column.args.get("kind") 239 if not isinstance(expr, exp.DataType): 240 raise ConfigError(f"Missing data type for column '{column.name}'.") 241 242 expr.meta["dialect"] = dialect 243 columns_to_types[normalize_identifiers(column, dialect=dialect).name] = expr 244 245 return columns_to_types 246 247 if isinstance(v, dict): 248 udt = Dialect.get_or_raise(dialect).SUPPORTS_USER_DEFINED_TYPES 249 for k, data_type in v.items(): 250 expr = exp.DataType.build(data_type, dialect=dialect, udt=udt) 251 expr.meta["dialect"] = dialect 252 columns_to_types[normalize_identifiers(k, dialect=dialect).name] = expr 253 254 return columns_to_types 255 256 return v 257 258 @field_validator("column_descriptions_", mode="before") 259 def _column_descriptions_validator( 260 cls, vs: t.Any, info: ValidationInfo 261 ) -> t.Optional[t.Dict[str, str]]: 262 dialect = info.data.get("dialect") 263 264 if vs is None: 265 return None 266 267 if isinstance(vs, exp.Paren): 268 vs = vs.flatten() 269 270 if isinstance(vs, (exp.Tuple, exp.Array)): 271 vs = vs.expressions 272 273 raw_col_descriptions = ( 274 vs 275 if isinstance(vs, dict) 276 else {".".join([part.this for part in v.this.parts]): v.expression.name for v in vs} 277 ) 278 279 col_descriptions = { 280 normalize_identifiers(k, dialect=dialect).name: v 281 for k, v in raw_col_descriptions.items() 282 } 283 284 columns_to_types = info.data.get("columns_to_types_") 285 if columns_to_types: 286 from sqlmesh.core.console import get_console 287 288 console = get_console() 289 for column_name in list(col_descriptions): 290 if column_name not in columns_to_types: 291 console.log_warning( 292 f"In model '{info.data['name']}', a description is provided for column '{column_name}' but it is not a column in the model." 293 ) 294 del col_descriptions[column_name] 295 296 return col_descriptions 297 298 @field_validator("grains", "references", mode="before") 299 def _refs_validator(cls, vs: t.Any, info: ValidationInfo) -> t.List[exp.Expression]: 300 dialect = info.data.get("dialect") 301 302 if isinstance(vs, exp.Paren): 303 vs = vs.unnest() 304 305 if isinstance(vs, (exp.Tuple, exp.Array)): 306 vs = vs.expressions 307 else: 308 vs = [ 309 d.parse_one(v, dialect=dialect) if isinstance(v, str) else v 310 for v in ensure_collection(vs) 311 ] 312 313 refs = [] 314 315 for v in vs: 316 v = exp.column(v) if isinstance(v, exp.Identifier) else v 317 v.meta["dialect"] = dialect 318 refs.append(v) 319 320 return refs 321 322 @field_validator("ignored_rules_", mode="before") 323 def ignored_rules_validator(cls, vs: t.Any) -> t.Any: 324 return LinterConfig._validate_rules(vs) 325 326 @field_validator("grants_target_layer", mode="before") 327 def _grants_target_layer_validator(cls, v: t.Any) -> t.Any: 328 if isinstance(v, exp.Identifier): 329 return v.this 330 if isinstance(v, exp.Literal) and v.is_string: 331 return v.this 332 return v 333 334 @field_validator("session_properties_", mode="before") 335 def session_properties_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any: 336 # use the generic properties validator to parse the session properties 337 parsed_session_properties = parse_properties(type(cls), v, info) 338 if not parsed_session_properties: 339 return parsed_session_properties 340 341 for eq in parsed_session_properties: 342 prop_name = eq.left.name 343 344 if prop_name == "query_label": 345 query_label = eq.right 346 if not isinstance( 347 query_label, (exp.Array, exp.Tuple, exp.Paren, d.MacroFunc, d.MacroVar) 348 ): 349 raise ConfigError( 350 "Invalid value for `session_properties.query_label`. Must be an array or tuple." 351 ) 352 353 label_tuples: t.List[exp.Expression] = ( 354 [query_label.unnest()] 355 if isinstance(query_label, exp.Paren) 356 else query_label.expressions 357 ) 358 359 for label_tuple in label_tuples: 360 if not ( 361 isinstance(label_tuple, exp.Tuple) 362 and len(label_tuple.expressions) == 2 363 and all(isinstance(label, exp.Literal) for label in label_tuple.expressions) 364 ): 365 raise ConfigError( 366 "Invalid entry in `session_properties.query_label`. Must be tuples of string literals with length 2." 367 ) 368 elif prop_name == "authorization": 369 authorization = eq.right 370 if not ( 371 isinstance(authorization, exp.Literal) and authorization.is_string 372 ) and not isinstance(authorization, (d.MacroFunc, d.MacroVar)): 373 raise ConfigError( 374 "Invalid value for `session_properties.authorization`. Must be a string literal." 375 ) 376 377 return parsed_session_properties 378 379 @model_validator(mode="before") 380 def _pre_root_validator(cls, data: t.Any) -> t.Any: 381 if not isinstance(data, dict): 382 return data 383 384 grain = data.pop("grain", None) 385 if grain: 386 grains = data.get("grains") 387 if grains: 388 raise ConfigError( 389 f"Cannot use argument 'grain' ({grain}) with 'grains' ({grains}), use only grains" 390 ) 391 data["grains"] = ensure_list(grain) 392 393 table_properties = data.pop("table_properties", None) 394 if table_properties: 395 if not isinstance(table_properties, str): 396 # Do not warn when deserializing from the state. 397 model_name = data["name"] 398 from sqlmesh.core.console import get_console 399 400 get_console().log_warning( 401 f"Model '{model_name}' is using the `table_properties` attribute which is deprecated. Please use `physical_properties` instead." 402 ) 403 physical_properties = data.get("physical_properties") 404 if physical_properties: 405 raise ConfigError( 406 f"Cannot use argument 'table_properties' ({table_properties}) with 'physical_properties' ({physical_properties}), use only physical_properties." 407 ) 408 409 data["physical_properties"] = table_properties 410 411 return data 412 413 @model_validator(mode="after") 414 def _root_validator(self) -> Self: 415 kind: t.Any = self.kind 416 417 for field in ("partitioned_by_", "clustered_by"): 418 if ( 419 getattr(self, field, None) 420 and not kind.is_materialized 421 and not (kind.is_view and kind.materialized) 422 ): 423 name = field[:-1] if field.endswith("_") else field 424 raise ValueError(f"{name} field cannot be set for {kind.name} models") 425 if kind.is_incremental_by_partition and not getattr(self, "partitioned_by_", None): 426 raise ValueError(f"partitioned_by field is required for {kind.name} models") 427 428 # needs to be in a mode=after model validator so that the field validators have run to convert from Expression -> str 429 if (storage_format := self.storage_format) and storage_format.lower() in { 430 "iceberg", 431 "hive", 432 "hudi", 433 "delta", 434 }: 435 from sqlmesh.core.console import get_console 436 437 get_console().log_warning( 438 f"Model {self.name} has `storage_format` set to a table format '{storage_format}' which is deprecated. Please use the `table_format` property instead." 439 ) 440 441 # Validate grants configuration for model kind support 442 if self.grants is not None and not kind.supports_grants: 443 raise ValueError(f"grants cannot be set for {kind.name} models") 444 445 return self 446 447 @property 448 def time_column(self) -> t.Optional[TimeColumn]: 449 """The time column for incremental models.""" 450 return getattr(self.kind, "time_column", None) 451 452 @property 453 def unique_key(self) -> t.List[exp.Expression]: 454 if isinstance( 455 self.kind, (SCDType2ByTimeKind, SCDType2ByColumnKind, IncrementalByUniqueKeyKind) 456 ): 457 return self.kind.unique_key 458 return [] 459 460 @property 461 def column_descriptions(self) -> t.Dict[str, str]: 462 """A dictionary of column names to annotation comments.""" 463 return self.column_descriptions_ or {} 464 465 @property 466 def lookback(self) -> int: 467 """The incremental lookback window.""" 468 return getattr(self.kind, "lookback", 0) or 0 469 470 def lookback_start(self, start: TimeLike) -> TimeLike: 471 if self.lookback == 0: 472 return start 473 474 for _ in range(self.lookback): 475 start = self.interval_unit.cron_prev(start) 476 return start 477 478 @property 479 def batch_size(self) -> t.Optional[int]: 480 """The maximal number of units in a single task for a backfill.""" 481 return getattr(self.kind, "batch_size", None) 482 483 @property 484 def batch_concurrency(self) -> t.Optional[int]: 485 """The maximal number of batches that can run concurrently for a backfill.""" 486 return getattr(self.kind, "batch_concurrency", None) 487 488 @cached_property 489 def physical_properties(self) -> t.Dict[str, exp.Expression]: 490 """A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated.""" 491 if self.physical_properties_: 492 return {e.this.name: e.expression for e in self.physical_properties_.expressions} 493 return {} 494 495 @cached_property 496 def virtual_properties(self) -> t.Dict[str, exp.Expression]: 497 """A dictionary of properties that will be applied to the virtual layer.""" 498 if self.virtual_properties_: 499 return {e.this.name: e.expression for e in self.virtual_properties_.expressions} 500 return {} 501 502 @property 503 def session_properties(self) -> SessionProperties: 504 """A dictionary of session properties.""" 505 if not self.session_properties_: 506 return {} 507 508 return d.interpret_key_value_pairs(self.session_properties_) 509 510 @property 511 def custom_materialization_properties(self) -> CustomMaterializationProperties: 512 if isinstance(self.kind, CustomKind): 513 return self.kind.materialization_properties 514 return {} 515 516 @cached_property 517 def grants(self) -> t.Optional[GrantsConfig]: 518 """A dictionary of grants mapping permission names to lists of grantees.""" 519 520 if self.grants_ is None: 521 return None 522 523 if not self.grants_.expressions: 524 return {} 525 526 grants_dict = {} 527 for eq_expr in self.grants_.expressions: 528 try: 529 permission_name = self._validate_config_expression(eq_expr.left) 530 grantee_list = self._validate_nested_config_values(eq_expr.expression) 531 grants_dict[permission_name] = grantee_list 532 except ConfigError as e: 533 permission_name = ( 534 eq_expr.left.name if hasattr(eq_expr.left, "name") else str(eq_expr.left) 535 ) 536 raise ConfigError(f"Invalid grants configuration for '{permission_name}': {e}") 537 538 return grants_dict if grants_dict else None 539 540 @property 541 def all_references(self) -> t.List[Reference]: 542 """All references including grains.""" 543 return [Reference(model_name=self.name, expression=e, unique=True) for e in self.grains] + [ 544 Reference(model_name=self.name, expression=e, unique=True) for e in self.references 545 ] 546 547 @property 548 def on(self) -> t.List[str]: 549 """The grains to be used as join condition in table_diff.""" 550 551 on: t.List[str] = [] 552 for expr in [ref.expression for ref in self.all_references if ref.unique]: 553 if isinstance(expr, exp.Tuple): 554 on.extend([key.this.sql(dialect=self.dialect) for key in expr.expressions]) 555 else: 556 # Handle a single Column or Paren expression 557 on.append(expr.this.sql(dialect=self.dialect)) 558 559 return on 560 561 @property 562 def managed_columns(self) -> t.Dict[str, exp.DataType]: 563 return getattr(self.kind, "managed_columns", {}) 564 565 @property 566 def when_matched(self) -> t.Optional[exp.Whens]: 567 if isinstance(self.kind, IncrementalByUniqueKeyKind): 568 return self.kind.when_matched 569 return None 570 571 @property 572 def merge_filter(self) -> t.Optional[exp.Expression]: 573 if isinstance(self.kind, IncrementalByUniqueKeyKind): 574 return self.kind.merge_filter 575 return None 576 577 @property 578 def catalog(self) -> t.Optional[str]: 579 """Returns the catalog of a model.""" 580 return self.fully_qualified_table.catalog 581 582 @cached_property 583 def fully_qualified_table(self) -> exp.Table: 584 return exp.to_table(self.fqn) 585 586 @cached_property 587 def fqn(self) -> str: 588 return normalize_model_name( 589 self.name, default_catalog=self.default_catalog, dialect=self.dialect 590 ) 591 592 @property 593 def on_destructive_change(self) -> OnDestructiveChange: 594 return getattr(self.kind, "on_destructive_change", OnDestructiveChange.ALLOW) 595 596 @property 597 def on_additive_change(self) -> OnAdditiveChange: 598 """Return the model's additive change setting if it has one.""" 599 return getattr(self.kind, "on_additive_change", OnAdditiveChange.ALLOW) 600 601 @property 602 def ignored_rules(self) -> t.Set[str]: 603 return self.ignored_rules_ or set() 604 605 def _validate_config_expression(self, expr: exp.Expression) -> str: 606 if isinstance(expr, (d.MacroFunc, d.MacroVar)): 607 raise ConfigError(f"Unresolved macro: {expr.sql(dialect=self.dialect)}") 608 609 if isinstance(expr, exp.Null): 610 raise ConfigError("NULL value") 611 612 if isinstance(expr, exp.Literal): 613 return str(expr.this).strip() 614 if isinstance(expr, (exp.Column, exp.Identifier)): 615 return expr.name 616 return expr.sql(dialect=self.dialect).strip() 617 618 def _validate_nested_config_values(self, value_expr: exp.Expression) -> t.List[str]: 619 result = [] 620 621 def flatten_expr(expr: exp.Expression) -> None: 622 if isinstance(expr, exp.Array): 623 for elem in expr.expressions: 624 flatten_expr(elem) 625 elif isinstance(expr, (exp.Tuple, exp.Paren)): 626 expressions = [expr.unnest()] if isinstance(expr, exp.Paren) else expr.expressions 627 for elem in expressions: 628 flatten_expr(elem) 629 else: 630 result.append(self._validate_config_expression(expr)) 631 632 flatten_expr(value_expr) 633 return result
Metadata for models which can be defined in SQL.
334 @field_validator("session_properties_", mode="before") 335 def session_properties_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any: 336 # use the generic properties validator to parse the session properties 337 parsed_session_properties = parse_properties(type(cls), v, info) 338 if not parsed_session_properties: 339 return parsed_session_properties 340 341 for eq in parsed_session_properties: 342 prop_name = eq.left.name 343 344 if prop_name == "query_label": 345 query_label = eq.right 346 if not isinstance( 347 query_label, (exp.Array, exp.Tuple, exp.Paren, d.MacroFunc, d.MacroVar) 348 ): 349 raise ConfigError( 350 "Invalid value for `session_properties.query_label`. Must be an array or tuple." 351 ) 352 353 label_tuples: t.List[exp.Expression] = ( 354 [query_label.unnest()] 355 if isinstance(query_label, exp.Paren) 356 else query_label.expressions 357 ) 358 359 for label_tuple in label_tuples: 360 if not ( 361 isinstance(label_tuple, exp.Tuple) 362 and len(label_tuple.expressions) == 2 363 and all(isinstance(label, exp.Literal) for label in label_tuple.expressions) 364 ): 365 raise ConfigError( 366 "Invalid entry in `session_properties.query_label`. Must be tuples of string literals with length 2." 367 ) 368 elif prop_name == "authorization": 369 authorization = eq.right 370 if not ( 371 isinstance(authorization, exp.Literal) and authorization.is_string 372 ) and not isinstance(authorization, (d.MacroFunc, d.MacroVar)): 373 raise ConfigError( 374 "Invalid value for `session_properties.authorization`. Must be a string literal." 375 ) 376 377 return parsed_session_properties
447 @property 448 def time_column(self) -> t.Optional[TimeColumn]: 449 """The time column for incremental models.""" 450 return getattr(self.kind, "time_column", None)
The time column for incremental models.
460 @property 461 def column_descriptions(self) -> t.Dict[str, str]: 462 """A dictionary of column names to annotation comments.""" 463 return self.column_descriptions_ or {}
A dictionary of column names to annotation comments.
465 @property 466 def lookback(self) -> int: 467 """The incremental lookback window.""" 468 return getattr(self.kind, "lookback", 0) or 0
The incremental lookback window.
478 @property 479 def batch_size(self) -> t.Optional[int]: 480 """The maximal number of units in a single task for a backfill.""" 481 return getattr(self.kind, "batch_size", None)
The maximal number of units in a single task for a backfill.
483 @property 484 def batch_concurrency(self) -> t.Optional[int]: 485 """The maximal number of batches that can run concurrently for a backfill.""" 486 return getattr(self.kind, "batch_concurrency", None)
The maximal number of batches that can run concurrently for a backfill.
488 @cached_property 489 def physical_properties(self) -> t.Dict[str, exp.Expression]: 490 """A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated.""" 491 if self.physical_properties_: 492 return {e.this.name: e.expression for e in self.physical_properties_.expressions} 493 return {}
A dictionary of properties that will be applied to the physical layer. It replaces table_properties which is deprecated.
495 @cached_property 496 def virtual_properties(self) -> t.Dict[str, exp.Expression]: 497 """A dictionary of properties that will be applied to the virtual layer.""" 498 if self.virtual_properties_: 499 return {e.this.name: e.expression for e in self.virtual_properties_.expressions} 500 return {}
A dictionary of properties that will be applied to the virtual layer.
502 @property 503 def session_properties(self) -> SessionProperties: 504 """A dictionary of session properties.""" 505 if not self.session_properties_: 506 return {} 507 508 return d.interpret_key_value_pairs(self.session_properties_)
A dictionary of session properties.
516 @cached_property 517 def grants(self) -> t.Optional[GrantsConfig]: 518 """A dictionary of grants mapping permission names to lists of grantees.""" 519 520 if self.grants_ is None: 521 return None 522 523 if not self.grants_.expressions: 524 return {} 525 526 grants_dict = {} 527 for eq_expr in self.grants_.expressions: 528 try: 529 permission_name = self._validate_config_expression(eq_expr.left) 530 grantee_list = self._validate_nested_config_values(eq_expr.expression) 531 grants_dict[permission_name] = grantee_list 532 except ConfigError as e: 533 permission_name = ( 534 eq_expr.left.name if hasattr(eq_expr.left, "name") else str(eq_expr.left) 535 ) 536 raise ConfigError(f"Invalid grants configuration for '{permission_name}': {e}") 537 538 return grants_dict if grants_dict else None
A dictionary of grants mapping permission names to lists of grantees.
540 @property 541 def all_references(self) -> t.List[Reference]: 542 """All references including grains.""" 543 return [Reference(model_name=self.name, expression=e, unique=True) for e in self.grains] + [ 544 Reference(model_name=self.name, expression=e, unique=True) for e in self.references 545 ]
All references including grains.
547 @property 548 def on(self) -> t.List[str]: 549 """The grains to be used as join condition in table_diff.""" 550 551 on: t.List[str] = [] 552 for expr in [ref.expression for ref in self.all_references if ref.unique]: 553 if isinstance(expr, exp.Tuple): 554 on.extend([key.this.sql(dialect=self.dialect) for key in expr.expressions]) 555 else: 556 # Handle a single Column or Paren expression 557 on.append(expr.this.sql(dialect=self.dialect)) 558 559 return on
The grains to be used as join condition in table_diff.
577 @property 578 def catalog(self) -> t.Optional[str]: 579 """Returns the catalog of a model.""" 580 return self.fully_qualified_table.catalog
Returns the catalog of a model.
596 @property 597 def on_additive_change(self) -> OnAdditiveChange: 598 """Return the model's additive change setting if it has one.""" 599 return getattr(self.kind, "on_additive_change", OnAdditiveChange.ALLOW)
Return the model's additive change setting if it has one.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs