sqlmesh.core.macros
1from __future__ import annotations 2 3import inspect 4import logging 5import sys 6import types 7import typing as t 8from enum import Enum 9from functools import reduce 10from itertools import chain 11from pathlib import Path 12from string import Template 13 14import sqlglot 15from jinja2 import Environment 16from sqlglot import Generator, exp, parse_one 17from sqlglot.executor.env import ENV 18from sqlglot.executor.python import Python 19from sqlglot.helper import csv, ensure_collection 20from sqlglot.schema import MappingSchema 21 22from sqlmesh.core import constants as c 23from sqlmesh.core.dialect import ( 24 SQLMESH_MACRO_PREFIX, 25 Dialect, 26 MacroDef, 27 MacroFunc, 28 MacroSQL, 29 MacroStrReplace, 30 MacroVar, 31 StagedFilePath, 32 normalize_model_name, 33) 34from sqlmesh.utils import ( 35 DECORATOR_RETURN_TYPE, 36 UniqueKeyDict, 37 columns_to_types_all_known, 38 registry_decorator, 39) 40from sqlmesh.utils.errors import MacroEvalError, SQLMeshError 41from sqlmesh.utils.jinja import JinjaMacroRegistry, has_jinja 42from sqlmesh.utils.metaprogramming import Executable, prepare_env, print_exception 43 44if t.TYPE_CHECKING: 45 from sqlmesh.core._typing import TableName 46 from sqlmesh.core.engine_adapter import EngineAdapter 47 from sqlmesh.core.snapshot import Snapshot 48 49 50if sys.version_info >= (3, 10): 51 UNION_TYPES = (t.Union, types.UnionType) 52else: 53 UNION_TYPES = (t.Union,) 54 55 56logger = logging.getLogger(__name__) 57 58 59class RuntimeStage(Enum): 60 LOADING = "loading" 61 CREATING = "creating" 62 EVALUATING = "evaluating" 63 TESTING = "testing" 64 65 66class MacroStrTemplate(Template): 67 delimiter = SQLMESH_MACRO_PREFIX 68 69 70EXPRESSIONS_NAME_MAP = {} 71SQL = t.NewType("SQL", str) 72 73for klass in sqlglot.Parser.EXPRESSION_PARSERS: 74 name = klass if isinstance(klass, str) else klass.__name__ # type: ignore 75 EXPRESSIONS_NAME_MAP[name.lower()] = name 76 77 78def _macro_sql(sql: str, into: t.Optional[str] = None) -> str: 79 args = [_macro_str_replace(sql)] 80 if into in EXPRESSIONS_NAME_MAP: 81 args.append(f"into=exp.{EXPRESSIONS_NAME_MAP[into]}") 82 return f"self.parse_one({', '.join(args)})" 83 84 85def _macro_func_sql(self: Generator, e: exp.Expression) -> str: 86 func = e.this 87 88 if isinstance(func, exp.Anonymous): 89 return f"""self.send({csv("'" + func.name + "'", self.expressions(func))})""" 90 return self.sql(func) 91 92 93def _macro_str_replace(text: str) -> str: 94 """Stringifies python code for variable replacement 95 Args: 96 text: text string 97 Returns: 98 Stringified python code to execute variable replacement 99 """ 100 return f"self.template({text}, locals())" 101 102 103class MacroDialect(Python): 104 class Generator(Python.Generator): 105 TRANSFORMS = { 106 **Python.Generator.TRANSFORMS, # type: ignore 107 exp.Column: lambda self, e: f"exp.to_column('{self.sql(e, 'this')}')", 108 exp.Lambda: lambda self, e: f"lambda {self.expressions(e)}: {self.sql(e, 'this')}", 109 MacroFunc: _macro_func_sql, 110 MacroSQL: lambda self, e: _macro_sql(self.sql(e, "this"), e.args.get("into")), 111 MacroStrReplace: lambda self, e: _macro_str_replace(self.sql(e, "this")), 112 } 113 114 115class MacroEvaluator: 116 """The class responsible for evaluating SQLMesh Macros/SQL. 117 118 SQLMesh supports special preprocessed SQL prefixed with `@`. Although it provides similar power to 119 traditional methods like string templating, there is semantic understanding of SQL which prevents 120 common errors like leading/trailing commas, syntax errors, etc. 121 122 SQLMesh SQL allows for macro variables and macro functions. Macro variables take the form of @variable. These are used for variable substitution. 123 124 SELECT * FROM foo WHERE ds BETWEEN @start_date AND @end_date 125 126 Macro variables can be defined with a special macro function. 127 128 @DEF(start_date, '2021-01-01') 129 130 Args: 131 dialect: Dialect of the SQL to evaluate. 132 python_env: Serialized Python environment. 133 """ 134 135 def __init__( 136 self, 137 dialect: str = "", 138 python_env: t.Optional[t.Dict[str, Executable]] = None, 139 jinja_env: t.Optional[Environment] = None, 140 schema: t.Optional[MappingSchema] = None, 141 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 142 resolve_tables: t.Optional[t.Callable[[exp.Expression], exp.Expression]] = None, 143 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 144 default_catalog: t.Optional[str] = None, 145 path: Path = Path(), 146 ): 147 self.dialect = dialect 148 self.generator = MacroDialect().generator() 149 self.locals: t.Dict[str, t.Any] = { 150 "runtime_stage": runtime_stage.value, 151 "default_catalog": default_catalog, 152 } 153 self.env = { 154 **ENV, 155 "self": self, 156 "SQL": SQL, 157 "MacroEvaluator": MacroEvaluator, 158 } 159 self.python_env = python_env or {} 160 self._jinja_env: t.Optional[Environment] = jinja_env 161 self.macros = {normalize_macro_name(k): v.func for k, v in macro.get_registry().items()} 162 self._schema = schema 163 self._resolve_tables = resolve_tables 164 self.columns_to_types_called = False 165 self._snapshots = snapshots if snapshots is not None else {} 166 self.default_catalog = default_catalog 167 self._path = path 168 169 prepare_env(self.python_env, self.env) 170 for k, v in self.python_env.items(): 171 if v.is_definition: 172 self.macros[normalize_macro_name(k)] = self.env[v.name or k] 173 elif v.is_import and getattr(self.env.get(k), c.SQLMESH_MACRO, None): 174 self.macros[normalize_macro_name(k)] = self.env[k] 175 elif v.is_value: 176 self.locals[k] = self.env[k] 177 178 def send( 179 self, name: str, *args: t.Any, **kwargs: t.Any 180 ) -> t.Union[None, exp.Expression, t.List[exp.Expression]]: 181 func = self.macros.get(normalize_macro_name(name)) 182 183 if not callable(func): 184 raise SQLMeshError(f"Macro '{name}' does not exist.") 185 186 try: 187 annotations = t.get_type_hints(func) 188 except NameError: # forward references aren't handled 189 annotations = {} 190 191 if annotations: 192 spec = inspect.getfullargspec(func) 193 callargs = inspect.getcallargs(func, self, *args, **kwargs) 194 new_args: t.List[t.Any] = [] 195 196 for arg, value in callargs.items(): 197 typ = annotations.get(arg) 198 199 if value is self: 200 continue 201 if arg == spec.varargs: 202 new_args.extend(self._coerce(v, typ) for v in value) 203 elif arg == spec.varkw: 204 for k, v in value.items(): 205 kwargs[k] = self._coerce(v, typ) 206 elif arg in kwargs: 207 kwargs[arg] = self._coerce(value, typ) 208 else: 209 new_args.append(self._coerce(value, typ)) 210 211 args = new_args # type: ignore 212 213 try: 214 return func(self, *args, **kwargs) 215 except Exception as e: 216 print_exception(e, self.python_env) 217 raise MacroEvalError("Error trying to eval macro.") from e 218 219 def transform( 220 self, expression: exp.Expression 221 ) -> exp.Expression | t.List[exp.Expression] | None: 222 changed = False 223 224 def evaluate_macros( 225 node: exp.Expression, 226 ) -> exp.Expression | t.List[exp.Expression] | None: 227 nonlocal changed 228 229 if isinstance(node, MacroVar): 230 changed = True 231 variables = self.locals.get(c.SQLMESH_VARS, {}) 232 if node.name not in self.locals and node.name.lower() not in variables: 233 if not isinstance(node.parent, StagedFilePath): 234 raise SQLMeshError(f"Macro variable '{node.name}' is undefined.") 235 236 return node 237 238 value = self.locals.get(node.name, variables.get(node.name.lower())) 239 if isinstance(value, list): 240 return exp.convert( 241 tuple( 242 self.transform(v) if isinstance(v, exp.Expression) else v for v in value 243 ) 244 ) 245 return exp.convert( 246 self.transform(value) if isinstance(value, exp.Expression) else value 247 ) 248 if isinstance(node, exp.Identifier) and "@" in node.this: 249 text = self.template(node.this, self.locals) 250 if node.this != text: 251 changed = True 252 node.args["this"] = text 253 return node 254 if node.is_string: 255 text = node.this 256 if has_jinja(text): 257 changed = True 258 node.set("this", self.jinja_env.from_string(node.this).render()) 259 return node 260 if isinstance(node, MacroFunc): 261 changed = True 262 return self.evaluate(node) 263 return node 264 265 transformed = exp.replace_tree( 266 expression.copy(), evaluate_macros, prune=lambda n: isinstance(n, exp.Lambda) 267 ) 268 269 if changed: 270 # the transformations could have corrupted the ast, turning this into sql and reparsing ensures 271 # that the ast is correct 272 if isinstance(transformed, list): 273 return [ 274 self.parse_one(node.sql(dialect=self.dialect, copy=False)) 275 for node in transformed 276 ] 277 elif isinstance(transformed, exp.Expression): 278 return self.parse_one(transformed.sql(dialect=self.dialect, copy=False)) 279 280 return transformed 281 282 def template(self, text: t.Any, local_variables: t.Dict[str, t.Any]) -> str: 283 """Substitute @vars with locals. 284 285 Args: 286 text: The string to do substitition on. 287 local_variables: Local variables in the context so that lambdas can be used. 288 289 Returns: 290 The rendered string. 291 """ 292 mapping = {} 293 294 variables = self.locals.get(c.SQLMESH_VARS, {}) 295 296 for k, v in chain(variables.items(), self.locals.items(), local_variables.items()): 297 # try to convert all variables into sqlglot expressions 298 # because they're going to be converted into strings in sql 299 # we use bare Exception instead of ValueError because there's 300 # a recursive error with MagicMock. 301 # we don't convert strings because that would result in adding quotes 302 if not isinstance(v, str): 303 try: 304 v = exp.convert(v) 305 except Exception: 306 pass 307 308 if isinstance(v, exp.Expression): 309 v = v.sql(dialect=self.dialect) 310 mapping[k] = v 311 312 return MacroStrTemplate(str(text)).safe_substitute(mapping) 313 314 def evaluate(self, node: MacroFunc) -> exp.Expression | t.List[exp.Expression] | None: 315 if isinstance(node, MacroDef): 316 if isinstance(node.expression, exp.Lambda): 317 _, fn = _norm_var_arg_lambda(self, node.expression) 318 self.macros[normalize_macro_name(node.name)] = lambda _, *args: fn( 319 args[0] if len(args) == 1 else exp.Tuple(expressions=list(args)) 320 ) 321 else: 322 self.locals[node.name] = self.transform(node.expression) 323 return node 324 325 if isinstance(node, (MacroSQL, MacroStrReplace)): 326 result: t.Optional[exp.Expression | t.List[exp.Expression]] = exp.convert( 327 self.eval_expression(node) 328 ) 329 else: 330 func = t.cast(exp.Anonymous, node.this) 331 332 args = [] 333 kwargs = {} 334 for e in func.expressions: 335 if isinstance(e, exp.PropertyEQ): 336 kwargs[e.this.name] = e.expression 337 else: 338 if kwargs: 339 raise MacroEvalError( 340 f"Positional argument cannot follow keyword argument.\n {func.sql(dialect=self.dialect)} at '{self._path}'" 341 ) 342 343 args.append(e) 344 345 result = self.send(func.name, *args, **kwargs) 346 347 if result is None: 348 return None 349 350 if isinstance(result, (tuple, list)): 351 return [self.parse_one(item) for item in result if item is not None] 352 return self.parse_one(result) 353 354 def eval_expression(self, node: t.Any) -> t.Any: 355 """Converts a SQLGlot expression into executable Python code and evals it. 356 357 If the node is not an expression, it will simply be returned. 358 359 Args: 360 node: expression 361 Returns: 362 The return value of the evaled Python Code. 363 """ 364 if not isinstance(node, exp.Expression): 365 return node 366 code = node.sql() 367 try: 368 code = self.generator.generate(node) 369 return eval(code, self.env, self.locals) 370 except Exception as e: 371 print_exception(e, self.python_env) 372 raise MacroEvalError( 373 f"Error trying to eval macro.\n\nGenerated code: {code}\n\nOriginal sql: {node}" 374 ) from e 375 376 def parse_one( 377 self, sql: str | exp.Expression, into: t.Optional[exp.IntoType] = None, **opts: t.Any 378 ) -> exp.Expression: 379 """Parses the given SQL string and returns a syntax tree for the first 380 parsed SQL statement. 381 382 Args: 383 sql: the SQL code or expression to parse. 384 into: the Expression to parse into 385 **opts: other options 386 387 Returns: 388 Expression: the syntax tree for the first parsed statement 389 """ 390 return sqlglot.maybe_parse(sql, dialect=self.dialect, into=into, **opts) 391 392 @property 393 def jinja_env(self) -> Environment: 394 if not self._jinja_env: 395 jinja_env_methods = {**self.locals, **self.env} 396 del jinja_env_methods["self"] 397 self._jinja_env = JinjaMacroRegistry().build_environment(**jinja_env_methods) 398 return self._jinja_env 399 400 def columns_to_types(self, model_name: TableName | exp.Column) -> t.Dict[str, exp.DataType]: 401 """Returns the columns-to-types mapping corresponding to the specified model.""" 402 if self._schema is None or self._schema.empty: 403 self.columns_to_types_called = True 404 return {"__schema_unavailable_at_load__": exp.DataType.build("unknown")} 405 406 normalized_model_name = normalize_model_name( 407 model_name, 408 default_catalog=self.default_catalog, 409 dialect=self.dialect, 410 ) 411 columns_to_types = self._schema.find(exp.to_table(normalized_model_name)) 412 if columns_to_types is None: 413 raise SQLMeshError(f"Schema for model '{model_name}' can't be statically determined.") 414 415 return columns_to_types 416 417 def get_snapshot(self, model_name: TableName | exp.Column) -> t.Optional[Snapshot]: 418 """Returns the snapshot that corresponds to the given model name.""" 419 return self._snapshots.get( 420 normalize_model_name( 421 model_name, 422 default_catalog=self.default_catalog, 423 dialect=self.dialect, 424 ) 425 ) 426 427 def resolve_tables(self, query: exp.Expression) -> exp.Expression: 428 """Resolves queries with references to SQLMesh model names to their physical tables.""" 429 if not self._resolve_tables: 430 raise SQLMeshError( 431 "Macro evaluator not properly initialized with resolve_tables lambda." 432 ) 433 return self._resolve_tables(query) 434 435 @property 436 def runtime_stage(self) -> RuntimeStage: 437 """Returns the current runtime stage of the macro evaluation.""" 438 return self.locals["runtime_stage"] 439 440 @property 441 def engine_adapter(self) -> EngineAdapter: 442 engine_adapter = self.locals.get("engine_adapter") 443 if not engine_adapter: 444 raise SQLMeshError( 445 "The engine adapter is not available while models are loading." 446 " You can gate these calls by checking in Python: evaluator.runtime_stage != 'loading' or SQL: @runtime_stage <> 'loading'." 447 ) 448 return self.locals["engine_adapter"] 449 450 @property 451 def gateway(self) -> t.Optional[str]: 452 """Returns the gateway name.""" 453 return self.var(c.GATEWAY) 454 455 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 456 """Returns the value of the specified variable, or the default value if it doesn't exist.""" 457 return (self.locals.get(c.SQLMESH_VARS) or {}).get(var_name.lower(), default) 458 459 def _coerce(self, expr: exp.Expression, typ: t.Any, strict: bool = False) -> t.Any: 460 """Coerces the given expression to the specified type on a best-effort basis.""" 461 base_err_msg = f"Failed to coerce expression '{expr}' to type '{typ}'." 462 try: 463 if typ is None or typ is t.Any: 464 return expr 465 base = t.get_origin(typ) or typ 466 467 # We need to handle Union and TypeVars first since we cannot use isinstance with it 468 if base in UNION_TYPES: 469 for branch in t.get_args(typ): 470 try: 471 return self._coerce(expr, branch, True) 472 except Exception: 473 pass 474 raise SQLMeshError(base_err_msg) 475 if base is SQL and isinstance(expr, exp.Expression): 476 return expr.sql(self.dialect) 477 478 if isinstance(expr, base): 479 return expr 480 if issubclass(base, exp.Expression): 481 d = Dialect.get_or_raise(self.dialect) 482 into = base if base in d.parser().EXPRESSION_PARSERS else None 483 if into is None: 484 if isinstance(expr, exp.Literal): 485 coerced = parse_one(expr.this) 486 else: 487 raise SQLMeshError( 488 f"{base_err_msg} Coercion to {base} requires a literal expression." 489 ) 490 else: 491 coerced = parse_one( 492 expr.this if isinstance(expr, exp.Literal) else expr.sql(), into=into 493 ) 494 if isinstance(coerced, base): 495 return coerced 496 raise SQLMeshError(base_err_msg) 497 498 if base in (int, float, str) and isinstance(expr, exp.Literal): 499 return base(expr.this) 500 if base is str and isinstance(expr, exp.Column) and not expr.table: 501 return expr.name 502 if base is bool and isinstance(expr, exp.Boolean): 503 return expr.this 504 # if base is str and isinstance(expr, exp.Expression): 505 # return expr.sql(self.dialect) 506 if base is tuple and isinstance(expr, (exp.Tuple, exp.Array)): 507 generic = t.get_args(typ) 508 if not generic: 509 return tuple(expr.expressions) 510 if generic[-1] is ...: 511 return tuple(self._coerce(expr, generic[0]) for expr in expr.expressions) 512 elif len(generic) == len(expr.expressions): 513 return tuple( 514 self._coerce(expr, generic[i]) for i, expr in enumerate(expr.expressions) 515 ) 516 raise SQLMeshError(f"{base_err_msg} Expected {len(generic)} items.") 517 if base is list and isinstance(expr, (exp.Array, exp.Tuple)): 518 generic = t.get_args(typ) 519 if not generic: 520 return expr.expressions 521 return [self._coerce(expr, generic[0]) for expr in expr.expressions] 522 raise SQLMeshError(base_err_msg) 523 except Exception: 524 if strict: 525 raise 526 logger.error( 527 "Coercion of expression '%s' to type '%s' failed. Using non coerced expression at '%s'", 528 expr, 529 typ, 530 self._path, 531 ) 532 return expr 533 534 535class macro(registry_decorator): 536 """Specifies a function is a macro and registers it the global MACROS registry. 537 538 Registered macros can be referenced in SQL statements to make queries more dynamic/cleaner. 539 540 Example: 541 from sqlglot import exp 542 from sqlmesh.core.macros import MacroEvaluator, macro 543 544 @macro() 545 def add_one(evaluator: MacroEvaluator, column: exp.Literal) -> exp.Add: 546 return evaluator.parse_one(f"{column} + 1") 547 548 Args: 549 name: A custom name for the macro, the default is the name of the function. 550 """ 551 552 registry_name = "macros" 553 554 def __call__( 555 self, func: t.Callable[..., DECORATOR_RETURN_TYPE] 556 ) -> t.Callable[..., DECORATOR_RETURN_TYPE]: 557 wrapper = super().__call__(func) 558 559 # This is used to identify macros at runtime to unwrap during serialization. 560 setattr(wrapper, c.SQLMESH_MACRO, True) 561 return wrapper 562 563 564ExecutableOrMacro = t.Union[Executable, macro] 565MacroRegistry = UniqueKeyDict[str, ExecutableOrMacro] 566 567 568def _norm_var_arg_lambda( 569 evaluator: MacroEvaluator, func: exp.Lambda, *items: t.Any 570) -> t.Tuple[t.Iterable, t.Callable]: 571 """ 572 Converts sql literal array and lambda into actual python iterable + callable. 573 574 In order to support expressions like @EACH([a, b, c], x -> @SQL('@x')), the lambda var x 575 needs be passed to the local state. 576 577 Args: 578 evaluator: MacroEvaluator that invoked the macro 579 func: Lambda SQLGlot expression. 580 items: Array or items of SQLGlot expressions. 581 """ 582 583 def substitute( 584 node: exp.Expression, args: t.Dict[str, exp.Expression] 585 ) -> exp.Expression | t.List[exp.Expression] | None: 586 if isinstance(node, (exp.Identifier, exp.Var)): 587 if not isinstance(node.parent, exp.Column): 588 name = node.name 589 if name in args: 590 return args[name].copy() 591 if name in evaluator.locals: 592 return exp.convert(evaluator.locals[name]) 593 if SQLMESH_MACRO_PREFIX in node.name: 594 return node.__class__( 595 this=evaluator.template(node.name, {k: v.name for k, v in args.items()}) 596 ) 597 elif isinstance(node, MacroFunc): 598 local_copy = evaluator.locals.copy() 599 evaluator.locals.update(args) 600 result = evaluator.transform(node) 601 evaluator.locals = local_copy 602 return result 603 return node 604 605 if len(items) == 1: 606 item = items[0] 607 expressions = item.expressions if isinstance(item, (exp.Array, exp.Tuple)) else item 608 else: 609 expressions = items 610 611 if not callable(func): 612 return expressions, lambda args: func.this.transform( 613 substitute, 614 { 615 expression.name: arg 616 for expression, arg in zip( 617 func.expressions, args.expressions if isinstance(args, exp.Tuple) else [args] 618 ) 619 }, 620 ) 621 622 return expressions, func 623 624 625@macro() 626def each( 627 evaluator: MacroEvaluator, 628 *args: t.Any, 629) -> t.List[t.Any]: 630 """Iterates through items calling func on each. 631 632 If a func call on item returns None, it will be excluded from the list. 633 634 Args: 635 evaluator: MacroEvaluator that invoked the macro 636 args: The last argument should be a lambda of the form x -> x +1. The first argument can be 637 an Array or var args can be used. 638 639 Returns: 640 A list of items that is the result of func 641 """ 642 *items, func = args 643 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 644 return [item for item in map(func, ensure_collection(items)) if item is not None] 645 646 647@macro("IF") 648def if_( 649 evaluator: MacroEvaluator, 650 condition: t.Any, 651 true: t.Any, 652 false: t.Any = None, 653) -> t.Any: 654 """Evaluates a given condition and returns the second argument if true or else the third argument. 655 656 If false is not passed in, the default return value will be None. 657 658 Example: 659 >>> from sqlglot import parse_one 660 >>> from sqlmesh.core.macros import MacroEvaluator 661 >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a, b)")).sql() 662 'b' 663 664 >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a)")) 665 """ 666 667 if evaluator.eval_expression(condition): 668 return true 669 return false 670 671 672@macro("REDUCE") 673def reduce_(evaluator: MacroEvaluator, *args: t.Any) -> t.Any: 674 """Iterates through items applying provided function that takes two arguments 675 cumulatively to the items of iterable items, from left to right, so as to reduce 676 the iterable to a single item. 677 678 Example: 679 >>> from sqlglot import parse_one 680 >>> from sqlmesh.core.macros import MacroEvaluator 681 >>> sql = "@SQL(@REDUCE([100, 200, 300, 400], (x, y) -> x + y))" 682 >>> MacroEvaluator().transform(parse_one(sql)).sql() 683 '1000' 684 685 Args: 686 evaluator: MacroEvaluator that invoked the macro 687 args: The last argument should be a lambda of the form (x, y) -> x + y. The first argument can be 688 an Array or var args can be used. 689 Returns: 690 A single item that is the result of applying func cumulatively to items 691 """ 692 *items, func = args 693 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 694 return reduce(lambda a, b: func(exp.Tuple(expressions=[a, b])), ensure_collection(items)) 695 696 697@macro("FILTER") 698def filter_(evaluator: MacroEvaluator, *args: t.Any) -> t.List[t.Any]: 699 """Iterates through items, applying provided function to each item and removing 700 all items where the function returns False 701 702 Example: 703 >>> from sqlglot import parse_one 704 >>> from sqlmesh.core.macros import MacroEvaluator 705 >>> sql = "@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y)" 706 >>> MacroEvaluator().transform(parse_one(sql)).sql() 707 '2 + 3' 708 709 >>> sql = "@EVAL(@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y))" 710 >>> MacroEvaluator().transform(parse_one(sql)).sql() 711 '5' 712 713 Args: 714 evaluator: MacroEvaluator that invoked the macro 715 args: The last argument should be a lambda of the form x -> x > 1. The first argument can be 716 an Array or var args can be used. 717 Returns: 718 The items for which the func returned True 719 """ 720 *items, func = args 721 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 722 return list(filter(lambda arg: evaluator.eval_expression(func(arg)), items)) 723 724 725def _optional_expression( 726 evaluator: MacroEvaluator, 727 condition: exp.Condition, 728 expression: exp.Expression, 729) -> t.Optional[exp.Expression]: 730 """Inserts expression when the condition is True 731 732 The following examples express the usage of this function in the context of the macros which wrap it. 733 734 Examples: 735 >>> from sqlglot import parse_one 736 >>> from sqlmesh.core.macros import MacroEvaluator 737 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 738 >>> MacroEvaluator().transform(parse_one(sql)).sql() 739 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 740 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 741 >>> MacroEvaluator().transform(parse_one(sql)).sql() 742 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 743 >>> sql = "select * from city @GROUP_BY(True) country, population" 744 >>> MacroEvaluator().transform(parse_one(sql)).sql() 745 'SELECT * FROM city GROUP BY country, population' 746 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 747 >>> MacroEvaluator().transform(parse_one(sql)).sql() 748 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 749 750 Args: 751 evaluator: MacroEvaluator that invoked the macro 752 condition: Condition expression 753 expression: SQL expression 754 Returns: 755 Expression if the conditional is True; otherwise None 756 """ 757 return expression if evaluator.eval_expression(condition) else None 758 759 760with_ = macro("WITH")(_optional_expression) 761join = macro("JOIN")(_optional_expression) 762where = macro("WHERE")(_optional_expression) 763group_by = macro("GROUP_BY")(_optional_expression) 764having = macro("HAVING")(_optional_expression) 765order_by = macro("ORDER_BY")(_optional_expression) 766limit = macro("LIMIT")(_optional_expression) 767 768 769@macro("eval") 770def eval_(evaluator: MacroEvaluator, condition: exp.Condition) -> t.Any: 771 """Evaluate the given condition in a Python/SQL interpretor. 772 773 Example: 774 >>> from sqlglot import parse_one 775 >>> from sqlmesh.core.macros import MacroEvaluator 776 >>> sql = "@EVAL(1 + 1)" 777 >>> MacroEvaluator().transform(parse_one(sql)).sql() 778 '2' 779 """ 780 return evaluator.eval_expression(condition) 781 782 783# macros with union types need to use t.Union since | isn't available until 3.9 784@macro() 785def star( 786 evaluator: MacroEvaluator, 787 relation: exp.Table, 788 alias: exp.Column = t.cast(exp.Column, exp.column("")), 789 except_: t.Union[exp.Array, exp.Tuple] = exp.Tuple(this=[]), 790 prefix: exp.Literal = exp.Literal.string(""), 791 suffix: exp.Literal = exp.Literal.string(""), 792 quote_identifiers: exp.Boolean = exp.true(), 793) -> t.List[exp.Alias]: 794 """Returns a list of projections for the given relation. 795 796 Args: 797 evaluator: MacroEvaluator that invoked the macro 798 relation: The relation to select star from 799 alias: The alias of the relation 800 except_: Columns to exclude 801 prefix: A prefix to use for all selections 802 suffix: A suffix to use for all selections 803 quote_identifiers: Whether or not quote the resulting aliases, defaults to true 804 805 Returns: 806 An array of columns. 807 808 Example: 809 >>> from sqlglot import parse_one, exp 810 >>> from sqlglot.schema import MappingSchema 811 >>> from sqlmesh.core.macros import MacroEvaluator 812 >>> sql = "SELECT @STAR(foo, bar, [c], 'baz_') FROM foo AS bar" 813 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": exp.DataType.build("string"), "b": exp.DataType.build("string"), "c": exp.DataType.build("string"), "d": exp.DataType.build("int")}})).transform(parse_one(sql)).sql() 814 'SELECT CAST("bar"."a" AS TEXT) AS "baz_a", CAST("bar"."b" AS TEXT) AS "baz_b", CAST("bar"."d" AS INT) AS "baz_d" FROM foo AS bar' 815 """ 816 if alias and not isinstance(alias, (exp.Identifier, exp.Column)): 817 raise SQLMeshError(f"Invalid alias '{alias}'. Expected an identifier.") 818 if except_ and not isinstance(except_, (exp.Array, exp.Tuple)): 819 raise SQLMeshError(f"Invalid except '{except_}'. Expected an array.") 820 if prefix and not isinstance(prefix, exp.Literal): 821 raise SQLMeshError(f"Invalid prefix '{prefix}'. Expected a literal.") 822 if suffix and not isinstance(suffix, exp.Literal): 823 raise SQLMeshError(f"Invalid suffix '{suffix}'. Expected a literal.") 824 if not isinstance(quote_identifiers, exp.Boolean): 825 raise SQLMeshError(f"Invalid quote_identifiers '{quote_identifiers}'. Expected a boolean.") 826 827 exclude = {e.name for e in except_.expressions} 828 quoted = quote_identifiers.this 829 table_identifier = alias.name or relation.name 830 831 columns_to_types = { 832 k: v for k, v in evaluator.columns_to_types(relation).items() if k not in exclude 833 } 834 if columns_to_types_all_known(columns_to_types): 835 return [ 836 exp.cast( 837 exp.column(column, table=table_identifier, quoted=quoted), 838 dtype, 839 dialect=evaluator.dialect, 840 ).as_(f"{prefix.this}{column}{suffix.this}", quoted=quoted) 841 for column, dtype in columns_to_types.items() 842 ] 843 return [ 844 exp.column(column, table=table_identifier, quoted=quoted).as_( 845 f"{prefix.this}{column}{suffix.this}", quoted=quoted 846 ) 847 for column, type_ in evaluator.columns_to_types(relation).items() 848 ] 849 850 851@macro() 852def generate_surrogate_key(_: MacroEvaluator, *fields: exp.Expression) -> exp.Func: 853 """Generates a surrogate key for the given fields. 854 855 Example: 856 >>> from sqlglot import parse_one 857 >>> from sqlmesh.core.macros import MacroEvaluator 858 >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c) FROM foo" 859 >>> MacroEvaluator().transform(parse_one(sql)).sql() 860 "SELECT MD5(CONCAT(COALESCE(CAST(a AS TEXT), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS TEXT), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS TEXT), '_sqlmesh_surrogate_key_null_'))) FROM foo" 861 """ 862 string_fields: t.List[exp.Expression] = [] 863 for i, field in enumerate(fields): 864 if i > 0: 865 string_fields.append(exp.Literal.string("|")) 866 string_fields.append( 867 exp.func( 868 "COALESCE", 869 exp.cast(field, exp.DataType.build("text")), 870 exp.Literal.string("_sqlmesh_surrogate_key_null_"), 871 ) 872 ) 873 return exp.func("MD5", exp.func("CONCAT", *string_fields)) 874 875 876@macro() 877def safe_add(_: MacroEvaluator, *fields: exp.Expression) -> exp.Case: 878 """Adds numbers together, substitutes nulls for 0s and only returns null if all fields are null. 879 880 Example: 881 >>> from sqlglot import parse_one 882 >>> from sqlmesh.core.macros import MacroEvaluator 883 >>> sql = "SELECT @SAFE_ADD(a, b) FROM foo" 884 >>> MacroEvaluator().transform(parse_one(sql)).sql() 885 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) + COALESCE(b, 0) END FROM foo' 886 """ 887 return ( 888 exp.Case() 889 .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null()) 890 .else_(reduce(lambda a, b: a + b, [exp.func("COALESCE", field, 0) for field in fields])) # type: ignore 891 ) 892 893 894@macro() 895def safe_sub(_: MacroEvaluator, *fields: exp.Expression) -> exp.Case: 896 """Subtract numbers, substitutes nulls for 0s and only returns null if all fields are null. 897 898 Example: 899 >>> from sqlglot import parse_one 900 >>> from sqlmesh.core.macros import MacroEvaluator 901 >>> sql = "SELECT @SAFE_SUB(a, b) FROM foo" 902 >>> MacroEvaluator().transform(parse_one(sql)).sql() 903 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) - COALESCE(b, 0) END FROM foo' 904 """ 905 return ( 906 exp.Case() 907 .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null()) 908 .else_(reduce(lambda a, b: a - b, [exp.func("COALESCE", field, 0) for field in fields])) # type: ignore 909 ) 910 911 912@macro() 913def safe_div(_: MacroEvaluator, numerator: exp.Expression, denominator: exp.Expression) -> exp.Div: 914 """Divides numbers, returns null if the denominator is 0. 915 916 Example: 917 >>> from sqlglot import parse_one 918 >>> from sqlmesh.core.macros import MacroEvaluator 919 >>> sql = "SELECT @SAFE_DIV(a, b) FROM foo" 920 >>> MacroEvaluator().transform(parse_one(sql)).sql() 921 'SELECT a / NULLIF(b, 0) FROM foo' 922 """ 923 return numerator / exp.func("NULLIF", denominator, 0) 924 925 926@macro() 927def union( 928 evaluator: MacroEvaluator, 929 type_: exp.Literal = exp.Literal.string("ALL"), 930 *tables: exp.Table, 931) -> exp.Query: 932 """Returns a UNION of the given tables. Only choosing columns that have the same name and type. 933 934 Example: 935 >>> from sqlglot import parse_one 936 >>> from sqlglot.schema import MappingSchema 937 >>> from sqlmesh.core.macros import MacroEvaluator 938 >>> sql = "@UNION('distinct', foo, bar)" 939 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 940 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar' 941 """ 942 kind = type_.name.upper() 943 if kind not in ("ALL", "DISTINCT"): 944 raise SQLMeshError(f"Invalid type '{type_}'. Expected 'ALL' or 'DISTINCT'.") 945 946 columns = { 947 column 948 for column, _ in reduce( 949 lambda a, b: a & b, # type: ignore 950 (evaluator.columns_to_types(table).items() for table in tables), 951 ) 952 } 953 954 projections = [ 955 exp.cast(column, type_, dialect=evaluator.dialect).as_(column) 956 for column, type_ in evaluator.columns_to_types(tables[0]).items() 957 if column in columns 958 ] 959 960 return reduce( 961 lambda a, b: a.union(b, distinct=kind == "DISTINCT"), # type: ignore 962 [exp.select(*projections).from_(t) for t in tables], 963 ) 964 965 966@macro() 967def haversine_distance( 968 _: MacroEvaluator, 969 lat1: exp.Expression, 970 lon1: exp.Expression, 971 lat2: exp.Expression, 972 lon2: exp.Expression, 973 unit: exp.Literal = exp.Literal.string("mi"), 974) -> exp.Mul: 975 """Returns the haversine distance between two points. 976 977 Example: 978 >>> from sqlglot import parse_one 979 >>> from sqlmesh.core.macros import MacroEvaluator 980 >>> sql = "SELECT @HAVERSINE_DISTANCE(driver_y, driver_x, passenger_y, passenger_x, 'mi') FROM rides" 981 >>> MacroEvaluator().transform(parse_one(sql)).sql() 982 'SELECT 7922 * ASIN(SQRT((POWER(SIN(RADIANS((passenger_y - driver_y) / 2)), 2)) + (COS(RADIANS(driver_y)) * COS(RADIANS(passenger_y)) * POWER(SIN(RADIANS((passenger_x - driver_x) / 2)), 2)))) * 1.0 FROM rides' 983 """ 984 if unit.this == "mi": 985 conversion_rate = 1.0 986 elif unit.this == "km": 987 conversion_rate = 1.60934 988 else: 989 raise SQLMeshError(f"Invalid unit '{unit}'. Expected 'mi' or 'km'.") 990 991 return ( 992 2 993 * 3961 994 * exp.func( 995 "ASIN", 996 exp.func( 997 "SQRT", 998 exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lat2 - lat1) / 2)), 2) 999 + exp.func("COS", exp.func("RADIANS", lat1)) 1000 * exp.func("COS", exp.func("RADIANS", lat2)) 1001 * exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lon2 - lon1) / 2)), 2), 1002 ), 1003 ) 1004 * conversion_rate 1005 ) 1006 1007 1008@macro() 1009def pivot( 1010 evaluator: MacroEvaluator, 1011 column: exp.Column, 1012 values: t.Union[exp.Array, exp.Tuple], 1013 alias: exp.Boolean = exp.true(), 1014 agg: exp.Literal = exp.Literal.string("SUM"), 1015 cmp: exp.Literal = exp.Literal.string("="), 1016 prefix: exp.Literal = exp.Literal.string(""), 1017 suffix: exp.Literal = exp.Literal.string(""), 1018 then_value: exp.Literal = exp.Literal.number(1), 1019 else_value: exp.Literal = exp.Literal.number(0), 1020 quote: exp.Boolean = exp.true(), 1021 distinct: exp.Boolean = exp.false(), 1022) -> t.List[exp.Expression]: 1023 """Returns a list of projections as a result of pivoting the given column on the given values. 1024 1025 Example: 1026 >>> from sqlglot import parse_one 1027 >>> from sqlmesh.core.macros import MacroEvaluator 1028 >>> sql = "SELECT date_day, @PIVOT(status, ['cancelled', 'completed']) FROM rides GROUP BY 1" 1029 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1030 'SELECT date_day, SUM(CASE WHEN status = \\'cancelled\\' THEN 1 ELSE 0 END) AS "\\'cancelled\\'", SUM(CASE WHEN status = \\'completed\\' THEN 1 ELSE 0 END) AS "\\'completed\\'" FROM rides GROUP BY 1' 1031 """ 1032 aggregates: t.List[exp.Expression] = [] 1033 for value in values.expressions: 1034 proj = f"{agg.this}(" 1035 if distinct.this: 1036 proj += "DISTINCT " 1037 proj += f"CASE WHEN {column} {cmp.this} {value} THEN {then_value} ELSE {else_value} END) " 1038 node = evaluator.parse_one(proj) 1039 if alias.this: 1040 node = node.as_(f"{prefix.this}{value}{suffix.this}", quoted=quote.this, copy=False) 1041 aggregates.append(node) 1042 return aggregates 1043 1044 1045@macro("AND") 1046def and_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expression]) -> exp.Condition: 1047 """Returns an AND statement filtering out any NULL expressions.""" 1048 conditions = [e for e in expressions if not isinstance(e, exp.Null)] 1049 1050 if not conditions: 1051 return exp.true() 1052 1053 return exp.and_(*conditions, dialect=evaluator.dialect) 1054 1055 1056@macro("OR") 1057def or_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expression]) -> exp.Condition: 1058 """Returns an OR statement filtering out any NULL expressions.""" 1059 conditions = [e for e in expressions if not isinstance(e, exp.Null)] 1060 1061 if not conditions: 1062 return exp.true() 1063 1064 return exp.or_(*conditions, dialect=evaluator.dialect) 1065 1066 1067@macro("VAR") 1068def var( 1069 evaluator: MacroEvaluator, var_name: exp.Expression, default: t.Optional[exp.Expression] = None 1070) -> exp.Expression: 1071 """Returns the value of a variable or the default value if the variable is not set.""" 1072 if not var_name.is_string: 1073 raise SQLMeshError(f"Invalid variable name '{var_name.sql()}'. Expected a string literal.") 1074 1075 return exp.convert(evaluator.var(var_name.this, default)) 1076 1077 1078def normalize_macro_name(name: str) -> str: 1079 """Prefix macro name with @ and upcase""" 1080 return f"@{name.upper()}" 1081 1082 1083for m in macro.get_registry().values(): 1084 setattr(m, c.SQLMESH_BUILTIN, True)
60class RuntimeStage(Enum): 61 LOADING = "loading" 62 CREATING = "creating" 63 EVALUATING = "evaluating" 64 TESTING = "testing"
An enumeration.
Inherited Members
- enum.Enum
- name
- value
A string class for supporting $-substitutions.
Inherited Members
- string.Template
- Template
- flags
- substitute
- safe_substitute
104class MacroDialect(Python): 105 class Generator(Python.Generator): 106 TRANSFORMS = { 107 **Python.Generator.TRANSFORMS, # type: ignore 108 exp.Column: lambda self, e: f"exp.to_column('{self.sql(e, 'this')}')", 109 exp.Lambda: lambda self, e: f"lambda {self.expressions(e)}: {self.sql(e, 'this')}", 110 MacroFunc: _macro_func_sql, 111 MacroSQL: lambda self, e: _macro_sql(self.sql(e, "this"), e.args.get("into")), 112 MacroStrReplace: lambda self, e: _macro_str_replace(self.sql(e, "this")), 113 }
Mapping of an escaped sequence (\n
) to its unescaped version (
).
Inherited Members
- sqlglot.dialects.dialect.Dialect
- Dialect
- INDEX_OFFSET
- WEEK_OFFSET
- UNNEST_COLUMN_ONLY
- ALIAS_POST_TABLESAMPLE
- TABLESAMPLE_SIZE_IS_PERCENT
- NORMALIZATION_STRATEGY
- IDENTIFIERS_CAN_START_WITH_DIGIT
- DPIPE_IS_STRING_CONCAT
- STRICT_STRING_CONCAT
- SUPPORTS_USER_DEFINED_TYPES
- SUPPORTS_SEMI_ANTI_JOIN
- NORMALIZE_FUNCTIONS
- LOG_BASE_FIRST
- NULL_ORDERING
- TYPED_DIVISION
- SAFE_DIVISION
- CONCAT_COALESCE
- TIME_MAPPING
- FORMAT_MAPPING
- PSEUDOCOLUMNS
- PREFER_CTE_ALIAS_COLUMN
- get_or_raise
- format_time
- normalize_identifier
- case_sensitive
- can_identify
- quote_identifier
- to_json_path
- parse
- parse_into
- generate
- transpile
- tokenize
- parser
- generator
- sqlglot.executor.python.Python
- Tokenizer
105 class Generator(Python.Generator): 106 TRANSFORMS = { 107 **Python.Generator.TRANSFORMS, # type: ignore 108 exp.Column: lambda self, e: f"exp.to_column('{self.sql(e, 'this')}')", 109 exp.Lambda: lambda self, e: f"lambda {self.expressions(e)}: {self.sql(e, 'this')}", 110 MacroFunc: _macro_func_sql, 111 MacroSQL: lambda self, e: _macro_sql(self.sql(e, "this"), e.args.get("into")), 112 MacroStrReplace: lambda self, e: _macro_str_replace(self.sql(e, "this")), 113 }
Generator converts a given syntax tree to the corresponding SQL string.
Arguments:
- pretty: Whether to format the produced SQL string. Default: False.
- identify: Determines when an identifier should be quoted. Possible values are: False (default): Never quote, except in cases where it's mandatory by the dialect. True or 'always': Always quote. 'safe': Only quote identifiers that are case insensitive.
- normalize: Whether to normalize identifiers to lowercase. Default: False.
- pad: The pad size in a formatted string. For example, this affects the indentation of a projection in a query, relative to its nesting level. Default: 2.
- indent: The indentation size in a formatted string. For example, this affects the
indentation of subqueries and filters under a
WHERE
clause. Default: 2. - normalize_functions: How to normalize function names. Possible values are: "upper" or True (default): Convert names to uppercase. "lower": Convert names to lowercase. False: Disables function name normalization.
- unsupported_level: Determines the generator's behavior when it encounters unsupported expressions. Default ErrorLevel.WARN.
- max_unsupported: Maximum number of unsupported messages to include in a raised UnsupportedError. This is only relevant if unsupported_level is ErrorLevel.RAISE. Default: 3
- leading_comma: Whether the comma is leading or trailing in select expressions. This is only relevant when generating in pretty mode. Default: False
- max_text_width: The max number of characters in a segment before creating new lines in pretty mode. The default is on the smaller end because the length only represents a segment and not the true line length. Default: 80
- comments: Whether to preserve comments in the output SQL code. Default: True
Inherited Members
- sqlglot.generator.Generator
- Generator
- generate
- preprocess
- unsupported
- sep
- seg
- pad_comment
- maybe_comment
- wrap
- no_identify
- normalize_func
- indent
- sql
- uncache_sql
- cache_sql
- characterset_sql
- column_parts
- column_sql
- columnposition_sql
- columndef_sql
- columnconstraint_sql
- computedcolumnconstraint_sql
- autoincrementcolumnconstraint_sql
- compresscolumnconstraint_sql
- generatedasidentitycolumnconstraint_sql
- generatedasrowcolumnconstraint_sql
- periodforsystemtimeconstraint_sql
- notnullcolumnconstraint_sql
- transformcolumnconstraint_sql
- primarykeycolumnconstraint_sql
- uniquecolumnconstraint_sql
- createable_sql
- create_sql
- sequenceproperties_sql
- clone_sql
- describe_sql
- heredoc_sql
- prepend_ctes
- with_sql
- cte_sql
- tablealias_sql
- bitstring_sql
- hexstring_sql
- bytestring_sql
- unicodestring_sql
- rawstring_sql
- datatypeparam_sql
- datatype_sql
- directory_sql
- delete_sql
- drop_sql
- except_sql
- except_op
- fetch_sql
- filter_sql
- hint_sql
- indexparameters_sql
- index_sql
- identifier_sql
- inputoutputformat_sql
- national_sql
- partition_sql
- properties_sql
- root_properties
- properties
- with_properties
- locate_properties
- property_name
- property_sql
- likeproperty_sql
- fallbackproperty_sql
- journalproperty_sql
- freespaceproperty_sql
- checksumproperty_sql
- mergeblockratioproperty_sql
- datablocksizeproperty_sql
- blockcompressionproperty_sql
- isolatedloadingproperty_sql
- partitionboundspec_sql
- partitionedofproperty_sql
- lockingproperty_sql
- withdataproperty_sql
- withsystemversioningproperty_sql
- insert_sql
- intersect_sql
- intersect_op
- introducer_sql
- kill_sql
- pseudotype_sql
- objectidentifier_sql
- onconflict_sql
- returning_sql
- rowformatdelimitedproperty_sql
- withtablehint_sql
- indextablehint_sql
- historicaldata_sql
- table_parts
- table_sql
- tablesample_sql
- pivot_sql
- version_sql
- tuple_sql
- update_sql
- values_sql
- var_sql
- into_sql
- from_sql
- group_sql
- having_sql
- connect_sql
- prior_sql
- join_sql
- lambda_sql
- lateral_op
- lateral_sql
- limit_sql
- offset_sql
- setitem_sql
- set_sql
- pragma_sql
- lock_sql
- literal_sql
- escape_str
- loaddata_sql
- null_sql
- boolean_sql
- order_sql
- withfill_sql
- cluster_sql
- distribute_sql
- sort_sql
- ordered_sql
- matchrecognizemeasure_sql
- matchrecognize_sql
- query_modifiers
- queryoption_sql
- offset_limit_modifiers
- after_limit_modifiers
- select_sql
- schema_sql
- schema_columns_sql
- star_sql
- parameter_sql
- sessionparameter_sql
- placeholder_sql
- subquery_sql
- qualify_sql
- set_operations
- union_sql
- union_op
- unnest_sql
- prewhere_sql
- where_sql
- window_sql
- partition_by_sql
- windowspec_sql
- withingroup_sql
- between_sql
- bracket_offset_expressions
- bracket_sql
- all_sql
- any_sql
- exists_sql
- case_sql
- constraint_sql
- nextvaluefor_sql
- extract_sql
- trim_sql
- convert_concat_args
- concat_sql
- concatws_sql
- check_sql
- foreignkey_sql
- primarykey_sql
- if_sql
- matchagainst_sql
- jsonkeyvalue_sql
- jsonpath_sql
- json_path_part
- formatjson_sql
- jsonobject_sql
- jsonobjectagg_sql
- jsonarray_sql
- jsonarrayagg_sql
- jsoncolumndef_sql
- jsonschema_sql
- jsontable_sql
- openjsoncolumndef_sql
- openjson_sql
- in_sql
- in_unnest_op
- interval_sql
- return_sql
- reference_sql
- anonymous_sql
- paren_sql
- neg_sql
- not_sql
- alias_sql
- pivotalias_sql
- aliases_sql
- atindex_sql
- attimezone_sql
- fromtimezone_sql
- add_sql
- and_sql
- or_sql
- xor_sql
- connector_sql
- bitwiseand_sql
- bitwiseleftshift_sql
- bitwisenot_sql
- bitwiseor_sql
- bitwiserightshift_sql
- bitwisexor_sql
- cast_sql
- currentdate_sql
- currenttimestamp_sql
- collate_sql
- command_sql
- comment_sql
- mergetreettlaction_sql
- mergetreettl_sql
- transaction_sql
- commit_sql
- rollback_sql
- altercolumn_sql
- alterdiststyle_sql
- altersortkey_sql
- renametable_sql
- renamecolumn_sql
- altertable_sql
- add_column_sql
- droppartition_sql
- addconstraint_sql
- distinct_sql
- ignorenulls_sql
- respectnulls_sql
- havingmax_sql
- intdiv_sql
- dpipe_sql
- div_sql
- overlaps_sql
- distance_sql
- dot_sql
- eq_sql
- propertyeq_sql
- escape_sql
- glob_sql
- gt_sql
- gte_sql
- ilike_sql
- ilikeany_sql
- is_sql
- like_sql
- likeany_sql
- similarto_sql
- lt_sql
- lte_sql
- mod_sql
- mul_sql
- neq_sql
- nullsafeeq_sql
- nullsafeneq_sql
- slice_sql
- sub_sql
- trycast_sql
- try_sql
- log_sql
- use_sql
- binary
- function_fallback_sql
- func
- format_args
- too_wide
- format_time
- expressions
- op_expressions
- naked_property
- tag_sql
- token_sql
- userdefinedfunction_sql
- joinhint_sql
- kwarg_sql
- when_sql
- merge_sql
- tochar_sql
- tonumber_sql
- dictproperty_sql
- dictrange_sql
- dictsubproperty_sql
- oncluster_sql
- clusteredbyproperty_sql
- anyvalue_sql
- querytransform_sql
- indexconstraintoption_sql
- checkcolumnconstraint_sql
- indexcolumnconstraint_sql
- nvl2_sql
- comprehension_sql
- columnprefix_sql
- opclass_sql
- predict_sql
- forin_sql
- refresh_sql
- operator_sql
- toarray_sql
- tsordstotime_sql
- tsordstotimestamp_sql
- tsordstodate_sql
- unixdate_sql
- lastday_sql
- dateadd_sql
- arrayany_sql
- generateseries_sql
- struct_sql
- partitionrange_sql
- truncatetable_sql
- convert_sql
- copyparameter_sql
- credentials_sql
- copy_sql
- semicolon_sql
116class MacroEvaluator: 117 """The class responsible for evaluating SQLMesh Macros/SQL. 118 119 SQLMesh supports special preprocessed SQL prefixed with `@`. Although it provides similar power to 120 traditional methods like string templating, there is semantic understanding of SQL which prevents 121 common errors like leading/trailing commas, syntax errors, etc. 122 123 SQLMesh SQL allows for macro variables and macro functions. Macro variables take the form of @variable. These are used for variable substitution. 124 125 SELECT * FROM foo WHERE ds BETWEEN @start_date AND @end_date 126 127 Macro variables can be defined with a special macro function. 128 129 @DEF(start_date, '2021-01-01') 130 131 Args: 132 dialect: Dialect of the SQL to evaluate. 133 python_env: Serialized Python environment. 134 """ 135 136 def __init__( 137 self, 138 dialect: str = "", 139 python_env: t.Optional[t.Dict[str, Executable]] = None, 140 jinja_env: t.Optional[Environment] = None, 141 schema: t.Optional[MappingSchema] = None, 142 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 143 resolve_tables: t.Optional[t.Callable[[exp.Expression], exp.Expression]] = None, 144 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 145 default_catalog: t.Optional[str] = None, 146 path: Path = Path(), 147 ): 148 self.dialect = dialect 149 self.generator = MacroDialect().generator() 150 self.locals: t.Dict[str, t.Any] = { 151 "runtime_stage": runtime_stage.value, 152 "default_catalog": default_catalog, 153 } 154 self.env = { 155 **ENV, 156 "self": self, 157 "SQL": SQL, 158 "MacroEvaluator": MacroEvaluator, 159 } 160 self.python_env = python_env or {} 161 self._jinja_env: t.Optional[Environment] = jinja_env 162 self.macros = {normalize_macro_name(k): v.func for k, v in macro.get_registry().items()} 163 self._schema = schema 164 self._resolve_tables = resolve_tables 165 self.columns_to_types_called = False 166 self._snapshots = snapshots if snapshots is not None else {} 167 self.default_catalog = default_catalog 168 self._path = path 169 170 prepare_env(self.python_env, self.env) 171 for k, v in self.python_env.items(): 172 if v.is_definition: 173 self.macros[normalize_macro_name(k)] = self.env[v.name or k] 174 elif v.is_import and getattr(self.env.get(k), c.SQLMESH_MACRO, None): 175 self.macros[normalize_macro_name(k)] = self.env[k] 176 elif v.is_value: 177 self.locals[k] = self.env[k] 178 179 def send( 180 self, name: str, *args: t.Any, **kwargs: t.Any 181 ) -> t.Union[None, exp.Expression, t.List[exp.Expression]]: 182 func = self.macros.get(normalize_macro_name(name)) 183 184 if not callable(func): 185 raise SQLMeshError(f"Macro '{name}' does not exist.") 186 187 try: 188 annotations = t.get_type_hints(func) 189 except NameError: # forward references aren't handled 190 annotations = {} 191 192 if annotations: 193 spec = inspect.getfullargspec(func) 194 callargs = inspect.getcallargs(func, self, *args, **kwargs) 195 new_args: t.List[t.Any] = [] 196 197 for arg, value in callargs.items(): 198 typ = annotations.get(arg) 199 200 if value is self: 201 continue 202 if arg == spec.varargs: 203 new_args.extend(self._coerce(v, typ) for v in value) 204 elif arg == spec.varkw: 205 for k, v in value.items(): 206 kwargs[k] = self._coerce(v, typ) 207 elif arg in kwargs: 208 kwargs[arg] = self._coerce(value, typ) 209 else: 210 new_args.append(self._coerce(value, typ)) 211 212 args = new_args # type: ignore 213 214 try: 215 return func(self, *args, **kwargs) 216 except Exception as e: 217 print_exception(e, self.python_env) 218 raise MacroEvalError("Error trying to eval macro.") from e 219 220 def transform( 221 self, expression: exp.Expression 222 ) -> exp.Expression | t.List[exp.Expression] | None: 223 changed = False 224 225 def evaluate_macros( 226 node: exp.Expression, 227 ) -> exp.Expression | t.List[exp.Expression] | None: 228 nonlocal changed 229 230 if isinstance(node, MacroVar): 231 changed = True 232 variables = self.locals.get(c.SQLMESH_VARS, {}) 233 if node.name not in self.locals and node.name.lower() not in variables: 234 if not isinstance(node.parent, StagedFilePath): 235 raise SQLMeshError(f"Macro variable '{node.name}' is undefined.") 236 237 return node 238 239 value = self.locals.get(node.name, variables.get(node.name.lower())) 240 if isinstance(value, list): 241 return exp.convert( 242 tuple( 243 self.transform(v) if isinstance(v, exp.Expression) else v for v in value 244 ) 245 ) 246 return exp.convert( 247 self.transform(value) if isinstance(value, exp.Expression) else value 248 ) 249 if isinstance(node, exp.Identifier) and "@" in node.this: 250 text = self.template(node.this, self.locals) 251 if node.this != text: 252 changed = True 253 node.args["this"] = text 254 return node 255 if node.is_string: 256 text = node.this 257 if has_jinja(text): 258 changed = True 259 node.set("this", self.jinja_env.from_string(node.this).render()) 260 return node 261 if isinstance(node, MacroFunc): 262 changed = True 263 return self.evaluate(node) 264 return node 265 266 transformed = exp.replace_tree( 267 expression.copy(), evaluate_macros, prune=lambda n: isinstance(n, exp.Lambda) 268 ) 269 270 if changed: 271 # the transformations could have corrupted the ast, turning this into sql and reparsing ensures 272 # that the ast is correct 273 if isinstance(transformed, list): 274 return [ 275 self.parse_one(node.sql(dialect=self.dialect, copy=False)) 276 for node in transformed 277 ] 278 elif isinstance(transformed, exp.Expression): 279 return self.parse_one(transformed.sql(dialect=self.dialect, copy=False)) 280 281 return transformed 282 283 def template(self, text: t.Any, local_variables: t.Dict[str, t.Any]) -> str: 284 """Substitute @vars with locals. 285 286 Args: 287 text: The string to do substitition on. 288 local_variables: Local variables in the context so that lambdas can be used. 289 290 Returns: 291 The rendered string. 292 """ 293 mapping = {} 294 295 variables = self.locals.get(c.SQLMESH_VARS, {}) 296 297 for k, v in chain(variables.items(), self.locals.items(), local_variables.items()): 298 # try to convert all variables into sqlglot expressions 299 # because they're going to be converted into strings in sql 300 # we use bare Exception instead of ValueError because there's 301 # a recursive error with MagicMock. 302 # we don't convert strings because that would result in adding quotes 303 if not isinstance(v, str): 304 try: 305 v = exp.convert(v) 306 except Exception: 307 pass 308 309 if isinstance(v, exp.Expression): 310 v = v.sql(dialect=self.dialect) 311 mapping[k] = v 312 313 return MacroStrTemplate(str(text)).safe_substitute(mapping) 314 315 def evaluate(self, node: MacroFunc) -> exp.Expression | t.List[exp.Expression] | None: 316 if isinstance(node, MacroDef): 317 if isinstance(node.expression, exp.Lambda): 318 _, fn = _norm_var_arg_lambda(self, node.expression) 319 self.macros[normalize_macro_name(node.name)] = lambda _, *args: fn( 320 args[0] if len(args) == 1 else exp.Tuple(expressions=list(args)) 321 ) 322 else: 323 self.locals[node.name] = self.transform(node.expression) 324 return node 325 326 if isinstance(node, (MacroSQL, MacroStrReplace)): 327 result: t.Optional[exp.Expression | t.List[exp.Expression]] = exp.convert( 328 self.eval_expression(node) 329 ) 330 else: 331 func = t.cast(exp.Anonymous, node.this) 332 333 args = [] 334 kwargs = {} 335 for e in func.expressions: 336 if isinstance(e, exp.PropertyEQ): 337 kwargs[e.this.name] = e.expression 338 else: 339 if kwargs: 340 raise MacroEvalError( 341 f"Positional argument cannot follow keyword argument.\n {func.sql(dialect=self.dialect)} at '{self._path}'" 342 ) 343 344 args.append(e) 345 346 result = self.send(func.name, *args, **kwargs) 347 348 if result is None: 349 return None 350 351 if isinstance(result, (tuple, list)): 352 return [self.parse_one(item) for item in result if item is not None] 353 return self.parse_one(result) 354 355 def eval_expression(self, node: t.Any) -> t.Any: 356 """Converts a SQLGlot expression into executable Python code and evals it. 357 358 If the node is not an expression, it will simply be returned. 359 360 Args: 361 node: expression 362 Returns: 363 The return value of the evaled Python Code. 364 """ 365 if not isinstance(node, exp.Expression): 366 return node 367 code = node.sql() 368 try: 369 code = self.generator.generate(node) 370 return eval(code, self.env, self.locals) 371 except Exception as e: 372 print_exception(e, self.python_env) 373 raise MacroEvalError( 374 f"Error trying to eval macro.\n\nGenerated code: {code}\n\nOriginal sql: {node}" 375 ) from e 376 377 def parse_one( 378 self, sql: str | exp.Expression, into: t.Optional[exp.IntoType] = None, **opts: t.Any 379 ) -> exp.Expression: 380 """Parses the given SQL string and returns a syntax tree for the first 381 parsed SQL statement. 382 383 Args: 384 sql: the SQL code or expression to parse. 385 into: the Expression to parse into 386 **opts: other options 387 388 Returns: 389 Expression: the syntax tree for the first parsed statement 390 """ 391 return sqlglot.maybe_parse(sql, dialect=self.dialect, into=into, **opts) 392 393 @property 394 def jinja_env(self) -> Environment: 395 if not self._jinja_env: 396 jinja_env_methods = {**self.locals, **self.env} 397 del jinja_env_methods["self"] 398 self._jinja_env = JinjaMacroRegistry().build_environment(**jinja_env_methods) 399 return self._jinja_env 400 401 def columns_to_types(self, model_name: TableName | exp.Column) -> t.Dict[str, exp.DataType]: 402 """Returns the columns-to-types mapping corresponding to the specified model.""" 403 if self._schema is None or self._schema.empty: 404 self.columns_to_types_called = True 405 return {"__schema_unavailable_at_load__": exp.DataType.build("unknown")} 406 407 normalized_model_name = normalize_model_name( 408 model_name, 409 default_catalog=self.default_catalog, 410 dialect=self.dialect, 411 ) 412 columns_to_types = self._schema.find(exp.to_table(normalized_model_name)) 413 if columns_to_types is None: 414 raise SQLMeshError(f"Schema for model '{model_name}' can't be statically determined.") 415 416 return columns_to_types 417 418 def get_snapshot(self, model_name: TableName | exp.Column) -> t.Optional[Snapshot]: 419 """Returns the snapshot that corresponds to the given model name.""" 420 return self._snapshots.get( 421 normalize_model_name( 422 model_name, 423 default_catalog=self.default_catalog, 424 dialect=self.dialect, 425 ) 426 ) 427 428 def resolve_tables(self, query: exp.Expression) -> exp.Expression: 429 """Resolves queries with references to SQLMesh model names to their physical tables.""" 430 if not self._resolve_tables: 431 raise SQLMeshError( 432 "Macro evaluator not properly initialized with resolve_tables lambda." 433 ) 434 return self._resolve_tables(query) 435 436 @property 437 def runtime_stage(self) -> RuntimeStage: 438 """Returns the current runtime stage of the macro evaluation.""" 439 return self.locals["runtime_stage"] 440 441 @property 442 def engine_adapter(self) -> EngineAdapter: 443 engine_adapter = self.locals.get("engine_adapter") 444 if not engine_adapter: 445 raise SQLMeshError( 446 "The engine adapter is not available while models are loading." 447 " You can gate these calls by checking in Python: evaluator.runtime_stage != 'loading' or SQL: @runtime_stage <> 'loading'." 448 ) 449 return self.locals["engine_adapter"] 450 451 @property 452 def gateway(self) -> t.Optional[str]: 453 """Returns the gateway name.""" 454 return self.var(c.GATEWAY) 455 456 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 457 """Returns the value of the specified variable, or the default value if it doesn't exist.""" 458 return (self.locals.get(c.SQLMESH_VARS) or {}).get(var_name.lower(), default) 459 460 def _coerce(self, expr: exp.Expression, typ: t.Any, strict: bool = False) -> t.Any: 461 """Coerces the given expression to the specified type on a best-effort basis.""" 462 base_err_msg = f"Failed to coerce expression '{expr}' to type '{typ}'." 463 try: 464 if typ is None or typ is t.Any: 465 return expr 466 base = t.get_origin(typ) or typ 467 468 # We need to handle Union and TypeVars first since we cannot use isinstance with it 469 if base in UNION_TYPES: 470 for branch in t.get_args(typ): 471 try: 472 return self._coerce(expr, branch, True) 473 except Exception: 474 pass 475 raise SQLMeshError(base_err_msg) 476 if base is SQL and isinstance(expr, exp.Expression): 477 return expr.sql(self.dialect) 478 479 if isinstance(expr, base): 480 return expr 481 if issubclass(base, exp.Expression): 482 d = Dialect.get_or_raise(self.dialect) 483 into = base if base in d.parser().EXPRESSION_PARSERS else None 484 if into is None: 485 if isinstance(expr, exp.Literal): 486 coerced = parse_one(expr.this) 487 else: 488 raise SQLMeshError( 489 f"{base_err_msg} Coercion to {base} requires a literal expression." 490 ) 491 else: 492 coerced = parse_one( 493 expr.this if isinstance(expr, exp.Literal) else expr.sql(), into=into 494 ) 495 if isinstance(coerced, base): 496 return coerced 497 raise SQLMeshError(base_err_msg) 498 499 if base in (int, float, str) and isinstance(expr, exp.Literal): 500 return base(expr.this) 501 if base is str and isinstance(expr, exp.Column) and not expr.table: 502 return expr.name 503 if base is bool and isinstance(expr, exp.Boolean): 504 return expr.this 505 # if base is str and isinstance(expr, exp.Expression): 506 # return expr.sql(self.dialect) 507 if base is tuple and isinstance(expr, (exp.Tuple, exp.Array)): 508 generic = t.get_args(typ) 509 if not generic: 510 return tuple(expr.expressions) 511 if generic[-1] is ...: 512 return tuple(self._coerce(expr, generic[0]) for expr in expr.expressions) 513 elif len(generic) == len(expr.expressions): 514 return tuple( 515 self._coerce(expr, generic[i]) for i, expr in enumerate(expr.expressions) 516 ) 517 raise SQLMeshError(f"{base_err_msg} Expected {len(generic)} items.") 518 if base is list and isinstance(expr, (exp.Array, exp.Tuple)): 519 generic = t.get_args(typ) 520 if not generic: 521 return expr.expressions 522 return [self._coerce(expr, generic[0]) for expr in expr.expressions] 523 raise SQLMeshError(base_err_msg) 524 except Exception: 525 if strict: 526 raise 527 logger.error( 528 "Coercion of expression '%s' to type '%s' failed. Using non coerced expression at '%s'", 529 expr, 530 typ, 531 self._path, 532 ) 533 return expr
The class responsible for evaluating SQLMesh Macros/SQL.
SQLMesh supports special preprocessed SQL prefixed with @
. Although it provides similar power to
traditional methods like string templating, there is semantic understanding of SQL which prevents
common errors like leading/trailing commas, syntax errors, etc.
SQLMesh SQL allows for macro variables and macro functions. Macro variables take the form of @variable. These are used for variable substitution.
SELECT * FROM foo WHERE ds BETWEEN @start_date AND @end_date
Macro variables can be defined with a special macro function.
@DEF(start_date, '2021-01-01')
Arguments:
- dialect: Dialect of the SQL to evaluate.
- python_env: Serialized Python environment.
136 def __init__( 137 self, 138 dialect: str = "", 139 python_env: t.Optional[t.Dict[str, Executable]] = None, 140 jinja_env: t.Optional[Environment] = None, 141 schema: t.Optional[MappingSchema] = None, 142 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 143 resolve_tables: t.Optional[t.Callable[[exp.Expression], exp.Expression]] = None, 144 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 145 default_catalog: t.Optional[str] = None, 146 path: Path = Path(), 147 ): 148 self.dialect = dialect 149 self.generator = MacroDialect().generator() 150 self.locals: t.Dict[str, t.Any] = { 151 "runtime_stage": runtime_stage.value, 152 "default_catalog": default_catalog, 153 } 154 self.env = { 155 **ENV, 156 "self": self, 157 "SQL": SQL, 158 "MacroEvaluator": MacroEvaluator, 159 } 160 self.python_env = python_env or {} 161 self._jinja_env: t.Optional[Environment] = jinja_env 162 self.macros = {normalize_macro_name(k): v.func for k, v in macro.get_registry().items()} 163 self._schema = schema 164 self._resolve_tables = resolve_tables 165 self.columns_to_types_called = False 166 self._snapshots = snapshots if snapshots is not None else {} 167 self.default_catalog = default_catalog 168 self._path = path 169 170 prepare_env(self.python_env, self.env) 171 for k, v in self.python_env.items(): 172 if v.is_definition: 173 self.macros[normalize_macro_name(k)] = self.env[v.name or k] 174 elif v.is_import and getattr(self.env.get(k), c.SQLMESH_MACRO, None): 175 self.macros[normalize_macro_name(k)] = self.env[k] 176 elif v.is_value: 177 self.locals[k] = self.env[k]
179 def send( 180 self, name: str, *args: t.Any, **kwargs: t.Any 181 ) -> t.Union[None, exp.Expression, t.List[exp.Expression]]: 182 func = self.macros.get(normalize_macro_name(name)) 183 184 if not callable(func): 185 raise SQLMeshError(f"Macro '{name}' does not exist.") 186 187 try: 188 annotations = t.get_type_hints(func) 189 except NameError: # forward references aren't handled 190 annotations = {} 191 192 if annotations: 193 spec = inspect.getfullargspec(func) 194 callargs = inspect.getcallargs(func, self, *args, **kwargs) 195 new_args: t.List[t.Any] = [] 196 197 for arg, value in callargs.items(): 198 typ = annotations.get(arg) 199 200 if value is self: 201 continue 202 if arg == spec.varargs: 203 new_args.extend(self._coerce(v, typ) for v in value) 204 elif arg == spec.varkw: 205 for k, v in value.items(): 206 kwargs[k] = self._coerce(v, typ) 207 elif arg in kwargs: 208 kwargs[arg] = self._coerce(value, typ) 209 else: 210 new_args.append(self._coerce(value, typ)) 211 212 args = new_args # type: ignore 213 214 try: 215 return func(self, *args, **kwargs) 216 except Exception as e: 217 print_exception(e, self.python_env) 218 raise MacroEvalError("Error trying to eval macro.") from e
220 def transform( 221 self, expression: exp.Expression 222 ) -> exp.Expression | t.List[exp.Expression] | None: 223 changed = False 224 225 def evaluate_macros( 226 node: exp.Expression, 227 ) -> exp.Expression | t.List[exp.Expression] | None: 228 nonlocal changed 229 230 if isinstance(node, MacroVar): 231 changed = True 232 variables = self.locals.get(c.SQLMESH_VARS, {}) 233 if node.name not in self.locals and node.name.lower() not in variables: 234 if not isinstance(node.parent, StagedFilePath): 235 raise SQLMeshError(f"Macro variable '{node.name}' is undefined.") 236 237 return node 238 239 value = self.locals.get(node.name, variables.get(node.name.lower())) 240 if isinstance(value, list): 241 return exp.convert( 242 tuple( 243 self.transform(v) if isinstance(v, exp.Expression) else v for v in value 244 ) 245 ) 246 return exp.convert( 247 self.transform(value) if isinstance(value, exp.Expression) else value 248 ) 249 if isinstance(node, exp.Identifier) and "@" in node.this: 250 text = self.template(node.this, self.locals) 251 if node.this != text: 252 changed = True 253 node.args["this"] = text 254 return node 255 if node.is_string: 256 text = node.this 257 if has_jinja(text): 258 changed = True 259 node.set("this", self.jinja_env.from_string(node.this).render()) 260 return node 261 if isinstance(node, MacroFunc): 262 changed = True 263 return self.evaluate(node) 264 return node 265 266 transformed = exp.replace_tree( 267 expression.copy(), evaluate_macros, prune=lambda n: isinstance(n, exp.Lambda) 268 ) 269 270 if changed: 271 # the transformations could have corrupted the ast, turning this into sql and reparsing ensures 272 # that the ast is correct 273 if isinstance(transformed, list): 274 return [ 275 self.parse_one(node.sql(dialect=self.dialect, copy=False)) 276 for node in transformed 277 ] 278 elif isinstance(transformed, exp.Expression): 279 return self.parse_one(transformed.sql(dialect=self.dialect, copy=False)) 280 281 return transformed
283 def template(self, text: t.Any, local_variables: t.Dict[str, t.Any]) -> str: 284 """Substitute @vars with locals. 285 286 Args: 287 text: The string to do substitition on. 288 local_variables: Local variables in the context so that lambdas can be used. 289 290 Returns: 291 The rendered string. 292 """ 293 mapping = {} 294 295 variables = self.locals.get(c.SQLMESH_VARS, {}) 296 297 for k, v in chain(variables.items(), self.locals.items(), local_variables.items()): 298 # try to convert all variables into sqlglot expressions 299 # because they're going to be converted into strings in sql 300 # we use bare Exception instead of ValueError because there's 301 # a recursive error with MagicMock. 302 # we don't convert strings because that would result in adding quotes 303 if not isinstance(v, str): 304 try: 305 v = exp.convert(v) 306 except Exception: 307 pass 308 309 if isinstance(v, exp.Expression): 310 v = v.sql(dialect=self.dialect) 311 mapping[k] = v 312 313 return MacroStrTemplate(str(text)).safe_substitute(mapping)
Substitute @vars with locals.
Arguments:
- text: The string to do substitition on.
- local_variables: Local variables in the context so that lambdas can be used.
Returns:
The rendered string.
315 def evaluate(self, node: MacroFunc) -> exp.Expression | t.List[exp.Expression] | None: 316 if isinstance(node, MacroDef): 317 if isinstance(node.expression, exp.Lambda): 318 _, fn = _norm_var_arg_lambda(self, node.expression) 319 self.macros[normalize_macro_name(node.name)] = lambda _, *args: fn( 320 args[0] if len(args) == 1 else exp.Tuple(expressions=list(args)) 321 ) 322 else: 323 self.locals[node.name] = self.transform(node.expression) 324 return node 325 326 if isinstance(node, (MacroSQL, MacroStrReplace)): 327 result: t.Optional[exp.Expression | t.List[exp.Expression]] = exp.convert( 328 self.eval_expression(node) 329 ) 330 else: 331 func = t.cast(exp.Anonymous, node.this) 332 333 args = [] 334 kwargs = {} 335 for e in func.expressions: 336 if isinstance(e, exp.PropertyEQ): 337 kwargs[e.this.name] = e.expression 338 else: 339 if kwargs: 340 raise MacroEvalError( 341 f"Positional argument cannot follow keyword argument.\n {func.sql(dialect=self.dialect)} at '{self._path}'" 342 ) 343 344 args.append(e) 345 346 result = self.send(func.name, *args, **kwargs) 347 348 if result is None: 349 return None 350 351 if isinstance(result, (tuple, list)): 352 return [self.parse_one(item) for item in result if item is not None] 353 return self.parse_one(result)
355 def eval_expression(self, node: t.Any) -> t.Any: 356 """Converts a SQLGlot expression into executable Python code and evals it. 357 358 If the node is not an expression, it will simply be returned. 359 360 Args: 361 node: expression 362 Returns: 363 The return value of the evaled Python Code. 364 """ 365 if not isinstance(node, exp.Expression): 366 return node 367 code = node.sql() 368 try: 369 code = self.generator.generate(node) 370 return eval(code, self.env, self.locals) 371 except Exception as e: 372 print_exception(e, self.python_env) 373 raise MacroEvalError( 374 f"Error trying to eval macro.\n\nGenerated code: {code}\n\nOriginal sql: {node}" 375 ) from e
Converts a SQLGlot expression into executable Python code and evals it.
If the node is not an expression, it will simply be returned.
Arguments:
- node: expression
Returns:
The return value of the evaled Python Code.
377 def parse_one( 378 self, sql: str | exp.Expression, into: t.Optional[exp.IntoType] = None, **opts: t.Any 379 ) -> exp.Expression: 380 """Parses the given SQL string and returns a syntax tree for the first 381 parsed SQL statement. 382 383 Args: 384 sql: the SQL code or expression to parse. 385 into: the Expression to parse into 386 **opts: other options 387 388 Returns: 389 Expression: the syntax tree for the first parsed statement 390 """ 391 return sqlglot.maybe_parse(sql, dialect=self.dialect, into=into, **opts)
Parses the given SQL string and returns a syntax tree for the first parsed SQL statement.
Arguments:
- sql: the SQL code or expression to parse.
- into: the Expression to parse into
- **opts: other options
Returns:
Expression: the syntax tree for the first parsed statement
401 def columns_to_types(self, model_name: TableName | exp.Column) -> t.Dict[str, exp.DataType]: 402 """Returns the columns-to-types mapping corresponding to the specified model.""" 403 if self._schema is None or self._schema.empty: 404 self.columns_to_types_called = True 405 return {"__schema_unavailable_at_load__": exp.DataType.build("unknown")} 406 407 normalized_model_name = normalize_model_name( 408 model_name, 409 default_catalog=self.default_catalog, 410 dialect=self.dialect, 411 ) 412 columns_to_types = self._schema.find(exp.to_table(normalized_model_name)) 413 if columns_to_types is None: 414 raise SQLMeshError(f"Schema for model '{model_name}' can't be statically determined.") 415 416 return columns_to_types
Returns the columns-to-types mapping corresponding to the specified model.
418 def get_snapshot(self, model_name: TableName | exp.Column) -> t.Optional[Snapshot]: 419 """Returns the snapshot that corresponds to the given model name.""" 420 return self._snapshots.get( 421 normalize_model_name( 422 model_name, 423 default_catalog=self.default_catalog, 424 dialect=self.dialect, 425 ) 426 )
Returns the snapshot that corresponds to the given model name.
428 def resolve_tables(self, query: exp.Expression) -> exp.Expression: 429 """Resolves queries with references to SQLMesh model names to their physical tables.""" 430 if not self._resolve_tables: 431 raise SQLMeshError( 432 "Macro evaluator not properly initialized with resolve_tables lambda." 433 ) 434 return self._resolve_tables(query)
Resolves queries with references to SQLMesh model names to their physical tables.
Returns the current runtime stage of the macro evaluation.
456 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 457 """Returns the value of the specified variable, or the default value if it doesn't exist.""" 458 return (self.locals.get(c.SQLMESH_VARS) or {}).get(var_name.lower(), default)
Returns the value of the specified variable, or the default value if it doesn't exist.
536class macro(registry_decorator): 537 """Specifies a function is a macro and registers it the global MACROS registry. 538 539 Registered macros can be referenced in SQL statements to make queries more dynamic/cleaner. 540 541 Example: 542 from sqlglot import exp 543 from sqlmesh.core.macros import MacroEvaluator, macro 544 545 @macro() 546 def add_one(evaluator: MacroEvaluator, column: exp.Literal) -> exp.Add: 547 return evaluator.parse_one(f"{column} + 1") 548 549 Args: 550 name: A custom name for the macro, the default is the name of the function. 551 """ 552 553 registry_name = "macros" 554 555 def __call__( 556 self, func: t.Callable[..., DECORATOR_RETURN_TYPE] 557 ) -> t.Callable[..., DECORATOR_RETURN_TYPE]: 558 wrapper = super().__call__(func) 559 560 # This is used to identify macros at runtime to unwrap during serialization. 561 setattr(wrapper, c.SQLMESH_MACRO, True) 562 return wrapper
Specifies a function is a macro and registers it the global MACROS registry.
Registered macros can be referenced in SQL statements to make queries more dynamic/cleaner.
Example:
from sqlglot import exp from sqlmesh.core.macros import MacroEvaluator, macro
@macro() def add_one(evaluator: MacroEvaluator, column: exp.Literal) -> exp.Add: return evaluator.parse_one(f"{column} + 1")
Arguments:
- name: A custom name for the macro, the default is the name of the function.
Inherited Members
626@macro() 627def each( 628 evaluator: MacroEvaluator, 629 *args: t.Any, 630) -> t.List[t.Any]: 631 """Iterates through items calling func on each. 632 633 If a func call on item returns None, it will be excluded from the list. 634 635 Args: 636 evaluator: MacroEvaluator that invoked the macro 637 args: The last argument should be a lambda of the form x -> x +1. The first argument can be 638 an Array or var args can be used. 639 640 Returns: 641 A list of items that is the result of func 642 """ 643 *items, func = args 644 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 645 return [item for item in map(func, ensure_collection(items)) if item is not None]
Iterates through items calling func on each.
If a func call on item returns None, it will be excluded from the list.
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- args: The last argument should be a lambda of the form x -> x +1. The first argument can be an Array or var args can be used.
Returns:
A list of items that is the result of func
648@macro("IF") 649def if_( 650 evaluator: MacroEvaluator, 651 condition: t.Any, 652 true: t.Any, 653 false: t.Any = None, 654) -> t.Any: 655 """Evaluates a given condition and returns the second argument if true or else the third argument. 656 657 If false is not passed in, the default return value will be None. 658 659 Example: 660 >>> from sqlglot import parse_one 661 >>> from sqlmesh.core.macros import MacroEvaluator 662 >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a, b)")).sql() 663 'b' 664 665 >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a)")) 666 """ 667 668 if evaluator.eval_expression(condition): 669 return true 670 return false
Evaluates a given condition and returns the second argument if true or else the third argument.
If false is not passed in, the default return value will be None.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a, b)")).sql() 'b'
>>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a)"))
673@macro("REDUCE") 674def reduce_(evaluator: MacroEvaluator, *args: t.Any) -> t.Any: 675 """Iterates through items applying provided function that takes two arguments 676 cumulatively to the items of iterable items, from left to right, so as to reduce 677 the iterable to a single item. 678 679 Example: 680 >>> from sqlglot import parse_one 681 >>> from sqlmesh.core.macros import MacroEvaluator 682 >>> sql = "@SQL(@REDUCE([100, 200, 300, 400], (x, y) -> x + y))" 683 >>> MacroEvaluator().transform(parse_one(sql)).sql() 684 '1000' 685 686 Args: 687 evaluator: MacroEvaluator that invoked the macro 688 args: The last argument should be a lambda of the form (x, y) -> x + y. The first argument can be 689 an Array or var args can be used. 690 Returns: 691 A single item that is the result of applying func cumulatively to items 692 """ 693 *items, func = args 694 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 695 return reduce(lambda a, b: func(exp.Tuple(expressions=[a, b])), ensure_collection(items))
Iterates through items applying provided function that takes two arguments cumulatively to the items of iterable items, from left to right, so as to reduce the iterable to a single item.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@SQL(@REDUCE([100, 200, 300, 400], (x, y) -> x + y))" >>> MacroEvaluator().transform(parse_one(sql)).sql() '1000'
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- args: The last argument should be a lambda of the form (x, y) -> x + y. The first argument can be an Array or var args can be used.
Returns:
A single item that is the result of applying func cumulatively to items
698@macro("FILTER") 699def filter_(evaluator: MacroEvaluator, *args: t.Any) -> t.List[t.Any]: 700 """Iterates through items, applying provided function to each item and removing 701 all items where the function returns False 702 703 Example: 704 >>> from sqlglot import parse_one 705 >>> from sqlmesh.core.macros import MacroEvaluator 706 >>> sql = "@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y)" 707 >>> MacroEvaluator().transform(parse_one(sql)).sql() 708 '2 + 3' 709 710 >>> sql = "@EVAL(@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y))" 711 >>> MacroEvaluator().transform(parse_one(sql)).sql() 712 '5' 713 714 Args: 715 evaluator: MacroEvaluator that invoked the macro 716 args: The last argument should be a lambda of the form x -> x > 1. The first argument can be 717 an Array or var args can be used. 718 Returns: 719 The items for which the func returned True 720 """ 721 *items, func = args 722 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 723 return list(filter(lambda arg: evaluator.eval_expression(func(arg)), items))
Iterates through items, applying provided function to each item and removing all items where the function returns False
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y)" >>> MacroEvaluator().transform(parse_one(sql)).sql() '2 + 3'
>>> sql = "@EVAL(@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y))" >>> MacroEvaluator().transform(parse_one(sql)).sql() '5'
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- args: The last argument should be a lambda of the form x -> x > 1. The first argument can be an Array or var args can be used.
Returns:
The items for which the func returned True
726def _optional_expression( 727 evaluator: MacroEvaluator, 728 condition: exp.Condition, 729 expression: exp.Expression, 730) -> t.Optional[exp.Expression]: 731 """Inserts expression when the condition is True 732 733 The following examples express the usage of this function in the context of the macros which wrap it. 734 735 Examples: 736 >>> from sqlglot import parse_one 737 >>> from sqlmesh.core.macros import MacroEvaluator 738 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 739 >>> MacroEvaluator().transform(parse_one(sql)).sql() 740 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 741 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 742 >>> MacroEvaluator().transform(parse_one(sql)).sql() 743 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 744 >>> sql = "select * from city @GROUP_BY(True) country, population" 745 >>> MacroEvaluator().transform(parse_one(sql)).sql() 746 'SELECT * FROM city GROUP BY country, population' 747 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 748 >>> MacroEvaluator().transform(parse_one(sql)).sql() 749 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 750 751 Args: 752 evaluator: MacroEvaluator that invoked the macro 753 condition: Condition expression 754 expression: SQL expression 755 Returns: 756 Expression if the conditional is True; otherwise None 757 """ 758 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
726def _optional_expression( 727 evaluator: MacroEvaluator, 728 condition: exp.Condition, 729 expression: exp.Expression, 730) -> t.Optional[exp.Expression]: 731 """Inserts expression when the condition is True 732 733 The following examples express the usage of this function in the context of the macros which wrap it. 734 735 Examples: 736 >>> from sqlglot import parse_one 737 >>> from sqlmesh.core.macros import MacroEvaluator 738 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 739 >>> MacroEvaluator().transform(parse_one(sql)).sql() 740 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 741 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 742 >>> MacroEvaluator().transform(parse_one(sql)).sql() 743 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 744 >>> sql = "select * from city @GROUP_BY(True) country, population" 745 >>> MacroEvaluator().transform(parse_one(sql)).sql() 746 'SELECT * FROM city GROUP BY country, population' 747 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 748 >>> MacroEvaluator().transform(parse_one(sql)).sql() 749 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 750 751 Args: 752 evaluator: MacroEvaluator that invoked the macro 753 condition: Condition expression 754 expression: SQL expression 755 Returns: 756 Expression if the conditional is True; otherwise None 757 """ 758 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
726def _optional_expression( 727 evaluator: MacroEvaluator, 728 condition: exp.Condition, 729 expression: exp.Expression, 730) -> t.Optional[exp.Expression]: 731 """Inserts expression when the condition is True 732 733 The following examples express the usage of this function in the context of the macros which wrap it. 734 735 Examples: 736 >>> from sqlglot import parse_one 737 >>> from sqlmesh.core.macros import MacroEvaluator 738 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 739 >>> MacroEvaluator().transform(parse_one(sql)).sql() 740 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 741 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 742 >>> MacroEvaluator().transform(parse_one(sql)).sql() 743 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 744 >>> sql = "select * from city @GROUP_BY(True) country, population" 745 >>> MacroEvaluator().transform(parse_one(sql)).sql() 746 'SELECT * FROM city GROUP BY country, population' 747 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 748 >>> MacroEvaluator().transform(parse_one(sql)).sql() 749 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 750 751 Args: 752 evaluator: MacroEvaluator that invoked the macro 753 condition: Condition expression 754 expression: SQL expression 755 Returns: 756 Expression if the conditional is True; otherwise None 757 """ 758 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
726def _optional_expression( 727 evaluator: MacroEvaluator, 728 condition: exp.Condition, 729 expression: exp.Expression, 730) -> t.Optional[exp.Expression]: 731 """Inserts expression when the condition is True 732 733 The following examples express the usage of this function in the context of the macros which wrap it. 734 735 Examples: 736 >>> from sqlglot import parse_one 737 >>> from sqlmesh.core.macros import MacroEvaluator 738 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 739 >>> MacroEvaluator().transform(parse_one(sql)).sql() 740 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 741 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 742 >>> MacroEvaluator().transform(parse_one(sql)).sql() 743 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 744 >>> sql = "select * from city @GROUP_BY(True) country, population" 745 >>> MacroEvaluator().transform(parse_one(sql)).sql() 746 'SELECT * FROM city GROUP BY country, population' 747 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 748 >>> MacroEvaluator().transform(parse_one(sql)).sql() 749 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 750 751 Args: 752 evaluator: MacroEvaluator that invoked the macro 753 condition: Condition expression 754 expression: SQL expression 755 Returns: 756 Expression if the conditional is True; otherwise None 757 """ 758 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
726def _optional_expression( 727 evaluator: MacroEvaluator, 728 condition: exp.Condition, 729 expression: exp.Expression, 730) -> t.Optional[exp.Expression]: 731 """Inserts expression when the condition is True 732 733 The following examples express the usage of this function in the context of the macros which wrap it. 734 735 Examples: 736 >>> from sqlglot import parse_one 737 >>> from sqlmesh.core.macros import MacroEvaluator 738 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 739 >>> MacroEvaluator().transform(parse_one(sql)).sql() 740 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 741 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 742 >>> MacroEvaluator().transform(parse_one(sql)).sql() 743 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 744 >>> sql = "select * from city @GROUP_BY(True) country, population" 745 >>> MacroEvaluator().transform(parse_one(sql)).sql() 746 'SELECT * FROM city GROUP BY country, population' 747 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 748 >>> MacroEvaluator().transform(parse_one(sql)).sql() 749 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 750 751 Args: 752 evaluator: MacroEvaluator that invoked the macro 753 condition: Condition expression 754 expression: SQL expression 755 Returns: 756 Expression if the conditional is True; otherwise None 757 """ 758 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
726def _optional_expression( 727 evaluator: MacroEvaluator, 728 condition: exp.Condition, 729 expression: exp.Expression, 730) -> t.Optional[exp.Expression]: 731 """Inserts expression when the condition is True 732 733 The following examples express the usage of this function in the context of the macros which wrap it. 734 735 Examples: 736 >>> from sqlglot import parse_one 737 >>> from sqlmesh.core.macros import MacroEvaluator 738 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 739 >>> MacroEvaluator().transform(parse_one(sql)).sql() 740 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 741 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 742 >>> MacroEvaluator().transform(parse_one(sql)).sql() 743 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 744 >>> sql = "select * from city @GROUP_BY(True) country, population" 745 >>> MacroEvaluator().transform(parse_one(sql)).sql() 746 'SELECT * FROM city GROUP BY country, population' 747 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 748 >>> MacroEvaluator().transform(parse_one(sql)).sql() 749 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 750 751 Args: 752 evaluator: MacroEvaluator that invoked the macro 753 condition: Condition expression 754 expression: SQL expression 755 Returns: 756 Expression if the conditional is True; otherwise None 757 """ 758 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
726def _optional_expression( 727 evaluator: MacroEvaluator, 728 condition: exp.Condition, 729 expression: exp.Expression, 730) -> t.Optional[exp.Expression]: 731 """Inserts expression when the condition is True 732 733 The following examples express the usage of this function in the context of the macros which wrap it. 734 735 Examples: 736 >>> from sqlglot import parse_one 737 >>> from sqlmesh.core.macros import MacroEvaluator 738 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 739 >>> MacroEvaluator().transform(parse_one(sql)).sql() 740 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 741 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 742 >>> MacroEvaluator().transform(parse_one(sql)).sql() 743 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 744 >>> sql = "select * from city @GROUP_BY(True) country, population" 745 >>> MacroEvaluator().transform(parse_one(sql)).sql() 746 'SELECT * FROM city GROUP BY country, population' 747 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 748 >>> MacroEvaluator().transform(parse_one(sql)).sql() 749 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 750 751 Args: 752 evaluator: MacroEvaluator that invoked the macro 753 condition: Condition expression 754 expression: SQL expression 755 Returns: 756 Expression if the conditional is True; otherwise None 757 """ 758 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
770@macro("eval") 771def eval_(evaluator: MacroEvaluator, condition: exp.Condition) -> t.Any: 772 """Evaluate the given condition in a Python/SQL interpretor. 773 774 Example: 775 >>> from sqlglot import parse_one 776 >>> from sqlmesh.core.macros import MacroEvaluator 777 >>> sql = "@EVAL(1 + 1)" 778 >>> MacroEvaluator().transform(parse_one(sql)).sql() 779 '2' 780 """ 781 return evaluator.eval_expression(condition)
Evaluate the given condition in a Python/SQL interpretor.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@EVAL(1 + 1)" >>> MacroEvaluator().transform(parse_one(sql)).sql() '2'
785@macro() 786def star( 787 evaluator: MacroEvaluator, 788 relation: exp.Table, 789 alias: exp.Column = t.cast(exp.Column, exp.column("")), 790 except_: t.Union[exp.Array, exp.Tuple] = exp.Tuple(this=[]), 791 prefix: exp.Literal = exp.Literal.string(""), 792 suffix: exp.Literal = exp.Literal.string(""), 793 quote_identifiers: exp.Boolean = exp.true(), 794) -> t.List[exp.Alias]: 795 """Returns a list of projections for the given relation. 796 797 Args: 798 evaluator: MacroEvaluator that invoked the macro 799 relation: The relation to select star from 800 alias: The alias of the relation 801 except_: Columns to exclude 802 prefix: A prefix to use for all selections 803 suffix: A suffix to use for all selections 804 quote_identifiers: Whether or not quote the resulting aliases, defaults to true 805 806 Returns: 807 An array of columns. 808 809 Example: 810 >>> from sqlglot import parse_one, exp 811 >>> from sqlglot.schema import MappingSchema 812 >>> from sqlmesh.core.macros import MacroEvaluator 813 >>> sql = "SELECT @STAR(foo, bar, [c], 'baz_') FROM foo AS bar" 814 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": exp.DataType.build("string"), "b": exp.DataType.build("string"), "c": exp.DataType.build("string"), "d": exp.DataType.build("int")}})).transform(parse_one(sql)).sql() 815 'SELECT CAST("bar"."a" AS TEXT) AS "baz_a", CAST("bar"."b" AS TEXT) AS "baz_b", CAST("bar"."d" AS INT) AS "baz_d" FROM foo AS bar' 816 """ 817 if alias and not isinstance(alias, (exp.Identifier, exp.Column)): 818 raise SQLMeshError(f"Invalid alias '{alias}'. Expected an identifier.") 819 if except_ and not isinstance(except_, (exp.Array, exp.Tuple)): 820 raise SQLMeshError(f"Invalid except '{except_}'. Expected an array.") 821 if prefix and not isinstance(prefix, exp.Literal): 822 raise SQLMeshError(f"Invalid prefix '{prefix}'. Expected a literal.") 823 if suffix and not isinstance(suffix, exp.Literal): 824 raise SQLMeshError(f"Invalid suffix '{suffix}'. Expected a literal.") 825 if not isinstance(quote_identifiers, exp.Boolean): 826 raise SQLMeshError(f"Invalid quote_identifiers '{quote_identifiers}'. Expected a boolean.") 827 828 exclude = {e.name for e in except_.expressions} 829 quoted = quote_identifiers.this 830 table_identifier = alias.name or relation.name 831 832 columns_to_types = { 833 k: v for k, v in evaluator.columns_to_types(relation).items() if k not in exclude 834 } 835 if columns_to_types_all_known(columns_to_types): 836 return [ 837 exp.cast( 838 exp.column(column, table=table_identifier, quoted=quoted), 839 dtype, 840 dialect=evaluator.dialect, 841 ).as_(f"{prefix.this}{column}{suffix.this}", quoted=quoted) 842 for column, dtype in columns_to_types.items() 843 ] 844 return [ 845 exp.column(column, table=table_identifier, quoted=quoted).as_( 846 f"{prefix.this}{column}{suffix.this}", quoted=quoted 847 ) 848 for column, type_ in evaluator.columns_to_types(relation).items() 849 ]
Returns a list of projections for the given relation.
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- relation: The relation to select star from
- alias: The alias of the relation
- except_: Columns to exclude
- prefix: A prefix to use for all selections
- suffix: A suffix to use for all selections
- quote_identifiers: Whether or not quote the resulting aliases, defaults to true
Returns:
An array of columns.
Example:
>>> from sqlglot import parse_one, exp >>> from sqlglot.schema import MappingSchema >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @STAR(foo, bar, [c], 'baz_') FROM foo AS bar" >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": exp.DataType.build("string"), "b": exp.DataType.build("string"), "c": exp.DataType.build("string"), "d": exp.DataType.build("int")}})).transform(parse_one(sql)).sql() 'SELECT CAST("bar"."a" AS TEXT) AS "baz_a", CAST("bar"."b" AS TEXT) AS "baz_b", CAST("bar"."d" AS INT) AS "baz_d" FROM foo AS bar'
852@macro() 853def generate_surrogate_key(_: MacroEvaluator, *fields: exp.Expression) -> exp.Func: 854 """Generates a surrogate key for the given fields. 855 856 Example: 857 >>> from sqlglot import parse_one 858 >>> from sqlmesh.core.macros import MacroEvaluator 859 >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c) FROM foo" 860 >>> MacroEvaluator().transform(parse_one(sql)).sql() 861 "SELECT MD5(CONCAT(COALESCE(CAST(a AS TEXT), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS TEXT), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS TEXT), '_sqlmesh_surrogate_key_null_'))) FROM foo" 862 """ 863 string_fields: t.List[exp.Expression] = [] 864 for i, field in enumerate(fields): 865 if i > 0: 866 string_fields.append(exp.Literal.string("|")) 867 string_fields.append( 868 exp.func( 869 "COALESCE", 870 exp.cast(field, exp.DataType.build("text")), 871 exp.Literal.string("_sqlmesh_surrogate_key_null_"), 872 ) 873 ) 874 return exp.func("MD5", exp.func("CONCAT", *string_fields))
Generates a surrogate key for the given fields.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c) FROM foo" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT MD5(CONCAT(COALESCE(CAST(a AS TEXT), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS TEXT), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS TEXT), '_sqlmesh_surrogate_key_null_'))) FROM foo"
877@macro() 878def safe_add(_: MacroEvaluator, *fields: exp.Expression) -> exp.Case: 879 """Adds numbers together, substitutes nulls for 0s and only returns null if all fields are null. 880 881 Example: 882 >>> from sqlglot import parse_one 883 >>> from sqlmesh.core.macros import MacroEvaluator 884 >>> sql = "SELECT @SAFE_ADD(a, b) FROM foo" 885 >>> MacroEvaluator().transform(parse_one(sql)).sql() 886 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) + COALESCE(b, 0) END FROM foo' 887 """ 888 return ( 889 exp.Case() 890 .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null()) 891 .else_(reduce(lambda a, b: a + b, [exp.func("COALESCE", field, 0) for field in fields])) # type: ignore 892 )
Adds numbers together, substitutes nulls for 0s and only returns null if all fields are null.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @SAFE_ADD(a, b) FROM foo" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) + COALESCE(b, 0) END FROM foo'
895@macro() 896def safe_sub(_: MacroEvaluator, *fields: exp.Expression) -> exp.Case: 897 """Subtract numbers, substitutes nulls for 0s and only returns null if all fields are null. 898 899 Example: 900 >>> from sqlglot import parse_one 901 >>> from sqlmesh.core.macros import MacroEvaluator 902 >>> sql = "SELECT @SAFE_SUB(a, b) FROM foo" 903 >>> MacroEvaluator().transform(parse_one(sql)).sql() 904 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) - COALESCE(b, 0) END FROM foo' 905 """ 906 return ( 907 exp.Case() 908 .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null()) 909 .else_(reduce(lambda a, b: a - b, [exp.func("COALESCE", field, 0) for field in fields])) # type: ignore 910 )
Subtract numbers, substitutes nulls for 0s and only returns null if all fields are null.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @SAFE_SUB(a, b) FROM foo" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) - COALESCE(b, 0) END FROM foo'
913@macro() 914def safe_div(_: MacroEvaluator, numerator: exp.Expression, denominator: exp.Expression) -> exp.Div: 915 """Divides numbers, returns null if the denominator is 0. 916 917 Example: 918 >>> from sqlglot import parse_one 919 >>> from sqlmesh.core.macros import MacroEvaluator 920 >>> sql = "SELECT @SAFE_DIV(a, b) FROM foo" 921 >>> MacroEvaluator().transform(parse_one(sql)).sql() 922 'SELECT a / NULLIF(b, 0) FROM foo' 923 """ 924 return numerator / exp.func("NULLIF", denominator, 0)
Divides numbers, returns null if the denominator is 0.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @SAFE_DIV(a, b) FROM foo" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT a / NULLIF(b, 0) FROM foo'
927@macro() 928def union( 929 evaluator: MacroEvaluator, 930 type_: exp.Literal = exp.Literal.string("ALL"), 931 *tables: exp.Table, 932) -> exp.Query: 933 """Returns a UNION of the given tables. Only choosing columns that have the same name and type. 934 935 Example: 936 >>> from sqlglot import parse_one 937 >>> from sqlglot.schema import MappingSchema 938 >>> from sqlmesh.core.macros import MacroEvaluator 939 >>> sql = "@UNION('distinct', foo, bar)" 940 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 941 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar' 942 """ 943 kind = type_.name.upper() 944 if kind not in ("ALL", "DISTINCT"): 945 raise SQLMeshError(f"Invalid type '{type_}'. Expected 'ALL' or 'DISTINCT'.") 946 947 columns = { 948 column 949 for column, _ in reduce( 950 lambda a, b: a & b, # type: ignore 951 (evaluator.columns_to_types(table).items() for table in tables), 952 ) 953 } 954 955 projections = [ 956 exp.cast(column, type_, dialect=evaluator.dialect).as_(column) 957 for column, type_ in evaluator.columns_to_types(tables[0]).items() 958 if column in columns 959 ] 960 961 return reduce( 962 lambda a, b: a.union(b, distinct=kind == "DISTINCT"), # type: ignore 963 [exp.select(*projections).from_(t) for t in tables], 964 )
Returns a UNION of the given tables. Only choosing columns that have the same name and type.
Example:
>>> from sqlglot import parse_one >>> from sqlglot.schema import MappingSchema >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@UNION('distinct', foo, bar)" >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar'
967@macro() 968def haversine_distance( 969 _: MacroEvaluator, 970 lat1: exp.Expression, 971 lon1: exp.Expression, 972 lat2: exp.Expression, 973 lon2: exp.Expression, 974 unit: exp.Literal = exp.Literal.string("mi"), 975) -> exp.Mul: 976 """Returns the haversine distance between two points. 977 978 Example: 979 >>> from sqlglot import parse_one 980 >>> from sqlmesh.core.macros import MacroEvaluator 981 >>> sql = "SELECT @HAVERSINE_DISTANCE(driver_y, driver_x, passenger_y, passenger_x, 'mi') FROM rides" 982 >>> MacroEvaluator().transform(parse_one(sql)).sql() 983 'SELECT 7922 * ASIN(SQRT((POWER(SIN(RADIANS((passenger_y - driver_y) / 2)), 2)) + (COS(RADIANS(driver_y)) * COS(RADIANS(passenger_y)) * POWER(SIN(RADIANS((passenger_x - driver_x) / 2)), 2)))) * 1.0 FROM rides' 984 """ 985 if unit.this == "mi": 986 conversion_rate = 1.0 987 elif unit.this == "km": 988 conversion_rate = 1.60934 989 else: 990 raise SQLMeshError(f"Invalid unit '{unit}'. Expected 'mi' or 'km'.") 991 992 return ( 993 2 994 * 3961 995 * exp.func( 996 "ASIN", 997 exp.func( 998 "SQRT", 999 exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lat2 - lat1) / 2)), 2) 1000 + exp.func("COS", exp.func("RADIANS", lat1)) 1001 * exp.func("COS", exp.func("RADIANS", lat2)) 1002 * exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lon2 - lon1) / 2)), 2), 1003 ), 1004 ) 1005 * conversion_rate 1006 )
Returns the haversine distance between two points.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @HAVERSINE_DISTANCE(driver_y, driver_x, passenger_y, passenger_x, 'mi') FROM rides" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT 7922 * ASIN(SQRT((POWER(SIN(RADIANS((passenger_y - driver_y) / 2)), 2)) + (COS(RADIANS(driver_y)) * COS(RADIANS(passenger_y)) * POWER(SIN(RADIANS((passenger_x - driver_x) / 2)), 2)))) * 1.0 FROM rides'
1009@macro() 1010def pivot( 1011 evaluator: MacroEvaluator, 1012 column: exp.Column, 1013 values: t.Union[exp.Array, exp.Tuple], 1014 alias: exp.Boolean = exp.true(), 1015 agg: exp.Literal = exp.Literal.string("SUM"), 1016 cmp: exp.Literal = exp.Literal.string("="), 1017 prefix: exp.Literal = exp.Literal.string(""), 1018 suffix: exp.Literal = exp.Literal.string(""), 1019 then_value: exp.Literal = exp.Literal.number(1), 1020 else_value: exp.Literal = exp.Literal.number(0), 1021 quote: exp.Boolean = exp.true(), 1022 distinct: exp.Boolean = exp.false(), 1023) -> t.List[exp.Expression]: 1024 """Returns a list of projections as a result of pivoting the given column on the given values. 1025 1026 Example: 1027 >>> from sqlglot import parse_one 1028 >>> from sqlmesh.core.macros import MacroEvaluator 1029 >>> sql = "SELECT date_day, @PIVOT(status, ['cancelled', 'completed']) FROM rides GROUP BY 1" 1030 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1031 'SELECT date_day, SUM(CASE WHEN status = \\'cancelled\\' THEN 1 ELSE 0 END) AS "\\'cancelled\\'", SUM(CASE WHEN status = \\'completed\\' THEN 1 ELSE 0 END) AS "\\'completed\\'" FROM rides GROUP BY 1' 1032 """ 1033 aggregates: t.List[exp.Expression] = [] 1034 for value in values.expressions: 1035 proj = f"{agg.this}(" 1036 if distinct.this: 1037 proj += "DISTINCT " 1038 proj += f"CASE WHEN {column} {cmp.this} {value} THEN {then_value} ELSE {else_value} END) " 1039 node = evaluator.parse_one(proj) 1040 if alias.this: 1041 node = node.as_(f"{prefix.this}{value}{suffix.this}", quoted=quote.this, copy=False) 1042 aggregates.append(node) 1043 return aggregates
Returns a list of projections as a result of pivoting the given column on the given values.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT date_day, @PIVOT(status, ['cancelled', 'completed']) FROM rides GROUP BY 1" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT date_day, SUM(CASE WHEN status = \'cancelled\' THEN 1 ELSE 0 END) AS "\'cancelled\'", SUM(CASE WHEN status = \'completed\' THEN 1 ELSE 0 END) AS "\'completed\'" FROM rides GROUP BY 1'
1046@macro("AND") 1047def and_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expression]) -> exp.Condition: 1048 """Returns an AND statement filtering out any NULL expressions.""" 1049 conditions = [e for e in expressions if not isinstance(e, exp.Null)] 1050 1051 if not conditions: 1052 return exp.true() 1053 1054 return exp.and_(*conditions, dialect=evaluator.dialect)
Returns an AND statement filtering out any NULL expressions.
1057@macro("OR") 1058def or_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expression]) -> exp.Condition: 1059 """Returns an OR statement filtering out any NULL expressions.""" 1060 conditions = [e for e in expressions if not isinstance(e, exp.Null)] 1061 1062 if not conditions: 1063 return exp.true() 1064 1065 return exp.or_(*conditions, dialect=evaluator.dialect)
Returns an OR statement filtering out any NULL expressions.
1068@macro("VAR") 1069def var( 1070 evaluator: MacroEvaluator, var_name: exp.Expression, default: t.Optional[exp.Expression] = None 1071) -> exp.Expression: 1072 """Returns the value of a variable or the default value if the variable is not set.""" 1073 if not var_name.is_string: 1074 raise SQLMeshError(f"Invalid variable name '{var_name.sql()}'. Expected a string literal.") 1075 1076 return exp.convert(evaluator.var(var_name.this, default))
Returns the value of a variable or the default value if the variable is not set.
1079def normalize_macro_name(name: str) -> str: 1080 """Prefix macro name with @ and upcase""" 1081 return f"@{name.upper()}"
Prefix macro name with @ and upcase