sqlmesh.core.macros
1from __future__ import annotations 2 3import inspect 4import sys 5import types 6import typing as t 7from enum import Enum 8from functools import lru_cache, reduce 9from itertools import chain 10from pathlib import Path 11from string import Template 12from datetime import datetime, date 13 14import sqlglot 15from sqlglot import Generator, exp, parse_one 16from sqlglot.executor.env import ENV 17from sqlglot.executor.python import Python 18from sqlglot.helper import csv, ensure_collection 19from sqlglot.optimizer.normalize_identifiers import normalize_identifiers 20from sqlglot.schema import MappingSchema 21 22from sqlmesh.core import constants as c 23from sqlmesh.core.dialect import ( 24 SQLMESH_MACRO_PREFIX, 25 Dialect, 26 MacroDef, 27 MacroFunc, 28 MacroSQL, 29 MacroStrReplace, 30 MacroVar, 31 StagedFilePath, 32 normalize_model_name, 33) 34from sqlmesh.utils import ( 35 DECORATOR_RETURN_TYPE, 36 UniqueKeyDict, 37 columns_to_types_all_known, 38 registry_decorator, 39) 40from sqlmesh.utils.date import DatetimeRanges, to_datetime, to_date 41from sqlmesh.utils.errors import MacroEvalError, SQLMeshError 42from sqlmesh.utils.metaprogramming import ( 43 Executable, 44 SqlValue, 45 format_evaluated_code_exception, 46 prepare_env, 47) 48 49if t.TYPE_CHECKING: 50 from sqlglot.dialects.dialect import DialectType 51 from sqlmesh.core._typing import TableName 52 from sqlmesh.core.engine_adapter import EngineAdapter 53 from sqlmesh.core.snapshot import Snapshot 54 from sqlmesh.core.environment import EnvironmentNamingInfo 55 56 57if sys.version_info >= (3, 10): 58 UNION_TYPES = (t.Union, types.UnionType) 59else: 60 UNION_TYPES = (t.Union,) 61 62 63class RuntimeStage(Enum): 64 LOADING = "loading" 65 CREATING = "creating" 66 EVALUATING = "evaluating" 67 PROMOTING = "promoting" 68 DEMOTING = "demoting" 69 AUDITING = "auditing" 70 TESTING = "testing" 71 BEFORE_ALL = "before_all" 72 AFTER_ALL = "after_all" 73 74 75class MacroStrTemplate(Template): 76 delimiter = SQLMESH_MACRO_PREFIX 77 78 79EXPRESSIONS_NAME_MAP = {} 80SQL = t.NewType("SQL", str) 81 82 83@lru_cache() 84def get_supported_types() -> t.Dict[str, t.Any]: 85 from sqlmesh.core.context import ExecutionContext 86 87 return { 88 "t": t, 89 "typing": t, 90 "List": t.List, 91 "Tuple": t.Tuple, 92 "Union": t.Union, 93 "DatetimeRanges": DatetimeRanges, 94 "exp": exp, 95 "SQL": SQL, 96 "MacroEvaluator": MacroEvaluator, 97 "ExecutionContext": ExecutionContext, 98 } 99 100 101for klass in sqlglot.Parser.EXPRESSION_PARSERS: 102 name = klass if isinstance(klass, str) else klass.__name__ # type: ignore 103 EXPRESSIONS_NAME_MAP[name.lower()] = name 104 105 106def _macro_sql(sql: str, into: t.Optional[str] = None) -> str: 107 args = [_macro_str_replace(sql)] 108 if into in EXPRESSIONS_NAME_MAP: 109 args.append(f"into=exp.{EXPRESSIONS_NAME_MAP[into]}") 110 return f"self.parse_one({', '.join(args)})" 111 112 113def _macro_func_sql(self: Generator, e: exp.Expression) -> str: 114 func = e.this 115 116 if isinstance(func, exp.Anonymous): 117 return f"""self.send({csv("'" + func.name + "'", self.expressions(func))})""" 118 return self.sql(func) 119 120 121def _macro_str_replace(text: str) -> str: 122 """Stringifies python code for variable replacement 123 Args: 124 text: text string 125 Returns: 126 Stringified python code to execute variable replacement 127 """ 128 return f"self.template({text}, locals())" 129 130 131class CaseInsensitiveMapping(t.Dict[str, t.Any]): 132 def __init__(self, data: t.Dict[str, t.Any]) -> None: 133 super().__init__(data) 134 135 def __getitem__(self, key: str) -> t.Any: 136 return super().__getitem__(key.lower()) 137 138 def get(self, key: str, default: t.Any = None, /) -> t.Any: 139 return super().get(key.lower(), default) 140 141 142class MacroDialect(Python): 143 class Generator(Python.Generator): 144 TRANSFORMS = { 145 **Python.Generator.TRANSFORMS, # type: ignore 146 exp.Column: lambda self, e: f"exp.to_column('{self.sql(e, 'this')}')", 147 exp.Lambda: lambda self, e: f"lambda {self.expressions(e)}: {self.sql(e, 'this')}", 148 MacroFunc: _macro_func_sql, 149 MacroSQL: lambda self, e: _macro_sql(self.sql(e, "this"), e.args.get("into")), 150 MacroStrReplace: lambda self, e: _macro_str_replace(self.sql(e, "this")), 151 } 152 153 154class MacroEvaluator: 155 """The class responsible for evaluating SQLMesh Macros/SQL. 156 157 SQLMesh supports special preprocessed SQL prefixed with `@`. Although it provides similar power to 158 traditional methods like string templating, there is semantic understanding of SQL which prevents 159 common errors like leading/trailing commas, syntax errors, etc. 160 161 SQLMesh SQL allows for macro variables and macro functions. Macro variables take the form of @variable. These are used for variable substitution. 162 163 SELECT * FROM foo WHERE ds BETWEEN @start_date AND @end_date 164 165 Macro variables can be defined with a special macro function. 166 167 @DEF(start_date, '2021-01-01') 168 169 Args: 170 dialect: Dialect of the SQL to evaluate. 171 python_env: Serialized Python environment. 172 """ 173 174 def __init__( 175 self, 176 dialect: DialectType = "", 177 python_env: t.Optional[t.Dict[str, Executable]] = None, 178 schema: t.Optional[MappingSchema] = None, 179 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 180 resolve_table: t.Optional[t.Callable[[str | exp.Table], str]] = None, 181 resolve_tables: t.Optional[t.Callable[[exp.Expression], exp.Expression]] = None, 182 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 183 default_catalog: t.Optional[str] = None, 184 path: t.Optional[Path] = None, 185 environment_naming_info: t.Optional[EnvironmentNamingInfo] = None, 186 model_fqn: t.Optional[str] = None, 187 ): 188 self.dialect = dialect 189 self.generator = MacroDialect().generator() 190 self.locals: t.Dict[str, t.Any] = { 191 "runtime_stage": runtime_stage.value, 192 "default_catalog": default_catalog, 193 } 194 self.env = { 195 **ENV, 196 "self": self, 197 "SQL": SQL, 198 "MacroEvaluator": MacroEvaluator, 199 } 200 self.python_env = python_env or {} 201 self.macros = {normalize_macro_name(k): v.func for k, v in macro.get_registry().items()} 202 self.columns_to_types_called = False 203 self.default_catalog = default_catalog 204 205 self._schema = schema 206 self._resolve_table = resolve_table 207 self._resolve_tables = resolve_tables 208 self._snapshots = snapshots if snapshots is not None else {} 209 self._path = path 210 self._environment_naming_info = environment_naming_info 211 self._model_fqn = model_fqn 212 213 prepare_env(self.python_env, self.env) 214 for k, v in self.python_env.items(): 215 if v.is_definition: 216 self.macros[normalize_macro_name(k)] = self.env[v.name or k] 217 elif v.is_import and getattr(self.env.get(k), c.SQLMESH_MACRO, None): 218 self.macros[normalize_macro_name(k)] = self.env[k] 219 elif v.is_value: 220 value = self.env[k] 221 if k in ( 222 c.SQLMESH_VARS, 223 c.SQLMESH_VARS_METADATA, 224 c.SQLMESH_BLUEPRINT_VARS, 225 c.SQLMESH_BLUEPRINT_VARS_METADATA, 226 ): 227 value = { 228 var_name: ( 229 self.parse_one(var_value.sql) 230 if isinstance(var_value, SqlValue) 231 else var_value 232 ) 233 for var_name, var_value in value.items() 234 } 235 236 self.locals[k] = value 237 238 def send( 239 self, name: str, *args: t.Any, **kwargs: t.Any 240 ) -> t.Union[None, exp.Expression, t.List[exp.Expression]]: 241 func = self.macros.get(normalize_macro_name(name)) 242 243 if not callable(func): 244 raise MacroEvalError(f"Macro '{name}' does not exist.") 245 246 try: 247 return call_macro( 248 func, self.dialect, self._path, provided_args=(self, *args), provided_kwargs=kwargs 249 ) # type: ignore 250 except Exception as e: 251 raise MacroEvalError( 252 f"An error occurred during evaluation of '{name}'\n\n" 253 + format_evaluated_code_exception(e, self.python_env) 254 ) 255 256 def transform( 257 self, expression: exp.Expression 258 ) -> exp.Expression | t.List[exp.Expression] | None: 259 changed = False 260 261 def evaluate_macros( 262 node: exp.Expression, 263 ) -> exp.Expression | t.List[exp.Expression] | None: 264 nonlocal changed 265 266 if isinstance(node, MacroVar): 267 changed = True 268 variables = self.variables 269 270 # This makes all variables case-insensitive, e.g. @X is the same as @x. We do this 271 # for consistency, since `variables` and `blueprint_variables` are normalized. 272 var_name = node.name.lower() 273 274 if var_name not in self.locals and var_name not in variables: 275 if not isinstance(node.parent, StagedFilePath): 276 raise SQLMeshError(f"Macro variable '{node.name}' is undefined.") 277 278 return node 279 280 # Precedence order is locals (e.g. @DEF) > blueprint variables > config variables 281 value = self.locals.get(var_name, variables.get(var_name)) 282 if isinstance(value, list): 283 return exp.convert( 284 tuple( 285 self.transform(v) if isinstance(v, exp.Expression) else v for v in value 286 ) 287 ) 288 289 return exp.convert( 290 self.transform(value) if isinstance(value, exp.Expression) else value 291 ) 292 if isinstance(node, exp.Identifier) and "@" in node.this: 293 text = self.template(node.this, {}) 294 if node.this != text: 295 changed = True 296 return exp.to_identifier(text, quoted=node.quoted or None) 297 if isinstance(node, MacroFunc): 298 changed = True 299 return self.evaluate(node) 300 return node 301 302 transformed = exp.replace_tree( 303 expression.copy(), evaluate_macros, prune=lambda n: isinstance(n, exp.Lambda) 304 ) 305 306 if changed: 307 # the transformations could have corrupted the ast, turning this into sql and reparsing ensures 308 # that the ast is correct 309 if isinstance(transformed, list): 310 return [ 311 self.parse_one(node.sql(dialect=self.dialect, copy=False)) 312 for node in transformed 313 ] 314 if isinstance(transformed, exp.Expression): 315 return self.parse_one(transformed.sql(dialect=self.dialect, copy=False)) 316 317 return transformed 318 319 def template(self, text: t.Any, local_variables: t.Dict[str, t.Any]) -> str: 320 """Substitute @vars with locals. 321 322 Args: 323 text: The string to do substitition on. 324 local_variables: Local variables in the context so that lambdas can be used. 325 326 Returns: 327 The rendered string. 328 """ 329 # We try to convert all variables into sqlglot expressions because they're going to be converted 330 # into strings; in sql we don't convert strings because that would result in adding quotes 331 base_mapping = { 332 k.lower(): convert_sql(v, self.dialect) 333 for k, v in chain(self.variables.items(), self.locals.items(), local_variables.items()) 334 if k.lower() 335 not in ( 336 "engine_adapter", 337 "snapshot", 338 ) 339 } 340 return MacroStrTemplate(str(text)).safe_substitute(CaseInsensitiveMapping(base_mapping)) 341 342 def evaluate(self, node: MacroFunc) -> exp.Expression | t.List[exp.Expression] | None: 343 if isinstance(node, MacroDef): 344 if isinstance(node.expression, exp.Lambda): 345 _, fn = _norm_var_arg_lambda(self, node.expression) 346 self.macros[normalize_macro_name(node.name)] = lambda _, *args: fn( 347 args[0] if len(args) == 1 else exp.Tuple(expressions=list(args)) 348 ) 349 else: 350 # Make variables defined through `@DEF` case-insensitive 351 self.locals[node.name.lower()] = self.transform(node.expression) 352 353 return node 354 355 if isinstance(node, (MacroSQL, MacroStrReplace)): 356 result: t.Optional[exp.Expression | t.List[exp.Expression]] = exp.convert( 357 self.eval_expression(node) 358 ) 359 else: 360 func = t.cast(exp.Anonymous, node.this) 361 362 args = [] 363 kwargs = {} 364 for e in func.expressions: 365 if isinstance(e, exp.PropertyEQ): 366 kwargs[e.this.name] = e.expression 367 else: 368 if kwargs: 369 raise MacroEvalError( 370 "Positional argument cannot follow keyword argument.\n " 371 f"{func.sql(dialect=self.dialect)} at '{self._path}'" 372 ) 373 374 args.append(e) 375 376 result = self.send(func.name, *args, **kwargs) 377 378 if result is None: 379 return None 380 381 if isinstance(result, (tuple, list)): 382 result = [self.parse_one(item) for item in result if item is not None] 383 384 if ( 385 len(result) == 1 386 and isinstance(result[0], (exp.Array, exp.Tuple)) 387 and node.find_ancestor(MacroFunc) 388 ): 389 """ 390 if: 391 - the output of evaluating this node is being passed as an argument to another macro function 392 - and that output is something that _norm_var_arg_lambda() will unpack into varargs 393 > (a list containing a single item of type exp.Tuple/exp.Array) 394 then we will get inconsistent behaviour depending on if this node emits a list with a single item vs multiple items. 395 396 In the first case, emitting a list containing a single array item will cause that array to get unpacked and its *members* passed to the calling macro 397 In the second case, emitting a list containing multiple array items will cause each item to get passed as-is to the calling macro 398 399 To prevent this inconsistency, we wrap this node output in an exp.Array so that _norm_var_arg_lambda() can "unpack" that into the 400 actual argument we want to pass to the parent macro function 401 402 Note we only do this for evaluation results that get passed as an argument to another macro, because when the final 403 result is given to something like SELECT, we still want that to be unpacked into a list of items like: 404 - SELECT ARRAY(1), ARRAY(2) 405 rather than a single item like: 406 - SELECT ARRAY(ARRAY(1), ARRAY(2)) 407 """ 408 result = [exp.Array(expressions=result)] 409 else: 410 result = self.parse_one(result) 411 412 return result 413 414 def eval_expression(self, node: t.Any) -> t.Any: 415 """Converts a SQLGlot expression into executable Python code and evals it. 416 417 If the node is not an expression, it will simply be returned. 418 419 Args: 420 node: expression 421 Returns: 422 The return value of the evaled Python Code. 423 """ 424 if not isinstance(node, exp.Expression): 425 return node 426 code = node.sql() 427 try: 428 code = self.generator.generate(node) 429 return eval(code, self.env, self.locals) 430 except Exception as e: 431 raise MacroEvalError( 432 f"Error trying to eval macro.\n\nGenerated code: {code}\n\nOriginal sql: {node}\n\n" 433 + format_evaluated_code_exception(e, self.python_env) 434 ) 435 436 def parse_one( 437 self, sql: str | exp.Expression, into: t.Optional[exp.IntoType] = None, **opts: t.Any 438 ) -> exp.Expression: 439 """Parses the given SQL string and returns a syntax tree for the first 440 parsed SQL statement. 441 442 Args: 443 sql: the SQL code or expression to parse. 444 into: the Expression to parse into 445 **opts: other options 446 447 Returns: 448 Expression: the syntax tree for the first parsed statement 449 """ 450 return sqlglot.maybe_parse(sql, dialect=self.dialect, into=into, **opts) 451 452 def columns_to_types(self, model_name: TableName | exp.Column) -> t.Dict[str, exp.DataType]: 453 """Returns the columns-to-types mapping corresponding to the specified model.""" 454 455 # We only return this dummy schema at load time, because if we don't actually know the 456 # target model's schema at creation/evaluation time, returning a dummy schema could lead 457 # to unintelligible errors when the query is executed 458 if (self._schema is None or self._schema.empty) and self.runtime_stage == "loading": 459 self.columns_to_types_called = True 460 return {"__schema_unavailable_at_load__": exp.DataType.build("unknown")} 461 462 normalized_model_name = normalize_model_name( 463 model_name, 464 default_catalog=self.default_catalog, 465 dialect=self.dialect, 466 ) 467 model_name = exp.to_table(normalized_model_name) 468 469 columns_to_types = ( 470 self._schema.find(model_name, ensure_data_types=True) if self._schema else None 471 ) 472 if columns_to_types is None: 473 snapshot = self.get_snapshot(model_name) 474 if snapshot and snapshot.node.is_model: 475 columns_to_types = snapshot.node.columns_to_types # type: ignore 476 477 if columns_to_types is None: 478 raise SQLMeshError(f"Schema for model '{model_name}' can't be statically determined.") 479 480 return columns_to_types 481 482 def get_snapshot(self, model_name: TableName | exp.Column) -> t.Optional[Snapshot]: 483 """Returns the snapshot that corresponds to the given model name.""" 484 return self._snapshots.get( 485 normalize_model_name( 486 model_name, 487 default_catalog=self.default_catalog, 488 dialect=self.dialect, 489 ) 490 ) 491 492 def resolve_table(self, table: str | exp.Table) -> str: 493 """Gets the physical table name for a given model.""" 494 if not self._resolve_table: 495 raise SQLMeshError( 496 "Macro evaluator not properly initialized with resolve_table lambda." 497 ) 498 return self._resolve_table(table) 499 500 def resolve_tables(self, query: exp.Expression) -> exp.Expression: 501 """Resolves queries with references to SQLMesh model names to their physical tables.""" 502 if not self._resolve_tables: 503 raise SQLMeshError( 504 "Macro evaluator not properly initialized with resolve_tables lambda." 505 ) 506 return self._resolve_tables(query) 507 508 @property 509 def runtime_stage(self) -> RuntimeStage: 510 """Returns the current runtime stage of the macro evaluation.""" 511 return self.locals["runtime_stage"] 512 513 @property 514 def this_model(self) -> str: 515 """Returns the resolved name of the surrounding model.""" 516 this_model = self.locals.get("this_model") 517 if not this_model: 518 raise SQLMeshError("Model name is not available in the macro evaluator.") 519 return this_model.sql(dialect=self.dialect, identify=True, comments=False) 520 521 @property 522 def this_model_fqn(self) -> str: 523 if self._model_fqn is None: 524 raise SQLMeshError("Model name is not available in the macro evaluator.") 525 return self._model_fqn 526 527 @property 528 def engine_adapter(self) -> EngineAdapter: 529 engine_adapter = self.locals.get("engine_adapter") 530 if not engine_adapter: 531 raise SQLMeshError( 532 "The engine adapter is not available while models are loading." 533 " You can gate these calls by checking in Python: evaluator.runtime_stage != 'loading' or SQL: @runtime_stage <> 'loading'." 534 ) 535 return self.locals["engine_adapter"] 536 537 @property 538 def gateway(self) -> t.Optional[str]: 539 """Returns the gateway name.""" 540 return self.var(c.GATEWAY) 541 542 @property 543 def snapshots(self) -> t.Dict[str, Snapshot]: 544 """Returns the snapshots if available.""" 545 return self._snapshots 546 547 @property 548 def this_env(self) -> str: 549 """Returns the name of the current environment in before after all.""" 550 if "this_env" not in self.locals: 551 raise SQLMeshError("Environment name is only available in before_all and after_all") 552 return self.locals["this_env"] 553 554 @property 555 def schemas(self) -> t.List[str]: 556 """Returns the schemas of the current environment in before after all macros.""" 557 if "schemas" not in self.locals: 558 raise SQLMeshError("Schemas are only available in before_all and after_all") 559 return self.locals["schemas"] 560 561 @property 562 def views(self) -> t.List[str]: 563 """Returns the views of the current environment in before after all macros.""" 564 if "views" not in self.locals: 565 raise SQLMeshError("Views are only available in before_all and after_all") 566 return self.locals["views"] 567 568 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 569 """Returns the value of the specified variable, or the default value if it doesn't exist.""" 570 return { 571 **(self.locals.get(c.SQLMESH_VARS) or {}), 572 **(self.locals.get(c.SQLMESH_VARS_METADATA) or {}), 573 }.get(var_name.lower(), default) 574 575 def blueprint_var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 576 """Returns the value of the specified blueprint variable, or the default value if it doesn't exist.""" 577 return { 578 **(self.locals.get(c.SQLMESH_BLUEPRINT_VARS) or {}), 579 **(self.locals.get(c.SQLMESH_BLUEPRINT_VARS_METADATA) or {}), 580 }.get(var_name.lower(), default) 581 582 @property 583 def variables(self) -> t.Dict[str, t.Any]: 584 return { 585 **self.locals.get(c.SQLMESH_VARS, {}), 586 **self.locals.get(c.SQLMESH_VARS_METADATA, {}), 587 **self.locals.get(c.SQLMESH_BLUEPRINT_VARS, {}), 588 **self.locals.get(c.SQLMESH_BLUEPRINT_VARS_METADATA, {}), 589 } 590 591 def _coerce(self, expr: exp.Expression, typ: t.Any, strict: bool = False) -> t.Any: 592 """Coerces the given expression to the specified type on a best-effort basis.""" 593 return _coerce(expr, typ, self.dialect, self._path, strict) 594 595 596class macro(registry_decorator): 597 """Specifies a function is a macro and registers it the global MACROS registry. 598 599 Registered macros can be referenced in SQL statements to make queries more dynamic/cleaner. 600 601 Example: 602 from sqlglot import exp 603 from sqlmesh.core.macros import MacroEvaluator, macro 604 605 @macro() 606 def add_one(evaluator: MacroEvaluator, column: exp.Literal) -> exp.Add: 607 return evaluator.parse_one(f"{column} + 1") 608 609 Args: 610 name: A custom name for the macro, the default is the name of the function. 611 """ 612 613 registry_name = "macros" 614 615 def __init__(self, *args: t.Any, metadata_only: bool = False, **kwargs: t.Any) -> None: 616 super().__init__(*args, **kwargs) 617 self.metadata_only = metadata_only 618 619 def __call__( 620 self, func: t.Callable[..., DECORATOR_RETURN_TYPE] 621 ) -> t.Callable[..., DECORATOR_RETURN_TYPE]: 622 if self.metadata_only: 623 setattr(func, c.SQLMESH_METADATA, self.metadata_only) 624 wrapper = super().__call__(func) 625 626 # This is used to identify macros at runtime to unwrap during serialization. 627 setattr(wrapper, c.SQLMESH_MACRO, True) 628 return wrapper 629 630 631ExecutableOrMacro = t.Union[Executable, macro] 632MacroRegistry = UniqueKeyDict[str, ExecutableOrMacro] 633 634 635def _norm_var_arg_lambda( 636 evaluator: MacroEvaluator, func: exp.Lambda, *items: t.Any 637) -> t.Tuple[t.Iterable, t.Callable]: 638 """ 639 Converts sql literal array and lambda into actual python iterable + callable. 640 641 In order to support expressions like @EACH([a, b, c], x -> @SQL('@x')), the lambda var x 642 needs be passed to the local state. 643 644 Args: 645 evaluator: MacroEvaluator that invoked the macro 646 func: Lambda SQLGlot expression. 647 items: Array or items of SQLGlot expressions. 648 """ 649 650 def substitute( 651 node: exp.Expression, args: t.Dict[str, exp.Expression] 652 ) -> exp.Expression | t.List[exp.Expression] | None: 653 if isinstance(node, (exp.Identifier, exp.Var)): 654 if not isinstance(node.parent, exp.Column): 655 name = node.name.lower() 656 if name in args: 657 return args[name].copy() 658 if name in evaluator.locals: 659 return exp.convert(evaluator.locals[name]) 660 if SQLMESH_MACRO_PREFIX in node.name: 661 return node.__class__( 662 this=evaluator.template(node.name, {k: v.name for k, v in args.items()}) 663 ) 664 elif isinstance(node, MacroFunc): 665 local_copy = evaluator.locals.copy() 666 evaluator.locals.update(args) 667 result = evaluator.transform(node) 668 evaluator.locals = local_copy 669 return result 670 return node 671 672 if len(items) == 1: 673 item = items[0] 674 expressions = ( 675 item.expressions 676 if isinstance(item, (exp.Array, exp.Tuple)) 677 else [item.this] 678 if isinstance(item, exp.Paren) 679 else item 680 ) 681 else: 682 expressions = items 683 684 if not callable(func): 685 return expressions, lambda args: func.this.transform( 686 substitute, 687 { 688 expression.name.lower(): arg 689 for expression, arg in zip( 690 func.expressions, args.expressions if isinstance(args, exp.Tuple) else [args] 691 ) 692 }, 693 ) 694 695 return expressions, func 696 697 698@macro() 699def each( 700 evaluator: MacroEvaluator, 701 *args: t.Any, 702) -> t.List[t.Any]: 703 """Iterates through items calling func on each. 704 705 If a func call on item returns None, it will be excluded from the list. 706 707 Args: 708 evaluator: MacroEvaluator that invoked the macro 709 args: The last argument should be a lambda of the form x -> x +1. The first argument can be 710 an Array or var args can be used. 711 712 Returns: 713 A list of items that is the result of func 714 """ 715 *items, func = args 716 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 717 return [item for item in map(func, ensure_collection(items)) if item is not None] 718 719 720@macro("IF") 721def if_( 722 evaluator: MacroEvaluator, 723 condition: t.Any, 724 true: t.Any, 725 false: t.Any = None, 726) -> t.Any: 727 """Evaluates a given condition and returns the second argument if true or else the third argument. 728 729 If false is not passed in, the default return value will be None. 730 731 Example: 732 >>> from sqlglot import parse_one 733 >>> from sqlmesh.core.macros import MacroEvaluator 734 >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a, b)")).sql() 735 'b' 736 737 >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a)")) 738 """ 739 740 if evaluator.eval_expression(condition): 741 return true 742 return false 743 744 745@macro("REDUCE") 746def reduce_(evaluator: MacroEvaluator, *args: t.Any) -> t.Any: 747 """Iterates through items applying provided function that takes two arguments 748 cumulatively to the items of iterable items, from left to right, so as to reduce 749 the iterable to a single item. 750 751 Example: 752 >>> from sqlglot import parse_one 753 >>> from sqlmesh.core.macros import MacroEvaluator 754 >>> sql = "@SQL(@REDUCE([100, 200, 300, 400], (x, y) -> x + y))" 755 >>> MacroEvaluator().transform(parse_one(sql)).sql() 756 '1000' 757 758 Args: 759 evaluator: MacroEvaluator that invoked the macro 760 args: The last argument should be a lambda of the form (x, y) -> x + y. The first argument can be 761 an Array or var args can be used. 762 Returns: 763 A single item that is the result of applying func cumulatively to items 764 """ 765 *items, func = args 766 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 767 return reduce(lambda a, b: func(exp.Tuple(expressions=[a, b])), ensure_collection(items)) 768 769 770@macro("FILTER") 771def filter_(evaluator: MacroEvaluator, *args: t.Any) -> t.List[t.Any]: 772 """Iterates through items, applying provided function to each item and removing 773 all items where the function returns False 774 775 Example: 776 >>> from sqlglot import parse_one 777 >>> from sqlmesh.core.macros import MacroEvaluator 778 >>> sql = "@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y)" 779 >>> MacroEvaluator().transform(parse_one(sql)).sql() 780 '2 + 3' 781 782 >>> sql = "@EVAL(@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y))" 783 >>> MacroEvaluator().transform(parse_one(sql)).sql() 784 '5' 785 786 Args: 787 evaluator: MacroEvaluator that invoked the macro 788 args: The last argument should be a lambda of the form x -> x > 1. The first argument can be 789 an Array or var args can be used. 790 Returns: 791 The items for which the func returned True 792 """ 793 *items, func = args 794 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 795 return list(filter(lambda arg: evaluator.eval_expression(func(arg)), items)) 796 797 798def _optional_expression( 799 evaluator: MacroEvaluator, 800 condition: exp.Condition, 801 expression: exp.Expression, 802) -> t.Optional[exp.Expression]: 803 """Inserts expression when the condition is True 804 805 The following examples express the usage of this function in the context of the macros which wrap it. 806 807 Examples: 808 >>> from sqlglot import parse_one 809 >>> from sqlmesh.core.macros import MacroEvaluator 810 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 811 >>> MacroEvaluator().transform(parse_one(sql)).sql() 812 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 813 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 814 >>> MacroEvaluator().transform(parse_one(sql)).sql() 815 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 816 >>> sql = "select * from city @GROUP_BY(True) country, population" 817 >>> MacroEvaluator().transform(parse_one(sql)).sql() 818 'SELECT * FROM city GROUP BY country, population' 819 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 820 >>> MacroEvaluator().transform(parse_one(sql)).sql() 821 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 822 823 Args: 824 evaluator: MacroEvaluator that invoked the macro 825 condition: Condition expression 826 expression: SQL expression 827 Returns: 828 Expression if the conditional is True; otherwise None 829 """ 830 return expression if evaluator.eval_expression(condition) else None 831 832 833with_ = macro("WITH")(_optional_expression) 834join = macro("JOIN")(_optional_expression) 835where = macro("WHERE")(_optional_expression) 836group_by = macro("GROUP_BY")(_optional_expression) 837having = macro("HAVING")(_optional_expression) 838order_by = macro("ORDER_BY")(_optional_expression) 839limit = macro("LIMIT")(_optional_expression) 840 841 842@macro("eval") 843def eval_(evaluator: MacroEvaluator, condition: exp.Condition) -> t.Any: 844 """Evaluate the given condition in a Python/SQL interpretor. 845 846 Example: 847 >>> from sqlglot import parse_one 848 >>> from sqlmesh.core.macros import MacroEvaluator 849 >>> sql = "@EVAL(1 + 1)" 850 >>> MacroEvaluator().transform(parse_one(sql)).sql() 851 '2' 852 """ 853 return evaluator.eval_expression(condition) 854 855 856# macros with union types need to use t.Union since | isn't available until 3.9 857@macro() 858def star( 859 evaluator: MacroEvaluator, 860 relation: exp.Table, 861 alias: exp.Column = t.cast(exp.Column, exp.column("")), 862 exclude: t.Union[exp.Array, exp.Tuple] = exp.Tuple(expressions=[]), 863 prefix: exp.Literal = exp.Literal.string(""), 864 suffix: exp.Literal = exp.Literal.string(""), 865 quote_identifiers: exp.Boolean = exp.true(), 866 except_: t.Union[exp.Array, exp.Tuple] = exp.Tuple(expressions=[]), 867) -> t.List[exp.Alias]: 868 """Returns a list of projections for the given relation. 869 870 Args: 871 evaluator: MacroEvaluator that invoked the macro 872 relation: The relation to select star from 873 alias: The alias of the relation 874 exclude: Columns to exclude 875 prefix: A prefix to use for all selections 876 suffix: A suffix to use for all selections 877 quote_identifiers: Whether or not quote the resulting aliases, defaults to true 878 except_: Alias for exclude (TODO: deprecate this, update docs) 879 880 Returns: 881 An array of columns. 882 883 Example: 884 >>> from sqlglot import parse_one, exp 885 >>> from sqlglot.schema import MappingSchema 886 >>> from sqlmesh.core.macros import MacroEvaluator 887 >>> sql = "SELECT @STAR(foo, bar, exclude := [c], prefix := 'baz_') FROM foo AS bar" 888 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": exp.DataType.build("string"), "b": exp.DataType.build("string"), "c": exp.DataType.build("string"), "d": exp.DataType.build("int")}})).transform(parse_one(sql)).sql() 889 'SELECT CAST("bar"."a" AS TEXT) AS "baz_a", CAST("bar"."b" AS TEXT) AS "baz_b", CAST("bar"."d" AS INT) AS "baz_d" FROM foo AS bar' 890 """ 891 if alias and not isinstance(alias, (exp.Identifier, exp.Column)): 892 raise SQLMeshError(f"Invalid alias '{alias}'. Expected an identifier.") 893 if exclude and not isinstance(exclude, (exp.Array, exp.Tuple)): 894 raise SQLMeshError(f"Invalid exclude '{exclude}'. Expected an array.") 895 if except_ != exp.tuple_(): 896 from sqlmesh.core.console import get_console 897 898 get_console().log_warning( 899 "The 'except_' argument in @STAR will soon be deprecated. Use 'exclude' instead." 900 ) 901 if not isinstance(exclude, (exp.Array, exp.Tuple)): 902 raise SQLMeshError(f"Invalid exclude_ '{exclude}'. Expected an array.") 903 if prefix and not isinstance(prefix, exp.Literal): 904 raise SQLMeshError(f"Invalid prefix '{prefix}'. Expected a literal.") 905 if suffix and not isinstance(suffix, exp.Literal): 906 raise SQLMeshError(f"Invalid suffix '{suffix}'. Expected a literal.") 907 if not isinstance(quote_identifiers, exp.Boolean): 908 raise SQLMeshError(f"Invalid quote_identifiers '{quote_identifiers}'. Expected a boolean.") 909 910 excluded_names = { 911 normalize_identifiers(excluded, dialect=evaluator.dialect).name 912 for excluded in exclude.expressions or except_.expressions 913 } 914 quoted = quote_identifiers.this 915 table_identifier = normalize_identifiers( 916 alias if alias.name else relation, dialect=evaluator.dialect 917 ).name 918 919 columns_to_types = { 920 k: v for k, v in evaluator.columns_to_types(relation).items() if k not in excluded_names 921 } 922 if columns_to_types_all_known(columns_to_types): 923 return [ 924 exp.cast( 925 exp.column(column, table=table_identifier, quoted=quoted), 926 dtype, 927 dialect=evaluator.dialect, 928 ).as_(f"{prefix.this}{column}{suffix.this}", quoted=quoted) 929 for column, dtype in columns_to_types.items() 930 ] 931 return [ 932 exp.column(column, table=table_identifier, quoted=quoted).as_( 933 f"{prefix.this}{column}{suffix.this}", quoted=quoted 934 ) 935 for column, type_ in columns_to_types.items() 936 ] 937 938 939@macro() 940def generate_surrogate_key( 941 evaluator: MacroEvaluator, 942 *fields: exp.Expression, 943 hash_function: exp.Literal = exp.Literal.string("MD5"), 944) -> exp.Func: 945 """Generates a surrogate key (string) for the given fields. 946 947 Example: 948 >>> from sqlglot import parse_one 949 >>> from sqlmesh.core.macros import MacroEvaluator 950 >>> 951 >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c) FROM foo" 952 >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql, dialect="bigquery")).sql("bigquery") 953 "SELECT TO_HEX(MD5(CONCAT(COALESCE(CAST(a AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS STRING), '_sqlmesh_surrogate_key_null_')))) FROM foo" 954 >>> 955 >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c, hash_function := 'SHA256') FROM foo" 956 >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql, dialect="bigquery")).sql("bigquery") 957 "SELECT SHA256(CONCAT(COALESCE(CAST(a AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS STRING), '_sqlmesh_surrogate_key_null_'))) FROM foo" 958 """ 959 string_fields: t.List[exp.Expression] = [] 960 for i, field in enumerate(fields): 961 if i > 0: 962 string_fields.append(exp.Literal.string("|")) 963 string_fields.append( 964 exp.func( 965 "COALESCE", 966 exp.cast(field, exp.DataType.build("text")), 967 exp.Literal.string("_sqlmesh_surrogate_key_null_"), 968 ) 969 ) 970 971 func = exp.func( 972 hash_function.name, 973 exp.func("CONCAT", *string_fields), 974 dialect=evaluator.dialect, 975 ) 976 if isinstance(func, exp.MD5Digest): 977 func = exp.MD5(this=func.this) 978 979 return func 980 981 982@macro() 983def safe_add(_: MacroEvaluator, *fields: exp.Expression) -> exp.Case: 984 """Adds numbers together, substitutes nulls for 0s and only returns null if all fields are null. 985 986 Example: 987 >>> from sqlglot import parse_one 988 >>> from sqlmesh.core.macros import MacroEvaluator 989 >>> sql = "SELECT @SAFE_ADD(a, b) FROM foo" 990 >>> MacroEvaluator().transform(parse_one(sql)).sql() 991 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) + COALESCE(b, 0) END FROM foo' 992 """ 993 return ( 994 exp.Case() 995 .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null()) 996 .else_(reduce(lambda a, b: a + b, [exp.func("COALESCE", field, 0) for field in fields])) # type: ignore 997 ) 998 999 1000@macro() 1001def safe_sub(_: MacroEvaluator, *fields: exp.Expression) -> exp.Case: 1002 """Subtract numbers, substitutes nulls for 0s and only returns null if all fields are null. 1003 1004 Example: 1005 >>> from sqlglot import parse_one 1006 >>> from sqlmesh.core.macros import MacroEvaluator 1007 >>> sql = "SELECT @SAFE_SUB(a, b) FROM foo" 1008 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1009 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) - COALESCE(b, 0) END FROM foo' 1010 """ 1011 return ( 1012 exp.Case() 1013 .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null()) 1014 .else_(reduce(lambda a, b: a - b, [exp.func("COALESCE", field, 0) for field in fields])) # type: ignore 1015 ) 1016 1017 1018@macro() 1019def safe_div(_: MacroEvaluator, numerator: exp.Expression, denominator: exp.Expression) -> exp.Div: 1020 """Divides numbers, returns null if the denominator is 0. 1021 1022 Example: 1023 >>> from sqlglot import parse_one 1024 >>> from sqlmesh.core.macros import MacroEvaluator 1025 >>> sql = "SELECT @SAFE_DIV(a, b) FROM foo" 1026 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1027 'SELECT a / NULLIF(b, 0) FROM foo' 1028 """ 1029 return numerator / exp.func("NULLIF", denominator, 0) 1030 1031 1032@macro() 1033def union( 1034 evaluator: MacroEvaluator, 1035 *args: exp.Expression, 1036) -> exp.Query: 1037 """Returns a UNION of the given tables. Only choosing columns that have the same name and type. 1038 1039 Args: 1040 evaluator: MacroEvaluator that invoked the macro 1041 args: Variable arguments that can be: 1042 - First argument can be a condition (exp.Condition) 1043 - A union type ('ALL' or 'DISTINCT') as exp.Literal 1044 - Tables (exp.Table) 1045 1046 Example: 1047 >>> from sqlglot import parse_one 1048 >>> from sqlglot.schema import MappingSchema 1049 >>> from sqlmesh.core.macros import MacroEvaluator 1050 >>> sql = "@UNION('distinct', foo, bar)" 1051 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 1052 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar' 1053 >>> sql = "@UNION(True, 'distinct', foo, bar)" 1054 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 1055 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar' 1056 """ 1057 1058 if not args: 1059 raise SQLMeshError("At least one table is required for the @UNION macro.") 1060 1061 arg_idx = 0 1062 # Check for condition 1063 condition = evaluator.eval_expression(args[arg_idx]) 1064 if isinstance(condition, bool): 1065 arg_idx += 1 1066 if arg_idx >= len(args): 1067 raise SQLMeshError("Expected more arguments after the condition of the `@UNION` macro.") 1068 1069 # Check for union type 1070 type_ = exp.Literal.string("ALL") 1071 if isinstance(args[arg_idx], exp.Literal): 1072 type_ = args[arg_idx] # type: ignore 1073 arg_idx += 1 1074 kind = type_.name.upper() 1075 if kind not in ("ALL", "DISTINCT"): 1076 raise SQLMeshError(f"Invalid type '{type_}'. Expected 'ALL' or 'DISTINCT'.") 1077 1078 # Remaining args should be tables 1079 tables = [ 1080 exp.to_table(e.sql(evaluator.dialect), dialect=evaluator.dialect) for e in args[arg_idx:] 1081 ] 1082 1083 columns = { 1084 column 1085 for column, _ in reduce( 1086 lambda a, b: a & b, # type: ignore 1087 (evaluator.columns_to_types(table).items() for table in tables), 1088 ) 1089 } 1090 1091 projections = [ 1092 exp.cast(column, type_, dialect=evaluator.dialect).as_(column) 1093 for column, type_ in evaluator.columns_to_types(tables[0]).items() 1094 if column in columns 1095 ] 1096 1097 # Skip the union if condition is False 1098 if condition == False: 1099 return exp.select(*projections).from_(tables[0]) 1100 1101 return reduce( 1102 lambda a, b: a.union(b, distinct=kind == "DISTINCT"), # type: ignore 1103 [exp.select(*projections).from_(t) for t in tables], 1104 ) 1105 1106 1107@macro() 1108def haversine_distance( 1109 _: MacroEvaluator, 1110 lat1: exp.Expression, 1111 lon1: exp.Expression, 1112 lat2: exp.Expression, 1113 lon2: exp.Expression, 1114 unit: exp.Literal = exp.Literal.string("mi"), 1115) -> exp.Mul: 1116 """Returns the haversine distance between two points. 1117 1118 Example: 1119 >>> from sqlglot import parse_one 1120 >>> from sqlmesh.core.macros import MacroEvaluator 1121 >>> sql = "SELECT @HAVERSINE_DISTANCE(driver_y, driver_x, passenger_y, passenger_x, 'mi') FROM rides" 1122 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1123 'SELECT 7922 * ASIN(SQRT((POWER(SIN(RADIANS((passenger_y - driver_y) / 2)), 2)) + (COS(RADIANS(driver_y)) * COS(RADIANS(passenger_y)) * POWER(SIN(RADIANS((passenger_x - driver_x) / 2)), 2)))) * 1.0 FROM rides' 1124 """ 1125 if unit.this == "mi": 1126 conversion_rate = 1.0 1127 elif unit.this == "km": 1128 conversion_rate = 1.60934 1129 else: 1130 raise SQLMeshError(f"Invalid unit '{unit}'. Expected 'mi' or 'km'.") 1131 1132 return ( 1133 2 1134 * 3961 1135 * exp.func( 1136 "ASIN", 1137 exp.func( 1138 "SQRT", 1139 exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lat2 - lat1) / 2)), 2) 1140 + exp.func("COS", exp.func("RADIANS", lat1)) 1141 * exp.func("COS", exp.func("RADIANS", lat2)) 1142 * exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lon2 - lon1) / 2)), 2), 1143 ), 1144 ) 1145 * conversion_rate 1146 ) 1147 1148 1149@macro() 1150def pivot( 1151 evaluator: MacroEvaluator, 1152 column: SQL, 1153 values: t.List[exp.Expression], 1154 alias: bool = True, 1155 agg: exp.Expression = exp.Literal.string("SUM"), 1156 cmp: exp.Expression = exp.Literal.string("="), 1157 prefix: exp.Expression = exp.Literal.string(""), 1158 suffix: exp.Expression = exp.Literal.string(""), 1159 then_value: SQL = SQL("1"), 1160 else_value: SQL = SQL("0"), 1161 quote: bool = True, 1162 distinct: bool = False, 1163) -> t.List[exp.Expression]: 1164 """Returns a list of projections as a result of pivoting the given column on the given values. 1165 1166 Example: 1167 >>> from sqlglot import parse_one 1168 >>> from sqlmesh.core.macros import MacroEvaluator 1169 >>> sql = "SELECT date_day, @PIVOT(status, ['cancelled', 'completed']) FROM rides GROUP BY 1" 1170 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1171 'SELECT date_day, SUM(CASE WHEN status = \\'cancelled\\' THEN 1 ELSE 0 END) AS "cancelled", SUM(CASE WHEN status = \\'completed\\' THEN 1 ELSE 0 END) AS "completed" FROM rides GROUP BY 1' 1172 >>> sql = "SELECT @PIVOT(a, ['v'], then_value := tv, suffix := '_sfx', quote := FALSE)" 1173 >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql)).sql("bigquery") 1174 "SELECT SUM(CASE WHEN a = 'v' THEN tv ELSE 0 END) AS v_sfx" 1175 """ 1176 aggregates: t.List[exp.Expression] = [] 1177 for value in values: 1178 proj = f"{agg.name}(" 1179 if distinct: 1180 proj += "DISTINCT " 1181 1182 proj += f"CASE WHEN {column} {cmp.name} {value.sql(evaluator.dialect)} THEN {then_value} ELSE {else_value} END) " 1183 node = evaluator.parse_one(proj) 1184 1185 if alias: 1186 node = node.as_( 1187 f"{prefix.name}{value.name}{suffix.name}", 1188 quoted=quote, 1189 copy=False, 1190 dialect=evaluator.dialect, 1191 ) 1192 1193 aggregates.append(node) 1194 1195 return aggregates 1196 1197 1198@macro("AND") 1199def and_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expression]) -> exp.Condition: 1200 """Returns an AND statement filtering out any NULL expressions.""" 1201 conditions = [e for e in expressions if not isinstance(e, exp.Null)] 1202 1203 if not conditions: 1204 return exp.true() 1205 1206 return exp.and_(*conditions, dialect=evaluator.dialect) 1207 1208 1209@macro("OR") 1210def or_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expression]) -> exp.Condition: 1211 """Returns an OR statement filtering out any NULL expressions.""" 1212 conditions = [e for e in expressions if not isinstance(e, exp.Null)] 1213 1214 if not conditions: 1215 return exp.true() 1216 1217 return exp.or_(*conditions, dialect=evaluator.dialect) 1218 1219 1220@macro("VAR") 1221def var( 1222 evaluator: MacroEvaluator, var_name: exp.Expression, default: t.Optional[exp.Expression] = None 1223) -> exp.Expression: 1224 """Returns the value of a variable or the default value if the variable is not set.""" 1225 if not var_name.is_string: 1226 raise SQLMeshError(f"Invalid variable name '{var_name.sql()}'. Expected a string literal.") 1227 1228 return exp.convert(evaluator.var(var_name.this, default)) 1229 1230 1231@macro("BLUEPRINT_VAR") 1232def blueprint_var( 1233 evaluator: MacroEvaluator, var_name: exp.Expression, default: t.Optional[exp.Expression] = None 1234) -> exp.Expression: 1235 """Returns the value of a blueprint variable or the default value if the variable is not set.""" 1236 if not var_name.is_string: 1237 raise SQLMeshError( 1238 f"Invalid blueprint variable name '{var_name.sql()}'. Expected a string literal." 1239 ) 1240 1241 return exp.convert(evaluator.blueprint_var(var_name.this, default)) 1242 1243 1244@macro() 1245def deduplicate( 1246 evaluator: MacroEvaluator, 1247 relation: exp.Expression, 1248 partition_by: t.List[exp.Expression], 1249 order_by: t.List[str], 1250) -> exp.Query: 1251 """Returns a QUERY to deduplicate rows within a table 1252 1253 Args: 1254 relation: table or CTE name to deduplicate 1255 partition_by: column names, or expressions to use to identify a window of rows out of which to select one as the deduplicated row 1256 order_by: A list of strings representing the ORDER BY clause 1257 1258 Example: 1259 >>> from sqlglot import parse_one 1260 >>> from sqlglot.schema import MappingSchema 1261 >>> from sqlmesh.core.macros import MacroEvaluator 1262 >>> sql = "@deduplicate(demo.table, [user_id, cast(timestamp as date)], ['timestamp desc', 'status asc'])" 1263 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1264 'SELECT * FROM demo.table QUALIFY ROW_NUMBER() OVER (PARTITION BY user_id, CAST(timestamp AS DATE) ORDER BY timestamp DESC, status ASC) = 1' 1265 """ 1266 if not isinstance(partition_by, list): 1267 raise SQLMeshError( 1268 "partition_by must be a list of columns: [<column>, cast(<column> as <type>)]" 1269 ) 1270 1271 if not isinstance(order_by, list): 1272 raise SQLMeshError( 1273 "order_by must be a list of strings, optional - nulls ordering: ['<column> <asc|desc> nulls <first|last>']" 1274 ) 1275 1276 partition_clause = exp.tuple_(*partition_by) 1277 1278 order_expressions = [ 1279 evaluator.transform(parse_one(order_item, into=exp.Ordered, dialect=evaluator.dialect)) 1280 for order_item in order_by 1281 ] 1282 1283 if not order_expressions: 1284 raise SQLMeshError( 1285 "order_by must be a list of strings, optional - nulls ordering: ['<column> <asc|desc> nulls <first|last>']" 1286 ) 1287 1288 order_clause = exp.Order(expressions=order_expressions) 1289 1290 window_function = exp.Window( 1291 this=exp.RowNumber(), partition_by=partition_clause, order=order_clause 1292 ) 1293 1294 first_unique_row = window_function.eq(1) 1295 1296 query = exp.select("*").from_(relation).qualify(first_unique_row) 1297 1298 return query 1299 1300 1301@macro() 1302def date_spine( 1303 evaluator: MacroEvaluator, 1304 datepart: exp.Expression, 1305 start_date: exp.Expression, 1306 end_date: exp.Expression, 1307) -> exp.Select: 1308 """Returns a query that produces a date spine with the given datepart, and range of start_date and end_date. Useful for joining as a date lookup table. 1309 1310 Args: 1311 datepart: The datepart to use for the date spine - day, week, month, quarter, year 1312 start_date: The start date for the date spine in format YYYY-MM-DD 1313 end_date: The end date for the date spine in format YYYY-MM-DD 1314 1315 Example: 1316 >>> from sqlglot import parse_one 1317 >>> from sqlglot.schema import MappingSchema 1318 >>> from sqlmesh.core.macros import MacroEvaluator 1319 >>> sql = "@date_spine('week', '2022-01-20', '2024-12-16')" 1320 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1321 "SELECT date_week FROM UNNEST(GENERATE_DATE_ARRAY(CAST(\'2022-01-20\' AS DATE), CAST(\'2024-12-16\' AS DATE), INTERVAL \'1\' WEEK)) AS _exploded(date_week)" 1322 """ 1323 datepart_name = datepart.name.lower() 1324 if datepart_name not in ("day", "week", "month", "quarter", "year"): 1325 raise SQLMeshError( 1326 f"Invalid datepart '{datepart_name}'. Expected: 'day', 'week', 'month', 'quarter', or 'year'" 1327 ) 1328 1329 start_date_name = start_date.name 1330 end_date_name = end_date.name 1331 1332 try: 1333 if start_date.is_string and end_date.is_string: 1334 start_date_obj = datetime.strptime(start_date_name, "%Y-%m-%d").date() 1335 end_date_obj = datetime.strptime(end_date_name, "%Y-%m-%d").date() 1336 else: 1337 start_date_obj = None 1338 end_date_obj = None 1339 except Exception as e: 1340 raise SQLMeshError( 1341 f"Invalid date format - start_date and end_date must be in format: YYYY-MM-DD. Error: {e}" 1342 ) 1343 1344 if start_date_obj and end_date_obj: 1345 if start_date_obj > end_date_obj: 1346 raise SQLMeshError( 1347 f"Invalid date range - start_date '{start_date_name}' is after end_date '{end_date_name}'." 1348 ) 1349 1350 start_date = exp.cast(start_date, "DATE") 1351 end_date = exp.cast(end_date, "DATE") 1352 1353 if datepart_name == "quarter" and evaluator.dialect in ( 1354 "spark", 1355 "spark2", 1356 "databricks", 1357 "postgres", 1358 ): 1359 date_interval = exp.Interval(this=exp.Literal.number(3), unit=exp.var("month")) 1360 else: 1361 date_interval = exp.Interval(this=exp.Literal.number(1), unit=exp.var(datepart_name)) 1362 1363 generate_date_array = exp.func( 1364 "GENERATE_DATE_ARRAY", 1365 start_date, 1366 end_date, 1367 date_interval, 1368 ) 1369 1370 alias_name = f"date_{datepart_name}" 1371 exploded = exp.alias_(exp.func("unnest", generate_date_array), "_exploded", table=[alias_name]) 1372 1373 return exp.select(alias_name).from_(exploded) 1374 1375 1376@macro() 1377def resolve_template( 1378 evaluator: MacroEvaluator, 1379 template: exp.Literal, 1380 mode: str = "literal", 1381) -> t.Union[exp.Literal, exp.Table]: 1382 """ 1383 Generates either a String literal or an exp.Table representing a physical table location, based on rendering the provided template String literal. 1384 1385 Note: It relies on the @this_model variable being available in the evaluation context (@this_model resolves to an exp.Table object 1386 representing the current physical table). 1387 Therefore, the @resolve_template macro must be used at creation or evaluation time and not at load time. 1388 1389 Args: 1390 template: Template string literal. Can contain the following placeholders: 1391 @{catalog_name} -> replaced with the catalog of the exp.Table returned from @this_model 1392 @{schema_name} -> replaced with the schema of the exp.Table returned from @this_model 1393 @{table_name} -> replaced with the name of the exp.Table returned from @this_model 1394 mode: What to return. 1395 'literal' -> return an exp.Literal string 1396 'table' -> return an exp.Table 1397 1398 Example: 1399 >>> from sqlglot import parse_one, exp 1400 >>> from sqlmesh.core.macros import MacroEvaluator, RuntimeStage 1401 >>> sql = "@resolve_template('s3://data-bucket/prod/@{catalog_name}/@{schema_name}/@{table_name}')" 1402 >>> evaluator = MacroEvaluator(runtime_stage=RuntimeStage.CREATING) 1403 >>> evaluator.locals.update({"this_model": exp.to_table("test_catalog.sqlmesh__test.test__test_model__2517971505")}) 1404 >>> evaluator.transform(parse_one(sql)).sql() 1405 "'s3://data-bucket/prod/test_catalog/sqlmesh__test/test__test_model__2517971505'" 1406 """ 1407 if "this_model" in evaluator.locals: 1408 this_model = exp.to_table(evaluator.locals["this_model"], dialect=evaluator.dialect) 1409 template_str: str = template.this 1410 result = ( 1411 template_str.replace("@{catalog_name}", this_model.catalog) 1412 .replace("@{schema_name}", this_model.db) 1413 .replace("@{table_name}", this_model.name) 1414 ) 1415 1416 if mode.lower() == "table": 1417 return exp.to_table(result, dialect=evaluator.dialect) 1418 return exp.Literal.string(result) 1419 if evaluator.runtime_stage != RuntimeStage.LOADING.value: 1420 # only error if we are CREATING, EVALUATING or TESTING and @this_model is not present; this could indicate a bug 1421 # otherwise, for LOADING, it's a no-op 1422 raise SQLMeshError( 1423 "@this_model must be present in the macro evaluation context in order to use @resolve_template" 1424 ) 1425 1426 return template 1427 1428 1429def normalize_macro_name(name: str) -> str: 1430 """Prefix macro name with @ and upcase""" 1431 return f"@{name.upper()}" 1432 1433 1434for m in macro.get_registry().values(): 1435 setattr(m, c.SQLMESH_BUILTIN, True) 1436 1437 1438def call_macro( 1439 func: t.Callable, 1440 dialect: DialectType, 1441 path: t.Optional[Path], 1442 provided_args: t.Tuple[t.Any, ...], 1443 provided_kwargs: t.Dict[str, t.Any], 1444 **optional_kwargs: t.Any, 1445) -> t.Any: 1446 # Bind the macro's actual parameters to its formal parameters 1447 sig = inspect.signature(func) 1448 1449 if optional_kwargs: 1450 provided_kwargs = provided_kwargs.copy() 1451 1452 for k, v in optional_kwargs.items(): 1453 if k in sig.parameters: 1454 provided_kwargs[k] = v 1455 1456 bound = sig.bind(*provided_args, **provided_kwargs) 1457 bound.apply_defaults() 1458 1459 try: 1460 annotations = t.get_type_hints(func, localns=get_supported_types()) 1461 except (NameError, TypeError): # forward references aren't handled 1462 annotations = {} 1463 1464 # If the macro is annotated, we try coerce the actual parameters to the corresponding types 1465 if annotations: 1466 for arg, value in bound.arguments.items(): 1467 typ = annotations.get(arg) 1468 if not typ: 1469 continue 1470 1471 # Changes to bound.arguments will reflect in bound.args and bound.kwargs 1472 # https://docs.python.org/3/library/inspect.html#inspect.BoundArguments.arguments 1473 param = sig.parameters[arg] 1474 if param.kind is inspect.Parameter.VAR_POSITIONAL: 1475 bound.arguments[arg] = tuple(_coerce(v, typ, dialect, path) for v in value) 1476 elif param.kind is inspect.Parameter.VAR_KEYWORD: 1477 bound.arguments[arg] = {k: _coerce(v, typ, dialect, path) for k, v in value.items()} 1478 else: 1479 bound.arguments[arg] = _coerce(value, typ, dialect, path) 1480 1481 return func(*bound.args, **bound.kwargs) 1482 1483 1484def _coerce( 1485 expr: t.Any, 1486 typ: t.Any, 1487 dialect: DialectType, 1488 path: t.Optional[Path] = None, 1489 strict: bool = False, 1490) -> t.Any: 1491 """Coerces the given expression to the specified type on a best-effort basis.""" 1492 base_err_msg = f"Failed to coerce expression '{expr}' to type '{typ}'." 1493 try: 1494 if typ is None or typ is t.Any or not isinstance(expr, exp.Expression): 1495 return expr 1496 base = t.get_origin(typ) or typ 1497 1498 # We need to handle Union and TypeVars first since we cannot use isinstance with it 1499 if base in UNION_TYPES: 1500 for branch in t.get_args(typ): 1501 try: 1502 return _coerce(expr, branch, dialect, path, strict=True) 1503 except Exception: 1504 pass 1505 raise SQLMeshError(base_err_msg) 1506 if base is SQL and isinstance(expr, exp.Expression): 1507 return expr.sql(dialect) 1508 1509 if base is t.Literal: 1510 if not isinstance(expr, (exp.Literal, exp.Boolean)): 1511 raise SQLMeshError( 1512 f"{base_err_msg} Coercion to {base} requires a literal expression." 1513 ) 1514 literal_type_args = t.get_args(typ) 1515 try: 1516 for literal_type_arg in literal_type_args: 1517 expr_is_bool = isinstance(expr.this, bool) 1518 literal_is_bool = isinstance(literal_type_arg, bool) 1519 if (expr_is_bool and literal_is_bool and literal_type_arg == expr.this) or ( 1520 not expr_is_bool 1521 and not literal_is_bool 1522 and str(literal_type_arg) == str(expr.this) 1523 ): 1524 return type(literal_type_arg)(expr.this) 1525 except Exception: 1526 raise SQLMeshError(base_err_msg) 1527 raise SQLMeshError(base_err_msg) 1528 1529 if isinstance(expr, base): 1530 return expr 1531 if issubclass(base, exp.Expression): 1532 d = Dialect.get_or_raise(dialect) 1533 into = base if base in d.parser_class.EXPRESSION_PARSERS else None 1534 if into is None: 1535 if isinstance(expr, exp.Literal): 1536 coerced = parse_one(expr.this) 1537 else: 1538 raise SQLMeshError( 1539 f"{base_err_msg} Coercion to {base} requires a literal expression." 1540 ) 1541 else: 1542 coerced = parse_one( 1543 expr.this if isinstance(expr, exp.Literal) else expr.sql(), into=into 1544 ) 1545 if isinstance(coerced, base): 1546 return coerced 1547 raise SQLMeshError(base_err_msg) 1548 1549 if base in (int, float, str) and isinstance(expr, exp.Literal): 1550 return base(expr.this) 1551 if base is str and isinstance(expr, exp.Column) and not expr.table: 1552 return expr.name 1553 if base is bool and isinstance(expr, exp.Boolean): 1554 return expr.this 1555 if base is datetime and isinstance(expr, exp.Literal): 1556 return to_datetime(expr.this) 1557 if base is date and isinstance(expr, exp.Literal): 1558 return to_date(expr.this) 1559 if base is tuple and isinstance(expr, (exp.Tuple, exp.Array)): 1560 generic = t.get_args(typ) 1561 if not generic: 1562 return tuple(expr.expressions) 1563 if generic[-1] is ...: 1564 return tuple(_coerce(expr, generic[0], dialect, path) for expr in expr.expressions) 1565 if len(generic) == len(expr.expressions): 1566 return tuple( 1567 _coerce(expr, generic[i], dialect, path) 1568 for i, expr in enumerate(expr.expressions) 1569 ) 1570 raise SQLMeshError(f"{base_err_msg} Expected {len(generic)} items.") 1571 if base is list and isinstance(expr, (exp.Array, exp.Tuple)): 1572 generic = t.get_args(typ) 1573 if not generic: 1574 return expr.expressions 1575 return [_coerce(expr, generic[0], dialect, path) for expr in expr.expressions] 1576 raise SQLMeshError(base_err_msg) 1577 except Exception: 1578 if strict: 1579 raise 1580 1581 from sqlmesh.core.console import get_console 1582 1583 get_console().log_error( 1584 f"Coercion of expression '{expr}' to type '{typ}' failed. Using non coerced expression at '{path}'", 1585 ) 1586 return expr 1587 1588 1589def convert_sql(v: t.Any, dialect: DialectType) -> t.Any: 1590 try: 1591 return _cache_convert_sql(v, dialect, v.__class__) 1592 # dicts aren't hashable but are convertable 1593 except TypeError: 1594 return _convert_sql(v, dialect) 1595 1596 1597def _convert_sql(v: t.Any, dialect: DialectType) -> t.Any: 1598 if not isinstance(v, str): 1599 try: 1600 v = exp.convert(v) 1601 # we use bare Exception instead of ValueError because there's 1602 # a recursive error with MagicMock. 1603 except Exception: 1604 pass 1605 1606 if isinstance(v, exp.Expression): 1607 if (isinstance(v, exp.Column) and not v.table) or ( 1608 isinstance(v, exp.Identifier) or v.is_string 1609 ): 1610 return v.name 1611 v = v.sql(dialect=dialect) 1612 return v 1613 1614 1615@lru_cache(maxsize=16384) 1616def _cache_convert_sql(v: t.Any, dialect: DialectType, t: type) -> t.Any: 1617 return _convert_sql(v, dialect)
64class RuntimeStage(Enum): 65 LOADING = "loading" 66 CREATING = "creating" 67 EVALUATING = "evaluating" 68 PROMOTING = "promoting" 69 DEMOTING = "demoting" 70 AUDITING = "auditing" 71 TESTING = "testing" 72 BEFORE_ALL = "before_all" 73 AFTER_ALL = "after_all"
An enumeration.
Inherited Members
- enum.Enum
- name
- value
A string class for supporting $-substitutions.
Inherited Members
- string.Template
- Template
- idpattern
- braceidpattern
- flags
- template
- substitute
- safe_substitute
84@lru_cache() 85def get_supported_types() -> t.Dict[str, t.Any]: 86 from sqlmesh.core.context import ExecutionContext 87 88 return { 89 "t": t, 90 "typing": t, 91 "List": t.List, 92 "Tuple": t.Tuple, 93 "Union": t.Union, 94 "DatetimeRanges": DatetimeRanges, 95 "exp": exp, 96 "SQL": SQL, 97 "MacroEvaluator": MacroEvaluator, 98 "ExecutionContext": ExecutionContext, 99 }
132class CaseInsensitiveMapping(t.Dict[str, t.Any]): 133 def __init__(self, data: t.Dict[str, t.Any]) -> None: 134 super().__init__(data) 135 136 def __getitem__(self, key: str) -> t.Any: 137 return super().__getitem__(key.lower()) 138 139 def get(self, key: str, default: t.Any = None, /) -> t.Any: 140 return super().get(key.lower(), default)
139 def get(self, key: str, default: t.Any = None, /) -> t.Any: 140 return super().get(key.lower(), default)
Return the value for key if key is in the dictionary, else default.
Inherited Members
- builtins.dict
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy
143class MacroDialect(Python): 144 class Generator(Python.Generator): 145 TRANSFORMS = { 146 **Python.Generator.TRANSFORMS, # type: ignore 147 exp.Column: lambda self, e: f"exp.to_column('{self.sql(e, 'this')}')", 148 exp.Lambda: lambda self, e: f"lambda {self.expressions(e)}: {self.sql(e, 'this')}", 149 MacroFunc: _macro_func_sql, 150 MacroSQL: lambda self, e: _macro_sql(self.sql(e, "this"), e.args.get("into")), 151 MacroStrReplace: lambda self, e: _macro_str_replace(self.sql(e, "this")), 152 }
Mapping of an escaped sequence (\n) to its unescaped version (
).
Inherited Members
- sqlglot.dialects.dialect.Dialect
- Dialect
- INDEX_OFFSET
- WEEK_OFFSET
- UNNEST_COLUMN_ONLY
- ALIAS_POST_TABLESAMPLE
- TABLESAMPLE_SIZE_IS_PERCENT
- NORMALIZATION_STRATEGY
- IDENTIFIERS_CAN_START_WITH_DIGIT
- DPIPE_IS_STRING_CONCAT
- STRICT_STRING_CONCAT
- SUPPORTS_USER_DEFINED_TYPES
- SUPPORTS_SEMI_ANTI_JOIN
- COPY_PARAMS_ARE_CSV
- NORMALIZE_FUNCTIONS
- PRESERVE_ORIGINAL_NAMES
- LOG_BASE_FIRST
- NULL_ORDERING
- TYPED_DIVISION
- SAFE_DIVISION
- CONCAT_COALESCE
- HEX_LOWERCASE
- DATE_FORMAT
- DATEINT_FORMAT
- TIME_FORMAT
- TIME_MAPPING
- FORMAT_MAPPING
- PSEUDOCOLUMNS
- PREFER_CTE_ALIAS_COLUMN
- FORCE_EARLY_ALIAS_REF_EXPANSION
- EXPAND_ONLY_GROUP_ALIAS_REF
- ANNOTATE_ALL_SCOPES
- DISABLES_ALIAS_REF_EXPANSION
- SUPPORTS_ALIAS_REFS_IN_JOIN_CONDITIONS
- SUPPORTS_ORDER_BY_ALL
- PROJECTION_ALIASES_SHADOW_SOURCE_NAMES
- TABLES_REFERENCEABLE_AS_COLUMNS
- SUPPORTS_STRUCT_STAR_EXPANSION
- EXCLUDES_PSEUDOCOLUMNS_FROM_STAR
- QUERY_RESULTS_ARE_STRUCTS
- REQUIRES_PARENTHESIZED_STRUCT_ACCESS
- SUPPORTS_NULL_TYPE
- COALESCE_COMPARISON_NON_STANDARD
- HAS_DISTINCT_ARRAY_CONSTRUCTORS
- SUPPORTS_FIXED_SIZE_ARRAYS
- STRICT_JSON_PATH_SYNTAX
- ON_CONDITION_EMPTY_BEFORE_ERROR
- ARRAY_AGG_INCLUDES_NULLS
- ARRAY_FUNCS_PROPAGATES_NULLS
- PROMOTE_TO_INFERRED_DATETIME_TYPE
- SUPPORTS_VALUES_DEFAULT
- NUMBERS_CAN_BE_UNDERSCORE_SEPARATED
- HEX_STRING_IS_INTEGER_TYPE
- REGEXP_EXTRACT_DEFAULT_GROUP
- REGEXP_EXTRACT_POSITION_OVERFLOW_RETURNS_NULL
- SET_OP_DISTINCT_BY_DEFAULT
- CREATABLE_KIND_MAPPING
- ALTER_TABLE_SUPPORTS_CASCADE
- ALTER_TABLE_ADD_REQUIRED_FOR_EACH_COLUMN
- TRY_CAST_REQUIRES_STRING
- SAFE_TO_ELIMINATE_DOUBLE_NEGATION
- INITCAP_DEFAULT_DELIMITER_CHARS
- BYTE_STRING_IS_BYTES_TYPE
- UUID_IS_STRING_TYPE
- JSON_EXTRACT_SCALAR_SCALAR_ONLY
- DEFAULT_FUNCTIONS_COLUMN_NAMES
- DEFAULT_NULL_TYPE
- LEAST_GREATEST_IGNORES_NULLS
- PRIORITIZE_NON_LITERAL_TYPES
- DATE_PART_MAPPING
- COERCES_TO
- EXPRESSION_METADATA
- SUPPORTED_SETTINGS
- get_or_raise
- format_time
- version
- settings
- normalize_identifier
- case_sensitive
- can_quote
- quote_identifier
- to_json_path
- parse
- parse_into
- generate
- transpile
- tokenize
- tokenizer
- jsonpath_tokenizer
- parser
- generator
- generate_values_aliases
- sqlglot.executor.python.Python
- Tokenizer
144 class Generator(Python.Generator): 145 TRANSFORMS = { 146 **Python.Generator.TRANSFORMS, # type: ignore 147 exp.Column: lambda self, e: f"exp.to_column('{self.sql(e, 'this')}')", 148 exp.Lambda: lambda self, e: f"lambda {self.expressions(e)}: {self.sql(e, 'this')}", 149 MacroFunc: _macro_func_sql, 150 MacroSQL: lambda self, e: _macro_sql(self.sql(e, "this"), e.args.get("into")), 151 MacroStrReplace: lambda self, e: _macro_str_replace(self.sql(e, "this")), 152 }
Generator converts a given syntax tree to the corresponding SQL string.
Arguments:
- pretty: Whether to format the produced SQL string. Default: False.
- identify: Determines when an identifier should be quoted. Possible values are: False (default): Never quote, except in cases where it's mandatory by the dialect. True: Always quote except for specials cases. 'safe': Only quote identifiers that are case insensitive.
- normalize: Whether to normalize identifiers to lowercase. Default: False.
- pad: The pad size in a formatted string. For example, this affects the indentation of a projection in a query, relative to its nesting level. Default: 2.
- indent: The indentation size in a formatted string. For example, this affects the
indentation of subqueries and filters under a
WHEREclause. Default: 2. - normalize_functions: How to normalize function names. Possible values are: "upper" or True (default): Convert names to uppercase. "lower": Convert names to lowercase. False: Disables function name normalization.
- unsupported_level: Determines the generator's behavior when it encounters unsupported expressions. Default ErrorLevel.WARN.
- max_unsupported: Maximum number of unsupported messages to include in a raised UnsupportedError. This is only relevant if unsupported_level is ErrorLevel.RAISE. Default: 3
- leading_comma: Whether the comma is leading or trailing in select expressions. This is only relevant when generating in pretty mode. Default: False
- max_text_width: The max number of characters in a segment before creating new lines in pretty mode. The default is on the smaller end because the length only represents a segment and not the true line length. Default: 80
- comments: Whether to preserve comments in the output SQL code. Default: True
Inherited Members
- sqlglot.generator.Generator
- Generator
- NULL_ORDERING_SUPPORTED
- IGNORE_NULLS_IN_FUNC
- LOCKING_READS_SUPPORTED
- EXCEPT_INTERSECT_SUPPORT_ALL_CLAUSE
- WRAP_DERIVED_VALUES
- CREATE_FUNCTION_RETURN_AS
- MATCHED_BY_SOURCE
- SINGLE_STRING_INTERVAL
- INTERVAL_ALLOWS_PLURAL_FORM
- LIMIT_FETCH
- LIMIT_ONLY_LITERALS
- RENAME_TABLE_WITH_DB
- GROUPINGS_SEP
- INDEX_ON
- INOUT_SEPARATOR
- JOIN_HINTS
- DIRECTED_JOINS
- TABLE_HINTS
- QUERY_HINTS
- QUERY_HINT_SEP
- IS_BOOL_ALLOWED
- DUPLICATE_KEY_UPDATE_WITH_SET
- LIMIT_IS_TOP
- RETURNING_END
- EXTRACT_ALLOWS_QUOTES
- TZ_TO_WITH_TIME_ZONE
- NVL2_SUPPORTED
- VALUES_AS_TABLE
- ALTER_TABLE_INCLUDE_COLUMN_KEYWORD
- UNNEST_WITH_ORDINALITY
- AGGREGATE_FILTER_SUPPORTED
- SEMI_ANTI_JOIN_WITH_SIDE
- COMPUTED_COLUMN_WITH_TYPE
- SUPPORTS_TABLE_COPY
- TABLESAMPLE_REQUIRES_PARENS
- TABLESAMPLE_SIZE_IS_ROWS
- TABLESAMPLE_KEYWORDS
- TABLESAMPLE_WITH_METHOD
- TABLESAMPLE_SEED_KEYWORD
- COLLATE_IS_FUNC
- DATA_TYPE_SPECIFIERS_ALLOWED
- ENSURE_BOOLS
- CTE_RECURSIVE_KEYWORD_REQUIRED
- SUPPORTS_SINGLE_ARG_CONCAT
- LAST_DAY_SUPPORTS_DATE_PART
- SUPPORTS_TABLE_ALIAS_COLUMNS
- UNPIVOT_ALIASES_ARE_IDENTIFIERS
- JSON_KEY_VALUE_PAIR_SEP
- INSERT_OVERWRITE
- SUPPORTS_SELECT_INTO
- SUPPORTS_UNLOGGED_TABLES
- SUPPORTS_CREATE_TABLE_LIKE
- LIKE_PROPERTY_INSIDE_SCHEMA
- MULTI_ARG_DISTINCT
- JSON_TYPE_REQUIRED_FOR_EXTRACTION
- JSON_PATH_BRACKETED_KEY_SUPPORTED
- JSON_PATH_SINGLE_QUOTE_ESCAPE
- SUPPORTED_JSON_PATH_PARTS
- CAN_IMPLEMENT_ARRAY_ANY
- SUPPORTS_TO_NUMBER
- SUPPORTS_WINDOW_EXCLUDE
- SET_OP_MODIFIERS
- COPY_PARAMS_ARE_WRAPPED
- COPY_PARAMS_EQ_REQUIRED
- COPY_HAS_INTO_KEYWORD
- UNICODE_SUBSTITUTE
- STAR_EXCEPT
- HEX_FUNC
- WITH_PROPERTIES_PREFIX
- QUOTE_JSON_PATH
- PAD_FILL_PATTERN_IS_REQUIRED
- SUPPORTS_EXPLODING_PROJECTIONS
- ARRAY_CONCAT_IS_VAR_LEN
- SUPPORTS_CONVERT_TIMEZONE
- SUPPORTS_MEDIAN
- SUPPORTS_UNIX_SECONDS
- ALTER_SET_WRAPPED
- NORMALIZE_EXTRACT_DATE_PARTS
- PARSE_JSON_NAME
- ARRAY_SIZE_NAME
- ALTER_SET_TYPE
- ARRAY_SIZE_DIM_REQUIRED
- SUPPORTS_BETWEEN_FLAGS
- SUPPORTS_LIKE_QUANTIFIERS
- MATCH_AGAINST_TABLE_PREFIX
- SET_ASSIGNMENT_REQUIRES_VARIABLE_KEYWORD
- UPDATE_STATEMENT_SUPPORTS_FROM
- TYPE_MAPPING
- UNSUPPORTED_TYPES
- TIME_PART_SINGULARS
- TOKEN_MAPPING
- STRUCT_DELIMITER
- PARAMETER_TOKEN
- NAMED_PLACEHOLDER_TOKEN
- EXPRESSION_PRECEDES_PROPERTIES_CREATABLES
- PROPERTIES_LOCATION
- RESERVED_KEYWORDS
- WITH_SEPARATED_COMMENTS
- EXCLUDE_COMMENTS
- UNWRAPPED_INTERVAL_VALUES
- PARAMETERIZABLE_TEXT_TYPES
- EXPRESSIONS_WITHOUT_NESTED_CTES
- RESPECT_IGNORE_NULLS_UNSUPPORTED_EXPRESSIONS
- SAFE_JSON_PATH_KEY_RE
- SENTINEL_LINE_BREAK
- pretty
- identify
- normalize
- pad
- unsupported_level
- max_unsupported
- leading_comma
- max_text_width
- comments
- dialect
- normalize_functions
- unsupported_messages
- generate
- preprocess
- unsupported
- sep
- seg
- sanitize_comment
- maybe_comment
- wrap
- no_identify
- normalize_func
- indent
- sql
- uncache_sql
- cache_sql
- characterset_sql
- column_parts
- column_sql
- pseudocolumn_sql
- columnposition_sql
- columndef_sql
- columnconstraint_sql
- computedcolumnconstraint_sql
- autoincrementcolumnconstraint_sql
- compresscolumnconstraint_sql
- generatedasidentitycolumnconstraint_sql
- generatedasrowcolumnconstraint_sql
- periodforsystemtimeconstraint_sql
- notnullcolumnconstraint_sql
- primarykeycolumnconstraint_sql
- uniquecolumnconstraint_sql
- inoutcolumnconstraint_sql
- createable_sql
- create_sql
- sequenceproperties_sql
- clone_sql
- describe_sql
- heredoc_sql
- prepend_ctes
- with_sql
- cte_sql
- tablealias_sql
- bitstring_sql
- hexstring_sql
- bytestring_sql
- unicodestring_sql
- rawstring_sql
- datatypeparam_sql
- datatype_sql
- directory_sql
- delete_sql
- drop_sql
- set_operation
- set_operations
- fetch_sql
- limitoptions_sql
- filter_sql
- hint_sql
- indexparameters_sql
- index_sql
- identifier_sql
- hex_sql
- lowerhex_sql
- inputoutputformat_sql
- national_sql
- partition_sql
- properties_sql
- root_properties
- properties
- with_properties
- locate_properties
- property_name
- property_sql
- likeproperty_sql
- fallbackproperty_sql
- journalproperty_sql
- freespaceproperty_sql
- checksumproperty_sql
- mergeblockratioproperty_sql
- datablocksizeproperty_sql
- blockcompressionproperty_sql
- isolatedloadingproperty_sql
- partitionboundspec_sql
- partitionedofproperty_sql
- lockingproperty_sql
- withdataproperty_sql
- withsystemversioningproperty_sql
- insert_sql
- introducer_sql
- kill_sql
- pseudotype_sql
- objectidentifier_sql
- onconflict_sql
- returning_sql
- rowformatdelimitedproperty_sql
- withtablehint_sql
- indextablehint_sql
- historicaldata_sql
- table_parts
- table_sql
- tablefromrows_sql
- tablesample_sql
- pivot_sql
- version_sql
- tuple_sql
- update_sql
- values_sql
- var_sql
- into_sql
- from_sql
- groupingsets_sql
- rollup_sql
- rollupindex_sql
- rollupproperty_sql
- cube_sql
- group_sql
- having_sql
- connect_sql
- prior_sql
- join_sql
- lambda_sql
- lateral_op
- lateral_sql
- limit_sql
- offset_sql
- setitem_sql
- set_sql
- queryband_sql
- pragma_sql
- lock_sql
- literal_sql
- escape_str
- loaddata_sql
- null_sql
- boolean_sql
- booland_sql
- boolor_sql
- order_sql
- withfill_sql
- cluster_sql
- distribute_sql
- sort_sql
- ordered_sql
- matchrecognizemeasure_sql
- matchrecognize_sql
- query_modifiers
- options_modifier
- for_modifiers
- queryoption_sql
- offset_limit_modifiers
- after_limit_modifiers
- select_sql
- schema_sql
- schema_columns_sql
- star_sql
- parameter_sql
- sessionparameter_sql
- placeholder_sql
- subquery_sql
- qualify_sql
- unnest_sql
- prewhere_sql
- where_sql
- window_sql
- partition_by_sql
- windowspec_sql
- withingroup_sql
- between_sql
- bracket_offset_expressions
- bracket_sql
- all_sql
- any_sql
- exists_sql
- case_sql
- constraint_sql
- nextvaluefor_sql
- extract_sql
- trim_sql
- convert_concat_args
- concat_sql
- concatws_sql
- check_sql
- foreignkey_sql
- primarykey_sql
- if_sql
- matchagainst_sql
- jsonkeyvalue_sql
- jsonpath_sql
- json_path_part
- formatjson_sql
- formatphrase_sql
- jsonobject_sql
- jsonobjectagg_sql
- jsonarray_sql
- jsonarrayagg_sql
- jsoncolumndef_sql
- jsonschema_sql
- jsontable_sql
- openjsoncolumndef_sql
- openjson_sql
- in_sql
- in_unnest_op
- interval_sql
- return_sql
- reference_sql
- anonymous_sql
- paren_sql
- neg_sql
- not_sql
- alias_sql
- pivotalias_sql
- aliases_sql
- atindex_sql
- attimezone_sql
- fromtimezone_sql
- add_sql
- and_sql
- or_sql
- xor_sql
- connector_sql
- bitwiseand_sql
- bitwiseleftshift_sql
- bitwisenot_sql
- bitwiseor_sql
- bitwiserightshift_sql
- bitwisexor_sql
- cast_sql
- strtotime_sql
- currentdate_sql
- collate_sql
- command_sql
- comment_sql
- mergetreettlaction_sql
- mergetreettl_sql
- transaction_sql
- commit_sql
- rollback_sql
- altercolumn_sql
- alterindex_sql
- alterdiststyle_sql
- altersortkey_sql
- alterrename_sql
- renamecolumn_sql
- alterset_sql
- alter_sql
- altersession_sql
- add_column_sql
- droppartition_sql
- addconstraint_sql
- addpartition_sql
- distinct_sql
- ignorenulls_sql
- respectnulls_sql
- havingmax_sql
- intdiv_sql
- dpipe_sql
- div_sql
- safedivide_sql
- overlaps_sql
- distance_sql
- dot_sql
- eq_sql
- propertyeq_sql
- escape_sql
- glob_sql
- gt_sql
- gte_sql
- is_sql
- like_sql
- ilike_sql
- match_sql
- similarto_sql
- lt_sql
- lte_sql
- mod_sql
- mul_sql
- neq_sql
- nullsafeeq_sql
- nullsafeneq_sql
- sub_sql
- trycast_sql
- jsoncast_sql
- try_sql
- log_sql
- use_sql
- binary
- ceil_floor
- function_fallback_sql
- func
- format_args
- too_wide
- format_time
- expressions
- op_expressions
- naked_property
- tag_sql
- token_sql
- userdefinedfunction_sql
- joinhint_sql
- kwarg_sql
- when_sql
- whens_sql
- merge_sql
- tochar_sql
- tonumber_sql
- dictproperty_sql
- dictrange_sql
- dictsubproperty_sql
- duplicatekeyproperty_sql
- uniquekeyproperty_sql
- distributedbyproperty_sql
- oncluster_sql
- clusteredbyproperty_sql
- anyvalue_sql
- querytransform_sql
- indexconstraintoption_sql
- checkcolumnconstraint_sql
- indexcolumnconstraint_sql
- nvl2_sql
- comprehension_sql
- columnprefix_sql
- opclass_sql
- predict_sql
- generateembedding_sql
- mltranslate_sql
- mlforecast_sql
- featuresattime_sql
- vectorsearch_sql
- forin_sql
- refresh_sql
- toarray_sql
- tsordstotime_sql
- tsordstotimestamp_sql
- tsordstodatetime_sql
- tsordstodate_sql
- unixdate_sql
- lastday_sql
- dateadd_sql
- arrayany_sql
- struct_sql
- partitionrange_sql
- truncatetable_sql
- convert_sql
- copyparameter_sql
- credentials_sql
- copy_sql
- semicolon_sql
- datadeletionproperty_sql
- maskingpolicycolumnconstraint_sql
- gapfill_sql
- scope_resolution
- scoperesolution_sql
- parsejson_sql
- rand_sql
- changes_sql
- pad_sql
- summarize_sql
- explodinggenerateseries_sql
- converttimezone_sql
- json_sql
- jsonvalue_sql
- conditionalinsert_sql
- multitableinserts_sql
- oncondition_sql
- jsonextractquote_sql
- jsonexists_sql
- arrayagg_sql
- slice_sql
- apply_sql
- grant_sql
- revoke_sql
- grantprivilege_sql
- grantprincipal_sql
- columns_sql
- overlay_sql
- todouble_sql
- string_sql
- median_sql
- overflowtruncatebehavior_sql
- unixseconds_sql
- arraysize_sql
- attach_sql
- detach_sql
- attachoption_sql
- watermarkcolumnconstraint_sql
- encodeproperty_sql
- includeproperty_sql
- xmlelement_sql
- xmlkeyvalueoption_sql
- partitionbyrangeproperty_sql
- partitionbyrangepropertydynamic_sql
- unpivotcolumns_sql
- analyzesample_sql
- analyzestatistics_sql
- analyzehistogram_sql
- analyzedelete_sql
- analyzelistchainedrows_sql
- analyzevalidate_sql
- analyze_sql
- xmltable_sql
- xmlnamespace_sql
- export_sql
- declare_sql
- declareitem_sql
- recursivewithsearch_sql
- parameterizedagg_sql
- anonymousaggfunc_sql
- combinedaggfunc_sql
- combinedparameterizedagg_sql
- show_sql
- install_sql
- get_put_sql
- translatecharacters_sql
- decodecase_sql
- semanticview_sql
- getextract_sql
- datefromunixdate_sql
- space_sql
- buildproperty_sql
- refreshtriggerproperty_sql
- modelattribute_sql
- directorystage_sql
- uuid_sql
- initcap_sql
- localtime_sql
- localtimestamp_sql
- weekstart_sql
- chr_sql
155class MacroEvaluator: 156 """The class responsible for evaluating SQLMesh Macros/SQL. 157 158 SQLMesh supports special preprocessed SQL prefixed with `@`. Although it provides similar power to 159 traditional methods like string templating, there is semantic understanding of SQL which prevents 160 common errors like leading/trailing commas, syntax errors, etc. 161 162 SQLMesh SQL allows for macro variables and macro functions. Macro variables take the form of @variable. These are used for variable substitution. 163 164 SELECT * FROM foo WHERE ds BETWEEN @start_date AND @end_date 165 166 Macro variables can be defined with a special macro function. 167 168 @DEF(start_date, '2021-01-01') 169 170 Args: 171 dialect: Dialect of the SQL to evaluate. 172 python_env: Serialized Python environment. 173 """ 174 175 def __init__( 176 self, 177 dialect: DialectType = "", 178 python_env: t.Optional[t.Dict[str, Executable]] = None, 179 schema: t.Optional[MappingSchema] = None, 180 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 181 resolve_table: t.Optional[t.Callable[[str | exp.Table], str]] = None, 182 resolve_tables: t.Optional[t.Callable[[exp.Expression], exp.Expression]] = None, 183 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 184 default_catalog: t.Optional[str] = None, 185 path: t.Optional[Path] = None, 186 environment_naming_info: t.Optional[EnvironmentNamingInfo] = None, 187 model_fqn: t.Optional[str] = None, 188 ): 189 self.dialect = dialect 190 self.generator = MacroDialect().generator() 191 self.locals: t.Dict[str, t.Any] = { 192 "runtime_stage": runtime_stage.value, 193 "default_catalog": default_catalog, 194 } 195 self.env = { 196 **ENV, 197 "self": self, 198 "SQL": SQL, 199 "MacroEvaluator": MacroEvaluator, 200 } 201 self.python_env = python_env or {} 202 self.macros = {normalize_macro_name(k): v.func for k, v in macro.get_registry().items()} 203 self.columns_to_types_called = False 204 self.default_catalog = default_catalog 205 206 self._schema = schema 207 self._resolve_table = resolve_table 208 self._resolve_tables = resolve_tables 209 self._snapshots = snapshots if snapshots is not None else {} 210 self._path = path 211 self._environment_naming_info = environment_naming_info 212 self._model_fqn = model_fqn 213 214 prepare_env(self.python_env, self.env) 215 for k, v in self.python_env.items(): 216 if v.is_definition: 217 self.macros[normalize_macro_name(k)] = self.env[v.name or k] 218 elif v.is_import and getattr(self.env.get(k), c.SQLMESH_MACRO, None): 219 self.macros[normalize_macro_name(k)] = self.env[k] 220 elif v.is_value: 221 value = self.env[k] 222 if k in ( 223 c.SQLMESH_VARS, 224 c.SQLMESH_VARS_METADATA, 225 c.SQLMESH_BLUEPRINT_VARS, 226 c.SQLMESH_BLUEPRINT_VARS_METADATA, 227 ): 228 value = { 229 var_name: ( 230 self.parse_one(var_value.sql) 231 if isinstance(var_value, SqlValue) 232 else var_value 233 ) 234 for var_name, var_value in value.items() 235 } 236 237 self.locals[k] = value 238 239 def send( 240 self, name: str, *args: t.Any, **kwargs: t.Any 241 ) -> t.Union[None, exp.Expression, t.List[exp.Expression]]: 242 func = self.macros.get(normalize_macro_name(name)) 243 244 if not callable(func): 245 raise MacroEvalError(f"Macro '{name}' does not exist.") 246 247 try: 248 return call_macro( 249 func, self.dialect, self._path, provided_args=(self, *args), provided_kwargs=kwargs 250 ) # type: ignore 251 except Exception as e: 252 raise MacroEvalError( 253 f"An error occurred during evaluation of '{name}'\n\n" 254 + format_evaluated_code_exception(e, self.python_env) 255 ) 256 257 def transform( 258 self, expression: exp.Expression 259 ) -> exp.Expression | t.List[exp.Expression] | None: 260 changed = False 261 262 def evaluate_macros( 263 node: exp.Expression, 264 ) -> exp.Expression | t.List[exp.Expression] | None: 265 nonlocal changed 266 267 if isinstance(node, MacroVar): 268 changed = True 269 variables = self.variables 270 271 # This makes all variables case-insensitive, e.g. @X is the same as @x. We do this 272 # for consistency, since `variables` and `blueprint_variables` are normalized. 273 var_name = node.name.lower() 274 275 if var_name not in self.locals and var_name not in variables: 276 if not isinstance(node.parent, StagedFilePath): 277 raise SQLMeshError(f"Macro variable '{node.name}' is undefined.") 278 279 return node 280 281 # Precedence order is locals (e.g. @DEF) > blueprint variables > config variables 282 value = self.locals.get(var_name, variables.get(var_name)) 283 if isinstance(value, list): 284 return exp.convert( 285 tuple( 286 self.transform(v) if isinstance(v, exp.Expression) else v for v in value 287 ) 288 ) 289 290 return exp.convert( 291 self.transform(value) if isinstance(value, exp.Expression) else value 292 ) 293 if isinstance(node, exp.Identifier) and "@" in node.this: 294 text = self.template(node.this, {}) 295 if node.this != text: 296 changed = True 297 return exp.to_identifier(text, quoted=node.quoted or None) 298 if isinstance(node, MacroFunc): 299 changed = True 300 return self.evaluate(node) 301 return node 302 303 transformed = exp.replace_tree( 304 expression.copy(), evaluate_macros, prune=lambda n: isinstance(n, exp.Lambda) 305 ) 306 307 if changed: 308 # the transformations could have corrupted the ast, turning this into sql and reparsing ensures 309 # that the ast is correct 310 if isinstance(transformed, list): 311 return [ 312 self.parse_one(node.sql(dialect=self.dialect, copy=False)) 313 for node in transformed 314 ] 315 if isinstance(transformed, exp.Expression): 316 return self.parse_one(transformed.sql(dialect=self.dialect, copy=False)) 317 318 return transformed 319 320 def template(self, text: t.Any, local_variables: t.Dict[str, t.Any]) -> str: 321 """Substitute @vars with locals. 322 323 Args: 324 text: The string to do substitition on. 325 local_variables: Local variables in the context so that lambdas can be used. 326 327 Returns: 328 The rendered string. 329 """ 330 # We try to convert all variables into sqlglot expressions because they're going to be converted 331 # into strings; in sql we don't convert strings because that would result in adding quotes 332 base_mapping = { 333 k.lower(): convert_sql(v, self.dialect) 334 for k, v in chain(self.variables.items(), self.locals.items(), local_variables.items()) 335 if k.lower() 336 not in ( 337 "engine_adapter", 338 "snapshot", 339 ) 340 } 341 return MacroStrTemplate(str(text)).safe_substitute(CaseInsensitiveMapping(base_mapping)) 342 343 def evaluate(self, node: MacroFunc) -> exp.Expression | t.List[exp.Expression] | None: 344 if isinstance(node, MacroDef): 345 if isinstance(node.expression, exp.Lambda): 346 _, fn = _norm_var_arg_lambda(self, node.expression) 347 self.macros[normalize_macro_name(node.name)] = lambda _, *args: fn( 348 args[0] if len(args) == 1 else exp.Tuple(expressions=list(args)) 349 ) 350 else: 351 # Make variables defined through `@DEF` case-insensitive 352 self.locals[node.name.lower()] = self.transform(node.expression) 353 354 return node 355 356 if isinstance(node, (MacroSQL, MacroStrReplace)): 357 result: t.Optional[exp.Expression | t.List[exp.Expression]] = exp.convert( 358 self.eval_expression(node) 359 ) 360 else: 361 func = t.cast(exp.Anonymous, node.this) 362 363 args = [] 364 kwargs = {} 365 for e in func.expressions: 366 if isinstance(e, exp.PropertyEQ): 367 kwargs[e.this.name] = e.expression 368 else: 369 if kwargs: 370 raise MacroEvalError( 371 "Positional argument cannot follow keyword argument.\n " 372 f"{func.sql(dialect=self.dialect)} at '{self._path}'" 373 ) 374 375 args.append(e) 376 377 result = self.send(func.name, *args, **kwargs) 378 379 if result is None: 380 return None 381 382 if isinstance(result, (tuple, list)): 383 result = [self.parse_one(item) for item in result if item is not None] 384 385 if ( 386 len(result) == 1 387 and isinstance(result[0], (exp.Array, exp.Tuple)) 388 and node.find_ancestor(MacroFunc) 389 ): 390 """ 391 if: 392 - the output of evaluating this node is being passed as an argument to another macro function 393 - and that output is something that _norm_var_arg_lambda() will unpack into varargs 394 > (a list containing a single item of type exp.Tuple/exp.Array) 395 then we will get inconsistent behaviour depending on if this node emits a list with a single item vs multiple items. 396 397 In the first case, emitting a list containing a single array item will cause that array to get unpacked and its *members* passed to the calling macro 398 In the second case, emitting a list containing multiple array items will cause each item to get passed as-is to the calling macro 399 400 To prevent this inconsistency, we wrap this node output in an exp.Array so that _norm_var_arg_lambda() can "unpack" that into the 401 actual argument we want to pass to the parent macro function 402 403 Note we only do this for evaluation results that get passed as an argument to another macro, because when the final 404 result is given to something like SELECT, we still want that to be unpacked into a list of items like: 405 - SELECT ARRAY(1), ARRAY(2) 406 rather than a single item like: 407 - SELECT ARRAY(ARRAY(1), ARRAY(2)) 408 """ 409 result = [exp.Array(expressions=result)] 410 else: 411 result = self.parse_one(result) 412 413 return result 414 415 def eval_expression(self, node: t.Any) -> t.Any: 416 """Converts a SQLGlot expression into executable Python code and evals it. 417 418 If the node is not an expression, it will simply be returned. 419 420 Args: 421 node: expression 422 Returns: 423 The return value of the evaled Python Code. 424 """ 425 if not isinstance(node, exp.Expression): 426 return node 427 code = node.sql() 428 try: 429 code = self.generator.generate(node) 430 return eval(code, self.env, self.locals) 431 except Exception as e: 432 raise MacroEvalError( 433 f"Error trying to eval macro.\n\nGenerated code: {code}\n\nOriginal sql: {node}\n\n" 434 + format_evaluated_code_exception(e, self.python_env) 435 ) 436 437 def parse_one( 438 self, sql: str | exp.Expression, into: t.Optional[exp.IntoType] = None, **opts: t.Any 439 ) -> exp.Expression: 440 """Parses the given SQL string and returns a syntax tree for the first 441 parsed SQL statement. 442 443 Args: 444 sql: the SQL code or expression to parse. 445 into: the Expression to parse into 446 **opts: other options 447 448 Returns: 449 Expression: the syntax tree for the first parsed statement 450 """ 451 return sqlglot.maybe_parse(sql, dialect=self.dialect, into=into, **opts) 452 453 def columns_to_types(self, model_name: TableName | exp.Column) -> t.Dict[str, exp.DataType]: 454 """Returns the columns-to-types mapping corresponding to the specified model.""" 455 456 # We only return this dummy schema at load time, because if we don't actually know the 457 # target model's schema at creation/evaluation time, returning a dummy schema could lead 458 # to unintelligible errors when the query is executed 459 if (self._schema is None or self._schema.empty) and self.runtime_stage == "loading": 460 self.columns_to_types_called = True 461 return {"__schema_unavailable_at_load__": exp.DataType.build("unknown")} 462 463 normalized_model_name = normalize_model_name( 464 model_name, 465 default_catalog=self.default_catalog, 466 dialect=self.dialect, 467 ) 468 model_name = exp.to_table(normalized_model_name) 469 470 columns_to_types = ( 471 self._schema.find(model_name, ensure_data_types=True) if self._schema else None 472 ) 473 if columns_to_types is None: 474 snapshot = self.get_snapshot(model_name) 475 if snapshot and snapshot.node.is_model: 476 columns_to_types = snapshot.node.columns_to_types # type: ignore 477 478 if columns_to_types is None: 479 raise SQLMeshError(f"Schema for model '{model_name}' can't be statically determined.") 480 481 return columns_to_types 482 483 def get_snapshot(self, model_name: TableName | exp.Column) -> t.Optional[Snapshot]: 484 """Returns the snapshot that corresponds to the given model name.""" 485 return self._snapshots.get( 486 normalize_model_name( 487 model_name, 488 default_catalog=self.default_catalog, 489 dialect=self.dialect, 490 ) 491 ) 492 493 def resolve_table(self, table: str | exp.Table) -> str: 494 """Gets the physical table name for a given model.""" 495 if not self._resolve_table: 496 raise SQLMeshError( 497 "Macro evaluator not properly initialized with resolve_table lambda." 498 ) 499 return self._resolve_table(table) 500 501 def resolve_tables(self, query: exp.Expression) -> exp.Expression: 502 """Resolves queries with references to SQLMesh model names to their physical tables.""" 503 if not self._resolve_tables: 504 raise SQLMeshError( 505 "Macro evaluator not properly initialized with resolve_tables lambda." 506 ) 507 return self._resolve_tables(query) 508 509 @property 510 def runtime_stage(self) -> RuntimeStage: 511 """Returns the current runtime stage of the macro evaluation.""" 512 return self.locals["runtime_stage"] 513 514 @property 515 def this_model(self) -> str: 516 """Returns the resolved name of the surrounding model.""" 517 this_model = self.locals.get("this_model") 518 if not this_model: 519 raise SQLMeshError("Model name is not available in the macro evaluator.") 520 return this_model.sql(dialect=self.dialect, identify=True, comments=False) 521 522 @property 523 def this_model_fqn(self) -> str: 524 if self._model_fqn is None: 525 raise SQLMeshError("Model name is not available in the macro evaluator.") 526 return self._model_fqn 527 528 @property 529 def engine_adapter(self) -> EngineAdapter: 530 engine_adapter = self.locals.get("engine_adapter") 531 if not engine_adapter: 532 raise SQLMeshError( 533 "The engine adapter is not available while models are loading." 534 " You can gate these calls by checking in Python: evaluator.runtime_stage != 'loading' or SQL: @runtime_stage <> 'loading'." 535 ) 536 return self.locals["engine_adapter"] 537 538 @property 539 def gateway(self) -> t.Optional[str]: 540 """Returns the gateway name.""" 541 return self.var(c.GATEWAY) 542 543 @property 544 def snapshots(self) -> t.Dict[str, Snapshot]: 545 """Returns the snapshots if available.""" 546 return self._snapshots 547 548 @property 549 def this_env(self) -> str: 550 """Returns the name of the current environment in before after all.""" 551 if "this_env" not in self.locals: 552 raise SQLMeshError("Environment name is only available in before_all and after_all") 553 return self.locals["this_env"] 554 555 @property 556 def schemas(self) -> t.List[str]: 557 """Returns the schemas of the current environment in before after all macros.""" 558 if "schemas" not in self.locals: 559 raise SQLMeshError("Schemas are only available in before_all and after_all") 560 return self.locals["schemas"] 561 562 @property 563 def views(self) -> t.List[str]: 564 """Returns the views of the current environment in before after all macros.""" 565 if "views" not in self.locals: 566 raise SQLMeshError("Views are only available in before_all and after_all") 567 return self.locals["views"] 568 569 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 570 """Returns the value of the specified variable, or the default value if it doesn't exist.""" 571 return { 572 **(self.locals.get(c.SQLMESH_VARS) or {}), 573 **(self.locals.get(c.SQLMESH_VARS_METADATA) or {}), 574 }.get(var_name.lower(), default) 575 576 def blueprint_var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 577 """Returns the value of the specified blueprint variable, or the default value if it doesn't exist.""" 578 return { 579 **(self.locals.get(c.SQLMESH_BLUEPRINT_VARS) or {}), 580 **(self.locals.get(c.SQLMESH_BLUEPRINT_VARS_METADATA) or {}), 581 }.get(var_name.lower(), default) 582 583 @property 584 def variables(self) -> t.Dict[str, t.Any]: 585 return { 586 **self.locals.get(c.SQLMESH_VARS, {}), 587 **self.locals.get(c.SQLMESH_VARS_METADATA, {}), 588 **self.locals.get(c.SQLMESH_BLUEPRINT_VARS, {}), 589 **self.locals.get(c.SQLMESH_BLUEPRINT_VARS_METADATA, {}), 590 } 591 592 def _coerce(self, expr: exp.Expression, typ: t.Any, strict: bool = False) -> t.Any: 593 """Coerces the given expression to the specified type on a best-effort basis.""" 594 return _coerce(expr, typ, self.dialect, self._path, strict)
The class responsible for evaluating SQLMesh Macros/SQL.
SQLMesh supports special preprocessed SQL prefixed with @. Although it provides similar power to
traditional methods like string templating, there is semantic understanding of SQL which prevents
common errors like leading/trailing commas, syntax errors, etc.
SQLMesh SQL allows for macro variables and macro functions. Macro variables take the form of @variable. These are used for variable substitution.
SELECT * FROM foo WHERE ds BETWEEN @start_date AND @end_date
Macro variables can be defined with a special macro function.
@DEF(start_date, '2021-01-01')
Arguments:
- dialect: Dialect of the SQL to evaluate.
- python_env: Serialized Python environment.
175 def __init__( 176 self, 177 dialect: DialectType = "", 178 python_env: t.Optional[t.Dict[str, Executable]] = None, 179 schema: t.Optional[MappingSchema] = None, 180 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 181 resolve_table: t.Optional[t.Callable[[str | exp.Table], str]] = None, 182 resolve_tables: t.Optional[t.Callable[[exp.Expression], exp.Expression]] = None, 183 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 184 default_catalog: t.Optional[str] = None, 185 path: t.Optional[Path] = None, 186 environment_naming_info: t.Optional[EnvironmentNamingInfo] = None, 187 model_fqn: t.Optional[str] = None, 188 ): 189 self.dialect = dialect 190 self.generator = MacroDialect().generator() 191 self.locals: t.Dict[str, t.Any] = { 192 "runtime_stage": runtime_stage.value, 193 "default_catalog": default_catalog, 194 } 195 self.env = { 196 **ENV, 197 "self": self, 198 "SQL": SQL, 199 "MacroEvaluator": MacroEvaluator, 200 } 201 self.python_env = python_env or {} 202 self.macros = {normalize_macro_name(k): v.func for k, v in macro.get_registry().items()} 203 self.columns_to_types_called = False 204 self.default_catalog = default_catalog 205 206 self._schema = schema 207 self._resolve_table = resolve_table 208 self._resolve_tables = resolve_tables 209 self._snapshots = snapshots if snapshots is not None else {} 210 self._path = path 211 self._environment_naming_info = environment_naming_info 212 self._model_fqn = model_fqn 213 214 prepare_env(self.python_env, self.env) 215 for k, v in self.python_env.items(): 216 if v.is_definition: 217 self.macros[normalize_macro_name(k)] = self.env[v.name or k] 218 elif v.is_import and getattr(self.env.get(k), c.SQLMESH_MACRO, None): 219 self.macros[normalize_macro_name(k)] = self.env[k] 220 elif v.is_value: 221 value = self.env[k] 222 if k in ( 223 c.SQLMESH_VARS, 224 c.SQLMESH_VARS_METADATA, 225 c.SQLMESH_BLUEPRINT_VARS, 226 c.SQLMESH_BLUEPRINT_VARS_METADATA, 227 ): 228 value = { 229 var_name: ( 230 self.parse_one(var_value.sql) 231 if isinstance(var_value, SqlValue) 232 else var_value 233 ) 234 for var_name, var_value in value.items() 235 } 236 237 self.locals[k] = value
239 def send( 240 self, name: str, *args: t.Any, **kwargs: t.Any 241 ) -> t.Union[None, exp.Expression, t.List[exp.Expression]]: 242 func = self.macros.get(normalize_macro_name(name)) 243 244 if not callable(func): 245 raise MacroEvalError(f"Macro '{name}' does not exist.") 246 247 try: 248 return call_macro( 249 func, self.dialect, self._path, provided_args=(self, *args), provided_kwargs=kwargs 250 ) # type: ignore 251 except Exception as e: 252 raise MacroEvalError( 253 f"An error occurred during evaluation of '{name}'\n\n" 254 + format_evaluated_code_exception(e, self.python_env) 255 )
257 def transform( 258 self, expression: exp.Expression 259 ) -> exp.Expression | t.List[exp.Expression] | None: 260 changed = False 261 262 def evaluate_macros( 263 node: exp.Expression, 264 ) -> exp.Expression | t.List[exp.Expression] | None: 265 nonlocal changed 266 267 if isinstance(node, MacroVar): 268 changed = True 269 variables = self.variables 270 271 # This makes all variables case-insensitive, e.g. @X is the same as @x. We do this 272 # for consistency, since `variables` and `blueprint_variables` are normalized. 273 var_name = node.name.lower() 274 275 if var_name not in self.locals and var_name not in variables: 276 if not isinstance(node.parent, StagedFilePath): 277 raise SQLMeshError(f"Macro variable '{node.name}' is undefined.") 278 279 return node 280 281 # Precedence order is locals (e.g. @DEF) > blueprint variables > config variables 282 value = self.locals.get(var_name, variables.get(var_name)) 283 if isinstance(value, list): 284 return exp.convert( 285 tuple( 286 self.transform(v) if isinstance(v, exp.Expression) else v for v in value 287 ) 288 ) 289 290 return exp.convert( 291 self.transform(value) if isinstance(value, exp.Expression) else value 292 ) 293 if isinstance(node, exp.Identifier) and "@" in node.this: 294 text = self.template(node.this, {}) 295 if node.this != text: 296 changed = True 297 return exp.to_identifier(text, quoted=node.quoted or None) 298 if isinstance(node, MacroFunc): 299 changed = True 300 return self.evaluate(node) 301 return node 302 303 transformed = exp.replace_tree( 304 expression.copy(), evaluate_macros, prune=lambda n: isinstance(n, exp.Lambda) 305 ) 306 307 if changed: 308 # the transformations could have corrupted the ast, turning this into sql and reparsing ensures 309 # that the ast is correct 310 if isinstance(transformed, list): 311 return [ 312 self.parse_one(node.sql(dialect=self.dialect, copy=False)) 313 for node in transformed 314 ] 315 if isinstance(transformed, exp.Expression): 316 return self.parse_one(transformed.sql(dialect=self.dialect, copy=False)) 317 318 return transformed
320 def template(self, text: t.Any, local_variables: t.Dict[str, t.Any]) -> str: 321 """Substitute @vars with locals. 322 323 Args: 324 text: The string to do substitition on. 325 local_variables: Local variables in the context so that lambdas can be used. 326 327 Returns: 328 The rendered string. 329 """ 330 # We try to convert all variables into sqlglot expressions because they're going to be converted 331 # into strings; in sql we don't convert strings because that would result in adding quotes 332 base_mapping = { 333 k.lower(): convert_sql(v, self.dialect) 334 for k, v in chain(self.variables.items(), self.locals.items(), local_variables.items()) 335 if k.lower() 336 not in ( 337 "engine_adapter", 338 "snapshot", 339 ) 340 } 341 return MacroStrTemplate(str(text)).safe_substitute(CaseInsensitiveMapping(base_mapping))
Substitute @vars with locals.
Arguments:
- text: The string to do substitition on.
- local_variables: Local variables in the context so that lambdas can be used.
Returns:
The rendered string.
343 def evaluate(self, node: MacroFunc) -> exp.Expression | t.List[exp.Expression] | None: 344 if isinstance(node, MacroDef): 345 if isinstance(node.expression, exp.Lambda): 346 _, fn = _norm_var_arg_lambda(self, node.expression) 347 self.macros[normalize_macro_name(node.name)] = lambda _, *args: fn( 348 args[0] if len(args) == 1 else exp.Tuple(expressions=list(args)) 349 ) 350 else: 351 # Make variables defined through `@DEF` case-insensitive 352 self.locals[node.name.lower()] = self.transform(node.expression) 353 354 return node 355 356 if isinstance(node, (MacroSQL, MacroStrReplace)): 357 result: t.Optional[exp.Expression | t.List[exp.Expression]] = exp.convert( 358 self.eval_expression(node) 359 ) 360 else: 361 func = t.cast(exp.Anonymous, node.this) 362 363 args = [] 364 kwargs = {} 365 for e in func.expressions: 366 if isinstance(e, exp.PropertyEQ): 367 kwargs[e.this.name] = e.expression 368 else: 369 if kwargs: 370 raise MacroEvalError( 371 "Positional argument cannot follow keyword argument.\n " 372 f"{func.sql(dialect=self.dialect)} at '{self._path}'" 373 ) 374 375 args.append(e) 376 377 result = self.send(func.name, *args, **kwargs) 378 379 if result is None: 380 return None 381 382 if isinstance(result, (tuple, list)): 383 result = [self.parse_one(item) for item in result if item is not None] 384 385 if ( 386 len(result) == 1 387 and isinstance(result[0], (exp.Array, exp.Tuple)) 388 and node.find_ancestor(MacroFunc) 389 ): 390 """ 391 if: 392 - the output of evaluating this node is being passed as an argument to another macro function 393 - and that output is something that _norm_var_arg_lambda() will unpack into varargs 394 > (a list containing a single item of type exp.Tuple/exp.Array) 395 then we will get inconsistent behaviour depending on if this node emits a list with a single item vs multiple items. 396 397 In the first case, emitting a list containing a single array item will cause that array to get unpacked and its *members* passed to the calling macro 398 In the second case, emitting a list containing multiple array items will cause each item to get passed as-is to the calling macro 399 400 To prevent this inconsistency, we wrap this node output in an exp.Array so that _norm_var_arg_lambda() can "unpack" that into the 401 actual argument we want to pass to the parent macro function 402 403 Note we only do this for evaluation results that get passed as an argument to another macro, because when the final 404 result is given to something like SELECT, we still want that to be unpacked into a list of items like: 405 - SELECT ARRAY(1), ARRAY(2) 406 rather than a single item like: 407 - SELECT ARRAY(ARRAY(1), ARRAY(2)) 408 """ 409 result = [exp.Array(expressions=result)] 410 else: 411 result = self.parse_one(result) 412 413 return result
415 def eval_expression(self, node: t.Any) -> t.Any: 416 """Converts a SQLGlot expression into executable Python code and evals it. 417 418 If the node is not an expression, it will simply be returned. 419 420 Args: 421 node: expression 422 Returns: 423 The return value of the evaled Python Code. 424 """ 425 if not isinstance(node, exp.Expression): 426 return node 427 code = node.sql() 428 try: 429 code = self.generator.generate(node) 430 return eval(code, self.env, self.locals) 431 except Exception as e: 432 raise MacroEvalError( 433 f"Error trying to eval macro.\n\nGenerated code: {code}\n\nOriginal sql: {node}\n\n" 434 + format_evaluated_code_exception(e, self.python_env) 435 )
Converts a SQLGlot expression into executable Python code and evals it.
If the node is not an expression, it will simply be returned.
Arguments:
- node: expression
Returns:
The return value of the evaled Python Code.
437 def parse_one( 438 self, sql: str | exp.Expression, into: t.Optional[exp.IntoType] = None, **opts: t.Any 439 ) -> exp.Expression: 440 """Parses the given SQL string and returns a syntax tree for the first 441 parsed SQL statement. 442 443 Args: 444 sql: the SQL code or expression to parse. 445 into: the Expression to parse into 446 **opts: other options 447 448 Returns: 449 Expression: the syntax tree for the first parsed statement 450 """ 451 return sqlglot.maybe_parse(sql, dialect=self.dialect, into=into, **opts)
Parses the given SQL string and returns a syntax tree for the first parsed SQL statement.
Arguments:
- sql: the SQL code or expression to parse.
- into: the Expression to parse into
- **opts: other options
Returns:
Expression: the syntax tree for the first parsed statement
453 def columns_to_types(self, model_name: TableName | exp.Column) -> t.Dict[str, exp.DataType]: 454 """Returns the columns-to-types mapping corresponding to the specified model.""" 455 456 # We only return this dummy schema at load time, because if we don't actually know the 457 # target model's schema at creation/evaluation time, returning a dummy schema could lead 458 # to unintelligible errors when the query is executed 459 if (self._schema is None or self._schema.empty) and self.runtime_stage == "loading": 460 self.columns_to_types_called = True 461 return {"__schema_unavailable_at_load__": exp.DataType.build("unknown")} 462 463 normalized_model_name = normalize_model_name( 464 model_name, 465 default_catalog=self.default_catalog, 466 dialect=self.dialect, 467 ) 468 model_name = exp.to_table(normalized_model_name) 469 470 columns_to_types = ( 471 self._schema.find(model_name, ensure_data_types=True) if self._schema else None 472 ) 473 if columns_to_types is None: 474 snapshot = self.get_snapshot(model_name) 475 if snapshot and snapshot.node.is_model: 476 columns_to_types = snapshot.node.columns_to_types # type: ignore 477 478 if columns_to_types is None: 479 raise SQLMeshError(f"Schema for model '{model_name}' can't be statically determined.") 480 481 return columns_to_types
Returns the columns-to-types mapping corresponding to the specified model.
483 def get_snapshot(self, model_name: TableName | exp.Column) -> t.Optional[Snapshot]: 484 """Returns the snapshot that corresponds to the given model name.""" 485 return self._snapshots.get( 486 normalize_model_name( 487 model_name, 488 default_catalog=self.default_catalog, 489 dialect=self.dialect, 490 ) 491 )
Returns the snapshot that corresponds to the given model name.
493 def resolve_table(self, table: str | exp.Table) -> str: 494 """Gets the physical table name for a given model.""" 495 if not self._resolve_table: 496 raise SQLMeshError( 497 "Macro evaluator not properly initialized with resolve_table lambda." 498 ) 499 return self._resolve_table(table)
Gets the physical table name for a given model.
501 def resolve_tables(self, query: exp.Expression) -> exp.Expression: 502 """Resolves queries with references to SQLMesh model names to their physical tables.""" 503 if not self._resolve_tables: 504 raise SQLMeshError( 505 "Macro evaluator not properly initialized with resolve_tables lambda." 506 ) 507 return self._resolve_tables(query)
Resolves queries with references to SQLMesh model names to their physical tables.
509 @property 510 def runtime_stage(self) -> RuntimeStage: 511 """Returns the current runtime stage of the macro evaluation.""" 512 return self.locals["runtime_stage"]
Returns the current runtime stage of the macro evaluation.
514 @property 515 def this_model(self) -> str: 516 """Returns the resolved name of the surrounding model.""" 517 this_model = self.locals.get("this_model") 518 if not this_model: 519 raise SQLMeshError("Model name is not available in the macro evaluator.") 520 return this_model.sql(dialect=self.dialect, identify=True, comments=False)
Returns the resolved name of the surrounding model.
528 @property 529 def engine_adapter(self) -> EngineAdapter: 530 engine_adapter = self.locals.get("engine_adapter") 531 if not engine_adapter: 532 raise SQLMeshError( 533 "The engine adapter is not available while models are loading." 534 " You can gate these calls by checking in Python: evaluator.runtime_stage != 'loading' or SQL: @runtime_stage <> 'loading'." 535 ) 536 return self.locals["engine_adapter"]
538 @property 539 def gateway(self) -> t.Optional[str]: 540 """Returns the gateway name.""" 541 return self.var(c.GATEWAY)
Returns the gateway name.
543 @property 544 def snapshots(self) -> t.Dict[str, Snapshot]: 545 """Returns the snapshots if available.""" 546 return self._snapshots
Returns the snapshots if available.
548 @property 549 def this_env(self) -> str: 550 """Returns the name of the current environment in before after all.""" 551 if "this_env" not in self.locals: 552 raise SQLMeshError("Environment name is only available in before_all and after_all") 553 return self.locals["this_env"]
Returns the name of the current environment in before after all.
555 @property 556 def schemas(self) -> t.List[str]: 557 """Returns the schemas of the current environment in before after all macros.""" 558 if "schemas" not in self.locals: 559 raise SQLMeshError("Schemas are only available in before_all and after_all") 560 return self.locals["schemas"]
Returns the schemas of the current environment in before after all macros.
562 @property 563 def views(self) -> t.List[str]: 564 """Returns the views of the current environment in before after all macros.""" 565 if "views" not in self.locals: 566 raise SQLMeshError("Views are only available in before_all and after_all") 567 return self.locals["views"]
Returns the views of the current environment in before after all macros.
569 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 570 """Returns the value of the specified variable, or the default value if it doesn't exist.""" 571 return { 572 **(self.locals.get(c.SQLMESH_VARS) or {}), 573 **(self.locals.get(c.SQLMESH_VARS_METADATA) or {}), 574 }.get(var_name.lower(), default)
Returns the value of the specified variable, or the default value if it doesn't exist.
576 def blueprint_var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 577 """Returns the value of the specified blueprint variable, or the default value if it doesn't exist.""" 578 return { 579 **(self.locals.get(c.SQLMESH_BLUEPRINT_VARS) or {}), 580 **(self.locals.get(c.SQLMESH_BLUEPRINT_VARS_METADATA) or {}), 581 }.get(var_name.lower(), default)
Returns the value of the specified blueprint variable, or the default value if it doesn't exist.
597class macro(registry_decorator): 598 """Specifies a function is a macro and registers it the global MACROS registry. 599 600 Registered macros can be referenced in SQL statements to make queries more dynamic/cleaner. 601 602 Example: 603 from sqlglot import exp 604 from sqlmesh.core.macros import MacroEvaluator, macro 605 606 @macro() 607 def add_one(evaluator: MacroEvaluator, column: exp.Literal) -> exp.Add: 608 return evaluator.parse_one(f"{column} + 1") 609 610 Args: 611 name: A custom name for the macro, the default is the name of the function. 612 """ 613 614 registry_name = "macros" 615 616 def __init__(self, *args: t.Any, metadata_only: bool = False, **kwargs: t.Any) -> None: 617 super().__init__(*args, **kwargs) 618 self.metadata_only = metadata_only 619 620 def __call__( 621 self, func: t.Callable[..., DECORATOR_RETURN_TYPE] 622 ) -> t.Callable[..., DECORATOR_RETURN_TYPE]: 623 if self.metadata_only: 624 setattr(func, c.SQLMESH_METADATA, self.metadata_only) 625 wrapper = super().__call__(func) 626 627 # This is used to identify macros at runtime to unwrap during serialization. 628 setattr(wrapper, c.SQLMESH_MACRO, True) 629 return wrapper
Specifies a function is a macro and registers it the global MACROS registry.
Registered macros can be referenced in SQL statements to make queries more dynamic/cleaner.
Example:
from sqlglot import exp from sqlmesh.core.macros import MacroEvaluator, macro
@macro() def add_one(evaluator: MacroEvaluator, column: exp.Literal) -> exp.Add: return evaluator.parse_one(f"{column} + 1")
Arguments:
- name: A custom name for the macro, the default is the name of the function.
Inherited Members
699@macro() 700def each( 701 evaluator: MacroEvaluator, 702 *args: t.Any, 703) -> t.List[t.Any]: 704 """Iterates through items calling func on each. 705 706 If a func call on item returns None, it will be excluded from the list. 707 708 Args: 709 evaluator: MacroEvaluator that invoked the macro 710 args: The last argument should be a lambda of the form x -> x +1. The first argument can be 711 an Array or var args can be used. 712 713 Returns: 714 A list of items that is the result of func 715 """ 716 *items, func = args 717 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 718 return [item for item in map(func, ensure_collection(items)) if item is not None]
Iterates through items calling func on each.
If a func call on item returns None, it will be excluded from the list.
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- args: The last argument should be a lambda of the form x -> x +1. The first argument can be an Array or var args can be used.
Returns:
A list of items that is the result of func
721@macro("IF") 722def if_( 723 evaluator: MacroEvaluator, 724 condition: t.Any, 725 true: t.Any, 726 false: t.Any = None, 727) -> t.Any: 728 """Evaluates a given condition and returns the second argument if true or else the third argument. 729 730 If false is not passed in, the default return value will be None. 731 732 Example: 733 >>> from sqlglot import parse_one 734 >>> from sqlmesh.core.macros import MacroEvaluator 735 >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a, b)")).sql() 736 'b' 737 738 >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a)")) 739 """ 740 741 if evaluator.eval_expression(condition): 742 return true 743 return false
Evaluates a given condition and returns the second argument if true or else the third argument.
If false is not passed in, the default return value will be None.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a, b)")).sql() 'b'>>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a)"))
746@macro("REDUCE") 747def reduce_(evaluator: MacroEvaluator, *args: t.Any) -> t.Any: 748 """Iterates through items applying provided function that takes two arguments 749 cumulatively to the items of iterable items, from left to right, so as to reduce 750 the iterable to a single item. 751 752 Example: 753 >>> from sqlglot import parse_one 754 >>> from sqlmesh.core.macros import MacroEvaluator 755 >>> sql = "@SQL(@REDUCE([100, 200, 300, 400], (x, y) -> x + y))" 756 >>> MacroEvaluator().transform(parse_one(sql)).sql() 757 '1000' 758 759 Args: 760 evaluator: MacroEvaluator that invoked the macro 761 args: The last argument should be a lambda of the form (x, y) -> x + y. The first argument can be 762 an Array or var args can be used. 763 Returns: 764 A single item that is the result of applying func cumulatively to items 765 """ 766 *items, func = args 767 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 768 return reduce(lambda a, b: func(exp.Tuple(expressions=[a, b])), ensure_collection(items))
Iterates through items applying provided function that takes two arguments cumulatively to the items of iterable items, from left to right, so as to reduce the iterable to a single item.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@SQL(@REDUCE([100, 200, 300, 400], (x, y) -> x + y))" >>> MacroEvaluator().transform(parse_one(sql)).sql() '1000'
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- args: The last argument should be a lambda of the form (x, y) -> x + y. The first argument can be an Array or var args can be used.
Returns:
A single item that is the result of applying func cumulatively to items
771@macro("FILTER") 772def filter_(evaluator: MacroEvaluator, *args: t.Any) -> t.List[t.Any]: 773 """Iterates through items, applying provided function to each item and removing 774 all items where the function returns False 775 776 Example: 777 >>> from sqlglot import parse_one 778 >>> from sqlmesh.core.macros import MacroEvaluator 779 >>> sql = "@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y)" 780 >>> MacroEvaluator().transform(parse_one(sql)).sql() 781 '2 + 3' 782 783 >>> sql = "@EVAL(@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y))" 784 >>> MacroEvaluator().transform(parse_one(sql)).sql() 785 '5' 786 787 Args: 788 evaluator: MacroEvaluator that invoked the macro 789 args: The last argument should be a lambda of the form x -> x > 1. The first argument can be 790 an Array or var args can be used. 791 Returns: 792 The items for which the func returned True 793 """ 794 *items, func = args 795 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 796 return list(filter(lambda arg: evaluator.eval_expression(func(arg)), items))
Iterates through items, applying provided function to each item and removing all items where the function returns False
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y)" >>> MacroEvaluator().transform(parse_one(sql)).sql() '2 + 3'>>> sql = "@EVAL(@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y))" >>> MacroEvaluator().transform(parse_one(sql)).sql() '5'
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- args: The last argument should be a lambda of the form x -> x > 1. The first argument can be an Array or var args can be used.
Returns:
The items for which the func returned True
799def _optional_expression( 800 evaluator: MacroEvaluator, 801 condition: exp.Condition, 802 expression: exp.Expression, 803) -> t.Optional[exp.Expression]: 804 """Inserts expression when the condition is True 805 806 The following examples express the usage of this function in the context of the macros which wrap it. 807 808 Examples: 809 >>> from sqlglot import parse_one 810 >>> from sqlmesh.core.macros import MacroEvaluator 811 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 812 >>> MacroEvaluator().transform(parse_one(sql)).sql() 813 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 814 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 815 >>> MacroEvaluator().transform(parse_one(sql)).sql() 816 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 817 >>> sql = "select * from city @GROUP_BY(True) country, population" 818 >>> MacroEvaluator().transform(parse_one(sql)).sql() 819 'SELECT * FROM city GROUP BY country, population' 820 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 821 >>> MacroEvaluator().transform(parse_one(sql)).sql() 822 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 823 824 Args: 825 evaluator: MacroEvaluator that invoked the macro 826 condition: Condition expression 827 expression: SQL expression 828 Returns: 829 Expression if the conditional is True; otherwise None 830 """ 831 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
799def _optional_expression( 800 evaluator: MacroEvaluator, 801 condition: exp.Condition, 802 expression: exp.Expression, 803) -> t.Optional[exp.Expression]: 804 """Inserts expression when the condition is True 805 806 The following examples express the usage of this function in the context of the macros which wrap it. 807 808 Examples: 809 >>> from sqlglot import parse_one 810 >>> from sqlmesh.core.macros import MacroEvaluator 811 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 812 >>> MacroEvaluator().transform(parse_one(sql)).sql() 813 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 814 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 815 >>> MacroEvaluator().transform(parse_one(sql)).sql() 816 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 817 >>> sql = "select * from city @GROUP_BY(True) country, population" 818 >>> MacroEvaluator().transform(parse_one(sql)).sql() 819 'SELECT * FROM city GROUP BY country, population' 820 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 821 >>> MacroEvaluator().transform(parse_one(sql)).sql() 822 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 823 824 Args: 825 evaluator: MacroEvaluator that invoked the macro 826 condition: Condition expression 827 expression: SQL expression 828 Returns: 829 Expression if the conditional is True; otherwise None 830 """ 831 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
799def _optional_expression( 800 evaluator: MacroEvaluator, 801 condition: exp.Condition, 802 expression: exp.Expression, 803) -> t.Optional[exp.Expression]: 804 """Inserts expression when the condition is True 805 806 The following examples express the usage of this function in the context of the macros which wrap it. 807 808 Examples: 809 >>> from sqlglot import parse_one 810 >>> from sqlmesh.core.macros import MacroEvaluator 811 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 812 >>> MacroEvaluator().transform(parse_one(sql)).sql() 813 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 814 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 815 >>> MacroEvaluator().transform(parse_one(sql)).sql() 816 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 817 >>> sql = "select * from city @GROUP_BY(True) country, population" 818 >>> MacroEvaluator().transform(parse_one(sql)).sql() 819 'SELECT * FROM city GROUP BY country, population' 820 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 821 >>> MacroEvaluator().transform(parse_one(sql)).sql() 822 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 823 824 Args: 825 evaluator: MacroEvaluator that invoked the macro 826 condition: Condition expression 827 expression: SQL expression 828 Returns: 829 Expression if the conditional is True; otherwise None 830 """ 831 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
799def _optional_expression( 800 evaluator: MacroEvaluator, 801 condition: exp.Condition, 802 expression: exp.Expression, 803) -> t.Optional[exp.Expression]: 804 """Inserts expression when the condition is True 805 806 The following examples express the usage of this function in the context of the macros which wrap it. 807 808 Examples: 809 >>> from sqlglot import parse_one 810 >>> from sqlmesh.core.macros import MacroEvaluator 811 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 812 >>> MacroEvaluator().transform(parse_one(sql)).sql() 813 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 814 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 815 >>> MacroEvaluator().transform(parse_one(sql)).sql() 816 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 817 >>> sql = "select * from city @GROUP_BY(True) country, population" 818 >>> MacroEvaluator().transform(parse_one(sql)).sql() 819 'SELECT * FROM city GROUP BY country, population' 820 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 821 >>> MacroEvaluator().transform(parse_one(sql)).sql() 822 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 823 824 Args: 825 evaluator: MacroEvaluator that invoked the macro 826 condition: Condition expression 827 expression: SQL expression 828 Returns: 829 Expression if the conditional is True; otherwise None 830 """ 831 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
799def _optional_expression( 800 evaluator: MacroEvaluator, 801 condition: exp.Condition, 802 expression: exp.Expression, 803) -> t.Optional[exp.Expression]: 804 """Inserts expression when the condition is True 805 806 The following examples express the usage of this function in the context of the macros which wrap it. 807 808 Examples: 809 >>> from sqlglot import parse_one 810 >>> from sqlmesh.core.macros import MacroEvaluator 811 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 812 >>> MacroEvaluator().transform(parse_one(sql)).sql() 813 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 814 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 815 >>> MacroEvaluator().transform(parse_one(sql)).sql() 816 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 817 >>> sql = "select * from city @GROUP_BY(True) country, population" 818 >>> MacroEvaluator().transform(parse_one(sql)).sql() 819 'SELECT * FROM city GROUP BY country, population' 820 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 821 >>> MacroEvaluator().transform(parse_one(sql)).sql() 822 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 823 824 Args: 825 evaluator: MacroEvaluator that invoked the macro 826 condition: Condition expression 827 expression: SQL expression 828 Returns: 829 Expression if the conditional is True; otherwise None 830 """ 831 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
799def _optional_expression( 800 evaluator: MacroEvaluator, 801 condition: exp.Condition, 802 expression: exp.Expression, 803) -> t.Optional[exp.Expression]: 804 """Inserts expression when the condition is True 805 806 The following examples express the usage of this function in the context of the macros which wrap it. 807 808 Examples: 809 >>> from sqlglot import parse_one 810 >>> from sqlmesh.core.macros import MacroEvaluator 811 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 812 >>> MacroEvaluator().transform(parse_one(sql)).sql() 813 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 814 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 815 >>> MacroEvaluator().transform(parse_one(sql)).sql() 816 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 817 >>> sql = "select * from city @GROUP_BY(True) country, population" 818 >>> MacroEvaluator().transform(parse_one(sql)).sql() 819 'SELECT * FROM city GROUP BY country, population' 820 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 821 >>> MacroEvaluator().transform(parse_one(sql)).sql() 822 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 823 824 Args: 825 evaluator: MacroEvaluator that invoked the macro 826 condition: Condition expression 827 expression: SQL expression 828 Returns: 829 Expression if the conditional is True; otherwise None 830 """ 831 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
799def _optional_expression( 800 evaluator: MacroEvaluator, 801 condition: exp.Condition, 802 expression: exp.Expression, 803) -> t.Optional[exp.Expression]: 804 """Inserts expression when the condition is True 805 806 The following examples express the usage of this function in the context of the macros which wrap it. 807 808 Examples: 809 >>> from sqlglot import parse_one 810 >>> from sqlmesh.core.macros import MacroEvaluator 811 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 812 >>> MacroEvaluator().transform(parse_one(sql)).sql() 813 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 814 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 815 >>> MacroEvaluator().transform(parse_one(sql)).sql() 816 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 817 >>> sql = "select * from city @GROUP_BY(True) country, population" 818 >>> MacroEvaluator().transform(parse_one(sql)).sql() 819 'SELECT * FROM city GROUP BY country, population' 820 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 821 >>> MacroEvaluator().transform(parse_one(sql)).sql() 822 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 823 824 Args: 825 evaluator: MacroEvaluator that invoked the macro 826 condition: Condition expression 827 expression: SQL expression 828 Returns: 829 Expression if the conditional is True; otherwise None 830 """ 831 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
843@macro("eval") 844def eval_(evaluator: MacroEvaluator, condition: exp.Condition) -> t.Any: 845 """Evaluate the given condition in a Python/SQL interpretor. 846 847 Example: 848 >>> from sqlglot import parse_one 849 >>> from sqlmesh.core.macros import MacroEvaluator 850 >>> sql = "@EVAL(1 + 1)" 851 >>> MacroEvaluator().transform(parse_one(sql)).sql() 852 '2' 853 """ 854 return evaluator.eval_expression(condition)
Evaluate the given condition in a Python/SQL interpretor.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@EVAL(1 + 1)" >>> MacroEvaluator().transform(parse_one(sql)).sql() '2'
858@macro() 859def star( 860 evaluator: MacroEvaluator, 861 relation: exp.Table, 862 alias: exp.Column = t.cast(exp.Column, exp.column("")), 863 exclude: t.Union[exp.Array, exp.Tuple] = exp.Tuple(expressions=[]), 864 prefix: exp.Literal = exp.Literal.string(""), 865 suffix: exp.Literal = exp.Literal.string(""), 866 quote_identifiers: exp.Boolean = exp.true(), 867 except_: t.Union[exp.Array, exp.Tuple] = exp.Tuple(expressions=[]), 868) -> t.List[exp.Alias]: 869 """Returns a list of projections for the given relation. 870 871 Args: 872 evaluator: MacroEvaluator that invoked the macro 873 relation: The relation to select star from 874 alias: The alias of the relation 875 exclude: Columns to exclude 876 prefix: A prefix to use for all selections 877 suffix: A suffix to use for all selections 878 quote_identifiers: Whether or not quote the resulting aliases, defaults to true 879 except_: Alias for exclude (TODO: deprecate this, update docs) 880 881 Returns: 882 An array of columns. 883 884 Example: 885 >>> from sqlglot import parse_one, exp 886 >>> from sqlglot.schema import MappingSchema 887 >>> from sqlmesh.core.macros import MacroEvaluator 888 >>> sql = "SELECT @STAR(foo, bar, exclude := [c], prefix := 'baz_') FROM foo AS bar" 889 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": exp.DataType.build("string"), "b": exp.DataType.build("string"), "c": exp.DataType.build("string"), "d": exp.DataType.build("int")}})).transform(parse_one(sql)).sql() 890 'SELECT CAST("bar"."a" AS TEXT) AS "baz_a", CAST("bar"."b" AS TEXT) AS "baz_b", CAST("bar"."d" AS INT) AS "baz_d" FROM foo AS bar' 891 """ 892 if alias and not isinstance(alias, (exp.Identifier, exp.Column)): 893 raise SQLMeshError(f"Invalid alias '{alias}'. Expected an identifier.") 894 if exclude and not isinstance(exclude, (exp.Array, exp.Tuple)): 895 raise SQLMeshError(f"Invalid exclude '{exclude}'. Expected an array.") 896 if except_ != exp.tuple_(): 897 from sqlmesh.core.console import get_console 898 899 get_console().log_warning( 900 "The 'except_' argument in @STAR will soon be deprecated. Use 'exclude' instead." 901 ) 902 if not isinstance(exclude, (exp.Array, exp.Tuple)): 903 raise SQLMeshError(f"Invalid exclude_ '{exclude}'. Expected an array.") 904 if prefix and not isinstance(prefix, exp.Literal): 905 raise SQLMeshError(f"Invalid prefix '{prefix}'. Expected a literal.") 906 if suffix and not isinstance(suffix, exp.Literal): 907 raise SQLMeshError(f"Invalid suffix '{suffix}'. Expected a literal.") 908 if not isinstance(quote_identifiers, exp.Boolean): 909 raise SQLMeshError(f"Invalid quote_identifiers '{quote_identifiers}'. Expected a boolean.") 910 911 excluded_names = { 912 normalize_identifiers(excluded, dialect=evaluator.dialect).name 913 for excluded in exclude.expressions or except_.expressions 914 } 915 quoted = quote_identifiers.this 916 table_identifier = normalize_identifiers( 917 alias if alias.name else relation, dialect=evaluator.dialect 918 ).name 919 920 columns_to_types = { 921 k: v for k, v in evaluator.columns_to_types(relation).items() if k not in excluded_names 922 } 923 if columns_to_types_all_known(columns_to_types): 924 return [ 925 exp.cast( 926 exp.column(column, table=table_identifier, quoted=quoted), 927 dtype, 928 dialect=evaluator.dialect, 929 ).as_(f"{prefix.this}{column}{suffix.this}", quoted=quoted) 930 for column, dtype in columns_to_types.items() 931 ] 932 return [ 933 exp.column(column, table=table_identifier, quoted=quoted).as_( 934 f"{prefix.this}{column}{suffix.this}", quoted=quoted 935 ) 936 for column, type_ in columns_to_types.items() 937 ]
Returns a list of projections for the given relation.
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- relation: The relation to select star from
- alias: The alias of the relation
- exclude: Columns to exclude
- prefix: A prefix to use for all selections
- suffix: A suffix to use for all selections
- quote_identifiers: Whether or not quote the resulting aliases, defaults to true
- except_: Alias for exclude (TODO: deprecate this, update docs)
Returns:
An array of columns.
Example:
>>> from sqlglot import parse_one, exp >>> from sqlglot.schema import MappingSchema >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @STAR(foo, bar, exclude := [c], prefix := 'baz_') FROM foo AS bar" >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": exp.DataType.build("string"), "b": exp.DataType.build("string"), "c": exp.DataType.build("string"), "d": exp.DataType.build("int")}})).transform(parse_one(sql)).sql() 'SELECT CAST("bar"."a" AS TEXT) AS "baz_a", CAST("bar"."b" AS TEXT) AS "baz_b", CAST("bar"."d" AS INT) AS "baz_d" FROM foo AS bar'
940@macro() 941def generate_surrogate_key( 942 evaluator: MacroEvaluator, 943 *fields: exp.Expression, 944 hash_function: exp.Literal = exp.Literal.string("MD5"), 945) -> exp.Func: 946 """Generates a surrogate key (string) for the given fields. 947 948 Example: 949 >>> from sqlglot import parse_one 950 >>> from sqlmesh.core.macros import MacroEvaluator 951 >>> 952 >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c) FROM foo" 953 >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql, dialect="bigquery")).sql("bigquery") 954 "SELECT TO_HEX(MD5(CONCAT(COALESCE(CAST(a AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS STRING), '_sqlmesh_surrogate_key_null_')))) FROM foo" 955 >>> 956 >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c, hash_function := 'SHA256') FROM foo" 957 >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql, dialect="bigquery")).sql("bigquery") 958 "SELECT SHA256(CONCAT(COALESCE(CAST(a AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS STRING), '_sqlmesh_surrogate_key_null_'))) FROM foo" 959 """ 960 string_fields: t.List[exp.Expression] = [] 961 for i, field in enumerate(fields): 962 if i > 0: 963 string_fields.append(exp.Literal.string("|")) 964 string_fields.append( 965 exp.func( 966 "COALESCE", 967 exp.cast(field, exp.DataType.build("text")), 968 exp.Literal.string("_sqlmesh_surrogate_key_null_"), 969 ) 970 ) 971 972 func = exp.func( 973 hash_function.name, 974 exp.func("CONCAT", *string_fields), 975 dialect=evaluator.dialect, 976 ) 977 if isinstance(func, exp.MD5Digest): 978 func = exp.MD5(this=func.this) 979 980 return func
Generates a surrogate key (string) for the given fields.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c) FROM foo" >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql, dialect="bigquery")).sql("bigquery") "SELECT TO_HEX(MD5(CONCAT(COALESCE(CAST(a AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS STRING), '_sqlmesh_surrogate_key_null_')))) FROM foo" >>> >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c, hash_function := 'SHA256') FROM foo" >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql, dialect="bigquery")).sql("bigquery") "SELECT SHA256(CONCAT(COALESCE(CAST(a AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS STRING), '_sqlmesh_surrogate_key_null_'))) FROM foo"
983@macro() 984def safe_add(_: MacroEvaluator, *fields: exp.Expression) -> exp.Case: 985 """Adds numbers together, substitutes nulls for 0s and only returns null if all fields are null. 986 987 Example: 988 >>> from sqlglot import parse_one 989 >>> from sqlmesh.core.macros import MacroEvaluator 990 >>> sql = "SELECT @SAFE_ADD(a, b) FROM foo" 991 >>> MacroEvaluator().transform(parse_one(sql)).sql() 992 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) + COALESCE(b, 0) END FROM foo' 993 """ 994 return ( 995 exp.Case() 996 .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null()) 997 .else_(reduce(lambda a, b: a + b, [exp.func("COALESCE", field, 0) for field in fields])) # type: ignore 998 )
Adds numbers together, substitutes nulls for 0s and only returns null if all fields are null.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @SAFE_ADD(a, b) FROM foo" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) + COALESCE(b, 0) END FROM foo'
1001@macro() 1002def safe_sub(_: MacroEvaluator, *fields: exp.Expression) -> exp.Case: 1003 """Subtract numbers, substitutes nulls for 0s and only returns null if all fields are null. 1004 1005 Example: 1006 >>> from sqlglot import parse_one 1007 >>> from sqlmesh.core.macros import MacroEvaluator 1008 >>> sql = "SELECT @SAFE_SUB(a, b) FROM foo" 1009 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1010 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) - COALESCE(b, 0) END FROM foo' 1011 """ 1012 return ( 1013 exp.Case() 1014 .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null()) 1015 .else_(reduce(lambda a, b: a - b, [exp.func("COALESCE", field, 0) for field in fields])) # type: ignore 1016 )
Subtract numbers, substitutes nulls for 0s and only returns null if all fields are null.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @SAFE_SUB(a, b) FROM foo" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) - COALESCE(b, 0) END FROM foo'
1019@macro() 1020def safe_div(_: MacroEvaluator, numerator: exp.Expression, denominator: exp.Expression) -> exp.Div: 1021 """Divides numbers, returns null if the denominator is 0. 1022 1023 Example: 1024 >>> from sqlglot import parse_one 1025 >>> from sqlmesh.core.macros import MacroEvaluator 1026 >>> sql = "SELECT @SAFE_DIV(a, b) FROM foo" 1027 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1028 'SELECT a / NULLIF(b, 0) FROM foo' 1029 """ 1030 return numerator / exp.func("NULLIF", denominator, 0)
Divides numbers, returns null if the denominator is 0.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @SAFE_DIV(a, b) FROM foo" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT a / NULLIF(b, 0) FROM foo'
1033@macro() 1034def union( 1035 evaluator: MacroEvaluator, 1036 *args: exp.Expression, 1037) -> exp.Query: 1038 """Returns a UNION of the given tables. Only choosing columns that have the same name and type. 1039 1040 Args: 1041 evaluator: MacroEvaluator that invoked the macro 1042 args: Variable arguments that can be: 1043 - First argument can be a condition (exp.Condition) 1044 - A union type ('ALL' or 'DISTINCT') as exp.Literal 1045 - Tables (exp.Table) 1046 1047 Example: 1048 >>> from sqlglot import parse_one 1049 >>> from sqlglot.schema import MappingSchema 1050 >>> from sqlmesh.core.macros import MacroEvaluator 1051 >>> sql = "@UNION('distinct', foo, bar)" 1052 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 1053 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar' 1054 >>> sql = "@UNION(True, 'distinct', foo, bar)" 1055 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 1056 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar' 1057 """ 1058 1059 if not args: 1060 raise SQLMeshError("At least one table is required for the @UNION macro.") 1061 1062 arg_idx = 0 1063 # Check for condition 1064 condition = evaluator.eval_expression(args[arg_idx]) 1065 if isinstance(condition, bool): 1066 arg_idx += 1 1067 if arg_idx >= len(args): 1068 raise SQLMeshError("Expected more arguments after the condition of the `@UNION` macro.") 1069 1070 # Check for union type 1071 type_ = exp.Literal.string("ALL") 1072 if isinstance(args[arg_idx], exp.Literal): 1073 type_ = args[arg_idx] # type: ignore 1074 arg_idx += 1 1075 kind = type_.name.upper() 1076 if kind not in ("ALL", "DISTINCT"): 1077 raise SQLMeshError(f"Invalid type '{type_}'. Expected 'ALL' or 'DISTINCT'.") 1078 1079 # Remaining args should be tables 1080 tables = [ 1081 exp.to_table(e.sql(evaluator.dialect), dialect=evaluator.dialect) for e in args[arg_idx:] 1082 ] 1083 1084 columns = { 1085 column 1086 for column, _ in reduce( 1087 lambda a, b: a & b, # type: ignore 1088 (evaluator.columns_to_types(table).items() for table in tables), 1089 ) 1090 } 1091 1092 projections = [ 1093 exp.cast(column, type_, dialect=evaluator.dialect).as_(column) 1094 for column, type_ in evaluator.columns_to_types(tables[0]).items() 1095 if column in columns 1096 ] 1097 1098 # Skip the union if condition is False 1099 if condition == False: 1100 return exp.select(*projections).from_(tables[0]) 1101 1102 return reduce( 1103 lambda a, b: a.union(b, distinct=kind == "DISTINCT"), # type: ignore 1104 [exp.select(*projections).from_(t) for t in tables], 1105 )
Returns a UNION of the given tables. Only choosing columns that have the same name and type.
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- args: Variable arguments that can be:
- First argument can be a condition (exp.Condition)
- A union type ('ALL' or 'DISTINCT') as exp.Literal
- Tables (exp.Table)
Example:
>>> from sqlglot import parse_one >>> from sqlglot.schema import MappingSchema >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@UNION('distinct', foo, bar)" >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar' >>> sql = "@UNION(True, 'distinct', foo, bar)" >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar'
1108@macro() 1109def haversine_distance( 1110 _: MacroEvaluator, 1111 lat1: exp.Expression, 1112 lon1: exp.Expression, 1113 lat2: exp.Expression, 1114 lon2: exp.Expression, 1115 unit: exp.Literal = exp.Literal.string("mi"), 1116) -> exp.Mul: 1117 """Returns the haversine distance between two points. 1118 1119 Example: 1120 >>> from sqlglot import parse_one 1121 >>> from sqlmesh.core.macros import MacroEvaluator 1122 >>> sql = "SELECT @HAVERSINE_DISTANCE(driver_y, driver_x, passenger_y, passenger_x, 'mi') FROM rides" 1123 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1124 'SELECT 7922 * ASIN(SQRT((POWER(SIN(RADIANS((passenger_y - driver_y) / 2)), 2)) + (COS(RADIANS(driver_y)) * COS(RADIANS(passenger_y)) * POWER(SIN(RADIANS((passenger_x - driver_x) / 2)), 2)))) * 1.0 FROM rides' 1125 """ 1126 if unit.this == "mi": 1127 conversion_rate = 1.0 1128 elif unit.this == "km": 1129 conversion_rate = 1.60934 1130 else: 1131 raise SQLMeshError(f"Invalid unit '{unit}'. Expected 'mi' or 'km'.") 1132 1133 return ( 1134 2 1135 * 3961 1136 * exp.func( 1137 "ASIN", 1138 exp.func( 1139 "SQRT", 1140 exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lat2 - lat1) / 2)), 2) 1141 + exp.func("COS", exp.func("RADIANS", lat1)) 1142 * exp.func("COS", exp.func("RADIANS", lat2)) 1143 * exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lon2 - lon1) / 2)), 2), 1144 ), 1145 ) 1146 * conversion_rate 1147 )
Returns the haversine distance between two points.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @HAVERSINE_DISTANCE(driver_y, driver_x, passenger_y, passenger_x, 'mi') FROM rides" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT 7922 * ASIN(SQRT((POWER(SIN(RADIANS((passenger_y - driver_y) / 2)), 2)) + (COS(RADIANS(driver_y)) * COS(RADIANS(passenger_y)) * POWER(SIN(RADIANS((passenger_x - driver_x) / 2)), 2)))) * 1.0 FROM rides'
1150@macro() 1151def pivot( 1152 evaluator: MacroEvaluator, 1153 column: SQL, 1154 values: t.List[exp.Expression], 1155 alias: bool = True, 1156 agg: exp.Expression = exp.Literal.string("SUM"), 1157 cmp: exp.Expression = exp.Literal.string("="), 1158 prefix: exp.Expression = exp.Literal.string(""), 1159 suffix: exp.Expression = exp.Literal.string(""), 1160 then_value: SQL = SQL("1"), 1161 else_value: SQL = SQL("0"), 1162 quote: bool = True, 1163 distinct: bool = False, 1164) -> t.List[exp.Expression]: 1165 """Returns a list of projections as a result of pivoting the given column on the given values. 1166 1167 Example: 1168 >>> from sqlglot import parse_one 1169 >>> from sqlmesh.core.macros import MacroEvaluator 1170 >>> sql = "SELECT date_day, @PIVOT(status, ['cancelled', 'completed']) FROM rides GROUP BY 1" 1171 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1172 'SELECT date_day, SUM(CASE WHEN status = \\'cancelled\\' THEN 1 ELSE 0 END) AS "cancelled", SUM(CASE WHEN status = \\'completed\\' THEN 1 ELSE 0 END) AS "completed" FROM rides GROUP BY 1' 1173 >>> sql = "SELECT @PIVOT(a, ['v'], then_value := tv, suffix := '_sfx', quote := FALSE)" 1174 >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql)).sql("bigquery") 1175 "SELECT SUM(CASE WHEN a = 'v' THEN tv ELSE 0 END) AS v_sfx" 1176 """ 1177 aggregates: t.List[exp.Expression] = [] 1178 for value in values: 1179 proj = f"{agg.name}(" 1180 if distinct: 1181 proj += "DISTINCT " 1182 1183 proj += f"CASE WHEN {column} {cmp.name} {value.sql(evaluator.dialect)} THEN {then_value} ELSE {else_value} END) " 1184 node = evaluator.parse_one(proj) 1185 1186 if alias: 1187 node = node.as_( 1188 f"{prefix.name}{value.name}{suffix.name}", 1189 quoted=quote, 1190 copy=False, 1191 dialect=evaluator.dialect, 1192 ) 1193 1194 aggregates.append(node) 1195 1196 return aggregates
Returns a list of projections as a result of pivoting the given column on the given values.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT date_day, @PIVOT(status, ['cancelled', 'completed']) FROM rides GROUP BY 1" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT date_day, SUM(CASE WHEN status = \'cancelled\' THEN 1 ELSE 0 END) AS "cancelled", SUM(CASE WHEN status = \'completed\' THEN 1 ELSE 0 END) AS "completed" FROM rides GROUP BY 1' >>> sql = "SELECT @PIVOT(a, ['v'], then_value := tv, suffix := '_sfx', quote := FALSE)" >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql)).sql("bigquery") "SELECT SUM(CASE WHEN a = 'v' THEN tv ELSE 0 END) AS v_sfx"
1199@macro("AND") 1200def and_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expression]) -> exp.Condition: 1201 """Returns an AND statement filtering out any NULL expressions.""" 1202 conditions = [e for e in expressions if not isinstance(e, exp.Null)] 1203 1204 if not conditions: 1205 return exp.true() 1206 1207 return exp.and_(*conditions, dialect=evaluator.dialect)
Returns an AND statement filtering out any NULL expressions.
1210@macro("OR") 1211def or_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expression]) -> exp.Condition: 1212 """Returns an OR statement filtering out any NULL expressions.""" 1213 conditions = [e for e in expressions if not isinstance(e, exp.Null)] 1214 1215 if not conditions: 1216 return exp.true() 1217 1218 return exp.or_(*conditions, dialect=evaluator.dialect)
Returns an OR statement filtering out any NULL expressions.
1221@macro("VAR") 1222def var( 1223 evaluator: MacroEvaluator, var_name: exp.Expression, default: t.Optional[exp.Expression] = None 1224) -> exp.Expression: 1225 """Returns the value of a variable or the default value if the variable is not set.""" 1226 if not var_name.is_string: 1227 raise SQLMeshError(f"Invalid variable name '{var_name.sql()}'. Expected a string literal.") 1228 1229 return exp.convert(evaluator.var(var_name.this, default))
Returns the value of a variable or the default value if the variable is not set.
1232@macro("BLUEPRINT_VAR") 1233def blueprint_var( 1234 evaluator: MacroEvaluator, var_name: exp.Expression, default: t.Optional[exp.Expression] = None 1235) -> exp.Expression: 1236 """Returns the value of a blueprint variable or the default value if the variable is not set.""" 1237 if not var_name.is_string: 1238 raise SQLMeshError( 1239 f"Invalid blueprint variable name '{var_name.sql()}'. Expected a string literal." 1240 ) 1241 1242 return exp.convert(evaluator.blueprint_var(var_name.this, default))
Returns the value of a blueprint variable or the default value if the variable is not set.
1245@macro() 1246def deduplicate( 1247 evaluator: MacroEvaluator, 1248 relation: exp.Expression, 1249 partition_by: t.List[exp.Expression], 1250 order_by: t.List[str], 1251) -> exp.Query: 1252 """Returns a QUERY to deduplicate rows within a table 1253 1254 Args: 1255 relation: table or CTE name to deduplicate 1256 partition_by: column names, or expressions to use to identify a window of rows out of which to select one as the deduplicated row 1257 order_by: A list of strings representing the ORDER BY clause 1258 1259 Example: 1260 >>> from sqlglot import parse_one 1261 >>> from sqlglot.schema import MappingSchema 1262 >>> from sqlmesh.core.macros import MacroEvaluator 1263 >>> sql = "@deduplicate(demo.table, [user_id, cast(timestamp as date)], ['timestamp desc', 'status asc'])" 1264 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1265 'SELECT * FROM demo.table QUALIFY ROW_NUMBER() OVER (PARTITION BY user_id, CAST(timestamp AS DATE) ORDER BY timestamp DESC, status ASC) = 1' 1266 """ 1267 if not isinstance(partition_by, list): 1268 raise SQLMeshError( 1269 "partition_by must be a list of columns: [<column>, cast(<column> as <type>)]" 1270 ) 1271 1272 if not isinstance(order_by, list): 1273 raise SQLMeshError( 1274 "order_by must be a list of strings, optional - nulls ordering: ['<column> <asc|desc> nulls <first|last>']" 1275 ) 1276 1277 partition_clause = exp.tuple_(*partition_by) 1278 1279 order_expressions = [ 1280 evaluator.transform(parse_one(order_item, into=exp.Ordered, dialect=evaluator.dialect)) 1281 for order_item in order_by 1282 ] 1283 1284 if not order_expressions: 1285 raise SQLMeshError( 1286 "order_by must be a list of strings, optional - nulls ordering: ['<column> <asc|desc> nulls <first|last>']" 1287 ) 1288 1289 order_clause = exp.Order(expressions=order_expressions) 1290 1291 window_function = exp.Window( 1292 this=exp.RowNumber(), partition_by=partition_clause, order=order_clause 1293 ) 1294 1295 first_unique_row = window_function.eq(1) 1296 1297 query = exp.select("*").from_(relation).qualify(first_unique_row) 1298 1299 return query
Returns a QUERY to deduplicate rows within a table
Arguments:
- relation: table or CTE name to deduplicate
- partition_by: column names, or expressions to use to identify a window of rows out of which to select one as the deduplicated row
- order_by: A list of strings representing the ORDER BY clause
Example:
>>> from sqlglot import parse_one >>> from sqlglot.schema import MappingSchema >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@deduplicate(demo.table, [user_id, cast(timestamp as date)], ['timestamp desc', 'status asc'])" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM demo.table QUALIFY ROW_NUMBER() OVER (PARTITION BY user_id, CAST(timestamp AS DATE) ORDER BY timestamp DESC, status ASC) = 1'
1302@macro() 1303def date_spine( 1304 evaluator: MacroEvaluator, 1305 datepart: exp.Expression, 1306 start_date: exp.Expression, 1307 end_date: exp.Expression, 1308) -> exp.Select: 1309 """Returns a query that produces a date spine with the given datepart, and range of start_date and end_date. Useful for joining as a date lookup table. 1310 1311 Args: 1312 datepart: The datepart to use for the date spine - day, week, month, quarter, year 1313 start_date: The start date for the date spine in format YYYY-MM-DD 1314 end_date: The end date for the date spine in format YYYY-MM-DD 1315 1316 Example: 1317 >>> from sqlglot import parse_one 1318 >>> from sqlglot.schema import MappingSchema 1319 >>> from sqlmesh.core.macros import MacroEvaluator 1320 >>> sql = "@date_spine('week', '2022-01-20', '2024-12-16')" 1321 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1322 "SELECT date_week FROM UNNEST(GENERATE_DATE_ARRAY(CAST(\'2022-01-20\' AS DATE), CAST(\'2024-12-16\' AS DATE), INTERVAL \'1\' WEEK)) AS _exploded(date_week)" 1323 """ 1324 datepart_name = datepart.name.lower() 1325 if datepart_name not in ("day", "week", "month", "quarter", "year"): 1326 raise SQLMeshError( 1327 f"Invalid datepart '{datepart_name}'. Expected: 'day', 'week', 'month', 'quarter', or 'year'" 1328 ) 1329 1330 start_date_name = start_date.name 1331 end_date_name = end_date.name 1332 1333 try: 1334 if start_date.is_string and end_date.is_string: 1335 start_date_obj = datetime.strptime(start_date_name, "%Y-%m-%d").date() 1336 end_date_obj = datetime.strptime(end_date_name, "%Y-%m-%d").date() 1337 else: 1338 start_date_obj = None 1339 end_date_obj = None 1340 except Exception as e: 1341 raise SQLMeshError( 1342 f"Invalid date format - start_date and end_date must be in format: YYYY-MM-DD. Error: {e}" 1343 ) 1344 1345 if start_date_obj and end_date_obj: 1346 if start_date_obj > end_date_obj: 1347 raise SQLMeshError( 1348 f"Invalid date range - start_date '{start_date_name}' is after end_date '{end_date_name}'." 1349 ) 1350 1351 start_date = exp.cast(start_date, "DATE") 1352 end_date = exp.cast(end_date, "DATE") 1353 1354 if datepart_name == "quarter" and evaluator.dialect in ( 1355 "spark", 1356 "spark2", 1357 "databricks", 1358 "postgres", 1359 ): 1360 date_interval = exp.Interval(this=exp.Literal.number(3), unit=exp.var("month")) 1361 else: 1362 date_interval = exp.Interval(this=exp.Literal.number(1), unit=exp.var(datepart_name)) 1363 1364 generate_date_array = exp.func( 1365 "GENERATE_DATE_ARRAY", 1366 start_date, 1367 end_date, 1368 date_interval, 1369 ) 1370 1371 alias_name = f"date_{datepart_name}" 1372 exploded = exp.alias_(exp.func("unnest", generate_date_array), "_exploded", table=[alias_name]) 1373 1374 return exp.select(alias_name).from_(exploded)
Returns a query that produces a date spine with the given datepart, and range of start_date and end_date. Useful for joining as a date lookup table.
Arguments:
- datepart: The datepart to use for the date spine - day, week, month, quarter, year
- start_date: The start date for the date spine in format YYYY-MM-DD
- end_date: The end date for the date spine in format YYYY-MM-DD
Example:
>>> from sqlglot import parse_one >>> from sqlglot.schema import MappingSchema >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@date_spine('week', '2022-01-20', '2024-12-16')" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT date_week FROM UNNEST(GENERATE_DATE_ARRAY(CAST('2022-01-20' AS DATE), CAST('2024-12-16' AS DATE), INTERVAL '1' WEEK)) AS _exploded(date_week)"
1377@macro() 1378def resolve_template( 1379 evaluator: MacroEvaluator, 1380 template: exp.Literal, 1381 mode: str = "literal", 1382) -> t.Union[exp.Literal, exp.Table]: 1383 """ 1384 Generates either a String literal or an exp.Table representing a physical table location, based on rendering the provided template String literal. 1385 1386 Note: It relies on the @this_model variable being available in the evaluation context (@this_model resolves to an exp.Table object 1387 representing the current physical table). 1388 Therefore, the @resolve_template macro must be used at creation or evaluation time and not at load time. 1389 1390 Args: 1391 template: Template string literal. Can contain the following placeholders: 1392 @{catalog_name} -> replaced with the catalog of the exp.Table returned from @this_model 1393 @{schema_name} -> replaced with the schema of the exp.Table returned from @this_model 1394 @{table_name} -> replaced with the name of the exp.Table returned from @this_model 1395 mode: What to return. 1396 'literal' -> return an exp.Literal string 1397 'table' -> return an exp.Table 1398 1399 Example: 1400 >>> from sqlglot import parse_one, exp 1401 >>> from sqlmesh.core.macros import MacroEvaluator, RuntimeStage 1402 >>> sql = "@resolve_template('s3://data-bucket/prod/@{catalog_name}/@{schema_name}/@{table_name}')" 1403 >>> evaluator = MacroEvaluator(runtime_stage=RuntimeStage.CREATING) 1404 >>> evaluator.locals.update({"this_model": exp.to_table("test_catalog.sqlmesh__test.test__test_model__2517971505")}) 1405 >>> evaluator.transform(parse_one(sql)).sql() 1406 "'s3://data-bucket/prod/test_catalog/sqlmesh__test/test__test_model__2517971505'" 1407 """ 1408 if "this_model" in evaluator.locals: 1409 this_model = exp.to_table(evaluator.locals["this_model"], dialect=evaluator.dialect) 1410 template_str: str = template.this 1411 result = ( 1412 template_str.replace("@{catalog_name}", this_model.catalog) 1413 .replace("@{schema_name}", this_model.db) 1414 .replace("@{table_name}", this_model.name) 1415 ) 1416 1417 if mode.lower() == "table": 1418 return exp.to_table(result, dialect=evaluator.dialect) 1419 return exp.Literal.string(result) 1420 if evaluator.runtime_stage != RuntimeStage.LOADING.value: 1421 # only error if we are CREATING, EVALUATING or TESTING and @this_model is not present; this could indicate a bug 1422 # otherwise, for LOADING, it's a no-op 1423 raise SQLMeshError( 1424 "@this_model must be present in the macro evaluation context in order to use @resolve_template" 1425 ) 1426 1427 return template
Generates either a String literal or an exp.Table representing a physical table location, based on rendering the provided template String literal.
Note: It relies on the @this_model variable being available in the evaluation context (@this_model resolves to an exp.Table object representing the current physical table). Therefore, the @resolve_template macro must be used at creation or evaluation time and not at load time.
Arguments:
- template: Template string literal. Can contain the following placeholders: @{catalog_name} -> replaced with the catalog of the exp.Table returned from @this_model @{schema_name} -> replaced with the schema of the exp.Table returned from @this_model @{table_name} -> replaced with the name of the exp.Table returned from @this_model
- mode: What to return. 'literal' -> return an exp.Literal string 'table' -> return an exp.Table
Example:
>>> from sqlglot import parse_one, exp >>> from sqlmesh.core.macros import MacroEvaluator, RuntimeStage >>> sql = "@resolve_template('s3://data-bucket/prod/@{catalog_name}/@{schema_name}/@{table_name}')" >>> evaluator = MacroEvaluator(runtime_stage=RuntimeStage.CREATING) >>> evaluator.locals.update({"this_model": exp.to_table("test_catalog.sqlmesh__test.test__test_model__2517971505")}) >>> evaluator.transform(parse_one(sql)).sql() "'s3://data-bucket/prod/test_catalog/sqlmesh__test/test__test_model__2517971505'"
1430def normalize_macro_name(name: str) -> str: 1431 """Prefix macro name with @ and upcase""" 1432 return f"@{name.upper()}"
Prefix macro name with @ and upcase
1439def call_macro( 1440 func: t.Callable, 1441 dialect: DialectType, 1442 path: t.Optional[Path], 1443 provided_args: t.Tuple[t.Any, ...], 1444 provided_kwargs: t.Dict[str, t.Any], 1445 **optional_kwargs: t.Any, 1446) -> t.Any: 1447 # Bind the macro's actual parameters to its formal parameters 1448 sig = inspect.signature(func) 1449 1450 if optional_kwargs: 1451 provided_kwargs = provided_kwargs.copy() 1452 1453 for k, v in optional_kwargs.items(): 1454 if k in sig.parameters: 1455 provided_kwargs[k] = v 1456 1457 bound = sig.bind(*provided_args, **provided_kwargs) 1458 bound.apply_defaults() 1459 1460 try: 1461 annotations = t.get_type_hints(func, localns=get_supported_types()) 1462 except (NameError, TypeError): # forward references aren't handled 1463 annotations = {} 1464 1465 # If the macro is annotated, we try coerce the actual parameters to the corresponding types 1466 if annotations: 1467 for arg, value in bound.arguments.items(): 1468 typ = annotations.get(arg) 1469 if not typ: 1470 continue 1471 1472 # Changes to bound.arguments will reflect in bound.args and bound.kwargs 1473 # https://docs.python.org/3/library/inspect.html#inspect.BoundArguments.arguments 1474 param = sig.parameters[arg] 1475 if param.kind is inspect.Parameter.VAR_POSITIONAL: 1476 bound.arguments[arg] = tuple(_coerce(v, typ, dialect, path) for v in value) 1477 elif param.kind is inspect.Parameter.VAR_KEYWORD: 1478 bound.arguments[arg] = {k: _coerce(v, typ, dialect, path) for k, v in value.items()} 1479 else: 1480 bound.arguments[arg] = _coerce(value, typ, dialect, path) 1481 1482 return func(*bound.args, **bound.kwargs)