sqlmesh.core.model.common
1from __future__ import annotations 2 3import ast 4import typing as t 5from pathlib import Path 6 7from astor import to_source 8from difflib import get_close_matches 9from sqlglot import exp 10from sqlglot.helper import ensure_list 11 12from sqlmesh.core import constants as c 13from sqlmesh.core import dialect as d 14from sqlmesh.core.macros import MacroRegistry, MacroStrTemplate 15from sqlmesh.utils import str_to_bool 16from sqlmesh.utils.errors import ConfigError, SQLMeshError, raise_config_error 17from sqlmesh.utils.metaprogramming import ( 18 Executable, 19 SqlValue, 20 build_env, 21 prepare_env, 22 serialize_env, 23) 24from sqlmesh.utils.pydantic import PydanticModel, ValidationInfo, field_validator, get_dialect 25 26if t.TYPE_CHECKING: 27 from sqlglot.dialects.dialect import DialectType 28 from sqlmesh.utils import registry_decorator 29 from sqlmesh.utils.jinja import MacroReference 30 31 MacroCallable = t.Union[Executable, registry_decorator] 32 33 34def make_python_env( 35 expressions: t.Union[ 36 exp.Expr, 37 t.List[t.Union[exp.Expr, t.Tuple[exp.Expr, bool]]], 38 ], 39 jinja_macro_references: t.Optional[t.Set[MacroReference]], 40 module_path: Path, 41 macros: MacroRegistry, 42 variables: t.Optional[t.Dict[str, t.Any]] = None, 43 referenced_variables: t.Optional[t.Set[str]] = None, 44 path: t.Optional[Path] = None, 45 python_env: t.Optional[t.Dict[str, Executable]] = None, 46 strict_resolution: bool = True, 47 blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, 48 dialect: DialectType = None, 49) -> t.Dict[str, Executable]: 50 python_env = {} if python_env is None else python_env 51 env: t.Dict[str, t.Tuple[t.Any, t.Optional[bool]]] = {} 52 53 variables = variables or {} 54 blueprint_variables = blueprint_variables or {} 55 56 used_macros: t.Dict[str, t.Tuple[MacroCallable, bool]] = {} 57 58 # var -> True: var is metadata-only 59 # var -> False: var is not metadata-only 60 # var -> None: cannot determine whether var is metadata-only yet, need to walk macros first 61 used_variables: t.Dict[str, t.Optional[bool]] = dict.fromkeys( 62 referenced_variables or set(), False 63 ) 64 65 # id(expr) -> true: expr appears under the AST of a metadata-only macro function 66 # id(expr) -> false: expr appears under the AST of a macro function whose metadata status we don't yet know 67 expr_under_metadata_macro_func: t.Dict[int, bool] = {} 68 69 # For @m1(@m2(@x), @y), we'd get x -> m1 and y -> m1 70 outermost_macro_func_ancestor_by_var: t.Dict[str, str] = {} 71 visited_macro_funcs: t.Set[int] = set() 72 73 def _is_metadata_var( 74 name: str, expression: exp.Expr, appears_in_metadata_expression: bool 75 ) -> t.Optional[bool]: 76 is_metadata_so_far = used_variables.get(name, True) 77 if is_metadata_so_far is False: 78 # We've concluded this variable is definitely not metadata-only 79 return False 80 81 appears_under_metadata_macro_func = expr_under_metadata_macro_func.get(id(expression)) 82 if is_metadata_so_far and ( 83 appears_in_metadata_expression or appears_under_metadata_macro_func 84 ): 85 # The variable appears in a metadata expression, e.g., audits (...), 86 # or in the AST of metadata-only macro call, e.g., @FOO(@x) 87 return True 88 89 # The variable appears in the AST of a macro call, but we don't know if it's metadata-only 90 if appears_under_metadata_macro_func is False: 91 return None 92 93 # The variable appears elsewhere, e.g., in the model's query: SELECT @x 94 return False 95 96 def _is_metadata_macro(name: str, appears_in_metadata_expression: bool) -> bool: 97 if name in used_macros: 98 is_metadata_so_far = used_macros[name][1] 99 return is_metadata_so_far and appears_in_metadata_expression 100 101 return appears_in_metadata_expression 102 103 expressions = ensure_list(expressions) 104 for expression_metadata in expressions: 105 if isinstance(expression_metadata, tuple): 106 expression, is_metadata = expression_metadata 107 else: 108 expression, is_metadata = expression_metadata, False 109 110 if isinstance(expression, d.Jinja): 111 continue 112 113 for macro_func_or_var in expression.find_all(d.MacroFunc, d.MacroVar, exp.Identifier): 114 if macro_func_or_var.__class__ is d.MacroFunc: 115 name = macro_func_or_var.this.name.lower() 116 if name not in macros: 117 continue 118 119 used_macros[name] = (macros[name], _is_metadata_macro(name, is_metadata)) 120 121 if name in (c.VAR, c.BLUEPRINT_VAR): 122 args = macro_func_or_var.this.expressions 123 if len(args) < 1: 124 raise_config_error( 125 f"Macro {name.upper()} requires at least one argument", path 126 ) 127 128 if not args[0].is_string: 129 raise_config_error( 130 f"The variable name must be a string literal, '{args[0].sql()}' was given instead", 131 path, 132 ) 133 134 var_name = args[0].this.lower() 135 used_variables[var_name] = _is_metadata_var( 136 var_name, macro_func_or_var, is_metadata 137 ) 138 elif id(macro_func_or_var) not in visited_macro_funcs: 139 # We only care about the top-level macro function calls to determine the metadata 140 # status of the variables referenced in their ASTs. For example, in @m1(@m2(@x)), 141 # if m1 is metadata-only but m2 is not, we can still determine that @x only affects 142 # the metadata hash, since m2's result feeds into a metadata-only macro function. 143 # 144 # Generally, if the top-level call is known to be metadata-only or appear in a 145 # metadata expression, then we can avoid traversing nested macro function calls. 146 147 var_refs, _expr_under_metadata_macro_func, _visited_macro_funcs = ( 148 _extract_macro_func_variable_references(macro_func_or_var, is_metadata) 149 ) 150 expr_under_metadata_macro_func.update(_expr_under_metadata_macro_func) 151 visited_macro_funcs.update(_visited_macro_funcs) 152 outermost_macro_func_ancestor_by_var |= {var_ref: name for var_ref in var_refs} 153 elif macro_func_or_var.__class__ is d.MacroVar: 154 var_name = macro_func_or_var.name.lower() 155 if var_name in macros: 156 used_macros[var_name] = ( 157 macros[var_name], 158 _is_metadata_macro(var_name, is_metadata), 159 ) 160 elif var_name in variables or var_name in blueprint_variables: 161 used_variables[var_name] = _is_metadata_var( 162 var_name, macro_func_or_var, is_metadata 163 ) 164 elif ( 165 isinstance(macro_func_or_var, (exp.Identifier, d.MacroStrReplace, d.MacroSQL)) 166 ) and "@" in macro_func_or_var.name: 167 for _, identifier, braced_identifier, _ in MacroStrTemplate.pattern.findall( 168 macro_func_or_var.name 169 ): 170 var_name = braced_identifier or identifier 171 if var_name in variables or var_name in blueprint_variables: 172 used_variables[var_name] = _is_metadata_var( 173 var_name, macro_func_or_var, is_metadata 174 ) 175 176 for macro_ref in jinja_macro_references or set(): 177 if macro_ref.package is None and macro_ref.name in macros: 178 used_macros[macro_ref.name] = (macros[macro_ref.name], False) 179 180 for name, (used_macro, is_metadata) in used_macros.items(): 181 if isinstance(used_macro, Executable): 182 python_env[name] = used_macro 183 elif not hasattr(used_macro, c.SQLMESH_BUILTIN) and name not in python_env: 184 build_env( 185 used_macro.func, 186 env=env, 187 name=name, 188 path=module_path, 189 is_metadata_obj=is_metadata, 190 ) 191 192 python_env.update(serialize_env(env, path=module_path)) 193 return _add_variables_to_python_env( 194 python_env, 195 used_variables, 196 variables, 197 blueprint_variables=blueprint_variables, 198 dialect=dialect, 199 strict_resolution=strict_resolution, 200 outermost_macro_func_ancestor_by_var=outermost_macro_func_ancestor_by_var, 201 ) 202 203 204def _extract_macro_func_variable_references( 205 macro_func: exp.Expr, 206 is_metadata: bool, 207) -> t.Tuple[t.Set[str], t.Dict[int, bool], t.Set[int]]: 208 var_references = set() 209 visited_macro_funcs = set() 210 expr_under_metadata_macro_func = {} 211 212 for n in macro_func.walk(): 213 if type(n) is d.MacroFunc: 214 visited_macro_funcs.add(id(n)) 215 216 this = n.this 217 args = this.expressions 218 219 if this.name.lower() in (c.VAR, c.BLUEPRINT_VAR) and args and args[0].is_string: 220 var_references.add(args[0].this.lower()) 221 expr_under_metadata_macro_func[id(n)] = is_metadata 222 elif isinstance(n, d.MacroVar): 223 var_references.add(n.name.lower()) 224 expr_under_metadata_macro_func[id(n)] = is_metadata 225 elif isinstance(n, (exp.Identifier, d.MacroStrReplace, d.MacroSQL)) and "@" in n.name: 226 var_references.update( 227 (braced_identifier or identifier).lower() 228 for _, identifier, braced_identifier, _ in MacroStrTemplate.pattern.findall(n.name) 229 ) 230 expr_under_metadata_macro_func[id(n)] = is_metadata 231 232 return (var_references, expr_under_metadata_macro_func, visited_macro_funcs) 233 234 235def _add_variables_to_python_env( 236 python_env: t.Dict[str, Executable], 237 used_variables: t.Dict[str, t.Optional[bool]], 238 variables: t.Optional[t.Dict[str, t.Any]], 239 strict_resolution: bool = True, 240 blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, 241 dialect: DialectType = None, 242 outermost_macro_func_ancestor_by_var: t.Optional[t.Dict[str, str]] = None, 243) -> t.Dict[str, Executable]: 244 _, python_used_variables = parse_dependencies( 245 python_env, 246 None, 247 strict_resolution=strict_resolution, 248 variables=variables, 249 blueprint_variables=blueprint_variables, 250 ) 251 for var_name, is_metadata in python_used_variables.items(): 252 used_variables[var_name] = is_metadata and used_variables.get(var_name, True) 253 254 # Variables are treated as metadata-only when all of their references either: 255 # - appear in metadata-only expressions, such as `audits (...)`, virtual statements, etc 256 # - appear in the ASTs or definitions of metadata-only macros 257 # 258 # See also: https://github.com/SQLMesh/sqlmesh/pull/4936#issuecomment-3136339936, 259 # specifically the "Terminology" and "Observations" section. 260 metadata_used_variables = { 261 var_name for var_name, is_metadata in used_variables.items() if is_metadata 262 } 263 for used_var, outermost_macro_func in (outermost_macro_func_ancestor_by_var or {}).items(): 264 used_var_is_metadata = used_variables.get(used_var) 265 if used_var_is_metadata is False: 266 continue 267 268 # At this point we can decide whether a variable reference in a macro call's AST is 269 # metadata-only, because we've annotated the corresponding macro call in the python env. 270 if outermost_macro_func in python_env and python_env[outermost_macro_func].is_metadata: 271 metadata_used_variables.add(used_var) 272 273 non_metadata_used_variables = set(used_variables) - metadata_used_variables 274 275 if overlapping_variables := (non_metadata_used_variables & metadata_used_variables): 276 raise ConfigError( 277 f"Variables {', '.join(overlapping_variables)} are both metadata and non-metadata, " 278 "which is unexpected. Please file an issue at https://github.com/SQLMesh/sqlmesh/issues/new." 279 ) 280 281 metadata_variables = { 282 k: v for k, v in (variables or {}).items() if k in metadata_used_variables 283 } 284 variables = {k: v for k, v in (variables or {}).items() if k in non_metadata_used_variables} 285 286 if variables: 287 python_env[c.SQLMESH_VARS] = Executable.value(variables, sort_root_dict=True) 288 if metadata_variables: 289 python_env[c.SQLMESH_VARS_METADATA] = Executable.value( 290 metadata_variables, sort_root_dict=True, is_metadata=True 291 ) 292 293 if blueprint_variables: 294 metadata_blueprint_variables = { 295 k: SqlValue(sql=v.sql(dialect=dialect)) if isinstance(v, exp.Expr) else v 296 for k, v in blueprint_variables.items() 297 if k in metadata_used_variables 298 } 299 blueprint_variables = { 300 k.lower(): SqlValue(sql=v.sql(dialect=dialect)) if isinstance(v, exp.Expr) else v 301 for k, v in blueprint_variables.items() 302 if k in non_metadata_used_variables 303 } 304 if blueprint_variables: 305 python_env[c.SQLMESH_BLUEPRINT_VARS] = Executable.value( 306 blueprint_variables, sort_root_dict=True 307 ) 308 if metadata_blueprint_variables: 309 python_env[c.SQLMESH_BLUEPRINT_VARS_METADATA] = Executable.value( 310 metadata_blueprint_variables, sort_root_dict=True, is_metadata=True 311 ) 312 313 return python_env 314 315 316def parse_dependencies( 317 python_env: t.Dict[str, Executable], 318 entrypoint: t.Optional[str], 319 strict_resolution: bool = True, 320 variables: t.Optional[t.Dict[str, t.Any]] = None, 321 blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, 322) -> t.Tuple[t.Set[str], t.Dict[str, bool]]: 323 """ 324 Parses the source of a model function and finds upstream table dependencies 325 and referenced variables based on calls to context / evaluator. 326 327 Args: 328 python_env: A dictionary of Python definitions. 329 entrypoint: The name of the function. 330 strict_resolution: If true, the arguments of `table` and `resolve_table` calls must 331 be resolvable at parse time, otherwise an exception will be raised. 332 variables: The variables available to the python environment. 333 blueprint_variables: The blueprint variables available to the python environment. 334 335 Returns: 336 A tuple containing the set of upstream table dependencies and a mapping of 337 the referenced variables associated with their metadata status. 338 """ 339 340 class VariableResolutionContext: 341 """This enables calls like `resolve_table` to reference `var()` and `blueprint_var()`.""" 342 343 @staticmethod 344 def var(var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 345 return (variables or {}).get(var_name.lower(), default) 346 347 @staticmethod 348 def blueprint_var(var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 349 return (blueprint_variables or {}).get(var_name.lower(), default) 350 351 env = prepare_env(python_env) 352 local_env = dict.fromkeys(("context", "evaluator"), VariableResolutionContext) 353 354 depends_on = set() 355 used_variables: t.Dict[str, bool] = {} 356 357 for executable in python_env.values(): 358 if not executable.is_definition: 359 continue 360 361 is_metadata = executable.is_metadata 362 for node in ast.walk(ast.parse(executable.payload)): 363 next_variables = set() 364 365 if isinstance(node, ast.Call): 366 func = node.func 367 if not isinstance(func, ast.Attribute) or not isinstance(func.value, ast.Name): 368 continue 369 370 def get_first_arg(keyword_arg_name: str) -> t.Any: 371 if node.args: 372 first_arg: t.Optional[ast.expr] = node.args[0] 373 else: 374 first_arg = next( 375 ( 376 keyword.value 377 for keyword in node.keywords 378 if keyword.arg == keyword_arg_name 379 ), 380 None, 381 ) 382 383 try: 384 expression = to_source(first_arg) 385 return eval(expression, env, local_env) 386 except Exception: 387 if strict_resolution: 388 raise ConfigError( 389 f"Error resolving dependencies for '{executable.path}'. " 390 f"Argument '{expression.strip()}' must be resolvable at parse time." 391 ) 392 393 if func.value.id == "context" and func.attr in ("table", "resolve_table"): 394 depends_on.add(get_first_arg("model_name")) 395 elif func.value.id in ("context", "evaluator") and func.attr in ( 396 c.VAR, 397 c.BLUEPRINT_VAR, 398 ): 399 next_variables.add(get_first_arg("var_name").lower()) 400 elif ( 401 isinstance(node, ast.Attribute) 402 and isinstance(node.value, ast.Name) 403 and node.value.id in ("context", "evaluator") 404 and node.attr == c.GATEWAY 405 ): 406 # Check whether the gateway attribute is referenced. 407 next_variables.add(c.GATEWAY) 408 elif isinstance(node, ast.FunctionDef) and node.name == entrypoint: 409 next_variables.update( 410 [ 411 arg.arg 412 for arg in [*node.args.args, *node.args.kwonlyargs] 413 if arg.arg != "context" 414 ] 415 ) 416 417 for var_name in next_variables: 418 used_variables[var_name] = used_variables.get(var_name, True) and bool(is_metadata) 419 420 return depends_on, used_variables 421 422 423def validate_extra_and_required_fields( 424 klass: t.Type[PydanticModel], 425 provided_fields: t.Set[str], 426 entity_name: str, 427 path: t.Optional[Path] = None, 428) -> None: 429 missing_required_fields = klass.missing_required_fields(provided_fields) 430 if missing_required_fields: 431 field_names = "'" + "', '".join(missing_required_fields) + "'" 432 raise_config_error( 433 f"Please add required field{'s' if len(missing_required_fields) > 1 else ''} {field_names} to the {entity_name}.", 434 path, 435 ) 436 437 extra_fields = klass.extra_fields(provided_fields) 438 if extra_fields: 439 extra_field_names = "'" + "', '".join(extra_fields) + "'" 440 441 all_fields = klass.all_fields() 442 close_matches = {} 443 for field in extra_fields: 444 matches = get_close_matches(field, all_fields, n=1) 445 if matches: 446 close_matches[field] = matches[0] 447 448 if len(close_matches) == 1: 449 similar_msg = ". Did you mean " + "'" + "', '".join(close_matches.values()) + "'?" 450 else: 451 similar = [ 452 f"- {field}: Did you mean '{match}'?" for field, match in close_matches.items() 453 ] 454 similar_msg = "\n\n " + "\n ".join(similar) if similar else "" 455 456 raise_config_error( 457 f"Invalid field name{'s' if len(extra_fields) > 1 else ''} present in the {entity_name}: {extra_field_names}{similar_msg}", 458 path, 459 ) 460 461 462def single_value_or_tuple(values: t.Sequence) -> exp.Identifier | exp.Tuple: 463 return ( 464 exp.to_identifier(values[0]) 465 if len(values) == 1 466 else exp.Tuple(expressions=[exp.to_identifier(v) for v in values]) 467 ) 468 469 470def parse_expression( 471 cls: t.Type, 472 v: t.Union[t.List[str], t.List[exp.Expr], str, exp.Expr, t.Callable, None], 473 info: t.Optional[ValidationInfo], 474) -> t.List[exp.Expr] | exp.Expr | t.Callable | None: 475 """Helper method to deserialize SQLGlot expressions in Pydantic Models.""" 476 if v is None: 477 return None 478 479 if callable(v): 480 return v 481 482 dialect = info.data.get("dialect") if info else "" 483 484 if isinstance(v, list): 485 return [ 486 e if isinstance(e, exp.Expr) else d.parse_one(e, dialect=dialect) # type: ignore[misc] 487 for e in v 488 if not isinstance(e, exp.Semicolon) 489 ] 490 491 if isinstance(v, str): 492 return d.parse_one(v, dialect=dialect) 493 494 if not v: 495 raise ConfigError(f"Could not parse {v}") 496 497 return v 498 499 500def parse_bool(v: t.Any) -> bool: 501 if isinstance(v, exp.Expr): 502 if not isinstance(v, exp.Boolean): 503 from sqlglot.optimizer.simplify import simplify 504 505 # Try to reduce expressions like (1 = 1) (see: T-SQL boolean generation) 506 v = simplify(v) 507 508 if isinstance(v, exp.Boolean): 509 return v.this 510 511 return str_to_bool(v.name) 512 513 return str_to_bool(str(v or "")) 514 515 516def parse_properties( 517 cls: t.Type, v: t.Any, info: t.Optional[ValidationInfo] 518) -> t.Optional[exp.Tuple]: 519 if v is None: 520 return v 521 522 dialect = info.data.get("dialect") if info else "" 523 524 if isinstance(v, str): 525 v = d.parse_one(v, dialect=dialect) 526 if isinstance(v, (exp.Array, exp.Paren, exp.Tuple)): 527 eq_expressions: t.List[exp.Expr] = ( 528 [v.unnest()] if isinstance(v, exp.Paren) else v.expressions 529 ) 530 531 for eq_expr in eq_expressions: 532 if not isinstance(eq_expr, exp.EQ): 533 raise ConfigError( 534 f"Invalid property '{eq_expr.sql(dialect=dialect)}'. " 535 "Properties must be specified as key-value pairs <key> = <value>. " 536 ) 537 538 properties = ( 539 exp.Tuple(expressions=eq_expressions) if isinstance(v, (exp.Paren, exp.Array)) else v 540 ) 541 elif isinstance(v, dict): 542 properties = exp.Tuple( 543 expressions=[exp.Literal.string(key).eq(value) for key, value in v.items()] 544 ) 545 else: 546 raise SQLMeshError(f"Unexpected properties '{v}'") 547 548 properties.meta["dialect"] = dialect 549 return properties 550 551 552def default_catalog(cls: t.Type, v: t.Any) -> t.Optional[str]: 553 if v is None: 554 return None 555 # If v is an expression then we will return expression as sql without a dialect 556 return str(v) 557 558 559def depends_on(cls: t.Type, v: t.Any, info: ValidationInfo) -> t.Optional[t.Set[str]]: 560 dialect = info.data.get("dialect") 561 default_catalog = info.data.get("default_catalog") 562 563 if isinstance(v, exp.Paren): 564 v = v.unnest() 565 566 if isinstance(v, (exp.Array, exp.Tuple)): 567 return { 568 d.normalize_model_name( 569 table.name if table.is_string else table, 570 default_catalog=default_catalog, 571 dialect=dialect, 572 ) 573 for table in v.expressions 574 } 575 if isinstance(v, (exp.Table, exp.Column)): 576 return {d.normalize_model_name(v, default_catalog=default_catalog, dialect=dialect)} 577 if hasattr(v, "__iter__") and not isinstance(v, str): 578 return { 579 d.normalize_model_name(name, default_catalog=default_catalog, dialect=dialect) 580 for name in v 581 } 582 583 return v 584 585 586def sort_python_env(python_env: t.Dict[str, Executable]) -> t.List[t.Tuple[str, Executable]]: 587 """Returns the python env sorted.""" 588 return sorted(python_env.items(), key=lambda x: (x[1].kind, x[0])) 589 590 591def sorted_python_env_payloads(python_env: t.Dict[str, Executable]) -> t.List[str]: 592 """Returns the payloads of the sorted python env.""" 593 594 def _executable_to_str(k: str, v: Executable) -> str: 595 result = f"# {v.path}\n" if v.path is not None else "" 596 if v.is_import or v.is_definition: 597 result += v.payload 598 else: 599 result += f"{k} = {v.payload}" 600 return result 601 602 return [_executable_to_str(k, v) for k, v in sort_python_env(python_env)] 603 604 605def parse_strings_with_macro_refs(value: t.Any, dialect: DialectType) -> t.Any: 606 if isinstance(value, str) and "@" in value: 607 return exp.maybe_parse(value, dialect=dialect) 608 609 if isinstance(value, dict): 610 for k, v in dict(value).items(): 611 value[k] = parse_strings_with_macro_refs(v, dialect) 612 elif isinstance(value, list): 613 value = [parse_strings_with_macro_refs(v, dialect) for v in value] 614 615 return value 616 617 618expression_validator: t.Callable = field_validator( 619 "unique_key", 620 mode="before", 621 check_fields=False, 622)(parse_expression) 623 624 625bool_validator: t.Callable = field_validator( 626 "skip", 627 "blocking", 628 "forward_only", 629 "disable_restatement", 630 "insert_overwrite", 631 "allow_partials", 632 "enabled", 633 "optimize_query", 634 "formatting", 635 mode="before", 636 check_fields=False, 637)(parse_bool) 638 639 640properties_validator: t.Callable = field_validator( 641 "physical_properties_", 642 "virtual_properties_", 643 "materialization_properties_", 644 "grants_", 645 mode="before", 646 check_fields=False, 647)(parse_properties) 648 649 650default_catalog_validator: t.Callable = field_validator( 651 "default_catalog", 652 mode="before", 653 check_fields=False, 654)(default_catalog) 655 656 657depends_on_validator: t.Callable = field_validator( 658 "depends_on_", 659 mode="before", 660 check_fields=False, 661)(depends_on) 662 663 664class ParsableSql(PydanticModel): 665 sql: str 666 transaction: t.Optional[bool] = None 667 668 _parsed: t.Optional[exp.Expr] = None 669 _parsed_dialect: t.Optional[str] = None 670 671 def parse(self, dialect: str) -> exp.Expr: 672 if self._parsed is None or self._parsed_dialect != dialect: 673 self._parsed = d.parse_one(self.sql, dialect=dialect) 674 self._parsed_dialect = dialect 675 return self._parsed # type: ignore[return-value] 676 677 @classmethod 678 def from_parsed_expression( 679 cls, parsed_expression: exp.Expr, dialect: str, use_meta_sql: bool = False 680 ) -> ParsableSql: 681 sql = ( 682 parsed_expression.meta.get("sql") or parsed_expression.sql(dialect=dialect) 683 if use_meta_sql 684 else parsed_expression.sql(dialect=dialect) 685 ) 686 result = cls(sql=sql) 687 result._parsed = parsed_expression 688 result._parsed_dialect = dialect 689 return result 690 691 @classmethod 692 def validator(cls) -> classmethod: 693 def _validate_parsable_sql( 694 v: t.Any, info: ValidationInfo 695 ) -> t.Optional[t.Union[ParsableSql, t.List[ParsableSql]]]: 696 if v is None: 697 return v 698 if isinstance(v, str): 699 return ParsableSql(sql=v) 700 if isinstance(v, exp.Expr): 701 return ParsableSql.from_parsed_expression( 702 v, get_dialect(info.data), use_meta_sql=False 703 ) 704 if isinstance(v, list): 705 dialect = get_dialect(info.data) 706 return [ 707 ParsableSql(sql=s) 708 if isinstance(s, str) 709 else ParsableSql.from_parsed_expression(s, dialect, use_meta_sql=False) 710 if isinstance(s, exp.Expr) 711 else ParsableSql.parse_obj(s) 712 for s in v 713 ] 714 return ParsableSql.parse_obj(v) 715 716 return field_validator( 717 "query_", 718 "expressions_", 719 "pre_statements_", 720 "post_statements_", 721 "on_virtual_update_", 722 mode="before", 723 check_fields=False, 724 )(_validate_parsable_sql)
35def make_python_env( 36 expressions: t.Union[ 37 exp.Expr, 38 t.List[t.Union[exp.Expr, t.Tuple[exp.Expr, bool]]], 39 ], 40 jinja_macro_references: t.Optional[t.Set[MacroReference]], 41 module_path: Path, 42 macros: MacroRegistry, 43 variables: t.Optional[t.Dict[str, t.Any]] = None, 44 referenced_variables: t.Optional[t.Set[str]] = None, 45 path: t.Optional[Path] = None, 46 python_env: t.Optional[t.Dict[str, Executable]] = None, 47 strict_resolution: bool = True, 48 blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, 49 dialect: DialectType = None, 50) -> t.Dict[str, Executable]: 51 python_env = {} if python_env is None else python_env 52 env: t.Dict[str, t.Tuple[t.Any, t.Optional[bool]]] = {} 53 54 variables = variables or {} 55 blueprint_variables = blueprint_variables or {} 56 57 used_macros: t.Dict[str, t.Tuple[MacroCallable, bool]] = {} 58 59 # var -> True: var is metadata-only 60 # var -> False: var is not metadata-only 61 # var -> None: cannot determine whether var is metadata-only yet, need to walk macros first 62 used_variables: t.Dict[str, t.Optional[bool]] = dict.fromkeys( 63 referenced_variables or set(), False 64 ) 65 66 # id(expr) -> true: expr appears under the AST of a metadata-only macro function 67 # id(expr) -> false: expr appears under the AST of a macro function whose metadata status we don't yet know 68 expr_under_metadata_macro_func: t.Dict[int, bool] = {} 69 70 # For @m1(@m2(@x), @y), we'd get x -> m1 and y -> m1 71 outermost_macro_func_ancestor_by_var: t.Dict[str, str] = {} 72 visited_macro_funcs: t.Set[int] = set() 73 74 def _is_metadata_var( 75 name: str, expression: exp.Expr, appears_in_metadata_expression: bool 76 ) -> t.Optional[bool]: 77 is_metadata_so_far = used_variables.get(name, True) 78 if is_metadata_so_far is False: 79 # We've concluded this variable is definitely not metadata-only 80 return False 81 82 appears_under_metadata_macro_func = expr_under_metadata_macro_func.get(id(expression)) 83 if is_metadata_so_far and ( 84 appears_in_metadata_expression or appears_under_metadata_macro_func 85 ): 86 # The variable appears in a metadata expression, e.g., audits (...), 87 # or in the AST of metadata-only macro call, e.g., @FOO(@x) 88 return True 89 90 # The variable appears in the AST of a macro call, but we don't know if it's metadata-only 91 if appears_under_metadata_macro_func is False: 92 return None 93 94 # The variable appears elsewhere, e.g., in the model's query: SELECT @x 95 return False 96 97 def _is_metadata_macro(name: str, appears_in_metadata_expression: bool) -> bool: 98 if name in used_macros: 99 is_metadata_so_far = used_macros[name][1] 100 return is_metadata_so_far and appears_in_metadata_expression 101 102 return appears_in_metadata_expression 103 104 expressions = ensure_list(expressions) 105 for expression_metadata in expressions: 106 if isinstance(expression_metadata, tuple): 107 expression, is_metadata = expression_metadata 108 else: 109 expression, is_metadata = expression_metadata, False 110 111 if isinstance(expression, d.Jinja): 112 continue 113 114 for macro_func_or_var in expression.find_all(d.MacroFunc, d.MacroVar, exp.Identifier): 115 if macro_func_or_var.__class__ is d.MacroFunc: 116 name = macro_func_or_var.this.name.lower() 117 if name not in macros: 118 continue 119 120 used_macros[name] = (macros[name], _is_metadata_macro(name, is_metadata)) 121 122 if name in (c.VAR, c.BLUEPRINT_VAR): 123 args = macro_func_or_var.this.expressions 124 if len(args) < 1: 125 raise_config_error( 126 f"Macro {name.upper()} requires at least one argument", path 127 ) 128 129 if not args[0].is_string: 130 raise_config_error( 131 f"The variable name must be a string literal, '{args[0].sql()}' was given instead", 132 path, 133 ) 134 135 var_name = args[0].this.lower() 136 used_variables[var_name] = _is_metadata_var( 137 var_name, macro_func_or_var, is_metadata 138 ) 139 elif id(macro_func_or_var) not in visited_macro_funcs: 140 # We only care about the top-level macro function calls to determine the metadata 141 # status of the variables referenced in their ASTs. For example, in @m1(@m2(@x)), 142 # if m1 is metadata-only but m2 is not, we can still determine that @x only affects 143 # the metadata hash, since m2's result feeds into a metadata-only macro function. 144 # 145 # Generally, if the top-level call is known to be metadata-only or appear in a 146 # metadata expression, then we can avoid traversing nested macro function calls. 147 148 var_refs, _expr_under_metadata_macro_func, _visited_macro_funcs = ( 149 _extract_macro_func_variable_references(macro_func_or_var, is_metadata) 150 ) 151 expr_under_metadata_macro_func.update(_expr_under_metadata_macro_func) 152 visited_macro_funcs.update(_visited_macro_funcs) 153 outermost_macro_func_ancestor_by_var |= {var_ref: name for var_ref in var_refs} 154 elif macro_func_or_var.__class__ is d.MacroVar: 155 var_name = macro_func_or_var.name.lower() 156 if var_name in macros: 157 used_macros[var_name] = ( 158 macros[var_name], 159 _is_metadata_macro(var_name, is_metadata), 160 ) 161 elif var_name in variables or var_name in blueprint_variables: 162 used_variables[var_name] = _is_metadata_var( 163 var_name, macro_func_or_var, is_metadata 164 ) 165 elif ( 166 isinstance(macro_func_or_var, (exp.Identifier, d.MacroStrReplace, d.MacroSQL)) 167 ) and "@" in macro_func_or_var.name: 168 for _, identifier, braced_identifier, _ in MacroStrTemplate.pattern.findall( 169 macro_func_or_var.name 170 ): 171 var_name = braced_identifier or identifier 172 if var_name in variables or var_name in blueprint_variables: 173 used_variables[var_name] = _is_metadata_var( 174 var_name, macro_func_or_var, is_metadata 175 ) 176 177 for macro_ref in jinja_macro_references or set(): 178 if macro_ref.package is None and macro_ref.name in macros: 179 used_macros[macro_ref.name] = (macros[macro_ref.name], False) 180 181 for name, (used_macro, is_metadata) in used_macros.items(): 182 if isinstance(used_macro, Executable): 183 python_env[name] = used_macro 184 elif not hasattr(used_macro, c.SQLMESH_BUILTIN) and name not in python_env: 185 build_env( 186 used_macro.func, 187 env=env, 188 name=name, 189 path=module_path, 190 is_metadata_obj=is_metadata, 191 ) 192 193 python_env.update(serialize_env(env, path=module_path)) 194 return _add_variables_to_python_env( 195 python_env, 196 used_variables, 197 variables, 198 blueprint_variables=blueprint_variables, 199 dialect=dialect, 200 strict_resolution=strict_resolution, 201 outermost_macro_func_ancestor_by_var=outermost_macro_func_ancestor_by_var, 202 )
317def parse_dependencies( 318 python_env: t.Dict[str, Executable], 319 entrypoint: t.Optional[str], 320 strict_resolution: bool = True, 321 variables: t.Optional[t.Dict[str, t.Any]] = None, 322 blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, 323) -> t.Tuple[t.Set[str], t.Dict[str, bool]]: 324 """ 325 Parses the source of a model function and finds upstream table dependencies 326 and referenced variables based on calls to context / evaluator. 327 328 Args: 329 python_env: A dictionary of Python definitions. 330 entrypoint: The name of the function. 331 strict_resolution: If true, the arguments of `table` and `resolve_table` calls must 332 be resolvable at parse time, otherwise an exception will be raised. 333 variables: The variables available to the python environment. 334 blueprint_variables: The blueprint variables available to the python environment. 335 336 Returns: 337 A tuple containing the set of upstream table dependencies and a mapping of 338 the referenced variables associated with their metadata status. 339 """ 340 341 class VariableResolutionContext: 342 """This enables calls like `resolve_table` to reference `var()` and `blueprint_var()`.""" 343 344 @staticmethod 345 def var(var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 346 return (variables or {}).get(var_name.lower(), default) 347 348 @staticmethod 349 def blueprint_var(var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 350 return (blueprint_variables or {}).get(var_name.lower(), default) 351 352 env = prepare_env(python_env) 353 local_env = dict.fromkeys(("context", "evaluator"), VariableResolutionContext) 354 355 depends_on = set() 356 used_variables: t.Dict[str, bool] = {} 357 358 for executable in python_env.values(): 359 if not executable.is_definition: 360 continue 361 362 is_metadata = executable.is_metadata 363 for node in ast.walk(ast.parse(executable.payload)): 364 next_variables = set() 365 366 if isinstance(node, ast.Call): 367 func = node.func 368 if not isinstance(func, ast.Attribute) or not isinstance(func.value, ast.Name): 369 continue 370 371 def get_first_arg(keyword_arg_name: str) -> t.Any: 372 if node.args: 373 first_arg: t.Optional[ast.expr] = node.args[0] 374 else: 375 first_arg = next( 376 ( 377 keyword.value 378 for keyword in node.keywords 379 if keyword.arg == keyword_arg_name 380 ), 381 None, 382 ) 383 384 try: 385 expression = to_source(first_arg) 386 return eval(expression, env, local_env) 387 except Exception: 388 if strict_resolution: 389 raise ConfigError( 390 f"Error resolving dependencies for '{executable.path}'. " 391 f"Argument '{expression.strip()}' must be resolvable at parse time." 392 ) 393 394 if func.value.id == "context" and func.attr in ("table", "resolve_table"): 395 depends_on.add(get_first_arg("model_name")) 396 elif func.value.id in ("context", "evaluator") and func.attr in ( 397 c.VAR, 398 c.BLUEPRINT_VAR, 399 ): 400 next_variables.add(get_first_arg("var_name").lower()) 401 elif ( 402 isinstance(node, ast.Attribute) 403 and isinstance(node.value, ast.Name) 404 and node.value.id in ("context", "evaluator") 405 and node.attr == c.GATEWAY 406 ): 407 # Check whether the gateway attribute is referenced. 408 next_variables.add(c.GATEWAY) 409 elif isinstance(node, ast.FunctionDef) and node.name == entrypoint: 410 next_variables.update( 411 [ 412 arg.arg 413 for arg in [*node.args.args, *node.args.kwonlyargs] 414 if arg.arg != "context" 415 ] 416 ) 417 418 for var_name in next_variables: 419 used_variables[var_name] = used_variables.get(var_name, True) and bool(is_metadata) 420 421 return depends_on, used_variables
Parses the source of a model function and finds upstream table dependencies and referenced variables based on calls to context / evaluator.
Arguments:
- python_env: A dictionary of Python definitions.
- entrypoint: The name of the function.
- strict_resolution: If true, the arguments of
tableandresolve_tablecalls must be resolvable at parse time, otherwise an exception will be raised. - variables: The variables available to the python environment.
- blueprint_variables: The blueprint variables available to the python environment.
Returns:
A tuple containing the set of upstream table dependencies and a mapping of the referenced variables associated with their metadata status.
424def validate_extra_and_required_fields( 425 klass: t.Type[PydanticModel], 426 provided_fields: t.Set[str], 427 entity_name: str, 428 path: t.Optional[Path] = None, 429) -> None: 430 missing_required_fields = klass.missing_required_fields(provided_fields) 431 if missing_required_fields: 432 field_names = "'" + "', '".join(missing_required_fields) + "'" 433 raise_config_error( 434 f"Please add required field{'s' if len(missing_required_fields) > 1 else ''} {field_names} to the {entity_name}.", 435 path, 436 ) 437 438 extra_fields = klass.extra_fields(provided_fields) 439 if extra_fields: 440 extra_field_names = "'" + "', '".join(extra_fields) + "'" 441 442 all_fields = klass.all_fields() 443 close_matches = {} 444 for field in extra_fields: 445 matches = get_close_matches(field, all_fields, n=1) 446 if matches: 447 close_matches[field] = matches[0] 448 449 if len(close_matches) == 1: 450 similar_msg = ". Did you mean " + "'" + "', '".join(close_matches.values()) + "'?" 451 else: 452 similar = [ 453 f"- {field}: Did you mean '{match}'?" for field, match in close_matches.items() 454 ] 455 similar_msg = "\n\n " + "\n ".join(similar) if similar else "" 456 457 raise_config_error( 458 f"Invalid field name{'s' if len(extra_fields) > 1 else ''} present in the {entity_name}: {extra_field_names}{similar_msg}", 459 path, 460 )
471def parse_expression( 472 cls: t.Type, 473 v: t.Union[t.List[str], t.List[exp.Expr], str, exp.Expr, t.Callable, None], 474 info: t.Optional[ValidationInfo], 475) -> t.List[exp.Expr] | exp.Expr | t.Callable | None: 476 """Helper method to deserialize SQLGlot expressions in Pydantic Models.""" 477 if v is None: 478 return None 479 480 if callable(v): 481 return v 482 483 dialect = info.data.get("dialect") if info else "" 484 485 if isinstance(v, list): 486 return [ 487 e if isinstance(e, exp.Expr) else d.parse_one(e, dialect=dialect) # type: ignore[misc] 488 for e in v 489 if not isinstance(e, exp.Semicolon) 490 ] 491 492 if isinstance(v, str): 493 return d.parse_one(v, dialect=dialect) 494 495 if not v: 496 raise ConfigError(f"Could not parse {v}") 497 498 return v
Helper method to deserialize SQLGlot expressions in Pydantic Models.
501def parse_bool(v: t.Any) -> bool: 502 if isinstance(v, exp.Expr): 503 if not isinstance(v, exp.Boolean): 504 from sqlglot.optimizer.simplify import simplify 505 506 # Try to reduce expressions like (1 = 1) (see: T-SQL boolean generation) 507 v = simplify(v) 508 509 if isinstance(v, exp.Boolean): 510 return v.this 511 512 return str_to_bool(v.name) 513 514 return str_to_bool(str(v or ""))
517def parse_properties( 518 cls: t.Type, v: t.Any, info: t.Optional[ValidationInfo] 519) -> t.Optional[exp.Tuple]: 520 if v is None: 521 return v 522 523 dialect = info.data.get("dialect") if info else "" 524 525 if isinstance(v, str): 526 v = d.parse_one(v, dialect=dialect) 527 if isinstance(v, (exp.Array, exp.Paren, exp.Tuple)): 528 eq_expressions: t.List[exp.Expr] = ( 529 [v.unnest()] if isinstance(v, exp.Paren) else v.expressions 530 ) 531 532 for eq_expr in eq_expressions: 533 if not isinstance(eq_expr, exp.EQ): 534 raise ConfigError( 535 f"Invalid property '{eq_expr.sql(dialect=dialect)}'. " 536 "Properties must be specified as key-value pairs <key> = <value>. " 537 ) 538 539 properties = ( 540 exp.Tuple(expressions=eq_expressions) if isinstance(v, (exp.Paren, exp.Array)) else v 541 ) 542 elif isinstance(v, dict): 543 properties = exp.Tuple( 544 expressions=[exp.Literal.string(key).eq(value) for key, value in v.items()] 545 ) 546 else: 547 raise SQLMeshError(f"Unexpected properties '{v}'") 548 549 properties.meta["dialect"] = dialect 550 return properties
560def depends_on(cls: t.Type, v: t.Any, info: ValidationInfo) -> t.Optional[t.Set[str]]: 561 dialect = info.data.get("dialect") 562 default_catalog = info.data.get("default_catalog") 563 564 if isinstance(v, exp.Paren): 565 v = v.unnest() 566 567 if isinstance(v, (exp.Array, exp.Tuple)): 568 return { 569 d.normalize_model_name( 570 table.name if table.is_string else table, 571 default_catalog=default_catalog, 572 dialect=dialect, 573 ) 574 for table in v.expressions 575 } 576 if isinstance(v, (exp.Table, exp.Column)): 577 return {d.normalize_model_name(v, default_catalog=default_catalog, dialect=dialect)} 578 if hasattr(v, "__iter__") and not isinstance(v, str): 579 return { 580 d.normalize_model_name(name, default_catalog=default_catalog, dialect=dialect) 581 for name in v 582 } 583 584 return v
587def sort_python_env(python_env: t.Dict[str, Executable]) -> t.List[t.Tuple[str, Executable]]: 588 """Returns the python env sorted.""" 589 return sorted(python_env.items(), key=lambda x: (x[1].kind, x[0]))
Returns the python env sorted.
592def sorted_python_env_payloads(python_env: t.Dict[str, Executable]) -> t.List[str]: 593 """Returns the payloads of the sorted python env.""" 594 595 def _executable_to_str(k: str, v: Executable) -> str: 596 result = f"# {v.path}\n" if v.path is not None else "" 597 if v.is_import or v.is_definition: 598 result += v.payload 599 else: 600 result += f"{k} = {v.payload}" 601 return result 602 603 return [_executable_to_str(k, v) for k, v in sort_python_env(python_env)]
Returns the payloads of the sorted python env.
606def parse_strings_with_macro_refs(value: t.Any, dialect: DialectType) -> t.Any: 607 if isinstance(value, str) and "@" in value: 608 return exp.maybe_parse(value, dialect=dialect) 609 610 if isinstance(value, dict): 611 for k, v in dict(value).items(): 612 value[k] = parse_strings_with_macro_refs(v, dialect) 613 elif isinstance(value, list): 614 value = [parse_strings_with_macro_refs(v, dialect) for v in value] 615 616 return value
471def parse_expression( 472 cls: t.Type, 473 v: t.Union[t.List[str], t.List[exp.Expr], str, exp.Expr, t.Callable, None], 474 info: t.Optional[ValidationInfo], 475) -> t.List[exp.Expr] | exp.Expr | t.Callable | None: 476 """Helper method to deserialize SQLGlot expressions in Pydantic Models.""" 477 if v is None: 478 return None 479 480 if callable(v): 481 return v 482 483 dialect = info.data.get("dialect") if info else "" 484 485 if isinstance(v, list): 486 return [ 487 e if isinstance(e, exp.Expr) else d.parse_one(e, dialect=dialect) # type: ignore[misc] 488 for e in v 489 if not isinstance(e, exp.Semicolon) 490 ] 491 492 if isinstance(v, str): 493 return d.parse_one(v, dialect=dialect) 494 495 if not v: 496 raise ConfigError(f"Could not parse {v}") 497 498 return v
Helper method to deserialize SQLGlot expressions in Pydantic Models.
Wrap a classmethod, staticmethod, property or unbound function and act as a descriptor that allows us to detect decorated items from the class' attributes.
This class' __get__ returns the wrapped item's __get__ result, which makes it transparent for classmethods and staticmethods.
Attributes:
- wrapped: The decorator that has to be wrapped.
- decorator_info: The decorator info.
- shim: A wrapper function to wrap V1 style function.
517def parse_properties( 518 cls: t.Type, v: t.Any, info: t.Optional[ValidationInfo] 519) -> t.Optional[exp.Tuple]: 520 if v is None: 521 return v 522 523 dialect = info.data.get("dialect") if info else "" 524 525 if isinstance(v, str): 526 v = d.parse_one(v, dialect=dialect) 527 if isinstance(v, (exp.Array, exp.Paren, exp.Tuple)): 528 eq_expressions: t.List[exp.Expr] = ( 529 [v.unnest()] if isinstance(v, exp.Paren) else v.expressions 530 ) 531 532 for eq_expr in eq_expressions: 533 if not isinstance(eq_expr, exp.EQ): 534 raise ConfigError( 535 f"Invalid property '{eq_expr.sql(dialect=dialect)}'. " 536 "Properties must be specified as key-value pairs <key> = <value>. " 537 ) 538 539 properties = ( 540 exp.Tuple(expressions=eq_expressions) if isinstance(v, (exp.Paren, exp.Array)) else v 541 ) 542 elif isinstance(v, dict): 543 properties = exp.Tuple( 544 expressions=[exp.Literal.string(key).eq(value) for key, value in v.items()] 545 ) 546 else: 547 raise SQLMeshError(f"Unexpected properties '{v}'") 548 549 properties.meta["dialect"] = dialect 550 return properties
Wrap a classmethod, staticmethod, property or unbound function and act as a descriptor that allows us to detect decorated items from the class' attributes.
This class' __get__ returns the wrapped item's __get__ result, which makes it transparent for classmethods and staticmethods.
Attributes:
- wrapped: The decorator that has to be wrapped.
- decorator_info: The decorator info.
- shim: A wrapper function to wrap V1 style function.
553def default_catalog(cls: t.Type, v: t.Any) -> t.Optional[str]: 554 if v is None: 555 return None 556 # If v is an expression then we will return expression as sql without a dialect 557 return str(v)
Wrap a classmethod, staticmethod, property or unbound function and act as a descriptor that allows us to detect decorated items from the class' attributes.
This class' __get__ returns the wrapped item's __get__ result, which makes it transparent for classmethods and staticmethods.
Attributes:
- wrapped: The decorator that has to be wrapped.
- decorator_info: The decorator info.
- shim: A wrapper function to wrap V1 style function.
560def depends_on(cls: t.Type, v: t.Any, info: ValidationInfo) -> t.Optional[t.Set[str]]: 561 dialect = info.data.get("dialect") 562 default_catalog = info.data.get("default_catalog") 563 564 if isinstance(v, exp.Paren): 565 v = v.unnest() 566 567 if isinstance(v, (exp.Array, exp.Tuple)): 568 return { 569 d.normalize_model_name( 570 table.name if table.is_string else table, 571 default_catalog=default_catalog, 572 dialect=dialect, 573 ) 574 for table in v.expressions 575 } 576 if isinstance(v, (exp.Table, exp.Column)): 577 return {d.normalize_model_name(v, default_catalog=default_catalog, dialect=dialect)} 578 if hasattr(v, "__iter__") and not isinstance(v, str): 579 return { 580 d.normalize_model_name(name, default_catalog=default_catalog, dialect=dialect) 581 for name in v 582 } 583 584 return v
Wrap a classmethod, staticmethod, property or unbound function and act as a descriptor that allows us to detect decorated items from the class' attributes.
This class' __get__ returns the wrapped item's __get__ result, which makes it transparent for classmethods and staticmethods.
Attributes:
- wrapped: The decorator that has to be wrapped.
- decorator_info: The decorator info.
- shim: A wrapper function to wrap V1 style function.
665class ParsableSql(PydanticModel): 666 sql: str 667 transaction: t.Optional[bool] = None 668 669 _parsed: t.Optional[exp.Expr] = None 670 _parsed_dialect: t.Optional[str] = None 671 672 def parse(self, dialect: str) -> exp.Expr: 673 if self._parsed is None or self._parsed_dialect != dialect: 674 self._parsed = d.parse_one(self.sql, dialect=dialect) 675 self._parsed_dialect = dialect 676 return self._parsed # type: ignore[return-value] 677 678 @classmethod 679 def from_parsed_expression( 680 cls, parsed_expression: exp.Expr, dialect: str, use_meta_sql: bool = False 681 ) -> ParsableSql: 682 sql = ( 683 parsed_expression.meta.get("sql") or parsed_expression.sql(dialect=dialect) 684 if use_meta_sql 685 else parsed_expression.sql(dialect=dialect) 686 ) 687 result = cls(sql=sql) 688 result._parsed = parsed_expression 689 result._parsed_dialect = dialect 690 return result 691 692 @classmethod 693 def validator(cls) -> classmethod: 694 def _validate_parsable_sql( 695 v: t.Any, info: ValidationInfo 696 ) -> t.Optional[t.Union[ParsableSql, t.List[ParsableSql]]]: 697 if v is None: 698 return v 699 if isinstance(v, str): 700 return ParsableSql(sql=v) 701 if isinstance(v, exp.Expr): 702 return ParsableSql.from_parsed_expression( 703 v, get_dialect(info.data), use_meta_sql=False 704 ) 705 if isinstance(v, list): 706 dialect = get_dialect(info.data) 707 return [ 708 ParsableSql(sql=s) 709 if isinstance(s, str) 710 else ParsableSql.from_parsed_expression(s, dialect, use_meta_sql=False) 711 if isinstance(s, exp.Expr) 712 else ParsableSql.parse_obj(s) 713 for s in v 714 ] 715 return ParsableSql.parse_obj(v) 716 717 return field_validator( 718 "query_", 719 "expressions_", 720 "pre_statements_", 721 "post_statements_", 722 "on_virtual_update_", 723 mode="before", 724 check_fields=False, 725 )(_validate_parsable_sql)
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
678 @classmethod 679 def from_parsed_expression( 680 cls, parsed_expression: exp.Expr, dialect: str, use_meta_sql: bool = False 681 ) -> ParsableSql: 682 sql = ( 683 parsed_expression.meta.get("sql") or parsed_expression.sql(dialect=dialect) 684 if use_meta_sql 685 else parsed_expression.sql(dialect=dialect) 686 ) 687 result = cls(sql=sql) 688 result._parsed = parsed_expression 689 result._parsed_dialect = dialect 690 return result
692 @classmethod 693 def validator(cls) -> classmethod: 694 def _validate_parsable_sql( 695 v: t.Any, info: ValidationInfo 696 ) -> t.Optional[t.Union[ParsableSql, t.List[ParsableSql]]]: 697 if v is None: 698 return v 699 if isinstance(v, str): 700 return ParsableSql(sql=v) 701 if isinstance(v, exp.Expr): 702 return ParsableSql.from_parsed_expression( 703 v, get_dialect(info.data), use_meta_sql=False 704 ) 705 if isinstance(v, list): 706 dialect = get_dialect(info.data) 707 return [ 708 ParsableSql(sql=s) 709 if isinstance(s, str) 710 else ParsableSql.from_parsed_expression(s, dialect, use_meta_sql=False) 711 if isinstance(s, exp.Expr) 712 else ParsableSql.parse_obj(s) 713 for s in v 714 ] 715 return ParsableSql.parse_obj(v) 716 717 return field_validator( 718 "query_", 719 "expressions_", 720 "pre_statements_", 721 "post_statements_", 722 "on_virtual_update_", 723 mode="before", 724 check_fields=False, 725 )(_validate_parsable_sql)
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs