Edit on GitHub

sqlmesh.core.macros

   1from __future__ import annotations
   2
   3import inspect
   4import logging
   5import sys
   6import types
   7import typing as t
   8from enum import Enum
   9from functools import reduce
  10from itertools import chain
  11from pathlib import Path
  12from string import Template
  13
  14import sqlglot
  15from jinja2 import Environment
  16from sqlglot import Generator, exp, parse_one
  17from sqlglot.executor.env import ENV
  18from sqlglot.executor.python import Python
  19from sqlglot.helper import csv, ensure_collection
  20from sqlglot.schema import MappingSchema
  21
  22from sqlmesh.core import constants as c
  23from sqlmesh.core.dialect import (
  24    SQLMESH_MACRO_PREFIX,
  25    Dialect,
  26    MacroDef,
  27    MacroFunc,
  28    MacroSQL,
  29    MacroStrReplace,
  30    MacroVar,
  31    StagedFilePath,
  32    normalize_model_name,
  33)
  34from sqlmesh.utils import (
  35    DECORATOR_RETURN_TYPE,
  36    UniqueKeyDict,
  37    columns_to_types_all_known,
  38    registry_decorator,
  39)
  40from sqlmesh.utils.errors import MacroEvalError, SQLMeshError
  41from sqlmesh.utils.jinja import JinjaMacroRegistry, has_jinja
  42from sqlmesh.utils.metaprogramming import Executable, prepare_env, print_exception
  43
  44if t.TYPE_CHECKING:
  45    from sqlmesh.core._typing import TableName
  46    from sqlmesh.core.engine_adapter import EngineAdapter
  47    from sqlmesh.core.snapshot import Snapshot
  48
  49
  50if sys.version_info >= (3, 10):
  51    UNION_TYPES = (t.Union, types.UnionType)
  52else:
  53    UNION_TYPES = (t.Union,)
  54
  55
  56logger = logging.getLogger(__name__)
  57
  58
  59class RuntimeStage(Enum):
  60    LOADING = "loading"
  61    CREATING = "creating"
  62    EVALUATING = "evaluating"
  63    TESTING = "testing"
  64
  65
  66class MacroStrTemplate(Template):
  67    delimiter = SQLMESH_MACRO_PREFIX
  68
  69
  70EXPRESSIONS_NAME_MAP = {}
  71SQL = t.NewType("SQL", str)
  72
  73for klass in sqlglot.Parser.EXPRESSION_PARSERS:
  74    name = klass if isinstance(klass, str) else klass.__name__  # type: ignore
  75    EXPRESSIONS_NAME_MAP[name.lower()] = name
  76
  77
  78def _macro_sql(sql: str, into: t.Optional[str] = None) -> str:
  79    args = [_macro_str_replace(sql)]
  80    if into in EXPRESSIONS_NAME_MAP:
  81        args.append(f"into=exp.{EXPRESSIONS_NAME_MAP[into]}")
  82    return f"self.parse_one({', '.join(args)})"
  83
  84
  85def _macro_func_sql(self: Generator, e: exp.Expression) -> str:
  86    func = e.this
  87
  88    if isinstance(func, exp.Anonymous):
  89        return f"""self.send({csv("'" + func.name + "'", self.expressions(func))})"""
  90    return self.sql(func)
  91
  92
  93def _macro_str_replace(text: str) -> str:
  94    """Stringifies python code for variable replacement
  95    Args:
  96        text: text string
  97    Returns:
  98        Stringified python code to execute variable replacement
  99    """
 100    return f"self.template({text}, locals())"
 101
 102
 103class MacroDialect(Python):
 104    class Generator(Python.Generator):
 105        TRANSFORMS = {
 106            **Python.Generator.TRANSFORMS,  # type: ignore
 107            exp.Column: lambda self, e: f"exp.to_column('{self.sql(e, 'this')}')",
 108            exp.Lambda: lambda self, e: f"lambda {self.expressions(e)}: {self.sql(e, 'this')}",
 109            MacroFunc: _macro_func_sql,
 110            MacroSQL: lambda self, e: _macro_sql(self.sql(e, "this"), e.args.get("into")),
 111            MacroStrReplace: lambda self, e: _macro_str_replace(self.sql(e, "this")),
 112        }
 113
 114
 115class MacroEvaluator:
 116    """The class responsible for evaluating SQLMesh Macros/SQL.
 117
 118    SQLMesh supports special preprocessed SQL prefixed with `@`. Although it provides similar power to
 119    traditional methods like string templating, there is semantic understanding of SQL which prevents
 120    common errors like leading/trailing commas, syntax errors, etc.
 121
 122    SQLMesh SQL allows for macro variables and macro functions. Macro variables take the form of @variable. These are used for variable substitution.
 123
 124    SELECT * FROM foo WHERE ds BETWEEN @start_date AND @end_date
 125
 126    Macro variables can be defined with a special macro function.
 127
 128    @DEF(start_date, '2021-01-01')
 129
 130    Args:
 131        dialect: Dialect of the SQL to evaluate.
 132        python_env: Serialized Python environment.
 133    """
 134
 135    def __init__(
 136        self,
 137        dialect: str = "",
 138        python_env: t.Optional[t.Dict[str, Executable]] = None,
 139        jinja_env: t.Optional[Environment] = None,
 140        schema: t.Optional[MappingSchema] = None,
 141        runtime_stage: RuntimeStage = RuntimeStage.LOADING,
 142        resolve_tables: t.Optional[t.Callable[[exp.Expression], exp.Expression]] = None,
 143        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
 144        default_catalog: t.Optional[str] = None,
 145        path: Path = Path(),
 146    ):
 147        self.dialect = dialect
 148        self.generator = MacroDialect().generator()
 149        self.locals: t.Dict[str, t.Any] = {
 150            "runtime_stage": runtime_stage.value,
 151            "default_catalog": default_catalog,
 152        }
 153        self.env = {
 154            **ENV,
 155            "self": self,
 156            "SQL": SQL,
 157            "MacroEvaluator": MacroEvaluator,
 158        }
 159        self.python_env = python_env or {}
 160        self._jinja_env: t.Optional[Environment] = jinja_env
 161        self.macros = {normalize_macro_name(k): v.func for k, v in macro.get_registry().items()}
 162        self._schema = schema
 163        self._resolve_tables = resolve_tables
 164        self.columns_to_types_called = False
 165        self._snapshots = snapshots if snapshots is not None else {}
 166        self.default_catalog = default_catalog
 167        self._path = path
 168
 169        prepare_env(self.python_env, self.env)
 170        for k, v in self.python_env.items():
 171            if v.is_definition:
 172                self.macros[normalize_macro_name(k)] = self.env[v.name or k]
 173            elif v.is_import and getattr(self.env.get(k), c.SQLMESH_MACRO, None):
 174                self.macros[normalize_macro_name(k)] = self.env[k]
 175            elif v.is_value:
 176                self.locals[k] = self.env[k]
 177
 178    def send(
 179        self, name: str, *args: t.Any, **kwargs: t.Any
 180    ) -> t.Union[None, exp.Expression, t.List[exp.Expression]]:
 181        func = self.macros.get(normalize_macro_name(name))
 182
 183        if not callable(func):
 184            raise SQLMeshError(f"Macro '{name}' does not exist.")
 185
 186        try:
 187            annotations = t.get_type_hints(func)
 188        except NameError:  # forward references aren't handled
 189            annotations = {}
 190
 191        if annotations:
 192            spec = inspect.getfullargspec(func)
 193            callargs = inspect.getcallargs(func, self, *args, **kwargs)
 194            new_args: t.List[t.Any] = []
 195
 196            for arg, value in callargs.items():
 197                typ = annotations.get(arg)
 198
 199                if value is self:
 200                    continue
 201                if arg == spec.varargs:
 202                    new_args.extend(self._coerce(v, typ) for v in value)
 203                elif arg == spec.varkw:
 204                    for k, v in value.items():
 205                        kwargs[k] = self._coerce(v, typ)
 206                elif arg in kwargs:
 207                    kwargs[arg] = self._coerce(value, typ)
 208                else:
 209                    new_args.append(self._coerce(value, typ))
 210
 211            args = new_args  # type: ignore
 212
 213        try:
 214            return func(self, *args, **kwargs)
 215        except Exception as e:
 216            print_exception(e, self.python_env)
 217            raise MacroEvalError("Error trying to eval macro.") from e
 218
 219    def transform(
 220        self, expression: exp.Expression
 221    ) -> exp.Expression | t.List[exp.Expression] | None:
 222        changed = False
 223
 224        def evaluate_macros(
 225            node: exp.Expression,
 226        ) -> exp.Expression | t.List[exp.Expression] | None:
 227            nonlocal changed
 228
 229            if isinstance(node, MacroVar):
 230                changed = True
 231                variables = self.locals.get(c.SQLMESH_VARS, {})
 232                if node.name not in self.locals and node.name.lower() not in variables:
 233                    if not isinstance(node.parent, StagedFilePath):
 234                        raise SQLMeshError(f"Macro variable '{node.name}' is undefined.")
 235
 236                    return node
 237
 238                value = self.locals.get(node.name, variables.get(node.name.lower()))
 239                if isinstance(value, list):
 240                    return exp.convert(
 241                        tuple(
 242                            self.transform(v) if isinstance(v, exp.Expression) else v for v in value
 243                        )
 244                    )
 245                return exp.convert(
 246                    self.transform(value) if isinstance(value, exp.Expression) else value
 247                )
 248            if isinstance(node, exp.Identifier) and "@" in node.this:
 249                text = self.template(node.this, self.locals)
 250                if node.this != text:
 251                    changed = True
 252                    node.args["this"] = text
 253                    return node
 254            if node.is_string:
 255                text = node.this
 256                if has_jinja(text):
 257                    changed = True
 258                    node.set("this", self.jinja_env.from_string(node.this).render())
 259                return node
 260            if isinstance(node, MacroFunc):
 261                changed = True
 262                return self.evaluate(node)
 263            return node
 264
 265        transformed = exp.replace_tree(
 266            expression.copy(), evaluate_macros, prune=lambda n: isinstance(n, exp.Lambda)
 267        )
 268
 269        if changed:
 270            # the transformations could have corrupted the ast, turning this into sql and reparsing ensures
 271            # that the ast is correct
 272            if isinstance(transformed, list):
 273                return [
 274                    self.parse_one(node.sql(dialect=self.dialect, copy=False))
 275                    for node in transformed
 276                ]
 277            elif isinstance(transformed, exp.Expression):
 278                return self.parse_one(transformed.sql(dialect=self.dialect, copy=False))
 279
 280        return transformed
 281
 282    def template(self, text: t.Any, local_variables: t.Dict[str, t.Any]) -> str:
 283        """Substitute @vars with locals.
 284
 285        Args:
 286            text: The string to do substitition on.
 287            local_variables: Local variables in the context so that lambdas can be used.
 288
 289        Returns:
 290           The rendered string.
 291        """
 292        mapping = {}
 293
 294        variables = self.locals.get(c.SQLMESH_VARS, {})
 295
 296        for k, v in chain(variables.items(), self.locals.items(), local_variables.items()):
 297            # try to convert all variables into sqlglot expressions
 298            # because they're going to be converted into strings in sql
 299            # we use bare Exception instead of ValueError because there's
 300            # a recursive error with MagicMock.
 301            # we don't convert strings because that would result in adding quotes
 302            if not isinstance(v, str):
 303                try:
 304                    v = exp.convert(v)
 305                except Exception:
 306                    pass
 307
 308            if isinstance(v, exp.Expression):
 309                v = v.sql(dialect=self.dialect)
 310            mapping[k] = v
 311
 312        return MacroStrTemplate(str(text)).safe_substitute(mapping)
 313
 314    def evaluate(self, node: MacroFunc) -> exp.Expression | t.List[exp.Expression] | None:
 315        if isinstance(node, MacroDef):
 316            if isinstance(node.expression, exp.Lambda):
 317                _, fn = _norm_var_arg_lambda(self, node.expression)
 318                self.macros[normalize_macro_name(node.name)] = lambda _, *args: fn(
 319                    args[0] if len(args) == 1 else exp.Tuple(expressions=list(args))
 320                )
 321            else:
 322                self.locals[node.name] = self.transform(node.expression)
 323            return node
 324
 325        if isinstance(node, (MacroSQL, MacroStrReplace)):
 326            result: t.Optional[exp.Expression | t.List[exp.Expression]] = exp.convert(
 327                self.eval_expression(node)
 328            )
 329        else:
 330            func = t.cast(exp.Anonymous, node.this)
 331
 332            args = []
 333            kwargs = {}
 334            for e in func.expressions:
 335                if isinstance(e, exp.PropertyEQ):
 336                    kwargs[e.this.name] = e.expression
 337                else:
 338                    if kwargs:
 339                        raise MacroEvalError(
 340                            f"Positional argument cannot follow keyword argument.\n  {func.sql(dialect=self.dialect)} at '{self._path}'"
 341                        )
 342
 343                    args.append(e)
 344
 345            result = self.send(func.name, *args, **kwargs)
 346
 347        if result is None:
 348            return None
 349
 350        if isinstance(result, (tuple, list)):
 351            return [self.parse_one(item) for item in result if item is not None]
 352        return self.parse_one(result)
 353
 354    def eval_expression(self, node: t.Any) -> t.Any:
 355        """Converts a SQLGlot expression into executable Python code and evals it.
 356
 357        If the node is not an expression, it will simply be returned.
 358
 359        Args:
 360            node: expression
 361        Returns:
 362            The return value of the evaled Python Code.
 363        """
 364        if not isinstance(node, exp.Expression):
 365            return node
 366        code = node.sql()
 367        try:
 368            code = self.generator.generate(node)
 369            return eval(code, self.env, self.locals)
 370        except Exception as e:
 371            print_exception(e, self.python_env)
 372            raise MacroEvalError(
 373                f"Error trying to eval macro.\n\nGenerated code: {code}\n\nOriginal sql: {node}"
 374            ) from e
 375
 376    def parse_one(
 377        self, sql: str | exp.Expression, into: t.Optional[exp.IntoType] = None, **opts: t.Any
 378    ) -> exp.Expression:
 379        """Parses the given SQL string and returns a syntax tree for the first
 380        parsed SQL statement.
 381
 382        Args:
 383            sql: the SQL code or expression to parse.
 384            into: the Expression to parse into
 385            **opts: other options
 386
 387        Returns:
 388            Expression: the syntax tree for the first parsed statement
 389        """
 390        return sqlglot.maybe_parse(sql, dialect=self.dialect, into=into, **opts)
 391
 392    @property
 393    def jinja_env(self) -> Environment:
 394        if not self._jinja_env:
 395            jinja_env_methods = {**self.locals, **self.env}
 396            del jinja_env_methods["self"]
 397            self._jinja_env = JinjaMacroRegistry().build_environment(**jinja_env_methods)
 398        return self._jinja_env
 399
 400    def columns_to_types(self, model_name: TableName | exp.Column) -> t.Dict[str, exp.DataType]:
 401        """Returns the columns-to-types mapping corresponding to the specified model."""
 402        if self._schema is None or self._schema.empty:
 403            self.columns_to_types_called = True
 404            return {"__schema_unavailable_at_load__": exp.DataType.build("unknown")}
 405
 406        normalized_model_name = normalize_model_name(
 407            model_name,
 408            default_catalog=self.default_catalog,
 409            dialect=self.dialect,
 410        )
 411        columns_to_types = self._schema.find(exp.to_table(normalized_model_name))
 412        if columns_to_types is None:
 413            raise SQLMeshError(f"Schema for model '{model_name}' can't be statically determined.")
 414
 415        return columns_to_types
 416
 417    def get_snapshot(self, model_name: TableName | exp.Column) -> t.Optional[Snapshot]:
 418        """Returns the snapshot that corresponds to the given model name."""
 419        return self._snapshots.get(
 420            normalize_model_name(
 421                model_name,
 422                default_catalog=self.default_catalog,
 423                dialect=self.dialect,
 424            )
 425        )
 426
 427    def resolve_tables(self, query: exp.Expression) -> exp.Expression:
 428        """Resolves queries with references to SQLMesh model names to their physical tables."""
 429        if not self._resolve_tables:
 430            raise SQLMeshError(
 431                "Macro evaluator not properly initialized with resolve_tables lambda."
 432            )
 433        return self._resolve_tables(query)
 434
 435    @property
 436    def runtime_stage(self) -> RuntimeStage:
 437        """Returns the current runtime stage of the macro evaluation."""
 438        return self.locals["runtime_stage"]
 439
 440    @property
 441    def engine_adapter(self) -> EngineAdapter:
 442        engine_adapter = self.locals.get("engine_adapter")
 443        if not engine_adapter:
 444            raise SQLMeshError(
 445                "The engine adapter is not available while models are loading."
 446                " You can gate these calls by checking in Python: evaluator.runtime_stage != 'loading' or SQL: @runtime_stage <> 'loading'."
 447            )
 448        return self.locals["engine_adapter"]
 449
 450    @property
 451    def gateway(self) -> t.Optional[str]:
 452        """Returns the gateway name."""
 453        return self.var(c.GATEWAY)
 454
 455    def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
 456        """Returns the value of the specified variable, or the default value if it doesn't exist."""
 457        return (self.locals.get(c.SQLMESH_VARS) or {}).get(var_name.lower(), default)
 458
 459    def _coerce(self, expr: exp.Expression, typ: t.Any, strict: bool = False) -> t.Any:
 460        """Coerces the given expression to the specified type on a best-effort basis."""
 461        base_err_msg = f"Failed to coerce expression '{expr}' to type '{typ}'."
 462        try:
 463            if typ is None or typ is t.Any:
 464                return expr
 465            base = t.get_origin(typ) or typ
 466
 467            # We need to handle Union and TypeVars first since we cannot use isinstance with it
 468            if base in UNION_TYPES:
 469                for branch in t.get_args(typ):
 470                    try:
 471                        return self._coerce(expr, branch, True)
 472                    except Exception:
 473                        pass
 474                raise SQLMeshError(base_err_msg)
 475            if base is SQL and isinstance(expr, exp.Expression):
 476                return expr.sql(self.dialect)
 477
 478            if isinstance(expr, base):
 479                return expr
 480            if issubclass(base, exp.Expression):
 481                d = Dialect.get_or_raise(self.dialect)
 482                into = base if base in d.parser().EXPRESSION_PARSERS else None
 483                if into is None:
 484                    if isinstance(expr, exp.Literal):
 485                        coerced = parse_one(expr.this)
 486                    else:
 487                        raise SQLMeshError(
 488                            f"{base_err_msg} Coercion to {base} requires a literal expression."
 489                        )
 490                else:
 491                    coerced = parse_one(
 492                        expr.this if isinstance(expr, exp.Literal) else expr.sql(), into=into
 493                    )
 494                if isinstance(coerced, base):
 495                    return coerced
 496                raise SQLMeshError(base_err_msg)
 497
 498            if base in (int, float, str) and isinstance(expr, exp.Literal):
 499                return base(expr.this)
 500            if base is str and isinstance(expr, exp.Column) and not expr.table:
 501                return expr.name
 502            if base is bool and isinstance(expr, exp.Boolean):
 503                return expr.this
 504            # if base is str and isinstance(expr, exp.Expression):
 505            #    return expr.sql(self.dialect)
 506            if base is tuple and isinstance(expr, (exp.Tuple, exp.Array)):
 507                generic = t.get_args(typ)
 508                if not generic:
 509                    return tuple(expr.expressions)
 510                if generic[-1] is ...:
 511                    return tuple(self._coerce(expr, generic[0]) for expr in expr.expressions)
 512                elif len(generic) == len(expr.expressions):
 513                    return tuple(
 514                        self._coerce(expr, generic[i]) for i, expr in enumerate(expr.expressions)
 515                    )
 516                raise SQLMeshError(f"{base_err_msg} Expected {len(generic)} items.")
 517            if base is list and isinstance(expr, (exp.Array, exp.Tuple)):
 518                generic = t.get_args(typ)
 519                if not generic:
 520                    return expr.expressions
 521                return [self._coerce(expr, generic[0]) for expr in expr.expressions]
 522            raise SQLMeshError(base_err_msg)
 523        except Exception:
 524            if strict:
 525                raise
 526            logger.error(
 527                "Coercion of expression '%s' to type '%s' failed. Using non coerced expression at '%s'",
 528                expr,
 529                typ,
 530                self._path,
 531            )
 532            return expr
 533
 534
 535class macro(registry_decorator):
 536    """Specifies a function is a macro and registers it the global MACROS registry.
 537
 538    Registered macros can be referenced in SQL statements to make queries more dynamic/cleaner.
 539
 540    Example:
 541        from sqlglot import exp
 542        from sqlmesh.core.macros import MacroEvaluator, macro
 543
 544        @macro()
 545        def add_one(evaluator: MacroEvaluator, column: exp.Literal) -> exp.Add:
 546            return evaluator.parse_one(f"{column} + 1")
 547
 548    Args:
 549        name: A custom name for the macro, the default is the name of the function.
 550    """
 551
 552    registry_name = "macros"
 553
 554    def __call__(
 555        self, func: t.Callable[..., DECORATOR_RETURN_TYPE]
 556    ) -> t.Callable[..., DECORATOR_RETURN_TYPE]:
 557        wrapper = super().__call__(func)
 558
 559        # This is used to identify macros at runtime to unwrap during serialization.
 560        setattr(wrapper, c.SQLMESH_MACRO, True)
 561        return wrapper
 562
 563
 564ExecutableOrMacro = t.Union[Executable, macro]
 565MacroRegistry = UniqueKeyDict[str, ExecutableOrMacro]
 566
 567
 568def _norm_var_arg_lambda(
 569    evaluator: MacroEvaluator, func: exp.Lambda, *items: t.Any
 570) -> t.Tuple[t.Iterable, t.Callable]:
 571    """
 572    Converts sql literal array and lambda into actual python iterable + callable.
 573
 574    In order to support expressions like @EACH([a, b, c], x -> @SQL('@x')), the lambda var x
 575    needs be passed to the local state.
 576
 577    Args:
 578        evaluator: MacroEvaluator that invoked the macro
 579        func: Lambda SQLGlot expression.
 580        items: Array or items of SQLGlot expressions.
 581    """
 582
 583    def substitute(
 584        node: exp.Expression, args: t.Dict[str, exp.Expression]
 585    ) -> exp.Expression | t.List[exp.Expression] | None:
 586        if isinstance(node, (exp.Identifier, exp.Var)):
 587            if not isinstance(node.parent, exp.Column):
 588                name = node.name
 589                if name in args:
 590                    return args[name].copy()
 591                if name in evaluator.locals:
 592                    return exp.convert(evaluator.locals[name])
 593            if SQLMESH_MACRO_PREFIX in node.name:
 594                return node.__class__(
 595                    this=evaluator.template(node.name, {k: v.name for k, v in args.items()})
 596                )
 597        elif isinstance(node, MacroFunc):
 598            local_copy = evaluator.locals.copy()
 599            evaluator.locals.update(args)
 600            result = evaluator.transform(node)
 601            evaluator.locals = local_copy
 602            return result
 603        return node
 604
 605    if len(items) == 1:
 606        item = items[0]
 607        expressions = item.expressions if isinstance(item, (exp.Array, exp.Tuple)) else item
 608    else:
 609        expressions = items
 610
 611    if not callable(func):
 612        return expressions, lambda args: func.this.transform(
 613            substitute,
 614            {
 615                expression.name: arg
 616                for expression, arg in zip(
 617                    func.expressions, args.expressions if isinstance(args, exp.Tuple) else [args]
 618                )
 619            },
 620        )
 621
 622    return expressions, func
 623
 624
 625@macro()
 626def each(
 627    evaluator: MacroEvaluator,
 628    *args: t.Any,
 629) -> t.List[t.Any]:
 630    """Iterates through items calling func on each.
 631
 632    If a func call on item returns None, it will be excluded from the list.
 633
 634    Args:
 635        evaluator: MacroEvaluator that invoked the macro
 636        args: The last argument should be a lambda of the form x -> x +1. The first argument can be
 637            an Array or var args can be used.
 638
 639    Returns:
 640        A list of items that is the result of func
 641    """
 642    *items, func = args
 643    items, func = _norm_var_arg_lambda(evaluator, func, *items)  # type: ignore
 644    return [item for item in map(func, ensure_collection(items)) if item is not None]
 645
 646
 647@macro("IF")
 648def if_(
 649    evaluator: MacroEvaluator,
 650    condition: t.Any,
 651    true: t.Any,
 652    false: t.Any = None,
 653) -> t.Any:
 654    """Evaluates a given condition and returns the second argument if true or else the third argument.
 655
 656    If false is not passed in, the default return value will be None.
 657
 658    Example:
 659        >>> from sqlglot import parse_one
 660        >>> from sqlmesh.core.macros import MacroEvaluator
 661        >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a, b)")).sql()
 662        'b'
 663
 664        >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a)"))
 665    """
 666
 667    if evaluator.eval_expression(condition):
 668        return true
 669    return false
 670
 671
 672@macro("REDUCE")
 673def reduce_(evaluator: MacroEvaluator, *args: t.Any) -> t.Any:
 674    """Iterates through items applying provided function that takes two arguments
 675    cumulatively to the items of iterable items, from left to right, so as to reduce
 676    the iterable to a single item.
 677
 678    Example:
 679        >>> from sqlglot import parse_one
 680        >>> from sqlmesh.core.macros import MacroEvaluator
 681        >>> sql = "@SQL(@REDUCE([100, 200, 300, 400], (x, y) -> x + y))"
 682        >>> MacroEvaluator().transform(parse_one(sql)).sql()
 683        '1000'
 684
 685    Args:
 686        evaluator: MacroEvaluator that invoked the macro
 687        args: The last argument should be a lambda of the form (x, y) -> x + y. The first argument can be
 688            an Array or var args can be used.
 689    Returns:
 690        A single item that is the result of applying func cumulatively to items
 691    """
 692    *items, func = args
 693    items, func = _norm_var_arg_lambda(evaluator, func, *items)  # type: ignore
 694    return reduce(lambda a, b: func(exp.Tuple(expressions=[a, b])), ensure_collection(items))
 695
 696
 697@macro("FILTER")
 698def filter_(evaluator: MacroEvaluator, *args: t.Any) -> t.List[t.Any]:
 699    """Iterates through items, applying provided function to each item and removing
 700    all items where the function returns False
 701
 702    Example:
 703        >>> from sqlglot import parse_one
 704        >>> from sqlmesh.core.macros import MacroEvaluator
 705        >>> sql = "@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y)"
 706        >>> MacroEvaluator().transform(parse_one(sql)).sql()
 707        '2 + 3'
 708
 709        >>> sql = "@EVAL(@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y))"
 710        >>> MacroEvaluator().transform(parse_one(sql)).sql()
 711        '5'
 712
 713    Args:
 714        evaluator: MacroEvaluator that invoked the macro
 715        args: The last argument should be a lambda of the form x -> x > 1. The first argument can be
 716            an Array or var args can be used.
 717    Returns:
 718        The items for which the func returned True
 719    """
 720    *items, func = args
 721    items, func = _norm_var_arg_lambda(evaluator, func, *items)  # type: ignore
 722    return list(filter(lambda arg: evaluator.eval_expression(func(arg)), items))
 723
 724
 725def _optional_expression(
 726    evaluator: MacroEvaluator,
 727    condition: exp.Condition,
 728    expression: exp.Expression,
 729) -> t.Optional[exp.Expression]:
 730    """Inserts expression when the condition is True
 731
 732    The following examples express the usage of this function in the context of the macros which wrap it.
 733
 734    Examples:
 735        >>> from sqlglot import parse_one
 736        >>> from sqlmesh.core.macros import MacroEvaluator
 737        >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
 738        >>> MacroEvaluator().transform(parse_one(sql)).sql()
 739        'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
 740        >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
 741        >>> MacroEvaluator().transform(parse_one(sql)).sql()
 742        'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
 743        >>> sql = "select * from city @GROUP_BY(True) country, population"
 744        >>> MacroEvaluator().transform(parse_one(sql)).sql()
 745        'SELECT * FROM city GROUP BY country, population'
 746        >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
 747        >>> MacroEvaluator().transform(parse_one(sql)).sql()
 748        "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
 749
 750    Args:
 751        evaluator: MacroEvaluator that invoked the macro
 752        condition: Condition expression
 753        expression: SQL expression
 754    Returns:
 755        Expression if the conditional is True; otherwise None
 756    """
 757    return expression if evaluator.eval_expression(condition) else None
 758
 759
 760with_ = macro("WITH")(_optional_expression)
 761join = macro("JOIN")(_optional_expression)
 762where = macro("WHERE")(_optional_expression)
 763group_by = macro("GROUP_BY")(_optional_expression)
 764having = macro("HAVING")(_optional_expression)
 765order_by = macro("ORDER_BY")(_optional_expression)
 766limit = macro("LIMIT")(_optional_expression)
 767
 768
 769@macro("eval")
 770def eval_(evaluator: MacroEvaluator, condition: exp.Condition) -> t.Any:
 771    """Evaluate the given condition in a Python/SQL interpretor.
 772
 773    Example:
 774        >>> from sqlglot import parse_one
 775        >>> from sqlmesh.core.macros import MacroEvaluator
 776        >>> sql = "@EVAL(1 + 1)"
 777        >>> MacroEvaluator().transform(parse_one(sql)).sql()
 778        '2'
 779    """
 780    return evaluator.eval_expression(condition)
 781
 782
 783# macros with union types need to use t.Union since | isn't available until 3.9
 784@macro()
 785def star(
 786    evaluator: MacroEvaluator,
 787    relation: exp.Table,
 788    alias: exp.Column = t.cast(exp.Column, exp.column("")),
 789    except_: t.Union[exp.Array, exp.Tuple] = exp.Tuple(this=[]),
 790    prefix: exp.Literal = exp.Literal.string(""),
 791    suffix: exp.Literal = exp.Literal.string(""),
 792    quote_identifiers: exp.Boolean = exp.true(),
 793) -> t.List[exp.Alias]:
 794    """Returns a list of projections for the given relation.
 795
 796    Args:
 797        evaluator: MacroEvaluator that invoked the macro
 798        relation: The relation to select star from
 799        alias: The alias of the relation
 800        except_: Columns to exclude
 801        prefix: A prefix to use for all selections
 802        suffix: A suffix to use for all selections
 803        quote_identifiers: Whether or not quote the resulting aliases, defaults to true
 804
 805    Returns:
 806        An array of columns.
 807
 808    Example:
 809        >>> from sqlglot import parse_one, exp
 810        >>> from sqlglot.schema import MappingSchema
 811        >>> from sqlmesh.core.macros import MacroEvaluator
 812        >>> sql = "SELECT @STAR(foo, bar, [c], 'baz_') FROM foo AS bar"
 813        >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": exp.DataType.build("string"), "b": exp.DataType.build("string"), "c": exp.DataType.build("string"), "d": exp.DataType.build("int")}})).transform(parse_one(sql)).sql()
 814        'SELECT CAST("bar"."a" AS TEXT) AS "baz_a", CAST("bar"."b" AS TEXT) AS "baz_b", CAST("bar"."d" AS INT) AS "baz_d" FROM foo AS bar'
 815    """
 816    if alias and not isinstance(alias, (exp.Identifier, exp.Column)):
 817        raise SQLMeshError(f"Invalid alias '{alias}'. Expected an identifier.")
 818    if except_ and not isinstance(except_, (exp.Array, exp.Tuple)):
 819        raise SQLMeshError(f"Invalid except '{except_}'. Expected an array.")
 820    if prefix and not isinstance(prefix, exp.Literal):
 821        raise SQLMeshError(f"Invalid prefix '{prefix}'. Expected a literal.")
 822    if suffix and not isinstance(suffix, exp.Literal):
 823        raise SQLMeshError(f"Invalid suffix '{suffix}'. Expected a literal.")
 824    if not isinstance(quote_identifiers, exp.Boolean):
 825        raise SQLMeshError(f"Invalid quote_identifiers '{quote_identifiers}'. Expected a boolean.")
 826
 827    exclude = {e.name for e in except_.expressions}
 828    quoted = quote_identifiers.this
 829    table_identifier = alias.name or relation.name
 830
 831    columns_to_types = {
 832        k: v for k, v in evaluator.columns_to_types(relation).items() if k not in exclude
 833    }
 834    if columns_to_types_all_known(columns_to_types):
 835        return [
 836            exp.cast(
 837                exp.column(column, table=table_identifier, quoted=quoted),
 838                dtype,
 839                dialect=evaluator.dialect,
 840            ).as_(f"{prefix.this}{column}{suffix.this}", quoted=quoted)
 841            for column, dtype in columns_to_types.items()
 842        ]
 843    return [
 844        exp.column(column, table=table_identifier, quoted=quoted).as_(
 845            f"{prefix.this}{column}{suffix.this}", quoted=quoted
 846        )
 847        for column, type_ in evaluator.columns_to_types(relation).items()
 848    ]
 849
 850
 851@macro()
 852def generate_surrogate_key(_: MacroEvaluator, *fields: exp.Expression) -> exp.Func:
 853    """Generates a surrogate key for the given fields.
 854
 855    Example:
 856        >>> from sqlglot import parse_one
 857        >>> from sqlmesh.core.macros import MacroEvaluator
 858        >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c) FROM foo"
 859        >>> MacroEvaluator().transform(parse_one(sql)).sql()
 860        "SELECT MD5(CONCAT(COALESCE(CAST(a AS TEXT), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS TEXT), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS TEXT), '_sqlmesh_surrogate_key_null_'))) FROM foo"
 861    """
 862    string_fields: t.List[exp.Expression] = []
 863    for i, field in enumerate(fields):
 864        if i > 0:
 865            string_fields.append(exp.Literal.string("|"))
 866        string_fields.append(
 867            exp.func(
 868                "COALESCE",
 869                exp.cast(field, exp.DataType.build("text")),
 870                exp.Literal.string("_sqlmesh_surrogate_key_null_"),
 871            )
 872        )
 873    return exp.func("MD5", exp.func("CONCAT", *string_fields))
 874
 875
 876@macro()
 877def safe_add(_: MacroEvaluator, *fields: exp.Expression) -> exp.Case:
 878    """Adds numbers together, substitutes nulls for 0s and only returns null if all fields are null.
 879
 880    Example:
 881        >>> from sqlglot import parse_one
 882        >>> from sqlmesh.core.macros import MacroEvaluator
 883        >>> sql = "SELECT @SAFE_ADD(a, b) FROM foo"
 884        >>> MacroEvaluator().transform(parse_one(sql)).sql()
 885        'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) + COALESCE(b, 0) END FROM foo'
 886    """
 887    return (
 888        exp.Case()
 889        .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null())
 890        .else_(reduce(lambda a, b: a + b, [exp.func("COALESCE", field, 0) for field in fields]))  # type: ignore
 891    )
 892
 893
 894@macro()
 895def safe_sub(_: MacroEvaluator, *fields: exp.Expression) -> exp.Case:
 896    """Subtract numbers, substitutes nulls for 0s and only returns null if all fields are null.
 897
 898    Example:
 899        >>> from sqlglot import parse_one
 900        >>> from sqlmesh.core.macros import MacroEvaluator
 901        >>> sql = "SELECT @SAFE_SUB(a, b) FROM foo"
 902        >>> MacroEvaluator().transform(parse_one(sql)).sql()
 903        'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) - COALESCE(b, 0) END FROM foo'
 904    """
 905    return (
 906        exp.Case()
 907        .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null())
 908        .else_(reduce(lambda a, b: a - b, [exp.func("COALESCE", field, 0) for field in fields]))  # type: ignore
 909    )
 910
 911
 912@macro()
 913def safe_div(_: MacroEvaluator, numerator: exp.Expression, denominator: exp.Expression) -> exp.Div:
 914    """Divides numbers, returns null if the denominator is 0.
 915
 916    Example:
 917        >>> from sqlglot import parse_one
 918        >>> from sqlmesh.core.macros import MacroEvaluator
 919        >>> sql = "SELECT @SAFE_DIV(a, b) FROM foo"
 920        >>> MacroEvaluator().transform(parse_one(sql)).sql()
 921        'SELECT a / NULLIF(b, 0) FROM foo'
 922    """
 923    return numerator / exp.func("NULLIF", denominator, 0)
 924
 925
 926@macro()
 927def union(
 928    evaluator: MacroEvaluator,
 929    type_: exp.Literal = exp.Literal.string("ALL"),
 930    *tables: exp.Table,
 931) -> exp.Query:
 932    """Returns a UNION of the given tables. Only choosing columns that have the same name and type.
 933
 934    Example:
 935        >>> from sqlglot import parse_one
 936        >>> from sqlglot.schema import MappingSchema
 937        >>> from sqlmesh.core.macros import MacroEvaluator
 938        >>> sql = "@UNION('distinct', foo, bar)"
 939        >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql()
 940        'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar'
 941    """
 942    kind = type_.name.upper()
 943    if kind not in ("ALL", "DISTINCT"):
 944        raise SQLMeshError(f"Invalid type '{type_}'. Expected 'ALL' or 'DISTINCT'.")
 945
 946    columns = {
 947        column
 948        for column, _ in reduce(
 949            lambda a, b: a & b,  # type: ignore
 950            (evaluator.columns_to_types(table).items() for table in tables),
 951        )
 952    }
 953
 954    projections = [
 955        exp.cast(column, type_, dialect=evaluator.dialect).as_(column)
 956        for column, type_ in evaluator.columns_to_types(tables[0]).items()
 957        if column in columns
 958    ]
 959
 960    return reduce(
 961        lambda a, b: a.union(b, distinct=kind == "DISTINCT"),  # type: ignore
 962        [exp.select(*projections).from_(t) for t in tables],
 963    )
 964
 965
 966@macro()
 967def haversine_distance(
 968    _: MacroEvaluator,
 969    lat1: exp.Expression,
 970    lon1: exp.Expression,
 971    lat2: exp.Expression,
 972    lon2: exp.Expression,
 973    unit: exp.Literal = exp.Literal.string("mi"),
 974) -> exp.Mul:
 975    """Returns the haversine distance between two points.
 976
 977    Example:
 978        >>> from sqlglot import parse_one
 979        >>> from sqlmesh.core.macros import MacroEvaluator
 980        >>> sql = "SELECT @HAVERSINE_DISTANCE(driver_y, driver_x, passenger_y, passenger_x, 'mi') FROM rides"
 981        >>> MacroEvaluator().transform(parse_one(sql)).sql()
 982        'SELECT 7922 * ASIN(SQRT((POWER(SIN(RADIANS((passenger_y - driver_y) / 2)), 2)) + (COS(RADIANS(driver_y)) * COS(RADIANS(passenger_y)) * POWER(SIN(RADIANS((passenger_x - driver_x) / 2)), 2)))) * 1.0 FROM rides'
 983    """
 984    if unit.this == "mi":
 985        conversion_rate = 1.0
 986    elif unit.this == "km":
 987        conversion_rate = 1.60934
 988    else:
 989        raise SQLMeshError(f"Invalid unit '{unit}'. Expected 'mi' or 'km'.")
 990
 991    return (
 992        2
 993        * 3961
 994        * exp.func(
 995            "ASIN",
 996            exp.func(
 997                "SQRT",
 998                exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lat2 - lat1) / 2)), 2)
 999                + exp.func("COS", exp.func("RADIANS", lat1))
1000                * exp.func("COS", exp.func("RADIANS", lat2))
1001                * exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lon2 - lon1) / 2)), 2),
1002            ),
1003        )
1004        * conversion_rate
1005    )
1006
1007
1008@macro()
1009def pivot(
1010    evaluator: MacroEvaluator,
1011    column: exp.Column,
1012    values: t.Union[exp.Array, exp.Tuple],
1013    alias: exp.Boolean = exp.true(),
1014    agg: exp.Literal = exp.Literal.string("SUM"),
1015    cmp: exp.Literal = exp.Literal.string("="),
1016    prefix: exp.Literal = exp.Literal.string(""),
1017    suffix: exp.Literal = exp.Literal.string(""),
1018    then_value: exp.Literal = exp.Literal.number(1),
1019    else_value: exp.Literal = exp.Literal.number(0),
1020    quote: exp.Boolean = exp.true(),
1021    distinct: exp.Boolean = exp.false(),
1022) -> t.List[exp.Expression]:
1023    """Returns a list of projections as a result of pivoting the given column on the given values.
1024
1025    Example:
1026        >>> from sqlglot import parse_one
1027        >>> from sqlmesh.core.macros import MacroEvaluator
1028        >>> sql = "SELECT date_day, @PIVOT(status, ['cancelled', 'completed']) FROM rides GROUP BY 1"
1029        >>> MacroEvaluator().transform(parse_one(sql)).sql()
1030        'SELECT date_day, SUM(CASE WHEN status = \\'cancelled\\' THEN 1 ELSE 0 END) AS "\\'cancelled\\'", SUM(CASE WHEN status = \\'completed\\' THEN 1 ELSE 0 END) AS "\\'completed\\'" FROM rides GROUP BY 1'
1031    """
1032    aggregates: t.List[exp.Expression] = []
1033    for value in values.expressions:
1034        proj = f"{agg.this}("
1035        if distinct.this:
1036            proj += "DISTINCT "
1037        proj += f"CASE WHEN {column} {cmp.this} {value} THEN {then_value} ELSE {else_value} END) "
1038        node = evaluator.parse_one(proj)
1039        if alias.this:
1040            node = node.as_(f"{prefix.this}{value}{suffix.this}", quoted=quote.this, copy=False)
1041        aggregates.append(node)
1042    return aggregates
1043
1044
1045@macro("AND")
1046def and_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expression]) -> exp.Condition:
1047    """Returns an AND statement filtering out any NULL expressions."""
1048    conditions = [e for e in expressions if not isinstance(e, exp.Null)]
1049
1050    if not conditions:
1051        return exp.true()
1052
1053    return exp.and_(*conditions, dialect=evaluator.dialect)
1054
1055
1056@macro("OR")
1057def or_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expression]) -> exp.Condition:
1058    """Returns an OR statement filtering out any NULL expressions."""
1059    conditions = [e for e in expressions if not isinstance(e, exp.Null)]
1060
1061    if not conditions:
1062        return exp.true()
1063
1064    return exp.or_(*conditions, dialect=evaluator.dialect)
1065
1066
1067@macro("VAR")
1068def var(
1069    evaluator: MacroEvaluator, var_name: exp.Expression, default: t.Optional[exp.Expression] = None
1070) -> exp.Expression:
1071    """Returns the value of a variable or the default value if the variable is not set."""
1072    if not var_name.is_string:
1073        raise SQLMeshError(f"Invalid variable name '{var_name.sql()}'. Expected a string literal.")
1074
1075    return exp.convert(evaluator.var(var_name.this, default))
1076
1077
1078def normalize_macro_name(name: str) -> str:
1079    """Prefix macro name with @ and upcase"""
1080    return f"@{name.upper()}"
1081
1082
1083for m in macro.get_registry().values():
1084    setattr(m, c.SQLMESH_BUILTIN, True)
class RuntimeStage(enum.Enum):
60class RuntimeStage(Enum):
61    LOADING = "loading"
62    CREATING = "creating"
63    EVALUATING = "evaluating"
64    TESTING = "testing"

An enumeration.

LOADING = <RuntimeStage.LOADING: 'loading'>
CREATING = <RuntimeStage.CREATING: 'creating'>
EVALUATING = <RuntimeStage.EVALUATING: 'evaluating'>
TESTING = <RuntimeStage.TESTING: 'testing'>
Inherited Members
enum.Enum
name
value
class MacroStrTemplate(string.Template):
67class MacroStrTemplate(Template):
68    delimiter = SQLMESH_MACRO_PREFIX

A string class for supporting $-substitutions.

Inherited Members
string.Template
Template
flags
substitute
safe_substitute
def SQL(x):
1823    def new_type(x):
1824        return x
class MacroDialect(sqlglot.executor.python.Python):
104class MacroDialect(Python):
105    class Generator(Python.Generator):
106        TRANSFORMS = {
107            **Python.Generator.TRANSFORMS,  # type: ignore
108            exp.Column: lambda self, e: f"exp.to_column('{self.sql(e, 'this')}')",
109            exp.Lambda: lambda self, e: f"lambda {self.expressions(e)}: {self.sql(e, 'this')}",
110            MacroFunc: _macro_func_sql,
111            MacroSQL: lambda self, e: _macro_sql(self.sql(e, "this"), e.args.get("into")),
112            MacroStrReplace: lambda self, e: _macro_str_replace(self.sql(e, "this")),
113        }
UNESCAPED_SEQUENCES: Dict[str, str] = {'\\a': '\x07', '\\b': '\x08', '\\f': '\x0c', '\\n': '\n', '\\r': '\r', '\\t': '\t', '\\v': '\x0b', '\\\\': '\\'}

Mapping of an escaped sequence (\n) to its unescaped version ( ).

Inherited Members
sqlglot.dialects.dialect.Dialect
Dialect
INDEX_OFFSET
WEEK_OFFSET
UNNEST_COLUMN_ONLY
ALIAS_POST_TABLESAMPLE
TABLESAMPLE_SIZE_IS_PERCENT
NORMALIZATION_STRATEGY
IDENTIFIERS_CAN_START_WITH_DIGIT
DPIPE_IS_STRING_CONCAT
STRICT_STRING_CONCAT
SUPPORTS_USER_DEFINED_TYPES
SUPPORTS_SEMI_ANTI_JOIN
NORMALIZE_FUNCTIONS
LOG_BASE_FIRST
NULL_ORDERING
TYPED_DIVISION
SAFE_DIVISION
CONCAT_COALESCE
TIME_MAPPING
FORMAT_MAPPING
PSEUDOCOLUMNS
PREFER_CTE_ALIAS_COLUMN
get_or_raise
format_time
normalize_identifier
case_sensitive
can_identify
quote_identifier
to_json_path
parse
parse_into
generate
transpile
tokenize
parser
generator
sqlglot.executor.python.Python
Tokenizer
class MacroDialect.Generator(sqlglot.executor.python.Python.Generator):
105    class Generator(Python.Generator):
106        TRANSFORMS = {
107            **Python.Generator.TRANSFORMS,  # type: ignore
108            exp.Column: lambda self, e: f"exp.to_column('{self.sql(e, 'this')}')",
109            exp.Lambda: lambda self, e: f"lambda {self.expressions(e)}: {self.sql(e, 'this')}",
110            MacroFunc: _macro_func_sql,
111            MacroSQL: lambda self, e: _macro_sql(self.sql(e, "this"), e.args.get("into")),
112            MacroStrReplace: lambda self, e: _macro_str_replace(self.sql(e, "this")),
113        }

Generator converts a given syntax tree to the corresponding SQL string.

Arguments:
  • pretty: Whether to format the produced SQL string. Default: False.
  • identify: Determines when an identifier should be quoted. Possible values are: False (default): Never quote, except in cases where it's mandatory by the dialect. True or 'always': Always quote. 'safe': Only quote identifiers that are case insensitive.
  • normalize: Whether to normalize identifiers to lowercase. Default: False.
  • pad: The pad size in a formatted string. For example, this affects the indentation of a projection in a query, relative to its nesting level. Default: 2.
  • indent: The indentation size in a formatted string. For example, this affects the indentation of subqueries and filters under a WHERE clause. Default: 2.
  • normalize_functions: How to normalize function names. Possible values are: "upper" or True (default): Convert names to uppercase. "lower": Convert names to lowercase. False: Disables function name normalization.
  • unsupported_level: Determines the generator's behavior when it encounters unsupported expressions. Default ErrorLevel.WARN.
  • max_unsupported: Maximum number of unsupported messages to include in a raised UnsupportedError. This is only relevant if unsupported_level is ErrorLevel.RAISE. Default: 3
  • leading_comma: Whether the comma is leading or trailing in select expressions. This is only relevant when generating in pretty mode. Default: False
  • max_text_width: The max number of characters in a segment before creating new lines in pretty mode. The default is on the smaller end because the length only represents a segment and not the true line length. Default: 80
  • comments: Whether to preserve comments in the output SQL code. Default: True
Inherited Members
sqlglot.generator.Generator
Generator
generate
preprocess
unsupported
sep
seg
pad_comment
maybe_comment
wrap
no_identify
normalize_func
indent
sql
uncache_sql
cache_sql
characterset_sql
column_parts
column_sql
columnposition_sql
columndef_sql
columnconstraint_sql
computedcolumnconstraint_sql
autoincrementcolumnconstraint_sql
compresscolumnconstraint_sql
generatedasidentitycolumnconstraint_sql
generatedasrowcolumnconstraint_sql
periodforsystemtimeconstraint_sql
notnullcolumnconstraint_sql
transformcolumnconstraint_sql
primarykeycolumnconstraint_sql
uniquecolumnconstraint_sql
createable_sql
create_sql
sequenceproperties_sql
clone_sql
describe_sql
heredoc_sql
prepend_ctes
with_sql
cte_sql
tablealias_sql
bitstring_sql
hexstring_sql
bytestring_sql
unicodestring_sql
rawstring_sql
datatypeparam_sql
datatype_sql
directory_sql
delete_sql
drop_sql
except_sql
except_op
fetch_sql
filter_sql
hint_sql
indexparameters_sql
index_sql
identifier_sql
inputoutputformat_sql
national_sql
partition_sql
properties_sql
root_properties
properties
with_properties
locate_properties
property_name
property_sql
likeproperty_sql
fallbackproperty_sql
journalproperty_sql
freespaceproperty_sql
checksumproperty_sql
mergeblockratioproperty_sql
datablocksizeproperty_sql
blockcompressionproperty_sql
isolatedloadingproperty_sql
partitionboundspec_sql
partitionedofproperty_sql
lockingproperty_sql
withdataproperty_sql
withsystemversioningproperty_sql
insert_sql
intersect_sql
intersect_op
introducer_sql
kill_sql
pseudotype_sql
objectidentifier_sql
onconflict_sql
returning_sql
rowformatdelimitedproperty_sql
withtablehint_sql
indextablehint_sql
historicaldata_sql
table_parts
table_sql
tablesample_sql
pivot_sql
version_sql
tuple_sql
update_sql
values_sql
var_sql
into_sql
from_sql
group_sql
having_sql
connect_sql
prior_sql
join_sql
lambda_sql
lateral_op
lateral_sql
limit_sql
offset_sql
setitem_sql
set_sql
pragma_sql
lock_sql
literal_sql
escape_str
loaddata_sql
null_sql
boolean_sql
order_sql
withfill_sql
cluster_sql
distribute_sql
sort_sql
ordered_sql
matchrecognizemeasure_sql
matchrecognize_sql
query_modifiers
queryoption_sql
offset_limit_modifiers
after_limit_modifiers
select_sql
schema_sql
schema_columns_sql
star_sql
parameter_sql
sessionparameter_sql
placeholder_sql
subquery_sql
qualify_sql
set_operations
union_sql
union_op
unnest_sql
prewhere_sql
where_sql
window_sql
partition_by_sql
windowspec_sql
withingroup_sql
between_sql
bracket_offset_expressions
bracket_sql
all_sql
any_sql
exists_sql
case_sql
constraint_sql
nextvaluefor_sql
extract_sql
trim_sql
convert_concat_args
concat_sql
concatws_sql
check_sql
foreignkey_sql
primarykey_sql
if_sql
matchagainst_sql
jsonkeyvalue_sql
jsonpath_sql
json_path_part
formatjson_sql
jsonobject_sql
jsonobjectagg_sql
jsonarray_sql
jsonarrayagg_sql
jsoncolumndef_sql
jsonschema_sql
jsontable_sql
openjsoncolumndef_sql
openjson_sql
in_sql
in_unnest_op
interval_sql
return_sql
reference_sql
anonymous_sql
paren_sql
neg_sql
not_sql
alias_sql
pivotalias_sql
aliases_sql
atindex_sql
attimezone_sql
fromtimezone_sql
add_sql
and_sql
or_sql
xor_sql
connector_sql
bitwiseand_sql
bitwiseleftshift_sql
bitwisenot_sql
bitwiseor_sql
bitwiserightshift_sql
bitwisexor_sql
cast_sql
currentdate_sql
currenttimestamp_sql
collate_sql
command_sql
comment_sql
mergetreettlaction_sql
mergetreettl_sql
transaction_sql
commit_sql
rollback_sql
altercolumn_sql
alterdiststyle_sql
altersortkey_sql
renametable_sql
renamecolumn_sql
altertable_sql
add_column_sql
droppartition_sql
addconstraint_sql
distinct_sql
ignorenulls_sql
respectnulls_sql
havingmax_sql
intdiv_sql
dpipe_sql
div_sql
overlaps_sql
distance_sql
dot_sql
eq_sql
propertyeq_sql
escape_sql
glob_sql
gt_sql
gte_sql
ilike_sql
ilikeany_sql
is_sql
like_sql
likeany_sql
similarto_sql
lt_sql
lte_sql
mod_sql
mul_sql
neq_sql
nullsafeeq_sql
nullsafeneq_sql
slice_sql
sub_sql
trycast_sql
try_sql
log_sql
use_sql
binary
function_fallback_sql
func
format_args
too_wide
format_time
expressions
op_expressions
naked_property
tag_sql
token_sql
userdefinedfunction_sql
joinhint_sql
kwarg_sql
when_sql
merge_sql
tochar_sql
tonumber_sql
dictproperty_sql
dictrange_sql
dictsubproperty_sql
oncluster_sql
clusteredbyproperty_sql
anyvalue_sql
querytransform_sql
indexconstraintoption_sql
checkcolumnconstraint_sql
indexcolumnconstraint_sql
nvl2_sql
comprehension_sql
columnprefix_sql
opclass_sql
predict_sql
forin_sql
refresh_sql
operator_sql
toarray_sql
tsordstotime_sql
tsordstotimestamp_sql
tsordstodate_sql
unixdate_sql
lastday_sql
dateadd_sql
arrayany_sql
generateseries_sql
struct_sql
partitionrange_sql
truncatetable_sql
convert_sql
copyparameter_sql
credentials_sql
copy_sql
semicolon_sql
class MacroEvaluator:
116class MacroEvaluator:
117    """The class responsible for evaluating SQLMesh Macros/SQL.
118
119    SQLMesh supports special preprocessed SQL prefixed with `@`. Although it provides similar power to
120    traditional methods like string templating, there is semantic understanding of SQL which prevents
121    common errors like leading/trailing commas, syntax errors, etc.
122
123    SQLMesh SQL allows for macro variables and macro functions. Macro variables take the form of @variable. These are used for variable substitution.
124
125    SELECT * FROM foo WHERE ds BETWEEN @start_date AND @end_date
126
127    Macro variables can be defined with a special macro function.
128
129    @DEF(start_date, '2021-01-01')
130
131    Args:
132        dialect: Dialect of the SQL to evaluate.
133        python_env: Serialized Python environment.
134    """
135
136    def __init__(
137        self,
138        dialect: str = "",
139        python_env: t.Optional[t.Dict[str, Executable]] = None,
140        jinja_env: t.Optional[Environment] = None,
141        schema: t.Optional[MappingSchema] = None,
142        runtime_stage: RuntimeStage = RuntimeStage.LOADING,
143        resolve_tables: t.Optional[t.Callable[[exp.Expression], exp.Expression]] = None,
144        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
145        default_catalog: t.Optional[str] = None,
146        path: Path = Path(),
147    ):
148        self.dialect = dialect
149        self.generator = MacroDialect().generator()
150        self.locals: t.Dict[str, t.Any] = {
151            "runtime_stage": runtime_stage.value,
152            "default_catalog": default_catalog,
153        }
154        self.env = {
155            **ENV,
156            "self": self,
157            "SQL": SQL,
158            "MacroEvaluator": MacroEvaluator,
159        }
160        self.python_env = python_env or {}
161        self._jinja_env: t.Optional[Environment] = jinja_env
162        self.macros = {normalize_macro_name(k): v.func for k, v in macro.get_registry().items()}
163        self._schema = schema
164        self._resolve_tables = resolve_tables
165        self.columns_to_types_called = False
166        self._snapshots = snapshots if snapshots is not None else {}
167        self.default_catalog = default_catalog
168        self._path = path
169
170        prepare_env(self.python_env, self.env)
171        for k, v in self.python_env.items():
172            if v.is_definition:
173                self.macros[normalize_macro_name(k)] = self.env[v.name or k]
174            elif v.is_import and getattr(self.env.get(k), c.SQLMESH_MACRO, None):
175                self.macros[normalize_macro_name(k)] = self.env[k]
176            elif v.is_value:
177                self.locals[k] = self.env[k]
178
179    def send(
180        self, name: str, *args: t.Any, **kwargs: t.Any
181    ) -> t.Union[None, exp.Expression, t.List[exp.Expression]]:
182        func = self.macros.get(normalize_macro_name(name))
183
184        if not callable(func):
185            raise SQLMeshError(f"Macro '{name}' does not exist.")
186
187        try:
188            annotations = t.get_type_hints(func)
189        except NameError:  # forward references aren't handled
190            annotations = {}
191
192        if annotations:
193            spec = inspect.getfullargspec(func)
194            callargs = inspect.getcallargs(func, self, *args, **kwargs)
195            new_args: t.List[t.Any] = []
196
197            for arg, value in callargs.items():
198                typ = annotations.get(arg)
199
200                if value is self:
201                    continue
202                if arg == spec.varargs:
203                    new_args.extend(self._coerce(v, typ) for v in value)
204                elif arg == spec.varkw:
205                    for k, v in value.items():
206                        kwargs[k] = self._coerce(v, typ)
207                elif arg in kwargs:
208                    kwargs[arg] = self._coerce(value, typ)
209                else:
210                    new_args.append(self._coerce(value, typ))
211
212            args = new_args  # type: ignore
213
214        try:
215            return func(self, *args, **kwargs)
216        except Exception as e:
217            print_exception(e, self.python_env)
218            raise MacroEvalError("Error trying to eval macro.") from e
219
220    def transform(
221        self, expression: exp.Expression
222    ) -> exp.Expression | t.List[exp.Expression] | None:
223        changed = False
224
225        def evaluate_macros(
226            node: exp.Expression,
227        ) -> exp.Expression | t.List[exp.Expression] | None:
228            nonlocal changed
229
230            if isinstance(node, MacroVar):
231                changed = True
232                variables = self.locals.get(c.SQLMESH_VARS, {})
233                if node.name not in self.locals and node.name.lower() not in variables:
234                    if not isinstance(node.parent, StagedFilePath):
235                        raise SQLMeshError(f"Macro variable '{node.name}' is undefined.")
236
237                    return node
238
239                value = self.locals.get(node.name, variables.get(node.name.lower()))
240                if isinstance(value, list):
241                    return exp.convert(
242                        tuple(
243                            self.transform(v) if isinstance(v, exp.Expression) else v for v in value
244                        )
245                    )
246                return exp.convert(
247                    self.transform(value) if isinstance(value, exp.Expression) else value
248                )
249            if isinstance(node, exp.Identifier) and "@" in node.this:
250                text = self.template(node.this, self.locals)
251                if node.this != text:
252                    changed = True
253                    node.args["this"] = text
254                    return node
255            if node.is_string:
256                text = node.this
257                if has_jinja(text):
258                    changed = True
259                    node.set("this", self.jinja_env.from_string(node.this).render())
260                return node
261            if isinstance(node, MacroFunc):
262                changed = True
263                return self.evaluate(node)
264            return node
265
266        transformed = exp.replace_tree(
267            expression.copy(), evaluate_macros, prune=lambda n: isinstance(n, exp.Lambda)
268        )
269
270        if changed:
271            # the transformations could have corrupted the ast, turning this into sql and reparsing ensures
272            # that the ast is correct
273            if isinstance(transformed, list):
274                return [
275                    self.parse_one(node.sql(dialect=self.dialect, copy=False))
276                    for node in transformed
277                ]
278            elif isinstance(transformed, exp.Expression):
279                return self.parse_one(transformed.sql(dialect=self.dialect, copy=False))
280
281        return transformed
282
283    def template(self, text: t.Any, local_variables: t.Dict[str, t.Any]) -> str:
284        """Substitute @vars with locals.
285
286        Args:
287            text: The string to do substitition on.
288            local_variables: Local variables in the context so that lambdas can be used.
289
290        Returns:
291           The rendered string.
292        """
293        mapping = {}
294
295        variables = self.locals.get(c.SQLMESH_VARS, {})
296
297        for k, v in chain(variables.items(), self.locals.items(), local_variables.items()):
298            # try to convert all variables into sqlglot expressions
299            # because they're going to be converted into strings in sql
300            # we use bare Exception instead of ValueError because there's
301            # a recursive error with MagicMock.
302            # we don't convert strings because that would result in adding quotes
303            if not isinstance(v, str):
304                try:
305                    v = exp.convert(v)
306                except Exception:
307                    pass
308
309            if isinstance(v, exp.Expression):
310                v = v.sql(dialect=self.dialect)
311            mapping[k] = v
312
313        return MacroStrTemplate(str(text)).safe_substitute(mapping)
314
315    def evaluate(self, node: MacroFunc) -> exp.Expression | t.List[exp.Expression] | None:
316        if isinstance(node, MacroDef):
317            if isinstance(node.expression, exp.Lambda):
318                _, fn = _norm_var_arg_lambda(self, node.expression)
319                self.macros[normalize_macro_name(node.name)] = lambda _, *args: fn(
320                    args[0] if len(args) == 1 else exp.Tuple(expressions=list(args))
321                )
322            else:
323                self.locals[node.name] = self.transform(node.expression)
324            return node
325
326        if isinstance(node, (MacroSQL, MacroStrReplace)):
327            result: t.Optional[exp.Expression | t.List[exp.Expression]] = exp.convert(
328                self.eval_expression(node)
329            )
330        else:
331            func = t.cast(exp.Anonymous, node.this)
332
333            args = []
334            kwargs = {}
335            for e in func.expressions:
336                if isinstance(e, exp.PropertyEQ):
337                    kwargs[e.this.name] = e.expression
338                else:
339                    if kwargs:
340                        raise MacroEvalError(
341                            f"Positional argument cannot follow keyword argument.\n  {func.sql(dialect=self.dialect)} at '{self._path}'"
342                        )
343
344                    args.append(e)
345
346            result = self.send(func.name, *args, **kwargs)
347
348        if result is None:
349            return None
350
351        if isinstance(result, (tuple, list)):
352            return [self.parse_one(item) for item in result if item is not None]
353        return self.parse_one(result)
354
355    def eval_expression(self, node: t.Any) -> t.Any:
356        """Converts a SQLGlot expression into executable Python code and evals it.
357
358        If the node is not an expression, it will simply be returned.
359
360        Args:
361            node: expression
362        Returns:
363            The return value of the evaled Python Code.
364        """
365        if not isinstance(node, exp.Expression):
366            return node
367        code = node.sql()
368        try:
369            code = self.generator.generate(node)
370            return eval(code, self.env, self.locals)
371        except Exception as e:
372            print_exception(e, self.python_env)
373            raise MacroEvalError(
374                f"Error trying to eval macro.\n\nGenerated code: {code}\n\nOriginal sql: {node}"
375            ) from e
376
377    def parse_one(
378        self, sql: str | exp.Expression, into: t.Optional[exp.IntoType] = None, **opts: t.Any
379    ) -> exp.Expression:
380        """Parses the given SQL string and returns a syntax tree for the first
381        parsed SQL statement.
382
383        Args:
384            sql: the SQL code or expression to parse.
385            into: the Expression to parse into
386            **opts: other options
387
388        Returns:
389            Expression: the syntax tree for the first parsed statement
390        """
391        return sqlglot.maybe_parse(sql, dialect=self.dialect, into=into, **opts)
392
393    @property
394    def jinja_env(self) -> Environment:
395        if not self._jinja_env:
396            jinja_env_methods = {**self.locals, **self.env}
397            del jinja_env_methods["self"]
398            self._jinja_env = JinjaMacroRegistry().build_environment(**jinja_env_methods)
399        return self._jinja_env
400
401    def columns_to_types(self, model_name: TableName | exp.Column) -> t.Dict[str, exp.DataType]:
402        """Returns the columns-to-types mapping corresponding to the specified model."""
403        if self._schema is None or self._schema.empty:
404            self.columns_to_types_called = True
405            return {"__schema_unavailable_at_load__": exp.DataType.build("unknown")}
406
407        normalized_model_name = normalize_model_name(
408            model_name,
409            default_catalog=self.default_catalog,
410            dialect=self.dialect,
411        )
412        columns_to_types = self._schema.find(exp.to_table(normalized_model_name))
413        if columns_to_types is None:
414            raise SQLMeshError(f"Schema for model '{model_name}' can't be statically determined.")
415
416        return columns_to_types
417
418    def get_snapshot(self, model_name: TableName | exp.Column) -> t.Optional[Snapshot]:
419        """Returns the snapshot that corresponds to the given model name."""
420        return self._snapshots.get(
421            normalize_model_name(
422                model_name,
423                default_catalog=self.default_catalog,
424                dialect=self.dialect,
425            )
426        )
427
428    def resolve_tables(self, query: exp.Expression) -> exp.Expression:
429        """Resolves queries with references to SQLMesh model names to their physical tables."""
430        if not self._resolve_tables:
431            raise SQLMeshError(
432                "Macro evaluator not properly initialized with resolve_tables lambda."
433            )
434        return self._resolve_tables(query)
435
436    @property
437    def runtime_stage(self) -> RuntimeStage:
438        """Returns the current runtime stage of the macro evaluation."""
439        return self.locals["runtime_stage"]
440
441    @property
442    def engine_adapter(self) -> EngineAdapter:
443        engine_adapter = self.locals.get("engine_adapter")
444        if not engine_adapter:
445            raise SQLMeshError(
446                "The engine adapter is not available while models are loading."
447                " You can gate these calls by checking in Python: evaluator.runtime_stage != 'loading' or SQL: @runtime_stage <> 'loading'."
448            )
449        return self.locals["engine_adapter"]
450
451    @property
452    def gateway(self) -> t.Optional[str]:
453        """Returns the gateway name."""
454        return self.var(c.GATEWAY)
455
456    def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
457        """Returns the value of the specified variable, or the default value if it doesn't exist."""
458        return (self.locals.get(c.SQLMESH_VARS) or {}).get(var_name.lower(), default)
459
460    def _coerce(self, expr: exp.Expression, typ: t.Any, strict: bool = False) -> t.Any:
461        """Coerces the given expression to the specified type on a best-effort basis."""
462        base_err_msg = f"Failed to coerce expression '{expr}' to type '{typ}'."
463        try:
464            if typ is None or typ is t.Any:
465                return expr
466            base = t.get_origin(typ) or typ
467
468            # We need to handle Union and TypeVars first since we cannot use isinstance with it
469            if base in UNION_TYPES:
470                for branch in t.get_args(typ):
471                    try:
472                        return self._coerce(expr, branch, True)
473                    except Exception:
474                        pass
475                raise SQLMeshError(base_err_msg)
476            if base is SQL and isinstance(expr, exp.Expression):
477                return expr.sql(self.dialect)
478
479            if isinstance(expr, base):
480                return expr
481            if issubclass(base, exp.Expression):
482                d = Dialect.get_or_raise(self.dialect)
483                into = base if base in d.parser().EXPRESSION_PARSERS else None
484                if into is None:
485                    if isinstance(expr, exp.Literal):
486                        coerced = parse_one(expr.this)
487                    else:
488                        raise SQLMeshError(
489                            f"{base_err_msg} Coercion to {base} requires a literal expression."
490                        )
491                else:
492                    coerced = parse_one(
493                        expr.this if isinstance(expr, exp.Literal) else expr.sql(), into=into
494                    )
495                if isinstance(coerced, base):
496                    return coerced
497                raise SQLMeshError(base_err_msg)
498
499            if base in (int, float, str) and isinstance(expr, exp.Literal):
500                return base(expr.this)
501            if base is str and isinstance(expr, exp.Column) and not expr.table:
502                return expr.name
503            if base is bool and isinstance(expr, exp.Boolean):
504                return expr.this
505            # if base is str and isinstance(expr, exp.Expression):
506            #    return expr.sql(self.dialect)
507            if base is tuple and isinstance(expr, (exp.Tuple, exp.Array)):
508                generic = t.get_args(typ)
509                if not generic:
510                    return tuple(expr.expressions)
511                if generic[-1] is ...:
512                    return tuple(self._coerce(expr, generic[0]) for expr in expr.expressions)
513                elif len(generic) == len(expr.expressions):
514                    return tuple(
515                        self._coerce(expr, generic[i]) for i, expr in enumerate(expr.expressions)
516                    )
517                raise SQLMeshError(f"{base_err_msg} Expected {len(generic)} items.")
518            if base is list and isinstance(expr, (exp.Array, exp.Tuple)):
519                generic = t.get_args(typ)
520                if not generic:
521                    return expr.expressions
522                return [self._coerce(expr, generic[0]) for expr in expr.expressions]
523            raise SQLMeshError(base_err_msg)
524        except Exception:
525            if strict:
526                raise
527            logger.error(
528                "Coercion of expression '%s' to type '%s' failed. Using non coerced expression at '%s'",
529                expr,
530                typ,
531                self._path,
532            )
533            return expr

The class responsible for evaluating SQLMesh Macros/SQL.

SQLMesh supports special preprocessed SQL prefixed with @. Although it provides similar power to traditional methods like string templating, there is semantic understanding of SQL which prevents common errors like leading/trailing commas, syntax errors, etc.

SQLMesh SQL allows for macro variables and macro functions. Macro variables take the form of @variable. These are used for variable substitution.

SELECT * FROM foo WHERE ds BETWEEN @start_date AND @end_date

Macro variables can be defined with a special macro function.

@DEF(start_date, '2021-01-01')

Arguments:
  • dialect: Dialect of the SQL to evaluate.
  • python_env: Serialized Python environment.
MacroEvaluator( dialect: str = '', python_env: Union[Dict[str, sqlmesh.utils.metaprogramming.Executable], NoneType] = None, jinja_env: Union[jinja2.environment.Environment, NoneType] = None, schema: Union[sqlglot.schema.MappingSchema, NoneType] = None, runtime_stage: sqlmesh.core.macros.RuntimeStage = <RuntimeStage.LOADING: 'loading'>, resolve_tables: Union[Callable[[sqlglot.expressions.Expression], sqlglot.expressions.Expression], NoneType] = None, snapshots: Union[Dict[str, <MagicMock id='140338245316416'>], NoneType] = None, default_catalog: Union[str, NoneType] = None, path: pathlib.Path = PosixPath('.'))
136    def __init__(
137        self,
138        dialect: str = "",
139        python_env: t.Optional[t.Dict[str, Executable]] = None,
140        jinja_env: t.Optional[Environment] = None,
141        schema: t.Optional[MappingSchema] = None,
142        runtime_stage: RuntimeStage = RuntimeStage.LOADING,
143        resolve_tables: t.Optional[t.Callable[[exp.Expression], exp.Expression]] = None,
144        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
145        default_catalog: t.Optional[str] = None,
146        path: Path = Path(),
147    ):
148        self.dialect = dialect
149        self.generator = MacroDialect().generator()
150        self.locals: t.Dict[str, t.Any] = {
151            "runtime_stage": runtime_stage.value,
152            "default_catalog": default_catalog,
153        }
154        self.env = {
155            **ENV,
156            "self": self,
157            "SQL": SQL,
158            "MacroEvaluator": MacroEvaluator,
159        }
160        self.python_env = python_env or {}
161        self._jinja_env: t.Optional[Environment] = jinja_env
162        self.macros = {normalize_macro_name(k): v.func for k, v in macro.get_registry().items()}
163        self._schema = schema
164        self._resolve_tables = resolve_tables
165        self.columns_to_types_called = False
166        self._snapshots = snapshots if snapshots is not None else {}
167        self.default_catalog = default_catalog
168        self._path = path
169
170        prepare_env(self.python_env, self.env)
171        for k, v in self.python_env.items():
172            if v.is_definition:
173                self.macros[normalize_macro_name(k)] = self.env[v.name or k]
174            elif v.is_import and getattr(self.env.get(k), c.SQLMESH_MACRO, None):
175                self.macros[normalize_macro_name(k)] = self.env[k]
176            elif v.is_value:
177                self.locals[k] = self.env[k]
def send( self, name: str, *args: Any, **kwargs: Any) -> Union[NoneType, sqlglot.expressions.Expression, List[sqlglot.expressions.Expression]]:
179    def send(
180        self, name: str, *args: t.Any, **kwargs: t.Any
181    ) -> t.Union[None, exp.Expression, t.List[exp.Expression]]:
182        func = self.macros.get(normalize_macro_name(name))
183
184        if not callable(func):
185            raise SQLMeshError(f"Macro '{name}' does not exist.")
186
187        try:
188            annotations = t.get_type_hints(func)
189        except NameError:  # forward references aren't handled
190            annotations = {}
191
192        if annotations:
193            spec = inspect.getfullargspec(func)
194            callargs = inspect.getcallargs(func, self, *args, **kwargs)
195            new_args: t.List[t.Any] = []
196
197            for arg, value in callargs.items():
198                typ = annotations.get(arg)
199
200                if value is self:
201                    continue
202                if arg == spec.varargs:
203                    new_args.extend(self._coerce(v, typ) for v in value)
204                elif arg == spec.varkw:
205                    for k, v in value.items():
206                        kwargs[k] = self._coerce(v, typ)
207                elif arg in kwargs:
208                    kwargs[arg] = self._coerce(value, typ)
209                else:
210                    new_args.append(self._coerce(value, typ))
211
212            args = new_args  # type: ignore
213
214        try:
215            return func(self, *args, **kwargs)
216        except Exception as e:
217            print_exception(e, self.python_env)
218            raise MacroEvalError("Error trying to eval macro.") from e
def transform( self, expression: sqlglot.expressions.Expression) -> 'exp.Expression | t.List[exp.Expression] | None':
220    def transform(
221        self, expression: exp.Expression
222    ) -> exp.Expression | t.List[exp.Expression] | None:
223        changed = False
224
225        def evaluate_macros(
226            node: exp.Expression,
227        ) -> exp.Expression | t.List[exp.Expression] | None:
228            nonlocal changed
229
230            if isinstance(node, MacroVar):
231                changed = True
232                variables = self.locals.get(c.SQLMESH_VARS, {})
233                if node.name not in self.locals and node.name.lower() not in variables:
234                    if not isinstance(node.parent, StagedFilePath):
235                        raise SQLMeshError(f"Macro variable '{node.name}' is undefined.")
236
237                    return node
238
239                value = self.locals.get(node.name, variables.get(node.name.lower()))
240                if isinstance(value, list):
241                    return exp.convert(
242                        tuple(
243                            self.transform(v) if isinstance(v, exp.Expression) else v for v in value
244                        )
245                    )
246                return exp.convert(
247                    self.transform(value) if isinstance(value, exp.Expression) else value
248                )
249            if isinstance(node, exp.Identifier) and "@" in node.this:
250                text = self.template(node.this, self.locals)
251                if node.this != text:
252                    changed = True
253                    node.args["this"] = text
254                    return node
255            if node.is_string:
256                text = node.this
257                if has_jinja(text):
258                    changed = True
259                    node.set("this", self.jinja_env.from_string(node.this).render())
260                return node
261            if isinstance(node, MacroFunc):
262                changed = True
263                return self.evaluate(node)
264            return node
265
266        transformed = exp.replace_tree(
267            expression.copy(), evaluate_macros, prune=lambda n: isinstance(n, exp.Lambda)
268        )
269
270        if changed:
271            # the transformations could have corrupted the ast, turning this into sql and reparsing ensures
272            # that the ast is correct
273            if isinstance(transformed, list):
274                return [
275                    self.parse_one(node.sql(dialect=self.dialect, copy=False))
276                    for node in transformed
277                ]
278            elif isinstance(transformed, exp.Expression):
279                return self.parse_one(transformed.sql(dialect=self.dialect, copy=False))
280
281        return transformed
def template(self, text: Any, local_variables: Dict[str, Any]) -> str:
283    def template(self, text: t.Any, local_variables: t.Dict[str, t.Any]) -> str:
284        """Substitute @vars with locals.
285
286        Args:
287            text: The string to do substitition on.
288            local_variables: Local variables in the context so that lambdas can be used.
289
290        Returns:
291           The rendered string.
292        """
293        mapping = {}
294
295        variables = self.locals.get(c.SQLMESH_VARS, {})
296
297        for k, v in chain(variables.items(), self.locals.items(), local_variables.items()):
298            # try to convert all variables into sqlglot expressions
299            # because they're going to be converted into strings in sql
300            # we use bare Exception instead of ValueError because there's
301            # a recursive error with MagicMock.
302            # we don't convert strings because that would result in adding quotes
303            if not isinstance(v, str):
304                try:
305                    v = exp.convert(v)
306                except Exception:
307                    pass
308
309            if isinstance(v, exp.Expression):
310                v = v.sql(dialect=self.dialect)
311            mapping[k] = v
312
313        return MacroStrTemplate(str(text)).safe_substitute(mapping)

Substitute @vars with locals.

Arguments:
  • text: The string to do substitition on.
  • local_variables: Local variables in the context so that lambdas can be used.
Returns:

The rendered string.

def evaluate( self, node: sqlmesh.core.dialect.MacroFunc) -> 'exp.Expression | t.List[exp.Expression] | None':
315    def evaluate(self, node: MacroFunc) -> exp.Expression | t.List[exp.Expression] | None:
316        if isinstance(node, MacroDef):
317            if isinstance(node.expression, exp.Lambda):
318                _, fn = _norm_var_arg_lambda(self, node.expression)
319                self.macros[normalize_macro_name(node.name)] = lambda _, *args: fn(
320                    args[0] if len(args) == 1 else exp.Tuple(expressions=list(args))
321                )
322            else:
323                self.locals[node.name] = self.transform(node.expression)
324            return node
325
326        if isinstance(node, (MacroSQL, MacroStrReplace)):
327            result: t.Optional[exp.Expression | t.List[exp.Expression]] = exp.convert(
328                self.eval_expression(node)
329            )
330        else:
331            func = t.cast(exp.Anonymous, node.this)
332
333            args = []
334            kwargs = {}
335            for e in func.expressions:
336                if isinstance(e, exp.PropertyEQ):
337                    kwargs[e.this.name] = e.expression
338                else:
339                    if kwargs:
340                        raise MacroEvalError(
341                            f"Positional argument cannot follow keyword argument.\n  {func.sql(dialect=self.dialect)} at '{self._path}'"
342                        )
343
344                    args.append(e)
345
346            result = self.send(func.name, *args, **kwargs)
347
348        if result is None:
349            return None
350
351        if isinstance(result, (tuple, list)):
352            return [self.parse_one(item) for item in result if item is not None]
353        return self.parse_one(result)
def eval_expression(self, node: Any) -> Any:
355    def eval_expression(self, node: t.Any) -> t.Any:
356        """Converts a SQLGlot expression into executable Python code and evals it.
357
358        If the node is not an expression, it will simply be returned.
359
360        Args:
361            node: expression
362        Returns:
363            The return value of the evaled Python Code.
364        """
365        if not isinstance(node, exp.Expression):
366            return node
367        code = node.sql()
368        try:
369            code = self.generator.generate(node)
370            return eval(code, self.env, self.locals)
371        except Exception as e:
372            print_exception(e, self.python_env)
373            raise MacroEvalError(
374                f"Error trying to eval macro.\n\nGenerated code: {code}\n\nOriginal sql: {node}"
375            ) from e

Converts a SQLGlot expression into executable Python code and evals it.

If the node is not an expression, it will simply be returned.

Arguments:
  • node: expression
Returns:

The return value of the evaled Python Code.

def parse_one( self, sql: 'str | exp.Expression', into: Union[str, Type[sqlglot.expressions.Expression], Collection[Union[str, Type[sqlglot.expressions.Expression]]], NoneType] = None, **opts: Any) -> sqlglot.expressions.Expression:
377    def parse_one(
378        self, sql: str | exp.Expression, into: t.Optional[exp.IntoType] = None, **opts: t.Any
379    ) -> exp.Expression:
380        """Parses the given SQL string and returns a syntax tree for the first
381        parsed SQL statement.
382
383        Args:
384            sql: the SQL code or expression to parse.
385            into: the Expression to parse into
386            **opts: other options
387
388        Returns:
389            Expression: the syntax tree for the first parsed statement
390        """
391        return sqlglot.maybe_parse(sql, dialect=self.dialect, into=into, **opts)

Parses the given SQL string and returns a syntax tree for the first parsed SQL statement.

Arguments:
  • sql: the SQL code or expression to parse.
  • into: the Expression to parse into
  • **opts: other options
Returns:

Expression: the syntax tree for the first parsed statement

def columns_to_types( self, model_name: <MagicMock name='mock.__or__()' id='140338238674832'>) -> Dict[str, sqlglot.expressions.DataType]:
401    def columns_to_types(self, model_name: TableName | exp.Column) -> t.Dict[str, exp.DataType]:
402        """Returns the columns-to-types mapping corresponding to the specified model."""
403        if self._schema is None or self._schema.empty:
404            self.columns_to_types_called = True
405            return {"__schema_unavailable_at_load__": exp.DataType.build("unknown")}
406
407        normalized_model_name = normalize_model_name(
408            model_name,
409            default_catalog=self.default_catalog,
410            dialect=self.dialect,
411        )
412        columns_to_types = self._schema.find(exp.to_table(normalized_model_name))
413        if columns_to_types is None:
414            raise SQLMeshError(f"Schema for model '{model_name}' can't be statically determined.")
415
416        return columns_to_types

Returns the columns-to-types mapping corresponding to the specified model.

def get_snapshot( self, model_name: <MagicMock name='mock.__or__()' id='140338238774720'>) -> Union[<MagicMock id='140338238697968'>, NoneType]:
418    def get_snapshot(self, model_name: TableName | exp.Column) -> t.Optional[Snapshot]:
419        """Returns the snapshot that corresponds to the given model name."""
420        return self._snapshots.get(
421            normalize_model_name(
422                model_name,
423                default_catalog=self.default_catalog,
424                dialect=self.dialect,
425            )
426        )

Returns the snapshot that corresponds to the given model name.

def resolve_tables( self, query: sqlglot.expressions.Expression) -> sqlglot.expressions.Expression:
428    def resolve_tables(self, query: exp.Expression) -> exp.Expression:
429        """Resolves queries with references to SQLMesh model names to their physical tables."""
430        if not self._resolve_tables:
431            raise SQLMeshError(
432                "Macro evaluator not properly initialized with resolve_tables lambda."
433            )
434        return self._resolve_tables(query)

Resolves queries with references to SQLMesh model names to their physical tables.

Returns the current runtime stage of the macro evaluation.

gateway: Union[str, NoneType]

Returns the gateway name.

def var( self, var_name: str, default: Union[Any, NoneType] = None) -> Union[Any, NoneType]:
456    def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
457        """Returns the value of the specified variable, or the default value if it doesn't exist."""
458        return (self.locals.get(c.SQLMESH_VARS) or {}).get(var_name.lower(), default)

Returns the value of the specified variable, or the default value if it doesn't exist.

class macro(sqlmesh.utils.registry_decorator):
536class macro(registry_decorator):
537    """Specifies a function is a macro and registers it the global MACROS registry.
538
539    Registered macros can be referenced in SQL statements to make queries more dynamic/cleaner.
540
541    Example:
542        from sqlglot import exp
543        from sqlmesh.core.macros import MacroEvaluator, macro
544
545        @macro()
546        def add_one(evaluator: MacroEvaluator, column: exp.Literal) -> exp.Add:
547            return evaluator.parse_one(f"{column} + 1")
548
549    Args:
550        name: A custom name for the macro, the default is the name of the function.
551    """
552
553    registry_name = "macros"
554
555    def __call__(
556        self, func: t.Callable[..., DECORATOR_RETURN_TYPE]
557    ) -> t.Callable[..., DECORATOR_RETURN_TYPE]:
558        wrapper = super().__call__(func)
559
560        # This is used to identify macros at runtime to unwrap during serialization.
561        setattr(wrapper, c.SQLMESH_MACRO, True)
562        return wrapper

Specifies a function is a macro and registers it the global MACROS registry.

Registered macros can be referenced in SQL statements to make queries more dynamic/cleaner.

Example:

from sqlglot import exp from sqlmesh.core.macros import MacroEvaluator, macro

@macro() def add_one(evaluator: MacroEvaluator, column: exp.Literal) -> exp.Add: return evaluator.parse_one(f"{column} + 1")

Arguments:
  • name: A custom name for the macro, the default is the name of the function.
@macro()
def each(evaluator: sqlmesh.core.macros.MacroEvaluator, *args: Any) -> List[Any]:
626@macro()
627def each(
628    evaluator: MacroEvaluator,
629    *args: t.Any,
630) -> t.List[t.Any]:
631    """Iterates through items calling func on each.
632
633    If a func call on item returns None, it will be excluded from the list.
634
635    Args:
636        evaluator: MacroEvaluator that invoked the macro
637        args: The last argument should be a lambda of the form x -> x +1. The first argument can be
638            an Array or var args can be used.
639
640    Returns:
641        A list of items that is the result of func
642    """
643    *items, func = args
644    items, func = _norm_var_arg_lambda(evaluator, func, *items)  # type: ignore
645    return [item for item in map(func, ensure_collection(items)) if item is not None]

Iterates through items calling func on each.

If a func call on item returns None, it will be excluded from the list.

Arguments:
  • evaluator: MacroEvaluator that invoked the macro
  • args: The last argument should be a lambda of the form x -> x +1. The first argument can be an Array or var args can be used.
Returns:

A list of items that is the result of func

@macro('IF')
def if_( evaluator: sqlmesh.core.macros.MacroEvaluator, condition: Any, true: Any, false: Any = None) -> Any:
648@macro("IF")
649def if_(
650    evaluator: MacroEvaluator,
651    condition: t.Any,
652    true: t.Any,
653    false: t.Any = None,
654) -> t.Any:
655    """Evaluates a given condition and returns the second argument if true or else the third argument.
656
657    If false is not passed in, the default return value will be None.
658
659    Example:
660        >>> from sqlglot import parse_one
661        >>> from sqlmesh.core.macros import MacroEvaluator
662        >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a, b)")).sql()
663        'b'
664
665        >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a)"))
666    """
667
668    if evaluator.eval_expression(condition):
669        return true
670    return false

Evaluates a given condition and returns the second argument if true or else the third argument.

If false is not passed in, the default return value will be None.

Example:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a, b)")).sql()
'b'
>>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a)"))
@macro('REDUCE')
def reduce_(evaluator: sqlmesh.core.macros.MacroEvaluator, *args: Any) -> Any:
673@macro("REDUCE")
674def reduce_(evaluator: MacroEvaluator, *args: t.Any) -> t.Any:
675    """Iterates through items applying provided function that takes two arguments
676    cumulatively to the items of iterable items, from left to right, so as to reduce
677    the iterable to a single item.
678
679    Example:
680        >>> from sqlglot import parse_one
681        >>> from sqlmesh.core.macros import MacroEvaluator
682        >>> sql = "@SQL(@REDUCE([100, 200, 300, 400], (x, y) -> x + y))"
683        >>> MacroEvaluator().transform(parse_one(sql)).sql()
684        '1000'
685
686    Args:
687        evaluator: MacroEvaluator that invoked the macro
688        args: The last argument should be a lambda of the form (x, y) -> x + y. The first argument can be
689            an Array or var args can be used.
690    Returns:
691        A single item that is the result of applying func cumulatively to items
692    """
693    *items, func = args
694    items, func = _norm_var_arg_lambda(evaluator, func, *items)  # type: ignore
695    return reduce(lambda a, b: func(exp.Tuple(expressions=[a, b])), ensure_collection(items))

Iterates through items applying provided function that takes two arguments cumulatively to the items of iterable items, from left to right, so as to reduce the iterable to a single item.

Example:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "@SQL(@REDUCE([100, 200, 300, 400], (x, y) -> x + y))"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'1000'
Arguments:
  • evaluator: MacroEvaluator that invoked the macro
  • args: The last argument should be a lambda of the form (x, y) -> x + y. The first argument can be an Array or var args can be used.
Returns:

A single item that is the result of applying func cumulatively to items

@macro('FILTER')
def filter_(evaluator: sqlmesh.core.macros.MacroEvaluator, *args: Any) -> List[Any]:
698@macro("FILTER")
699def filter_(evaluator: MacroEvaluator, *args: t.Any) -> t.List[t.Any]:
700    """Iterates through items, applying provided function to each item and removing
701    all items where the function returns False
702
703    Example:
704        >>> from sqlglot import parse_one
705        >>> from sqlmesh.core.macros import MacroEvaluator
706        >>> sql = "@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y)"
707        >>> MacroEvaluator().transform(parse_one(sql)).sql()
708        '2 + 3'
709
710        >>> sql = "@EVAL(@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y))"
711        >>> MacroEvaluator().transform(parse_one(sql)).sql()
712        '5'
713
714    Args:
715        evaluator: MacroEvaluator that invoked the macro
716        args: The last argument should be a lambda of the form x -> x > 1. The first argument can be
717            an Array or var args can be used.
718    Returns:
719        The items for which the func returned True
720    """
721    *items, func = args
722    items, func = _norm_var_arg_lambda(evaluator, func, *items)  # type: ignore
723    return list(filter(lambda arg: evaluator.eval_expression(func(arg)), items))

Iterates through items, applying provided function to each item and removing all items where the function returns False

Example:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y)"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'2 + 3'
>>> sql = "@EVAL(@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y))"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'5'
Arguments:
  • evaluator: MacroEvaluator that invoked the macro
  • args: The last argument should be a lambda of the form x -> x > 1. The first argument can be an Array or var args can be used.
Returns:

The items for which the func returned True

def with_( evaluator: sqlmesh.core.macros.MacroEvaluator, condition: sqlglot.expressions.Condition, expression: sqlglot.expressions.Expression) -> Union[sqlglot.expressions.Expression, NoneType]:
726def _optional_expression(
727    evaluator: MacroEvaluator,
728    condition: exp.Condition,
729    expression: exp.Expression,
730) -> t.Optional[exp.Expression]:
731    """Inserts expression when the condition is True
732
733    The following examples express the usage of this function in the context of the macros which wrap it.
734
735    Examples:
736        >>> from sqlglot import parse_one
737        >>> from sqlmesh.core.macros import MacroEvaluator
738        >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
739        >>> MacroEvaluator().transform(parse_one(sql)).sql()
740        'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
741        >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
742        >>> MacroEvaluator().transform(parse_one(sql)).sql()
743        'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
744        >>> sql = "select * from city @GROUP_BY(True) country, population"
745        >>> MacroEvaluator().transform(parse_one(sql)).sql()
746        'SELECT * FROM city GROUP BY country, population'
747        >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
748        >>> MacroEvaluator().transform(parse_one(sql)).sql()
749        "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
750
751    Args:
752        evaluator: MacroEvaluator that invoked the macro
753        condition: Condition expression
754        expression: SQL expression
755    Returns:
756        Expression if the conditional is True; otherwise None
757    """
758    return expression if evaluator.eval_expression(condition) else None

Inserts expression when the condition is True

The following examples express the usage of this function in the context of the macros which wrap it.

Examples:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
>>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
>>> sql = "select * from city @GROUP_BY(True) country, population"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT * FROM city GROUP BY country, population'
>>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
"SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
  • evaluator: MacroEvaluator that invoked the macro
  • condition: Condition expression
  • expression: SQL expression
Returns:

Expression if the conditional is True; otherwise None

def join( evaluator: sqlmesh.core.macros.MacroEvaluator, condition: sqlglot.expressions.Condition, expression: sqlglot.expressions.Expression) -> Union[sqlglot.expressions.Expression, NoneType]:
726def _optional_expression(
727    evaluator: MacroEvaluator,
728    condition: exp.Condition,
729    expression: exp.Expression,
730) -> t.Optional[exp.Expression]:
731    """Inserts expression when the condition is True
732
733    The following examples express the usage of this function in the context of the macros which wrap it.
734
735    Examples:
736        >>> from sqlglot import parse_one
737        >>> from sqlmesh.core.macros import MacroEvaluator
738        >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
739        >>> MacroEvaluator().transform(parse_one(sql)).sql()
740        'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
741        >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
742        >>> MacroEvaluator().transform(parse_one(sql)).sql()
743        'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
744        >>> sql = "select * from city @GROUP_BY(True) country, population"
745        >>> MacroEvaluator().transform(parse_one(sql)).sql()
746        'SELECT * FROM city GROUP BY country, population'
747        >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
748        >>> MacroEvaluator().transform(parse_one(sql)).sql()
749        "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
750
751    Args:
752        evaluator: MacroEvaluator that invoked the macro
753        condition: Condition expression
754        expression: SQL expression
755    Returns:
756        Expression if the conditional is True; otherwise None
757    """
758    return expression if evaluator.eval_expression(condition) else None

Inserts expression when the condition is True

The following examples express the usage of this function in the context of the macros which wrap it.

Examples:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
>>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
>>> sql = "select * from city @GROUP_BY(True) country, population"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT * FROM city GROUP BY country, population'
>>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
"SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
  • evaluator: MacroEvaluator that invoked the macro
  • condition: Condition expression
  • expression: SQL expression
Returns:

Expression if the conditional is True; otherwise None

def where( evaluator: sqlmesh.core.macros.MacroEvaluator, condition: sqlglot.expressions.Condition, expression: sqlglot.expressions.Expression) -> Union[sqlglot.expressions.Expression, NoneType]:
726def _optional_expression(
727    evaluator: MacroEvaluator,
728    condition: exp.Condition,
729    expression: exp.Expression,
730) -> t.Optional[exp.Expression]:
731    """Inserts expression when the condition is True
732
733    The following examples express the usage of this function in the context of the macros which wrap it.
734
735    Examples:
736        >>> from sqlglot import parse_one
737        >>> from sqlmesh.core.macros import MacroEvaluator
738        >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
739        >>> MacroEvaluator().transform(parse_one(sql)).sql()
740        'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
741        >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
742        >>> MacroEvaluator().transform(parse_one(sql)).sql()
743        'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
744        >>> sql = "select * from city @GROUP_BY(True) country, population"
745        >>> MacroEvaluator().transform(parse_one(sql)).sql()
746        'SELECT * FROM city GROUP BY country, population'
747        >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
748        >>> MacroEvaluator().transform(parse_one(sql)).sql()
749        "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
750
751    Args:
752        evaluator: MacroEvaluator that invoked the macro
753        condition: Condition expression
754        expression: SQL expression
755    Returns:
756        Expression if the conditional is True; otherwise None
757    """
758    return expression if evaluator.eval_expression(condition) else None

Inserts expression when the condition is True

The following examples express the usage of this function in the context of the macros which wrap it.

Examples:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
>>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
>>> sql = "select * from city @GROUP_BY(True) country, population"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT * FROM city GROUP BY country, population'
>>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
"SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
  • evaluator: MacroEvaluator that invoked the macro
  • condition: Condition expression
  • expression: SQL expression
Returns:

Expression if the conditional is True; otherwise None

def group_by( evaluator: sqlmesh.core.macros.MacroEvaluator, condition: sqlglot.expressions.Condition, expression: sqlglot.expressions.Expression) -> Union[sqlglot.expressions.Expression, NoneType]:
726def _optional_expression(
727    evaluator: MacroEvaluator,
728    condition: exp.Condition,
729    expression: exp.Expression,
730) -> t.Optional[exp.Expression]:
731    """Inserts expression when the condition is True
732
733    The following examples express the usage of this function in the context of the macros which wrap it.
734
735    Examples:
736        >>> from sqlglot import parse_one
737        >>> from sqlmesh.core.macros import MacroEvaluator
738        >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
739        >>> MacroEvaluator().transform(parse_one(sql)).sql()
740        'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
741        >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
742        >>> MacroEvaluator().transform(parse_one(sql)).sql()
743        'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
744        >>> sql = "select * from city @GROUP_BY(True) country, population"
745        >>> MacroEvaluator().transform(parse_one(sql)).sql()
746        'SELECT * FROM city GROUP BY country, population'
747        >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
748        >>> MacroEvaluator().transform(parse_one(sql)).sql()
749        "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
750
751    Args:
752        evaluator: MacroEvaluator that invoked the macro
753        condition: Condition expression
754        expression: SQL expression
755    Returns:
756        Expression if the conditional is True; otherwise None
757    """
758    return expression if evaluator.eval_expression(condition) else None

Inserts expression when the condition is True

The following examples express the usage of this function in the context of the macros which wrap it.

Examples:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
>>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
>>> sql = "select * from city @GROUP_BY(True) country, population"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT * FROM city GROUP BY country, population'
>>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
"SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
  • evaluator: MacroEvaluator that invoked the macro
  • condition: Condition expression
  • expression: SQL expression
Returns:

Expression if the conditional is True; otherwise None

def having( evaluator: sqlmesh.core.macros.MacroEvaluator, condition: sqlglot.expressions.Condition, expression: sqlglot.expressions.Expression) -> Union[sqlglot.expressions.Expression, NoneType]:
726def _optional_expression(
727    evaluator: MacroEvaluator,
728    condition: exp.Condition,
729    expression: exp.Expression,
730) -> t.Optional[exp.Expression]:
731    """Inserts expression when the condition is True
732
733    The following examples express the usage of this function in the context of the macros which wrap it.
734
735    Examples:
736        >>> from sqlglot import parse_one
737        >>> from sqlmesh.core.macros import MacroEvaluator
738        >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
739        >>> MacroEvaluator().transform(parse_one(sql)).sql()
740        'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
741        >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
742        >>> MacroEvaluator().transform(parse_one(sql)).sql()
743        'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
744        >>> sql = "select * from city @GROUP_BY(True) country, population"
745        >>> MacroEvaluator().transform(parse_one(sql)).sql()
746        'SELECT * FROM city GROUP BY country, population'
747        >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
748        >>> MacroEvaluator().transform(parse_one(sql)).sql()
749        "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
750
751    Args:
752        evaluator: MacroEvaluator that invoked the macro
753        condition: Condition expression
754        expression: SQL expression
755    Returns:
756        Expression if the conditional is True; otherwise None
757    """
758    return expression if evaluator.eval_expression(condition) else None

Inserts expression when the condition is True

The following examples express the usage of this function in the context of the macros which wrap it.

Examples:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
>>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
>>> sql = "select * from city @GROUP_BY(True) country, population"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT * FROM city GROUP BY country, population'
>>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
"SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
  • evaluator: MacroEvaluator that invoked the macro
  • condition: Condition expression
  • expression: SQL expression
Returns:

Expression if the conditional is True; otherwise None

def order_by( evaluator: sqlmesh.core.macros.MacroEvaluator, condition: sqlglot.expressions.Condition, expression: sqlglot.expressions.Expression) -> Union[sqlglot.expressions.Expression, NoneType]:
726def _optional_expression(
727    evaluator: MacroEvaluator,
728    condition: exp.Condition,
729    expression: exp.Expression,
730) -> t.Optional[exp.Expression]:
731    """Inserts expression when the condition is True
732
733    The following examples express the usage of this function in the context of the macros which wrap it.
734
735    Examples:
736        >>> from sqlglot import parse_one
737        >>> from sqlmesh.core.macros import MacroEvaluator
738        >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
739        >>> MacroEvaluator().transform(parse_one(sql)).sql()
740        'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
741        >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
742        >>> MacroEvaluator().transform(parse_one(sql)).sql()
743        'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
744        >>> sql = "select * from city @GROUP_BY(True) country, population"
745        >>> MacroEvaluator().transform(parse_one(sql)).sql()
746        'SELECT * FROM city GROUP BY country, population'
747        >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
748        >>> MacroEvaluator().transform(parse_one(sql)).sql()
749        "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
750
751    Args:
752        evaluator: MacroEvaluator that invoked the macro
753        condition: Condition expression
754        expression: SQL expression
755    Returns:
756        Expression if the conditional is True; otherwise None
757    """
758    return expression if evaluator.eval_expression(condition) else None

Inserts expression when the condition is True

The following examples express the usage of this function in the context of the macros which wrap it.

Examples:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
>>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
>>> sql = "select * from city @GROUP_BY(True) country, population"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT * FROM city GROUP BY country, population'
>>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
"SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
  • evaluator: MacroEvaluator that invoked the macro
  • condition: Condition expression
  • expression: SQL expression
Returns:

Expression if the conditional is True; otherwise None

def limit( evaluator: sqlmesh.core.macros.MacroEvaluator, condition: sqlglot.expressions.Condition, expression: sqlglot.expressions.Expression) -> Union[sqlglot.expressions.Expression, NoneType]:
726def _optional_expression(
727    evaluator: MacroEvaluator,
728    condition: exp.Condition,
729    expression: exp.Expression,
730) -> t.Optional[exp.Expression]:
731    """Inserts expression when the condition is True
732
733    The following examples express the usage of this function in the context of the macros which wrap it.
734
735    Examples:
736        >>> from sqlglot import parse_one
737        >>> from sqlmesh.core.macros import MacroEvaluator
738        >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
739        >>> MacroEvaluator().transform(parse_one(sql)).sql()
740        'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
741        >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
742        >>> MacroEvaluator().transform(parse_one(sql)).sql()
743        'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
744        >>> sql = "select * from city @GROUP_BY(True) country, population"
745        >>> MacroEvaluator().transform(parse_one(sql)).sql()
746        'SELECT * FROM city GROUP BY country, population'
747        >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
748        >>> MacroEvaluator().transform(parse_one(sql)).sql()
749        "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
750
751    Args:
752        evaluator: MacroEvaluator that invoked the macro
753        condition: Condition expression
754        expression: SQL expression
755    Returns:
756        Expression if the conditional is True; otherwise None
757    """
758    return expression if evaluator.eval_expression(condition) else None

Inserts expression when the condition is True

The following examples express the usage of this function in the context of the macros which wrap it.

Examples:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "@WITH(True) all_cities as (select * from city) select all_cities"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'WITH all_cities AS (SELECT * FROM city) SELECT all_cities'
>>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name'
>>> sql = "select * from city @GROUP_BY(True) country, population"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT * FROM city GROUP BY country, population'
>>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
"SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
  • evaluator: MacroEvaluator that invoked the macro
  • condition: Condition expression
  • expression: SQL expression
Returns:

Expression if the conditional is True; otherwise None

@macro('eval')
def eval_( evaluator: sqlmesh.core.macros.MacroEvaluator, condition: sqlglot.expressions.Condition) -> Any:
770@macro("eval")
771def eval_(evaluator: MacroEvaluator, condition: exp.Condition) -> t.Any:
772    """Evaluate the given condition in a Python/SQL interpretor.
773
774    Example:
775        >>> from sqlglot import parse_one
776        >>> from sqlmesh.core.macros import MacroEvaluator
777        >>> sql = "@EVAL(1 + 1)"
778        >>> MacroEvaluator().transform(parse_one(sql)).sql()
779        '2'
780    """
781    return evaluator.eval_expression(condition)

Evaluate the given condition in a Python/SQL interpretor.

Example:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "@EVAL(1 + 1)"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'2'
@macro()
def star( evaluator: sqlmesh.core.macros.MacroEvaluator, relation: sqlglot.expressions.Table, alias: sqlglot.expressions.Column = Column( this=Identifier(this=, quoted=True)), except_: Union[sqlglot.expressions.Array, sqlglot.expressions.Tuple] = Tuple( ), prefix: sqlglot.expressions.Literal = Literal(this=, is_string=True), suffix: sqlglot.expressions.Literal = Literal(this=, is_string=True), quote_identifiers: sqlglot.expressions.Boolean = Boolean(this=True)) -> List[sqlglot.expressions.Alias]:
785@macro()
786def star(
787    evaluator: MacroEvaluator,
788    relation: exp.Table,
789    alias: exp.Column = t.cast(exp.Column, exp.column("")),
790    except_: t.Union[exp.Array, exp.Tuple] = exp.Tuple(this=[]),
791    prefix: exp.Literal = exp.Literal.string(""),
792    suffix: exp.Literal = exp.Literal.string(""),
793    quote_identifiers: exp.Boolean = exp.true(),
794) -> t.List[exp.Alias]:
795    """Returns a list of projections for the given relation.
796
797    Args:
798        evaluator: MacroEvaluator that invoked the macro
799        relation: The relation to select star from
800        alias: The alias of the relation
801        except_: Columns to exclude
802        prefix: A prefix to use for all selections
803        suffix: A suffix to use for all selections
804        quote_identifiers: Whether or not quote the resulting aliases, defaults to true
805
806    Returns:
807        An array of columns.
808
809    Example:
810        >>> from sqlglot import parse_one, exp
811        >>> from sqlglot.schema import MappingSchema
812        >>> from sqlmesh.core.macros import MacroEvaluator
813        >>> sql = "SELECT @STAR(foo, bar, [c], 'baz_') FROM foo AS bar"
814        >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": exp.DataType.build("string"), "b": exp.DataType.build("string"), "c": exp.DataType.build("string"), "d": exp.DataType.build("int")}})).transform(parse_one(sql)).sql()
815        'SELECT CAST("bar"."a" AS TEXT) AS "baz_a", CAST("bar"."b" AS TEXT) AS "baz_b", CAST("bar"."d" AS INT) AS "baz_d" FROM foo AS bar'
816    """
817    if alias and not isinstance(alias, (exp.Identifier, exp.Column)):
818        raise SQLMeshError(f"Invalid alias '{alias}'. Expected an identifier.")
819    if except_ and not isinstance(except_, (exp.Array, exp.Tuple)):
820        raise SQLMeshError(f"Invalid except '{except_}'. Expected an array.")
821    if prefix and not isinstance(prefix, exp.Literal):
822        raise SQLMeshError(f"Invalid prefix '{prefix}'. Expected a literal.")
823    if suffix and not isinstance(suffix, exp.Literal):
824        raise SQLMeshError(f"Invalid suffix '{suffix}'. Expected a literal.")
825    if not isinstance(quote_identifiers, exp.Boolean):
826        raise SQLMeshError(f"Invalid quote_identifiers '{quote_identifiers}'. Expected a boolean.")
827
828    exclude = {e.name for e in except_.expressions}
829    quoted = quote_identifiers.this
830    table_identifier = alias.name or relation.name
831
832    columns_to_types = {
833        k: v for k, v in evaluator.columns_to_types(relation).items() if k not in exclude
834    }
835    if columns_to_types_all_known(columns_to_types):
836        return [
837            exp.cast(
838                exp.column(column, table=table_identifier, quoted=quoted),
839                dtype,
840                dialect=evaluator.dialect,
841            ).as_(f"{prefix.this}{column}{suffix.this}", quoted=quoted)
842            for column, dtype in columns_to_types.items()
843        ]
844    return [
845        exp.column(column, table=table_identifier, quoted=quoted).as_(
846            f"{prefix.this}{column}{suffix.this}", quoted=quoted
847        )
848        for column, type_ in evaluator.columns_to_types(relation).items()
849    ]

Returns a list of projections for the given relation.

Arguments:
  • evaluator: MacroEvaluator that invoked the macro
  • relation: The relation to select star from
  • alias: The alias of the relation
  • except_: Columns to exclude
  • prefix: A prefix to use for all selections
  • suffix: A suffix to use for all selections
  • quote_identifiers: Whether or not quote the resulting aliases, defaults to true
Returns:

An array of columns.

Example:
>>> from sqlglot import parse_one, exp
>>> from sqlglot.schema import MappingSchema
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "SELECT @STAR(foo, bar, [c], 'baz_') FROM foo AS bar"
>>> MacroEvaluator(schema=MappingSchema({"foo": {"a": exp.DataType.build("string"), "b": exp.DataType.build("string"), "c": exp.DataType.build("string"), "d": exp.DataType.build("int")}})).transform(parse_one(sql)).sql()
'SELECT CAST("bar"."a" AS TEXT) AS "baz_a", CAST("bar"."b" AS TEXT) AS "baz_b", CAST("bar"."d" AS INT) AS "baz_d" FROM foo AS bar'
@macro()
def generate_surrogate_key( _: sqlmesh.core.macros.MacroEvaluator, *fields: sqlglot.expressions.Expression) -> sqlglot.expressions.Func:
852@macro()
853def generate_surrogate_key(_: MacroEvaluator, *fields: exp.Expression) -> exp.Func:
854    """Generates a surrogate key for the given fields.
855
856    Example:
857        >>> from sqlglot import parse_one
858        >>> from sqlmesh.core.macros import MacroEvaluator
859        >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c) FROM foo"
860        >>> MacroEvaluator().transform(parse_one(sql)).sql()
861        "SELECT MD5(CONCAT(COALESCE(CAST(a AS TEXT), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS TEXT), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS TEXT), '_sqlmesh_surrogate_key_null_'))) FROM foo"
862    """
863    string_fields: t.List[exp.Expression] = []
864    for i, field in enumerate(fields):
865        if i > 0:
866            string_fields.append(exp.Literal.string("|"))
867        string_fields.append(
868            exp.func(
869                "COALESCE",
870                exp.cast(field, exp.DataType.build("text")),
871                exp.Literal.string("_sqlmesh_surrogate_key_null_"),
872            )
873        )
874    return exp.func("MD5", exp.func("CONCAT", *string_fields))

Generates a surrogate key for the given fields.

Example:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c) FROM foo"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
"SELECT MD5(CONCAT(COALESCE(CAST(a AS TEXT), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS TEXT), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS TEXT), '_sqlmesh_surrogate_key_null_'))) FROM foo"
@macro()
def safe_add( _: sqlmesh.core.macros.MacroEvaluator, *fields: sqlglot.expressions.Expression) -> sqlglot.expressions.Case:
877@macro()
878def safe_add(_: MacroEvaluator, *fields: exp.Expression) -> exp.Case:
879    """Adds numbers together, substitutes nulls for 0s and only returns null if all fields are null.
880
881    Example:
882        >>> from sqlglot import parse_one
883        >>> from sqlmesh.core.macros import MacroEvaluator
884        >>> sql = "SELECT @SAFE_ADD(a, b) FROM foo"
885        >>> MacroEvaluator().transform(parse_one(sql)).sql()
886        'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) + COALESCE(b, 0) END FROM foo'
887    """
888    return (
889        exp.Case()
890        .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null())
891        .else_(reduce(lambda a, b: a + b, [exp.func("COALESCE", field, 0) for field in fields]))  # type: ignore
892    )

Adds numbers together, substitutes nulls for 0s and only returns null if all fields are null.

Example:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "SELECT @SAFE_ADD(a, b) FROM foo"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) + COALESCE(b, 0) END FROM foo'
@macro()
def safe_sub( _: sqlmesh.core.macros.MacroEvaluator, *fields: sqlglot.expressions.Expression) -> sqlglot.expressions.Case:
895@macro()
896def safe_sub(_: MacroEvaluator, *fields: exp.Expression) -> exp.Case:
897    """Subtract numbers, substitutes nulls for 0s and only returns null if all fields are null.
898
899    Example:
900        >>> from sqlglot import parse_one
901        >>> from sqlmesh.core.macros import MacroEvaluator
902        >>> sql = "SELECT @SAFE_SUB(a, b) FROM foo"
903        >>> MacroEvaluator().transform(parse_one(sql)).sql()
904        'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) - COALESCE(b, 0) END FROM foo'
905    """
906    return (
907        exp.Case()
908        .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null())
909        .else_(reduce(lambda a, b: a - b, [exp.func("COALESCE", field, 0) for field in fields]))  # type: ignore
910    )

Subtract numbers, substitutes nulls for 0s and only returns null if all fields are null.

Example:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "SELECT @SAFE_SUB(a, b) FROM foo"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) - COALESCE(b, 0) END FROM foo'
@macro()
def safe_div( _: sqlmesh.core.macros.MacroEvaluator, numerator: sqlglot.expressions.Expression, denominator: sqlglot.expressions.Expression) -> sqlglot.expressions.Div:
913@macro()
914def safe_div(_: MacroEvaluator, numerator: exp.Expression, denominator: exp.Expression) -> exp.Div:
915    """Divides numbers, returns null if the denominator is 0.
916
917    Example:
918        >>> from sqlglot import parse_one
919        >>> from sqlmesh.core.macros import MacroEvaluator
920        >>> sql = "SELECT @SAFE_DIV(a, b) FROM foo"
921        >>> MacroEvaluator().transform(parse_one(sql)).sql()
922        'SELECT a / NULLIF(b, 0) FROM foo'
923    """
924    return numerator / exp.func("NULLIF", denominator, 0)

Divides numbers, returns null if the denominator is 0.

Example:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "SELECT @SAFE_DIV(a, b) FROM foo"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT a / NULLIF(b, 0) FROM foo'
@macro()
def union( evaluator: sqlmesh.core.macros.MacroEvaluator, type_: sqlglot.expressions.Literal = Literal(this=ALL, is_string=True), *tables: sqlglot.expressions.Table) -> sqlglot.expressions.Query:
927@macro()
928def union(
929    evaluator: MacroEvaluator,
930    type_: exp.Literal = exp.Literal.string("ALL"),
931    *tables: exp.Table,
932) -> exp.Query:
933    """Returns a UNION of the given tables. Only choosing columns that have the same name and type.
934
935    Example:
936        >>> from sqlglot import parse_one
937        >>> from sqlglot.schema import MappingSchema
938        >>> from sqlmesh.core.macros import MacroEvaluator
939        >>> sql = "@UNION('distinct', foo, bar)"
940        >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql()
941        'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar'
942    """
943    kind = type_.name.upper()
944    if kind not in ("ALL", "DISTINCT"):
945        raise SQLMeshError(f"Invalid type '{type_}'. Expected 'ALL' or 'DISTINCT'.")
946
947    columns = {
948        column
949        for column, _ in reduce(
950            lambda a, b: a & b,  # type: ignore
951            (evaluator.columns_to_types(table).items() for table in tables),
952        )
953    }
954
955    projections = [
956        exp.cast(column, type_, dialect=evaluator.dialect).as_(column)
957        for column, type_ in evaluator.columns_to_types(tables[0]).items()
958        if column in columns
959    ]
960
961    return reduce(
962        lambda a, b: a.union(b, distinct=kind == "DISTINCT"),  # type: ignore
963        [exp.select(*projections).from_(t) for t in tables],
964    )

Returns a UNION of the given tables. Only choosing columns that have the same name and type.

Example:
>>> from sqlglot import parse_one
>>> from sqlglot.schema import MappingSchema
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "@UNION('distinct', foo, bar)"
>>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql()
'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar'
@macro()
def haversine_distance( _: sqlmesh.core.macros.MacroEvaluator, lat1: sqlglot.expressions.Expression, lon1: sqlglot.expressions.Expression, lat2: sqlglot.expressions.Expression, lon2: sqlglot.expressions.Expression, unit: sqlglot.expressions.Literal = Literal(this=mi, is_string=True)) -> sqlglot.expressions.Mul:
 967@macro()
 968def haversine_distance(
 969    _: MacroEvaluator,
 970    lat1: exp.Expression,
 971    lon1: exp.Expression,
 972    lat2: exp.Expression,
 973    lon2: exp.Expression,
 974    unit: exp.Literal = exp.Literal.string("mi"),
 975) -> exp.Mul:
 976    """Returns the haversine distance between two points.
 977
 978    Example:
 979        >>> from sqlglot import parse_one
 980        >>> from sqlmesh.core.macros import MacroEvaluator
 981        >>> sql = "SELECT @HAVERSINE_DISTANCE(driver_y, driver_x, passenger_y, passenger_x, 'mi') FROM rides"
 982        >>> MacroEvaluator().transform(parse_one(sql)).sql()
 983        'SELECT 7922 * ASIN(SQRT((POWER(SIN(RADIANS((passenger_y - driver_y) / 2)), 2)) + (COS(RADIANS(driver_y)) * COS(RADIANS(passenger_y)) * POWER(SIN(RADIANS((passenger_x - driver_x) / 2)), 2)))) * 1.0 FROM rides'
 984    """
 985    if unit.this == "mi":
 986        conversion_rate = 1.0
 987    elif unit.this == "km":
 988        conversion_rate = 1.60934
 989    else:
 990        raise SQLMeshError(f"Invalid unit '{unit}'. Expected 'mi' or 'km'.")
 991
 992    return (
 993        2
 994        * 3961
 995        * exp.func(
 996            "ASIN",
 997            exp.func(
 998                "SQRT",
 999                exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lat2 - lat1) / 2)), 2)
1000                + exp.func("COS", exp.func("RADIANS", lat1))
1001                * exp.func("COS", exp.func("RADIANS", lat2))
1002                * exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lon2 - lon1) / 2)), 2),
1003            ),
1004        )
1005        * conversion_rate
1006    )

Returns the haversine distance between two points.

Example:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "SELECT @HAVERSINE_DISTANCE(driver_y, driver_x, passenger_y, passenger_x, 'mi') FROM rides"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT 7922 * ASIN(SQRT((POWER(SIN(RADIANS((passenger_y - driver_y) / 2)), 2)) + (COS(RADIANS(driver_y)) * COS(RADIANS(passenger_y)) * POWER(SIN(RADIANS((passenger_x - driver_x) / 2)), 2)))) * 1.0 FROM rides'
@macro()
def pivot( evaluator: sqlmesh.core.macros.MacroEvaluator, column: sqlglot.expressions.Column, values: Union[sqlglot.expressions.Array, sqlglot.expressions.Tuple], alias: sqlglot.expressions.Boolean = Boolean(this=True), agg: sqlglot.expressions.Literal = Literal(this=SUM, is_string=True), cmp: sqlglot.expressions.Literal = Literal(this==, is_string=True), prefix: sqlglot.expressions.Literal = Literal(this=, is_string=True), suffix: sqlglot.expressions.Literal = Literal(this=, is_string=True), then_value: sqlglot.expressions.Literal = Literal(this=1, is_string=False), else_value: sqlglot.expressions.Literal = Literal(this=0, is_string=False), quote: sqlglot.expressions.Boolean = Boolean(this=True), distinct: sqlglot.expressions.Boolean = Boolean(this=False)) -> List[sqlglot.expressions.Expression]:
1009@macro()
1010def pivot(
1011    evaluator: MacroEvaluator,
1012    column: exp.Column,
1013    values: t.Union[exp.Array, exp.Tuple],
1014    alias: exp.Boolean = exp.true(),
1015    agg: exp.Literal = exp.Literal.string("SUM"),
1016    cmp: exp.Literal = exp.Literal.string("="),
1017    prefix: exp.Literal = exp.Literal.string(""),
1018    suffix: exp.Literal = exp.Literal.string(""),
1019    then_value: exp.Literal = exp.Literal.number(1),
1020    else_value: exp.Literal = exp.Literal.number(0),
1021    quote: exp.Boolean = exp.true(),
1022    distinct: exp.Boolean = exp.false(),
1023) -> t.List[exp.Expression]:
1024    """Returns a list of projections as a result of pivoting the given column on the given values.
1025
1026    Example:
1027        >>> from sqlglot import parse_one
1028        >>> from sqlmesh.core.macros import MacroEvaluator
1029        >>> sql = "SELECT date_day, @PIVOT(status, ['cancelled', 'completed']) FROM rides GROUP BY 1"
1030        >>> MacroEvaluator().transform(parse_one(sql)).sql()
1031        'SELECT date_day, SUM(CASE WHEN status = \\'cancelled\\' THEN 1 ELSE 0 END) AS "\\'cancelled\\'", SUM(CASE WHEN status = \\'completed\\' THEN 1 ELSE 0 END) AS "\\'completed\\'" FROM rides GROUP BY 1'
1032    """
1033    aggregates: t.List[exp.Expression] = []
1034    for value in values.expressions:
1035        proj = f"{agg.this}("
1036        if distinct.this:
1037            proj += "DISTINCT "
1038        proj += f"CASE WHEN {column} {cmp.this} {value} THEN {then_value} ELSE {else_value} END) "
1039        node = evaluator.parse_one(proj)
1040        if alias.this:
1041            node = node.as_(f"{prefix.this}{value}{suffix.this}", quoted=quote.this, copy=False)
1042        aggregates.append(node)
1043    return aggregates

Returns a list of projections as a result of pivoting the given column on the given values.

Example:
>>> from sqlglot import parse_one
>>> from sqlmesh.core.macros import MacroEvaluator
>>> sql = "SELECT date_day, @PIVOT(status, ['cancelled', 'completed']) FROM rides GROUP BY 1"
>>> MacroEvaluator().transform(parse_one(sql)).sql()
'SELECT date_day, SUM(CASE WHEN status = \'cancelled\' THEN 1 ELSE 0 END) AS "\'cancelled\'", SUM(CASE WHEN status = \'completed\' THEN 1 ELSE 0 END) AS "\'completed\'" FROM rides GROUP BY 1'
@macro('AND')
def and_( evaluator: sqlmesh.core.macros.MacroEvaluator, *expressions: Union[sqlglot.expressions.Expression, NoneType]) -> sqlglot.expressions.Condition:
1046@macro("AND")
1047def and_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expression]) -> exp.Condition:
1048    """Returns an AND statement filtering out any NULL expressions."""
1049    conditions = [e for e in expressions if not isinstance(e, exp.Null)]
1050
1051    if not conditions:
1052        return exp.true()
1053
1054    return exp.and_(*conditions, dialect=evaluator.dialect)

Returns an AND statement filtering out any NULL expressions.

@macro('OR')
def or_( evaluator: sqlmesh.core.macros.MacroEvaluator, *expressions: Union[sqlglot.expressions.Expression, NoneType]) -> sqlglot.expressions.Condition:
1057@macro("OR")
1058def or_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expression]) -> exp.Condition:
1059    """Returns an OR statement filtering out any NULL expressions."""
1060    conditions = [e for e in expressions if not isinstance(e, exp.Null)]
1061
1062    if not conditions:
1063        return exp.true()
1064
1065    return exp.or_(*conditions, dialect=evaluator.dialect)

Returns an OR statement filtering out any NULL expressions.

@macro('VAR')
def var( evaluator: sqlmesh.core.macros.MacroEvaluator, var_name: sqlglot.expressions.Expression, default: Union[sqlglot.expressions.Expression, NoneType] = None) -> sqlglot.expressions.Expression:
1068@macro("VAR")
1069def var(
1070    evaluator: MacroEvaluator, var_name: exp.Expression, default: t.Optional[exp.Expression] = None
1071) -> exp.Expression:
1072    """Returns the value of a variable or the default value if the variable is not set."""
1073    if not var_name.is_string:
1074        raise SQLMeshError(f"Invalid variable name '{var_name.sql()}'. Expected a string literal.")
1075
1076    return exp.convert(evaluator.var(var_name.this, default))

Returns the value of a variable or the default value if the variable is not set.

def normalize_macro_name(name: str) -> str:
1079def normalize_macro_name(name: str) -> str:
1080    """Prefix macro name with @ and upcase"""
1081    return f"@{name.upper()}"

Prefix macro name with @ and upcase