Edit on GitHub

sqlmesh.core.model.common

  1from __future__ import annotations
  2
  3import ast
  4import typing as t
  5from pathlib import Path
  6
  7from astor import to_source
  8from difflib import get_close_matches
  9from sqlglot import exp
 10from sqlglot.helper import ensure_list
 11
 12from sqlmesh.core import constants as c
 13from sqlmesh.core import dialect as d
 14from sqlmesh.core.macros import MacroRegistry, MacroStrTemplate
 15from sqlmesh.utils import str_to_bool
 16from sqlmesh.utils.errors import ConfigError, SQLMeshError, raise_config_error
 17from sqlmesh.utils.metaprogramming import (
 18    Executable,
 19    SqlValue,
 20    build_env,
 21    prepare_env,
 22    serialize_env,
 23)
 24from sqlmesh.utils.pydantic import PydanticModel, ValidationInfo, field_validator, get_dialect
 25
 26if t.TYPE_CHECKING:
 27    from sqlglot.dialects.dialect import DialectType
 28    from sqlmesh.utils import registry_decorator
 29    from sqlmesh.utils.jinja import MacroReference
 30
 31    MacroCallable = t.Union[Executable, registry_decorator]
 32
 33
 34def make_python_env(
 35    expressions: t.Union[
 36        exp.Expr,
 37        t.List[t.Union[exp.Expr, t.Tuple[exp.Expr, bool]]],
 38    ],
 39    jinja_macro_references: t.Optional[t.Set[MacroReference]],
 40    module_path: Path,
 41    macros: MacroRegistry,
 42    variables: t.Optional[t.Dict[str, t.Any]] = None,
 43    referenced_variables: t.Optional[t.Set[str]] = None,
 44    path: t.Optional[Path] = None,
 45    python_env: t.Optional[t.Dict[str, Executable]] = None,
 46    strict_resolution: bool = True,
 47    blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
 48    dialect: DialectType = None,
 49) -> t.Dict[str, Executable]:
 50    python_env = {} if python_env is None else python_env
 51    env: t.Dict[str, t.Tuple[t.Any, t.Optional[bool]]] = {}
 52
 53    variables = variables or {}
 54    blueprint_variables = blueprint_variables or {}
 55
 56    used_macros: t.Dict[str, t.Tuple[MacroCallable, bool]] = {}
 57
 58    # var -> True: var is metadata-only
 59    # var -> False: var is not metadata-only
 60    # var -> None: cannot determine whether var is metadata-only yet, need to walk macros first
 61    used_variables: t.Dict[str, t.Optional[bool]] = dict.fromkeys(
 62        referenced_variables or set(), False
 63    )
 64
 65    # id(expr) -> true: expr appears under the AST of a metadata-only macro function
 66    # id(expr) -> false: expr appears under the AST of a macro function whose metadata status we don't yet know
 67    expr_under_metadata_macro_func: t.Dict[int, bool] = {}
 68
 69    # For @m1(@m2(@x), @y), we'd get x -> m1 and y -> m1
 70    outermost_macro_func_ancestor_by_var: t.Dict[str, str] = {}
 71    visited_macro_funcs: t.Set[int] = set()
 72
 73    def _is_metadata_var(
 74        name: str, expression: exp.Expr, appears_in_metadata_expression: bool
 75    ) -> t.Optional[bool]:
 76        is_metadata_so_far = used_variables.get(name, True)
 77        if is_metadata_so_far is False:
 78            # We've concluded this variable is definitely not metadata-only
 79            return False
 80
 81        appears_under_metadata_macro_func = expr_under_metadata_macro_func.get(id(expression))
 82        if is_metadata_so_far and (
 83            appears_in_metadata_expression or appears_under_metadata_macro_func
 84        ):
 85            # The variable appears in a metadata expression, e.g., audits (...),
 86            # or in the AST of metadata-only macro call, e.g., @FOO(@x)
 87            return True
 88
 89        # The variable appears in the AST of a macro call, but we don't know if it's metadata-only
 90        if appears_under_metadata_macro_func is False:
 91            return None
 92
 93        # The variable appears elsewhere, e.g., in the model's query: SELECT @x
 94        return False
 95
 96    def _is_metadata_macro(name: str, appears_in_metadata_expression: bool) -> bool:
 97        if name in used_macros:
 98            is_metadata_so_far = used_macros[name][1]
 99            return is_metadata_so_far and appears_in_metadata_expression
100
101        return appears_in_metadata_expression
102
103    expressions = ensure_list(expressions)
104    for expression_metadata in expressions:
105        if isinstance(expression_metadata, tuple):
106            expression, is_metadata = expression_metadata
107        else:
108            expression, is_metadata = expression_metadata, False
109
110        if isinstance(expression, d.Jinja):
111            continue
112
113        for macro_func_or_var in expression.find_all(d.MacroFunc, d.MacroVar, exp.Identifier):
114            if macro_func_or_var.__class__ is d.MacroFunc:
115                name = macro_func_or_var.this.name.lower()
116                if name not in macros:
117                    continue
118
119                used_macros[name] = (macros[name], _is_metadata_macro(name, is_metadata))
120
121                if name in (c.VAR, c.BLUEPRINT_VAR):
122                    args = macro_func_or_var.this.expressions
123                    if len(args) < 1:
124                        raise_config_error(
125                            f"Macro {name.upper()} requires at least one argument", path
126                        )
127
128                    if not args[0].is_string:
129                        raise_config_error(
130                            f"The variable name must be a string literal, '{args[0].sql()}' was given instead",
131                            path,
132                        )
133
134                    var_name = args[0].this.lower()
135                    used_variables[var_name] = _is_metadata_var(
136                        var_name, macro_func_or_var, is_metadata
137                    )
138                elif id(macro_func_or_var) not in visited_macro_funcs:
139                    # We only care about the top-level macro function calls to determine the metadata
140                    # status of the variables referenced in their ASTs. For example, in @m1(@m2(@x)),
141                    # if m1 is metadata-only but m2 is not, we can still determine that @x only affects
142                    # the metadata hash, since m2's result feeds into a metadata-only macro function.
143                    #
144                    # Generally, if the top-level call is known to be metadata-only or appear in a
145                    # metadata expression, then we can avoid traversing nested macro function calls.
146
147                    var_refs, _expr_under_metadata_macro_func, _visited_macro_funcs = (
148                        _extract_macro_func_variable_references(macro_func_or_var, is_metadata)
149                    )
150                    expr_under_metadata_macro_func.update(_expr_under_metadata_macro_func)
151                    visited_macro_funcs.update(_visited_macro_funcs)
152                    outermost_macro_func_ancestor_by_var |= {var_ref: name for var_ref in var_refs}
153            elif macro_func_or_var.__class__ is d.MacroVar:
154                var_name = macro_func_or_var.name.lower()
155                if var_name in macros:
156                    used_macros[var_name] = (
157                        macros[var_name],
158                        _is_metadata_macro(var_name, is_metadata),
159                    )
160                elif var_name in variables or var_name in blueprint_variables:
161                    used_variables[var_name] = _is_metadata_var(
162                        var_name, macro_func_or_var, is_metadata
163                    )
164            elif (
165                isinstance(macro_func_or_var, (exp.Identifier, d.MacroStrReplace, d.MacroSQL))
166            ) and "@" in macro_func_or_var.name:
167                for _, identifier, braced_identifier, _ in MacroStrTemplate.pattern.findall(
168                    macro_func_or_var.name
169                ):
170                    var_name = braced_identifier or identifier
171                    if var_name in variables or var_name in blueprint_variables:
172                        used_variables[var_name] = _is_metadata_var(
173                            var_name, macro_func_or_var, is_metadata
174                        )
175
176    for macro_ref in jinja_macro_references or set():
177        if macro_ref.package is None and macro_ref.name in macros:
178            used_macros[macro_ref.name] = (macros[macro_ref.name], False)
179
180    for name, (used_macro, is_metadata) in used_macros.items():
181        if isinstance(used_macro, Executable):
182            python_env[name] = used_macro
183        elif not hasattr(used_macro, c.SQLMESH_BUILTIN) and name not in python_env:
184            build_env(
185                used_macro.func,
186                env=env,
187                name=name,
188                path=module_path,
189                is_metadata_obj=is_metadata,
190            )
191
192    python_env.update(serialize_env(env, path=module_path))
193    return _add_variables_to_python_env(
194        python_env,
195        used_variables,
196        variables,
197        blueprint_variables=blueprint_variables,
198        dialect=dialect,
199        strict_resolution=strict_resolution,
200        outermost_macro_func_ancestor_by_var=outermost_macro_func_ancestor_by_var,
201    )
202
203
204def _extract_macro_func_variable_references(
205    macro_func: exp.Expr,
206    is_metadata: bool,
207) -> t.Tuple[t.Set[str], t.Dict[int, bool], t.Set[int]]:
208    var_references = set()
209    visited_macro_funcs = set()
210    expr_under_metadata_macro_func = {}
211
212    for n in macro_func.walk():
213        if type(n) is d.MacroFunc:
214            visited_macro_funcs.add(id(n))
215
216            this = n.this
217            args = this.expressions
218
219            if this.name.lower() in (c.VAR, c.BLUEPRINT_VAR) and args and args[0].is_string:
220                var_references.add(args[0].this.lower())
221                expr_under_metadata_macro_func[id(n)] = is_metadata
222        elif isinstance(n, d.MacroVar):
223            var_references.add(n.name.lower())
224            expr_under_metadata_macro_func[id(n)] = is_metadata
225        elif isinstance(n, (exp.Identifier, d.MacroStrReplace, d.MacroSQL)) and "@" in n.name:
226            var_references.update(
227                (braced_identifier or identifier).lower()
228                for _, identifier, braced_identifier, _ in MacroStrTemplate.pattern.findall(n.name)
229            )
230            expr_under_metadata_macro_func[id(n)] = is_metadata
231
232    return (var_references, expr_under_metadata_macro_func, visited_macro_funcs)
233
234
235def _add_variables_to_python_env(
236    python_env: t.Dict[str, Executable],
237    used_variables: t.Dict[str, t.Optional[bool]],
238    variables: t.Optional[t.Dict[str, t.Any]],
239    strict_resolution: bool = True,
240    blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
241    dialect: DialectType = None,
242    outermost_macro_func_ancestor_by_var: t.Optional[t.Dict[str, str]] = None,
243) -> t.Dict[str, Executable]:
244    _, python_used_variables = parse_dependencies(
245        python_env,
246        None,
247        strict_resolution=strict_resolution,
248        variables=variables,
249        blueprint_variables=blueprint_variables,
250    )
251    for var_name, is_metadata in python_used_variables.items():
252        used_variables[var_name] = is_metadata and used_variables.get(var_name, True)
253
254    # Variables are treated as metadata-only when all of their references either:
255    # - appear in metadata-only expressions, such as `audits (...)`, virtual statements, etc
256    # - appear in the ASTs or definitions of metadata-only macros
257    #
258    # See also: https://github.com/SQLMesh/sqlmesh/pull/4936#issuecomment-3136339936,
259    # specifically the "Terminology" and "Observations" section.
260    metadata_used_variables = {
261        var_name for var_name, is_metadata in used_variables.items() if is_metadata
262    }
263    for used_var, outermost_macro_func in (outermost_macro_func_ancestor_by_var or {}).items():
264        used_var_is_metadata = used_variables.get(used_var)
265        if used_var_is_metadata is False:
266            continue
267
268        # At this point we can decide whether a variable reference in a macro call's AST is
269        # metadata-only, because we've annotated the corresponding macro call in the python env.
270        if outermost_macro_func in python_env and python_env[outermost_macro_func].is_metadata:
271            metadata_used_variables.add(used_var)
272
273    non_metadata_used_variables = set(used_variables) - metadata_used_variables
274
275    if overlapping_variables := (non_metadata_used_variables & metadata_used_variables):
276        raise ConfigError(
277            f"Variables {', '.join(overlapping_variables)} are both metadata and non-metadata, "
278            "which is unexpected. Please file an issue at https://github.com/SQLMesh/sqlmesh/issues/new."
279        )
280
281    metadata_variables = {
282        k: v for k, v in (variables or {}).items() if k in metadata_used_variables
283    }
284    variables = {k: v for k, v in (variables or {}).items() if k in non_metadata_used_variables}
285
286    if variables:
287        python_env[c.SQLMESH_VARS] = Executable.value(variables, sort_root_dict=True)
288    if metadata_variables:
289        python_env[c.SQLMESH_VARS_METADATA] = Executable.value(
290            metadata_variables, sort_root_dict=True, is_metadata=True
291        )
292
293    if blueprint_variables:
294        metadata_blueprint_variables = {
295            k: SqlValue(sql=v.sql(dialect=dialect)) if isinstance(v, exp.Expr) else v
296            for k, v in blueprint_variables.items()
297            if k in metadata_used_variables
298        }
299        blueprint_variables = {
300            k.lower(): SqlValue(sql=v.sql(dialect=dialect)) if isinstance(v, exp.Expr) else v
301            for k, v in blueprint_variables.items()
302            if k in non_metadata_used_variables
303        }
304        if blueprint_variables:
305            python_env[c.SQLMESH_BLUEPRINT_VARS] = Executable.value(
306                blueprint_variables, sort_root_dict=True
307            )
308        if metadata_blueprint_variables:
309            python_env[c.SQLMESH_BLUEPRINT_VARS_METADATA] = Executable.value(
310                metadata_blueprint_variables, sort_root_dict=True, is_metadata=True
311            )
312
313    return python_env
314
315
316def parse_dependencies(
317    python_env: t.Dict[str, Executable],
318    entrypoint: t.Optional[str],
319    strict_resolution: bool = True,
320    variables: t.Optional[t.Dict[str, t.Any]] = None,
321    blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
322) -> t.Tuple[t.Set[str], t.Dict[str, bool]]:
323    """
324    Parses the source of a model function and finds upstream table dependencies
325    and referenced variables based on calls to context / evaluator.
326
327    Args:
328        python_env: A dictionary of Python definitions.
329        entrypoint: The name of the function.
330        strict_resolution: If true, the arguments of `table` and `resolve_table` calls must
331            be resolvable at parse time, otherwise an exception will be raised.
332        variables: The variables available to the python environment.
333        blueprint_variables: The blueprint variables available to the python environment.
334
335    Returns:
336        A tuple containing the set of upstream table dependencies and a mapping of
337        the referenced variables associated with their metadata status.
338    """
339
340    class VariableResolutionContext:
341        """This enables calls like `resolve_table` to reference `var()` and `blueprint_var()`."""
342
343        @staticmethod
344        def var(var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
345            return (variables or {}).get(var_name.lower(), default)
346
347        @staticmethod
348        def blueprint_var(var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
349            return (blueprint_variables or {}).get(var_name.lower(), default)
350
351    env = prepare_env(python_env)
352    local_env = dict.fromkeys(("context", "evaluator"), VariableResolutionContext)
353
354    depends_on = set()
355    used_variables: t.Dict[str, bool] = {}
356
357    for executable in python_env.values():
358        if not executable.is_definition:
359            continue
360
361        is_metadata = executable.is_metadata
362        for node in ast.walk(ast.parse(executable.payload)):
363            next_variables = set()
364
365            if isinstance(node, ast.Call):
366                func = node.func
367                if not isinstance(func, ast.Attribute) or not isinstance(func.value, ast.Name):
368                    continue
369
370                def get_first_arg(keyword_arg_name: str) -> t.Any:
371                    if node.args:
372                        first_arg: t.Optional[ast.expr] = node.args[0]
373                    else:
374                        first_arg = next(
375                            (
376                                keyword.value
377                                for keyword in node.keywords
378                                if keyword.arg == keyword_arg_name
379                            ),
380                            None,
381                        )
382
383                    try:
384                        expression = to_source(first_arg)
385                        return eval(expression, env, local_env)
386                    except Exception:
387                        if strict_resolution:
388                            raise ConfigError(
389                                f"Error resolving dependencies for '{executable.path}'. "
390                                f"Argument '{expression.strip()}' must be resolvable at parse time."
391                            )
392
393                if func.value.id == "context" and func.attr in ("table", "resolve_table"):
394                    depends_on.add(get_first_arg("model_name"))
395                elif func.value.id in ("context", "evaluator") and func.attr in (
396                    c.VAR,
397                    c.BLUEPRINT_VAR,
398                ):
399                    next_variables.add(get_first_arg("var_name").lower())
400            elif (
401                isinstance(node, ast.Attribute)
402                and isinstance(node.value, ast.Name)
403                and node.value.id in ("context", "evaluator")
404                and node.attr == c.GATEWAY
405            ):
406                # Check whether the gateway attribute is referenced.
407                next_variables.add(c.GATEWAY)
408            elif isinstance(node, ast.FunctionDef) and node.name == entrypoint:
409                next_variables.update(
410                    [
411                        arg.arg
412                        for arg in [*node.args.args, *node.args.kwonlyargs]
413                        if arg.arg != "context"
414                    ]
415                )
416
417            for var_name in next_variables:
418                used_variables[var_name] = used_variables.get(var_name, True) and bool(is_metadata)
419
420    return depends_on, used_variables
421
422
423def validate_extra_and_required_fields(
424    klass: t.Type[PydanticModel],
425    provided_fields: t.Set[str],
426    entity_name: str,
427    path: t.Optional[Path] = None,
428) -> None:
429    missing_required_fields = klass.missing_required_fields(provided_fields)
430    if missing_required_fields:
431        field_names = "'" + "', '".join(missing_required_fields) + "'"
432        raise_config_error(
433            f"Please add required field{'s' if len(missing_required_fields) > 1 else ''} {field_names} to the {entity_name}.",
434            path,
435        )
436
437    extra_fields = klass.extra_fields(provided_fields)
438    if extra_fields:
439        extra_field_names = "'" + "', '".join(extra_fields) + "'"
440
441        all_fields = klass.all_fields()
442        close_matches = {}
443        for field in extra_fields:
444            matches = get_close_matches(field, all_fields, n=1)
445            if matches:
446                close_matches[field] = matches[0]
447
448        if len(close_matches) == 1:
449            similar_msg = ". Did you mean " + "'" + "', '".join(close_matches.values()) + "'?"
450        else:
451            similar = [
452                f"- {field}: Did you mean '{match}'?" for field, match in close_matches.items()
453            ]
454            similar_msg = "\n\n  " + "\n  ".join(similar) if similar else ""
455
456        raise_config_error(
457            f"Invalid field name{'s' if len(extra_fields) > 1 else ''} present in the {entity_name}: {extra_field_names}{similar_msg}",
458            path,
459        )
460
461
462def single_value_or_tuple(values: t.Sequence) -> exp.Identifier | exp.Tuple:
463    return (
464        exp.to_identifier(values[0])
465        if len(values) == 1
466        else exp.Tuple(expressions=[exp.to_identifier(v) for v in values])
467    )
468
469
470def parse_expression(
471    cls: t.Type,
472    v: t.Union[t.List[str], t.List[exp.Expr], str, exp.Expr, t.Callable, None],
473    info: t.Optional[ValidationInfo],
474) -> t.List[exp.Expr] | exp.Expr | t.Callable | None:
475    """Helper method to deserialize SQLGlot expressions in Pydantic Models."""
476    if v is None:
477        return None
478
479    if callable(v):
480        return v
481
482    dialect = info.data.get("dialect") if info else ""
483
484    if isinstance(v, list):
485        return [
486            e if isinstance(e, exp.Expr) else d.parse_one(e, dialect=dialect)  # type: ignore[misc]
487            for e in v
488            if not isinstance(e, exp.Semicolon)
489        ]
490
491    if isinstance(v, str):
492        return d.parse_one(v, dialect=dialect)
493
494    if not v:
495        raise ConfigError(f"Could not parse {v}")
496
497    return v
498
499
500def parse_bool(v: t.Any) -> bool:
501    if isinstance(v, exp.Expr):
502        if not isinstance(v, exp.Boolean):
503            from sqlglot.optimizer.simplify import simplify
504
505            # Try to reduce expressions like (1 = 1) (see: T-SQL boolean generation)
506            v = simplify(v)
507
508        if isinstance(v, exp.Boolean):
509            return v.this
510
511        return str_to_bool(v.name)
512
513    return str_to_bool(str(v or ""))
514
515
516def parse_properties(
517    cls: t.Type, v: t.Any, info: t.Optional[ValidationInfo]
518) -> t.Optional[exp.Tuple]:
519    if v is None:
520        return v
521
522    dialect = info.data.get("dialect") if info else ""
523
524    if isinstance(v, str):
525        v = d.parse_one(v, dialect=dialect)
526    if isinstance(v, (exp.Array, exp.Paren, exp.Tuple)):
527        eq_expressions: t.List[exp.Expr] = (
528            [v.unnest()] if isinstance(v, exp.Paren) else v.expressions
529        )
530
531        for eq_expr in eq_expressions:
532            if not isinstance(eq_expr, exp.EQ):
533                raise ConfigError(
534                    f"Invalid property '{eq_expr.sql(dialect=dialect)}'. "
535                    "Properties must be specified as key-value pairs <key> = <value>. "
536                )
537
538        properties = (
539            exp.Tuple(expressions=eq_expressions) if isinstance(v, (exp.Paren, exp.Array)) else v
540        )
541    elif isinstance(v, dict):
542        properties = exp.Tuple(
543            expressions=[exp.Literal.string(key).eq(value) for key, value in v.items()]
544        )
545    else:
546        raise SQLMeshError(f"Unexpected properties '{v}'")
547
548    properties.meta["dialect"] = dialect
549    return properties
550
551
552def default_catalog(cls: t.Type, v: t.Any) -> t.Optional[str]:
553    if v is None:
554        return None
555    # If v is an expression then we will return expression as sql without a dialect
556    return str(v)
557
558
559def depends_on(cls: t.Type, v: t.Any, info: ValidationInfo) -> t.Optional[t.Set[str]]:
560    dialect = info.data.get("dialect")
561    default_catalog = info.data.get("default_catalog")
562
563    if isinstance(v, exp.Paren):
564        v = v.unnest()
565
566    if isinstance(v, (exp.Array, exp.Tuple)):
567        return {
568            d.normalize_model_name(
569                table.name if table.is_string else table,
570                default_catalog=default_catalog,
571                dialect=dialect,
572            )
573            for table in v.expressions
574        }
575    if isinstance(v, (exp.Table, exp.Column)):
576        return {d.normalize_model_name(v, default_catalog=default_catalog, dialect=dialect)}
577    if hasattr(v, "__iter__") and not isinstance(v, str):
578        return {
579            d.normalize_model_name(name, default_catalog=default_catalog, dialect=dialect)
580            for name in v
581        }
582
583    return v
584
585
586def sort_python_env(python_env: t.Dict[str, Executable]) -> t.List[t.Tuple[str, Executable]]:
587    """Returns the python env sorted."""
588    return sorted(python_env.items(), key=lambda x: (x[1].kind, x[0]))
589
590
591def sorted_python_env_payloads(python_env: t.Dict[str, Executable]) -> t.List[str]:
592    """Returns the payloads of the sorted python env."""
593
594    def _executable_to_str(k: str, v: Executable) -> str:
595        result = f"# {v.path}\n" if v.path is not None else ""
596        if v.is_import or v.is_definition:
597            result += v.payload
598        else:
599            result += f"{k} = {v.payload}"
600        return result
601
602    return [_executable_to_str(k, v) for k, v in sort_python_env(python_env)]
603
604
605def parse_strings_with_macro_refs(value: t.Any, dialect: DialectType) -> t.Any:
606    if isinstance(value, str) and "@" in value:
607        return exp.maybe_parse(value, dialect=dialect)
608
609    if isinstance(value, dict):
610        for k, v in dict(value).items():
611            value[k] = parse_strings_with_macro_refs(v, dialect)
612    elif isinstance(value, list):
613        value = [parse_strings_with_macro_refs(v, dialect) for v in value]
614
615    return value
616
617
618expression_validator: t.Callable = field_validator(
619    "unique_key",
620    mode="before",
621    check_fields=False,
622)(parse_expression)
623
624
625bool_validator: t.Callable = field_validator(
626    "skip",
627    "blocking",
628    "forward_only",
629    "disable_restatement",
630    "insert_overwrite",
631    "allow_partials",
632    "enabled",
633    "optimize_query",
634    "formatting",
635    mode="before",
636    check_fields=False,
637)(parse_bool)
638
639
640properties_validator: t.Callable = field_validator(
641    "physical_properties_",
642    "virtual_properties_",
643    "materialization_properties_",
644    "grants_",
645    mode="before",
646    check_fields=False,
647)(parse_properties)
648
649
650default_catalog_validator: t.Callable = field_validator(
651    "default_catalog",
652    mode="before",
653    check_fields=False,
654)(default_catalog)
655
656
657depends_on_validator: t.Callable = field_validator(
658    "depends_on_",
659    mode="before",
660    check_fields=False,
661)(depends_on)
662
663
664class ParsableSql(PydanticModel):
665    sql: str
666    transaction: t.Optional[bool] = None
667
668    _parsed: t.Optional[exp.Expr] = None
669    _parsed_dialect: t.Optional[str] = None
670
671    def parse(self, dialect: str) -> exp.Expr:
672        if self._parsed is None or self._parsed_dialect != dialect:
673            self._parsed = d.parse_one(self.sql, dialect=dialect)
674            self._parsed_dialect = dialect
675        return self._parsed  # type: ignore[return-value]
676
677    @classmethod
678    def from_parsed_expression(
679        cls, parsed_expression: exp.Expr, dialect: str, use_meta_sql: bool = False
680    ) -> ParsableSql:
681        sql = (
682            parsed_expression.meta.get("sql") or parsed_expression.sql(dialect=dialect)
683            if use_meta_sql
684            else parsed_expression.sql(dialect=dialect)
685        )
686        result = cls(sql=sql)
687        result._parsed = parsed_expression
688        result._parsed_dialect = dialect
689        return result
690
691    @classmethod
692    def validator(cls) -> classmethod:
693        def _validate_parsable_sql(
694            v: t.Any, info: ValidationInfo
695        ) -> t.Optional[t.Union[ParsableSql, t.List[ParsableSql]]]:
696            if v is None:
697                return v
698            if isinstance(v, str):
699                return ParsableSql(sql=v)
700            if isinstance(v, exp.Expr):
701                return ParsableSql.from_parsed_expression(
702                    v, get_dialect(info.data), use_meta_sql=False
703                )
704            if isinstance(v, list):
705                dialect = get_dialect(info.data)
706                return [
707                    ParsableSql(sql=s)
708                    if isinstance(s, str)
709                    else ParsableSql.from_parsed_expression(s, dialect, use_meta_sql=False)
710                    if isinstance(s, exp.Expr)
711                    else ParsableSql.parse_obj(s)
712                    for s in v
713                ]
714            return ParsableSql.parse_obj(v)
715
716        return field_validator(
717            "query_",
718            "expressions_",
719            "pre_statements_",
720            "post_statements_",
721            "on_virtual_update_",
722            mode="before",
723            check_fields=False,
724        )(_validate_parsable_sql)
def make_python_env( expressions: Union[sqlglot.expressions.core.Expr, List[Union[sqlglot.expressions.core.Expr, Tuple[sqlglot.expressions.core.Expr, bool]]]], jinja_macro_references: Optional[Set[sqlmesh.utils.jinja.MacroReference]], module_path: pathlib.Path, macros: sqlmesh.utils.UniqueKeyDict[str, typing.Union[sqlmesh.utils.metaprogramming.Executable, sqlmesh.core.macros.macro]], variables: Optional[Dict[str, Any]] = None, referenced_variables: Optional[Set[str]] = None, path: Optional[pathlib.Path] = None, python_env: Optional[Dict[str, sqlmesh.utils.metaprogramming.Executable]] = None, strict_resolution: bool = True, blueprint_variables: Optional[Dict[str, Any]] = None, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> Dict[str, sqlmesh.utils.metaprogramming.Executable]:
 35def make_python_env(
 36    expressions: t.Union[
 37        exp.Expr,
 38        t.List[t.Union[exp.Expr, t.Tuple[exp.Expr, bool]]],
 39    ],
 40    jinja_macro_references: t.Optional[t.Set[MacroReference]],
 41    module_path: Path,
 42    macros: MacroRegistry,
 43    variables: t.Optional[t.Dict[str, t.Any]] = None,
 44    referenced_variables: t.Optional[t.Set[str]] = None,
 45    path: t.Optional[Path] = None,
 46    python_env: t.Optional[t.Dict[str, Executable]] = None,
 47    strict_resolution: bool = True,
 48    blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
 49    dialect: DialectType = None,
 50) -> t.Dict[str, Executable]:
 51    python_env = {} if python_env is None else python_env
 52    env: t.Dict[str, t.Tuple[t.Any, t.Optional[bool]]] = {}
 53
 54    variables = variables or {}
 55    blueprint_variables = blueprint_variables or {}
 56
 57    used_macros: t.Dict[str, t.Tuple[MacroCallable, bool]] = {}
 58
 59    # var -> True: var is metadata-only
 60    # var -> False: var is not metadata-only
 61    # var -> None: cannot determine whether var is metadata-only yet, need to walk macros first
 62    used_variables: t.Dict[str, t.Optional[bool]] = dict.fromkeys(
 63        referenced_variables or set(), False
 64    )
 65
 66    # id(expr) -> true: expr appears under the AST of a metadata-only macro function
 67    # id(expr) -> false: expr appears under the AST of a macro function whose metadata status we don't yet know
 68    expr_under_metadata_macro_func: t.Dict[int, bool] = {}
 69
 70    # For @m1(@m2(@x), @y), we'd get x -> m1 and y -> m1
 71    outermost_macro_func_ancestor_by_var: t.Dict[str, str] = {}
 72    visited_macro_funcs: t.Set[int] = set()
 73
 74    def _is_metadata_var(
 75        name: str, expression: exp.Expr, appears_in_metadata_expression: bool
 76    ) -> t.Optional[bool]:
 77        is_metadata_so_far = used_variables.get(name, True)
 78        if is_metadata_so_far is False:
 79            # We've concluded this variable is definitely not metadata-only
 80            return False
 81
 82        appears_under_metadata_macro_func = expr_under_metadata_macro_func.get(id(expression))
 83        if is_metadata_so_far and (
 84            appears_in_metadata_expression or appears_under_metadata_macro_func
 85        ):
 86            # The variable appears in a metadata expression, e.g., audits (...),
 87            # or in the AST of metadata-only macro call, e.g., @FOO(@x)
 88            return True
 89
 90        # The variable appears in the AST of a macro call, but we don't know if it's metadata-only
 91        if appears_under_metadata_macro_func is False:
 92            return None
 93
 94        # The variable appears elsewhere, e.g., in the model's query: SELECT @x
 95        return False
 96
 97    def _is_metadata_macro(name: str, appears_in_metadata_expression: bool) -> bool:
 98        if name in used_macros:
 99            is_metadata_so_far = used_macros[name][1]
100            return is_metadata_so_far and appears_in_metadata_expression
101
102        return appears_in_metadata_expression
103
104    expressions = ensure_list(expressions)
105    for expression_metadata in expressions:
106        if isinstance(expression_metadata, tuple):
107            expression, is_metadata = expression_metadata
108        else:
109            expression, is_metadata = expression_metadata, False
110
111        if isinstance(expression, d.Jinja):
112            continue
113
114        for macro_func_or_var in expression.find_all(d.MacroFunc, d.MacroVar, exp.Identifier):
115            if macro_func_or_var.__class__ is d.MacroFunc:
116                name = macro_func_or_var.this.name.lower()
117                if name not in macros:
118                    continue
119
120                used_macros[name] = (macros[name], _is_metadata_macro(name, is_metadata))
121
122                if name in (c.VAR, c.BLUEPRINT_VAR):
123                    args = macro_func_or_var.this.expressions
124                    if len(args) < 1:
125                        raise_config_error(
126                            f"Macro {name.upper()} requires at least one argument", path
127                        )
128
129                    if not args[0].is_string:
130                        raise_config_error(
131                            f"The variable name must be a string literal, '{args[0].sql()}' was given instead",
132                            path,
133                        )
134
135                    var_name = args[0].this.lower()
136                    used_variables[var_name] = _is_metadata_var(
137                        var_name, macro_func_or_var, is_metadata
138                    )
139                elif id(macro_func_or_var) not in visited_macro_funcs:
140                    # We only care about the top-level macro function calls to determine the metadata
141                    # status of the variables referenced in their ASTs. For example, in @m1(@m2(@x)),
142                    # if m1 is metadata-only but m2 is not, we can still determine that @x only affects
143                    # the metadata hash, since m2's result feeds into a metadata-only macro function.
144                    #
145                    # Generally, if the top-level call is known to be metadata-only or appear in a
146                    # metadata expression, then we can avoid traversing nested macro function calls.
147
148                    var_refs, _expr_under_metadata_macro_func, _visited_macro_funcs = (
149                        _extract_macro_func_variable_references(macro_func_or_var, is_metadata)
150                    )
151                    expr_under_metadata_macro_func.update(_expr_under_metadata_macro_func)
152                    visited_macro_funcs.update(_visited_macro_funcs)
153                    outermost_macro_func_ancestor_by_var |= {var_ref: name for var_ref in var_refs}
154            elif macro_func_or_var.__class__ is d.MacroVar:
155                var_name = macro_func_or_var.name.lower()
156                if var_name in macros:
157                    used_macros[var_name] = (
158                        macros[var_name],
159                        _is_metadata_macro(var_name, is_metadata),
160                    )
161                elif var_name in variables or var_name in blueprint_variables:
162                    used_variables[var_name] = _is_metadata_var(
163                        var_name, macro_func_or_var, is_metadata
164                    )
165            elif (
166                isinstance(macro_func_or_var, (exp.Identifier, d.MacroStrReplace, d.MacroSQL))
167            ) and "@" in macro_func_or_var.name:
168                for _, identifier, braced_identifier, _ in MacroStrTemplate.pattern.findall(
169                    macro_func_or_var.name
170                ):
171                    var_name = braced_identifier or identifier
172                    if var_name in variables or var_name in blueprint_variables:
173                        used_variables[var_name] = _is_metadata_var(
174                            var_name, macro_func_or_var, is_metadata
175                        )
176
177    for macro_ref in jinja_macro_references or set():
178        if macro_ref.package is None and macro_ref.name in macros:
179            used_macros[macro_ref.name] = (macros[macro_ref.name], False)
180
181    for name, (used_macro, is_metadata) in used_macros.items():
182        if isinstance(used_macro, Executable):
183            python_env[name] = used_macro
184        elif not hasattr(used_macro, c.SQLMESH_BUILTIN) and name not in python_env:
185            build_env(
186                used_macro.func,
187                env=env,
188                name=name,
189                path=module_path,
190                is_metadata_obj=is_metadata,
191            )
192
193    python_env.update(serialize_env(env, path=module_path))
194    return _add_variables_to_python_env(
195        python_env,
196        used_variables,
197        variables,
198        blueprint_variables=blueprint_variables,
199        dialect=dialect,
200        strict_resolution=strict_resolution,
201        outermost_macro_func_ancestor_by_var=outermost_macro_func_ancestor_by_var,
202    )
def parse_dependencies( python_env: Dict[str, sqlmesh.utils.metaprogramming.Executable], entrypoint: Optional[str], strict_resolution: bool = True, variables: Optional[Dict[str, Any]] = None, blueprint_variables: Optional[Dict[str, Any]] = None) -> Tuple[Set[str], Dict[str, bool]]:
317def parse_dependencies(
318    python_env: t.Dict[str, Executable],
319    entrypoint: t.Optional[str],
320    strict_resolution: bool = True,
321    variables: t.Optional[t.Dict[str, t.Any]] = None,
322    blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
323) -> t.Tuple[t.Set[str], t.Dict[str, bool]]:
324    """
325    Parses the source of a model function and finds upstream table dependencies
326    and referenced variables based on calls to context / evaluator.
327
328    Args:
329        python_env: A dictionary of Python definitions.
330        entrypoint: The name of the function.
331        strict_resolution: If true, the arguments of `table` and `resolve_table` calls must
332            be resolvable at parse time, otherwise an exception will be raised.
333        variables: The variables available to the python environment.
334        blueprint_variables: The blueprint variables available to the python environment.
335
336    Returns:
337        A tuple containing the set of upstream table dependencies and a mapping of
338        the referenced variables associated with their metadata status.
339    """
340
341    class VariableResolutionContext:
342        """This enables calls like `resolve_table` to reference `var()` and `blueprint_var()`."""
343
344        @staticmethod
345        def var(var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
346            return (variables or {}).get(var_name.lower(), default)
347
348        @staticmethod
349        def blueprint_var(var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
350            return (blueprint_variables or {}).get(var_name.lower(), default)
351
352    env = prepare_env(python_env)
353    local_env = dict.fromkeys(("context", "evaluator"), VariableResolutionContext)
354
355    depends_on = set()
356    used_variables: t.Dict[str, bool] = {}
357
358    for executable in python_env.values():
359        if not executable.is_definition:
360            continue
361
362        is_metadata = executable.is_metadata
363        for node in ast.walk(ast.parse(executable.payload)):
364            next_variables = set()
365
366            if isinstance(node, ast.Call):
367                func = node.func
368                if not isinstance(func, ast.Attribute) or not isinstance(func.value, ast.Name):
369                    continue
370
371                def get_first_arg(keyword_arg_name: str) -> t.Any:
372                    if node.args:
373                        first_arg: t.Optional[ast.expr] = node.args[0]
374                    else:
375                        first_arg = next(
376                            (
377                                keyword.value
378                                for keyword in node.keywords
379                                if keyword.arg == keyword_arg_name
380                            ),
381                            None,
382                        )
383
384                    try:
385                        expression = to_source(first_arg)
386                        return eval(expression, env, local_env)
387                    except Exception:
388                        if strict_resolution:
389                            raise ConfigError(
390                                f"Error resolving dependencies for '{executable.path}'. "
391                                f"Argument '{expression.strip()}' must be resolvable at parse time."
392                            )
393
394                if func.value.id == "context" and func.attr in ("table", "resolve_table"):
395                    depends_on.add(get_first_arg("model_name"))
396                elif func.value.id in ("context", "evaluator") and func.attr in (
397                    c.VAR,
398                    c.BLUEPRINT_VAR,
399                ):
400                    next_variables.add(get_first_arg("var_name").lower())
401            elif (
402                isinstance(node, ast.Attribute)
403                and isinstance(node.value, ast.Name)
404                and node.value.id in ("context", "evaluator")
405                and node.attr == c.GATEWAY
406            ):
407                # Check whether the gateway attribute is referenced.
408                next_variables.add(c.GATEWAY)
409            elif isinstance(node, ast.FunctionDef) and node.name == entrypoint:
410                next_variables.update(
411                    [
412                        arg.arg
413                        for arg in [*node.args.args, *node.args.kwonlyargs]
414                        if arg.arg != "context"
415                    ]
416                )
417
418            for var_name in next_variables:
419                used_variables[var_name] = used_variables.get(var_name, True) and bool(is_metadata)
420
421    return depends_on, used_variables

Parses the source of a model function and finds upstream table dependencies and referenced variables based on calls to context / evaluator.

Arguments:
  • python_env: A dictionary of Python definitions.
  • entrypoint: The name of the function.
  • strict_resolution: If true, the arguments of table and resolve_table calls must be resolvable at parse time, otherwise an exception will be raised.
  • variables: The variables available to the python environment.
  • blueprint_variables: The blueprint variables available to the python environment.
Returns:

A tuple containing the set of upstream table dependencies and a mapping of the referenced variables associated with their metadata status.

def validate_extra_and_required_fields( klass: Type[sqlmesh.utils.pydantic.PydanticModel], provided_fields: Set[str], entity_name: str, path: Optional[pathlib.Path] = None) -> None:
424def validate_extra_and_required_fields(
425    klass: t.Type[PydanticModel],
426    provided_fields: t.Set[str],
427    entity_name: str,
428    path: t.Optional[Path] = None,
429) -> None:
430    missing_required_fields = klass.missing_required_fields(provided_fields)
431    if missing_required_fields:
432        field_names = "'" + "', '".join(missing_required_fields) + "'"
433        raise_config_error(
434            f"Please add required field{'s' if len(missing_required_fields) > 1 else ''} {field_names} to the {entity_name}.",
435            path,
436        )
437
438    extra_fields = klass.extra_fields(provided_fields)
439    if extra_fields:
440        extra_field_names = "'" + "', '".join(extra_fields) + "'"
441
442        all_fields = klass.all_fields()
443        close_matches = {}
444        for field in extra_fields:
445            matches = get_close_matches(field, all_fields, n=1)
446            if matches:
447                close_matches[field] = matches[0]
448
449        if len(close_matches) == 1:
450            similar_msg = ". Did you mean " + "'" + "', '".join(close_matches.values()) + "'?"
451        else:
452            similar = [
453                f"- {field}: Did you mean '{match}'?" for field, match in close_matches.items()
454            ]
455            similar_msg = "\n\n  " + "\n  ".join(similar) if similar else ""
456
457        raise_config_error(
458            f"Invalid field name{'s' if len(extra_fields) > 1 else ''} present in the {entity_name}: {extra_field_names}{similar_msg}",
459            path,
460        )
def single_value_or_tuple( values: Sequence) -> sqlglot.expressions.core.Identifier | sqlglot.expressions.query.Tuple:
463def single_value_or_tuple(values: t.Sequence) -> exp.Identifier | exp.Tuple:
464    return (
465        exp.to_identifier(values[0])
466        if len(values) == 1
467        else exp.Tuple(expressions=[exp.to_identifier(v) for v in values])
468    )
def parse_expression( cls: Type, v: Union[List[str], List[sqlglot.expressions.core.Expr], str, sqlglot.expressions.core.Expr, Callable, NoneType], info: Optional[pydantic_core.core_schema.ValidationInfo]) -> Union[List[sqlglot.expressions.core.Expr], sqlglot.expressions.core.Expr, Callable, NoneType]:
471def parse_expression(
472    cls: t.Type,
473    v: t.Union[t.List[str], t.List[exp.Expr], str, exp.Expr, t.Callable, None],
474    info: t.Optional[ValidationInfo],
475) -> t.List[exp.Expr] | exp.Expr | t.Callable | None:
476    """Helper method to deserialize SQLGlot expressions in Pydantic Models."""
477    if v is None:
478        return None
479
480    if callable(v):
481        return v
482
483    dialect = info.data.get("dialect") if info else ""
484
485    if isinstance(v, list):
486        return [
487            e if isinstance(e, exp.Expr) else d.parse_one(e, dialect=dialect)  # type: ignore[misc]
488            for e in v
489            if not isinstance(e, exp.Semicolon)
490        ]
491
492    if isinstance(v, str):
493        return d.parse_one(v, dialect=dialect)
494
495    if not v:
496        raise ConfigError(f"Could not parse {v}")
497
498    return v

Helper method to deserialize SQLGlot expressions in Pydantic Models.

def parse_bool(v: Any) -> bool:
501def parse_bool(v: t.Any) -> bool:
502    if isinstance(v, exp.Expr):
503        if not isinstance(v, exp.Boolean):
504            from sqlglot.optimizer.simplify import simplify
505
506            # Try to reduce expressions like (1 = 1) (see: T-SQL boolean generation)
507            v = simplify(v)
508
509        if isinstance(v, exp.Boolean):
510            return v.this
511
512        return str_to_bool(v.name)
513
514    return str_to_bool(str(v or ""))
def parse_properties( cls: Type, v: Any, info: Optional[pydantic_core.core_schema.ValidationInfo]) -> Optional[sqlglot.expressions.query.Tuple]:
517def parse_properties(
518    cls: t.Type, v: t.Any, info: t.Optional[ValidationInfo]
519) -> t.Optional[exp.Tuple]:
520    if v is None:
521        return v
522
523    dialect = info.data.get("dialect") if info else ""
524
525    if isinstance(v, str):
526        v = d.parse_one(v, dialect=dialect)
527    if isinstance(v, (exp.Array, exp.Paren, exp.Tuple)):
528        eq_expressions: t.List[exp.Expr] = (
529            [v.unnest()] if isinstance(v, exp.Paren) else v.expressions
530        )
531
532        for eq_expr in eq_expressions:
533            if not isinstance(eq_expr, exp.EQ):
534                raise ConfigError(
535                    f"Invalid property '{eq_expr.sql(dialect=dialect)}'. "
536                    "Properties must be specified as key-value pairs <key> = <value>. "
537                )
538
539        properties = (
540            exp.Tuple(expressions=eq_expressions) if isinstance(v, (exp.Paren, exp.Array)) else v
541        )
542    elif isinstance(v, dict):
543        properties = exp.Tuple(
544            expressions=[exp.Literal.string(key).eq(value) for key, value in v.items()]
545        )
546    else:
547        raise SQLMeshError(f"Unexpected properties '{v}'")
548
549    properties.meta["dialect"] = dialect
550    return properties
def default_catalog(cls: Type, v: Any) -> Optional[str]:
553def default_catalog(cls: t.Type, v: t.Any) -> t.Optional[str]:
554    if v is None:
555        return None
556    # If v is an expression then we will return expression as sql without a dialect
557    return str(v)
def depends_on( cls: Type, v: Any, info: pydantic_core.core_schema.ValidationInfo) -> Optional[Set[str]]:
560def depends_on(cls: t.Type, v: t.Any, info: ValidationInfo) -> t.Optional[t.Set[str]]:
561    dialect = info.data.get("dialect")
562    default_catalog = info.data.get("default_catalog")
563
564    if isinstance(v, exp.Paren):
565        v = v.unnest()
566
567    if isinstance(v, (exp.Array, exp.Tuple)):
568        return {
569            d.normalize_model_name(
570                table.name if table.is_string else table,
571                default_catalog=default_catalog,
572                dialect=dialect,
573            )
574            for table in v.expressions
575        }
576    if isinstance(v, (exp.Table, exp.Column)):
577        return {d.normalize_model_name(v, default_catalog=default_catalog, dialect=dialect)}
578    if hasattr(v, "__iter__") and not isinstance(v, str):
579        return {
580            d.normalize_model_name(name, default_catalog=default_catalog, dialect=dialect)
581            for name in v
582        }
583
584    return v
def sort_python_env( python_env: Dict[str, sqlmesh.utils.metaprogramming.Executable]) -> List[Tuple[str, sqlmesh.utils.metaprogramming.Executable]]:
587def sort_python_env(python_env: t.Dict[str, Executable]) -> t.List[t.Tuple[str, Executable]]:
588    """Returns the python env sorted."""
589    return sorted(python_env.items(), key=lambda x: (x[1].kind, x[0]))

Returns the python env sorted.

def sorted_python_env_payloads( python_env: Dict[str, sqlmesh.utils.metaprogramming.Executable]) -> List[str]:
592def sorted_python_env_payloads(python_env: t.Dict[str, Executable]) -> t.List[str]:
593    """Returns the payloads of the sorted python env."""
594
595    def _executable_to_str(k: str, v: Executable) -> str:
596        result = f"# {v.path}\n" if v.path is not None else ""
597        if v.is_import or v.is_definition:
598            result += v.payload
599        else:
600            result += f"{k} = {v.payload}"
601        return result
602
603    return [_executable_to_str(k, v) for k, v in sort_python_env(python_env)]

Returns the payloads of the sorted python env.

def parse_strings_with_macro_refs( value: Any, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType]) -> Any:
606def parse_strings_with_macro_refs(value: t.Any, dialect: DialectType) -> t.Any:
607    if isinstance(value, str) and "@" in value:
608        return exp.maybe_parse(value, dialect=dialect)
609
610    if isinstance(value, dict):
611        for k, v in dict(value).items():
612            value[k] = parse_strings_with_macro_refs(v, dialect)
613    elif isinstance(value, list):
614        value = [parse_strings_with_macro_refs(v, dialect) for v in value]
615
616    return value
def expression_validator( cls: Type, v: Union[List[str], List[sqlglot.expressions.core.Expr], str, sqlglot.expressions.core.Expr, Callable, NoneType], info: Optional[pydantic_core.core_schema.ValidationInfo]) -> Union[List[sqlglot.expressions.core.Expr], sqlglot.expressions.core.Expr, Callable, NoneType]:
471def parse_expression(
472    cls: t.Type,
473    v: t.Union[t.List[str], t.List[exp.Expr], str, exp.Expr, t.Callable, None],
474    info: t.Optional[ValidationInfo],
475) -> t.List[exp.Expr] | exp.Expr | t.Callable | None:
476    """Helper method to deserialize SQLGlot expressions in Pydantic Models."""
477    if v is None:
478        return None
479
480    if callable(v):
481        return v
482
483    dialect = info.data.get("dialect") if info else ""
484
485    if isinstance(v, list):
486        return [
487            e if isinstance(e, exp.Expr) else d.parse_one(e, dialect=dialect)  # type: ignore[misc]
488            for e in v
489            if not isinstance(e, exp.Semicolon)
490        ]
491
492    if isinstance(v, str):
493        return d.parse_one(v, dialect=dialect)
494
495    if not v:
496        raise ConfigError(f"Could not parse {v}")
497
498    return v

Helper method to deserialize SQLGlot expressions in Pydantic Models.

def bool_validator(unknown):

Wrap a classmethod, staticmethod, property or unbound function and act as a descriptor that allows us to detect decorated items from the class' attributes.

This class' __get__ returns the wrapped item's __get__ result, which makes it transparent for classmethods and staticmethods.

Attributes:
  • wrapped: The decorator that has to be wrapped.
  • decorator_info: The decorator info.
  • shim: A wrapper function to wrap V1 style function.
def properties_validator( cls: Type, v: Any, info: Optional[pydantic_core.core_schema.ValidationInfo]) -> Optional[sqlglot.expressions.query.Tuple]:
517def parse_properties(
518    cls: t.Type, v: t.Any, info: t.Optional[ValidationInfo]
519) -> t.Optional[exp.Tuple]:
520    if v is None:
521        return v
522
523    dialect = info.data.get("dialect") if info else ""
524
525    if isinstance(v, str):
526        v = d.parse_one(v, dialect=dialect)
527    if isinstance(v, (exp.Array, exp.Paren, exp.Tuple)):
528        eq_expressions: t.List[exp.Expr] = (
529            [v.unnest()] if isinstance(v, exp.Paren) else v.expressions
530        )
531
532        for eq_expr in eq_expressions:
533            if not isinstance(eq_expr, exp.EQ):
534                raise ConfigError(
535                    f"Invalid property '{eq_expr.sql(dialect=dialect)}'. "
536                    "Properties must be specified as key-value pairs <key> = <value>. "
537                )
538
539        properties = (
540            exp.Tuple(expressions=eq_expressions) if isinstance(v, (exp.Paren, exp.Array)) else v
541        )
542    elif isinstance(v, dict):
543        properties = exp.Tuple(
544            expressions=[exp.Literal.string(key).eq(value) for key, value in v.items()]
545        )
546    else:
547        raise SQLMeshError(f"Unexpected properties '{v}'")
548
549    properties.meta["dialect"] = dialect
550    return properties

Wrap a classmethod, staticmethod, property or unbound function and act as a descriptor that allows us to detect decorated items from the class' attributes.

This class' __get__ returns the wrapped item's __get__ result, which makes it transparent for classmethods and staticmethods.

Attributes:
  • wrapped: The decorator that has to be wrapped.
  • decorator_info: The decorator info.
  • shim: A wrapper function to wrap V1 style function.
def default_catalog_validator(cls: Type, v: Any) -> Optional[str]:
553def default_catalog(cls: t.Type, v: t.Any) -> t.Optional[str]:
554    if v is None:
555        return None
556    # If v is an expression then we will return expression as sql without a dialect
557    return str(v)

Wrap a classmethod, staticmethod, property or unbound function and act as a descriptor that allows us to detect decorated items from the class' attributes.

This class' __get__ returns the wrapped item's __get__ result, which makes it transparent for classmethods and staticmethods.

Attributes:
  • wrapped: The decorator that has to be wrapped.
  • decorator_info: The decorator info.
  • shim: A wrapper function to wrap V1 style function.
def depends_on_validator( cls: Type, v: Any, info: pydantic_core.core_schema.ValidationInfo) -> Optional[Set[str]]:
560def depends_on(cls: t.Type, v: t.Any, info: ValidationInfo) -> t.Optional[t.Set[str]]:
561    dialect = info.data.get("dialect")
562    default_catalog = info.data.get("default_catalog")
563
564    if isinstance(v, exp.Paren):
565        v = v.unnest()
566
567    if isinstance(v, (exp.Array, exp.Tuple)):
568        return {
569            d.normalize_model_name(
570                table.name if table.is_string else table,
571                default_catalog=default_catalog,
572                dialect=dialect,
573            )
574            for table in v.expressions
575        }
576    if isinstance(v, (exp.Table, exp.Column)):
577        return {d.normalize_model_name(v, default_catalog=default_catalog, dialect=dialect)}
578    if hasattr(v, "__iter__") and not isinstance(v, str):
579        return {
580            d.normalize_model_name(name, default_catalog=default_catalog, dialect=dialect)
581            for name in v
582        }
583
584    return v

Wrap a classmethod, staticmethod, property or unbound function and act as a descriptor that allows us to detect decorated items from the class' attributes.

This class' __get__ returns the wrapped item's __get__ result, which makes it transparent for classmethods and staticmethods.

Attributes:
  • wrapped: The decorator that has to be wrapped.
  • decorator_info: The decorator info.
  • shim: A wrapper function to wrap V1 style function.
class ParsableSql(sqlmesh.utils.pydantic.PydanticModel):
665class ParsableSql(PydanticModel):
666    sql: str
667    transaction: t.Optional[bool] = None
668
669    _parsed: t.Optional[exp.Expr] = None
670    _parsed_dialect: t.Optional[str] = None
671
672    def parse(self, dialect: str) -> exp.Expr:
673        if self._parsed is None or self._parsed_dialect != dialect:
674            self._parsed = d.parse_one(self.sql, dialect=dialect)
675            self._parsed_dialect = dialect
676        return self._parsed  # type: ignore[return-value]
677
678    @classmethod
679    def from_parsed_expression(
680        cls, parsed_expression: exp.Expr, dialect: str, use_meta_sql: bool = False
681    ) -> ParsableSql:
682        sql = (
683            parsed_expression.meta.get("sql") or parsed_expression.sql(dialect=dialect)
684            if use_meta_sql
685            else parsed_expression.sql(dialect=dialect)
686        )
687        result = cls(sql=sql)
688        result._parsed = parsed_expression
689        result._parsed_dialect = dialect
690        return result
691
692    @classmethod
693    def validator(cls) -> classmethod:
694        def _validate_parsable_sql(
695            v: t.Any, info: ValidationInfo
696        ) -> t.Optional[t.Union[ParsableSql, t.List[ParsableSql]]]:
697            if v is None:
698                return v
699            if isinstance(v, str):
700                return ParsableSql(sql=v)
701            if isinstance(v, exp.Expr):
702                return ParsableSql.from_parsed_expression(
703                    v, get_dialect(info.data), use_meta_sql=False
704                )
705            if isinstance(v, list):
706                dialect = get_dialect(info.data)
707                return [
708                    ParsableSql(sql=s)
709                    if isinstance(s, str)
710                    else ParsableSql.from_parsed_expression(s, dialect, use_meta_sql=False)
711                    if isinstance(s, exp.Expr)
712                    else ParsableSql.parse_obj(s)
713                    for s in v
714                ]
715            return ParsableSql.parse_obj(v)
716
717        return field_validator(
718            "query_",
719            "expressions_",
720            "pre_statements_",
721            "post_statements_",
722            "on_virtual_update_",
723            mode="before",
724            check_fields=False,
725        )(_validate_parsable_sql)

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
sql: str
transaction: Optional[bool]
def parse(self, dialect: str) -> sqlglot.expressions.core.Expr:
672    def parse(self, dialect: str) -> exp.Expr:
673        if self._parsed is None or self._parsed_dialect != dialect:
674            self._parsed = d.parse_one(self.sql, dialect=dialect)
675            self._parsed_dialect = dialect
676        return self._parsed  # type: ignore[return-value]
@classmethod
def from_parsed_expression( cls, parsed_expression: sqlglot.expressions.core.Expr, dialect: str, use_meta_sql: bool = False) -> ParsableSql:
678    @classmethod
679    def from_parsed_expression(
680        cls, parsed_expression: exp.Expr, dialect: str, use_meta_sql: bool = False
681    ) -> ParsableSql:
682        sql = (
683            parsed_expression.meta.get("sql") or parsed_expression.sql(dialect=dialect)
684            if use_meta_sql
685            else parsed_expression.sql(dialect=dialect)
686        )
687        result = cls(sql=sql)
688        result._parsed = parsed_expression
689        result._parsed_dialect = dialect
690        return result
@classmethod
def validator(cls) -> classmethod:
692    @classmethod
693    def validator(cls) -> classmethod:
694        def _validate_parsable_sql(
695            v: t.Any, info: ValidationInfo
696        ) -> t.Optional[t.Union[ParsableSql, t.List[ParsableSql]]]:
697            if v is None:
698                return v
699            if isinstance(v, str):
700                return ParsableSql(sql=v)
701            if isinstance(v, exp.Expr):
702                return ParsableSql.from_parsed_expression(
703                    v, get_dialect(info.data), use_meta_sql=False
704                )
705            if isinstance(v, list):
706                dialect = get_dialect(info.data)
707                return [
708                    ParsableSql(sql=s)
709                    if isinstance(s, str)
710                    else ParsableSql.from_parsed_expression(s, dialect, use_meta_sql=False)
711                    if isinstance(s, exp.Expr)
712                    else ParsableSql.parse_obj(s)
713                    for s in v
714                ]
715            return ParsableSql.parse_obj(v)
716
717        return field_validator(
718            "query_",
719            "expressions_",
720            "pre_statements_",
721            "post_statements_",
722            "on_virtual_update_",
723            mode="before",
724            check_fields=False,
725        )(_validate_parsable_sql)
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields