sqlmesh.core.macros
1from __future__ import annotations 2 3import inspect 4import sys 5import types 6import typing as t 7from enum import Enum 8from functools import lru_cache, reduce 9from itertools import chain 10from pathlib import Path 11from string import Template 12from datetime import datetime, date 13 14import sqlglot 15from sqlglot import Generator, exp, parse_one 16from sqlglot.executor.env import ENV 17from sqlglot.executor.python import Python 18from sqlglot.helper import csv, ensure_collection 19from sqlglot.optimizer.normalize_identifiers import normalize_identifiers 20from sqlglot.schema import MappingSchema 21 22from sqlmesh.core import constants as c 23from sqlmesh.core.dialect import ( 24 SQLMESH_MACRO_PREFIX, 25 Dialect, 26 MacroDef, 27 MacroFunc, 28 MacroSQL, 29 MacroStrReplace, 30 MacroVar, 31 StagedFilePath, 32 normalize_model_name, 33) 34from sqlmesh.utils import ( 35 DECORATOR_RETURN_TYPE, 36 UniqueKeyDict, 37 columns_to_types_all_known, 38 registry_decorator, 39) 40from sqlmesh.utils.date import DatetimeRanges, to_datetime, to_date 41from sqlmesh.utils.errors import MacroEvalError, SQLMeshError 42from sqlmesh.utils.metaprogramming import ( 43 Executable, 44 SqlValue, 45 format_evaluated_code_exception, 46 prepare_env, 47) 48 49if t.TYPE_CHECKING: 50 from sqlglot.dialects.dialect import DialectType 51 from sqlmesh.core._typing import TableName 52 from sqlmesh.core.engine_adapter import EngineAdapter 53 from sqlmesh.core.snapshot import Snapshot 54 from sqlmesh.core.environment import EnvironmentNamingInfo 55 56 57if sys.version_info >= (3, 10): 58 UNION_TYPES = (t.Union, types.UnionType) 59else: 60 UNION_TYPES = (t.Union,) 61 62 63class RuntimeStage(Enum): 64 LOADING = "loading" 65 CREATING = "creating" 66 EVALUATING = "evaluating" 67 PROMOTING = "promoting" 68 DEMOTING = "demoting" 69 AUDITING = "auditing" 70 TESTING = "testing" 71 BEFORE_ALL = "before_all" 72 AFTER_ALL = "after_all" 73 74 75class MacroStrTemplate(Template): 76 delimiter = SQLMESH_MACRO_PREFIX 77 78 79EXPRESSIONS_NAME_MAP = {} 80SQL = t.NewType("SQL", str) 81 82 83@lru_cache() 84def get_supported_types() -> t.Dict[str, t.Any]: 85 from sqlmesh.core.context import ExecutionContext 86 87 return { 88 "t": t, 89 "typing": t, 90 "List": t.List, 91 "Tuple": t.Tuple, 92 "Union": t.Union, 93 "DatetimeRanges": DatetimeRanges, 94 "exp": exp, 95 "SQL": SQL, 96 "MacroEvaluator": MacroEvaluator, 97 "ExecutionContext": ExecutionContext, 98 } 99 100 101for klass in sqlglot.Parser.EXPRESSION_PARSERS: 102 name = klass if isinstance(klass, str) else klass.__name__ # type: ignore 103 EXPRESSIONS_NAME_MAP[name.lower()] = name 104 105 106def _macro_sql(sql: str, into: t.Optional[str] = None) -> str: 107 args = [_macro_str_replace(sql)] 108 if into in EXPRESSIONS_NAME_MAP: 109 args.append(f"into=exp.{EXPRESSIONS_NAME_MAP[into]}") 110 return f"self.parse_one({', '.join(args)})" 111 112 113def _macro_func_sql(self: Generator, e: exp.Expr) -> str: 114 func = e.this 115 116 if isinstance(func, exp.Anonymous): 117 return f"""self.send({csv("'" + func.name + "'", self.expressions(func))})""" 118 return self.sql(func) 119 120 121def _macro_str_replace(text: str) -> str: 122 """Stringifies python code for variable replacement 123 Args: 124 text: text string 125 Returns: 126 Stringified python code to execute variable replacement 127 """ 128 return f"self.template({text}, locals())" 129 130 131class CaseInsensitiveMapping(t.Dict[str, t.Any]): 132 def __init__(self, data: t.Dict[str, t.Any]) -> None: 133 super().__init__(data) 134 135 def __getitem__(self, key: str) -> t.Any: 136 return super().__getitem__(key.lower()) 137 138 def get(self, key: str, default: t.Any = None, /) -> t.Any: 139 return super().get(key.lower(), default) 140 141 142class MacroDialect(Python): 143 class Generator(Python.Generator): 144 TRANSFORMS = { 145 **Python.Generator.TRANSFORMS, # type: ignore 146 exp.Column: lambda self, e: f"exp.to_column('{self.sql(e, 'this')}')", 147 exp.Lambda: lambda self, e: f"lambda {self.expressions(e)}: {self.sql(e, 'this')}", 148 MacroFunc: _macro_func_sql, 149 MacroSQL: lambda self, e: _macro_sql(self.sql(e, "this"), e.args.get("into")), 150 MacroStrReplace: lambda self, e: _macro_str_replace(self.sql(e, "this")), 151 } 152 153 154class MacroEvaluator: 155 """The class responsible for evaluating SQLMesh Macros/SQL. 156 157 SQLMesh supports special preprocessed SQL prefixed with `@`. Although it provides similar power to 158 traditional methods like string templating, there is semantic understanding of SQL which prevents 159 common errors like leading/trailing commas, syntax errors, etc. 160 161 SQLMesh SQL allows for macro variables and macro functions. Macro variables take the form of @variable. These are used for variable substitution. 162 163 SELECT * FROM foo WHERE ds BETWEEN @start_date AND @end_date 164 165 Macro variables can be defined with a special macro function. 166 167 @DEF(start_date, '2021-01-01') 168 169 Args: 170 dialect: Dialect of the SQL to evaluate. 171 python_env: Serialized Python environment. 172 """ 173 174 def __init__( 175 self, 176 dialect: DialectType = "", 177 python_env: t.Optional[t.Dict[str, Executable]] = None, 178 schema: t.Optional[MappingSchema] = None, 179 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 180 resolve_table: t.Optional[t.Callable[[str | exp.Table], str]] = None, 181 resolve_tables: t.Optional[t.Callable[[exp.Expr], exp.Expr]] = None, 182 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 183 default_catalog: t.Optional[str] = None, 184 path: t.Optional[Path] = None, 185 environment_naming_info: t.Optional[EnvironmentNamingInfo] = None, 186 model_fqn: t.Optional[str] = None, 187 ): 188 self.dialect = dialect 189 self.generator = MacroDialect().generator() 190 self.locals: t.Dict[str, t.Any] = { 191 "runtime_stage": runtime_stage.value, 192 "default_catalog": default_catalog, 193 } 194 self.env = { 195 **ENV, 196 "self": self, 197 "SQL": SQL, 198 "MacroEvaluator": MacroEvaluator, 199 } 200 self.python_env = python_env or {} 201 self.macros = {normalize_macro_name(k): v.func for k, v in macro.get_registry().items()} 202 self.columns_to_types_called = False 203 self.default_catalog = default_catalog 204 205 self._schema = schema 206 self._resolve_table = resolve_table 207 self._resolve_tables = resolve_tables 208 self._snapshots = snapshots if snapshots is not None else {} 209 self._path = path 210 self._environment_naming_info = environment_naming_info 211 self._model_fqn = model_fqn 212 213 prepare_env(self.python_env, self.env) 214 for k, v in self.python_env.items(): 215 if v.is_definition: 216 self.macros[normalize_macro_name(k)] = self.env[v.name or k] 217 elif v.is_import and getattr(self.env.get(k), c.SQLMESH_MACRO, None): 218 self.macros[normalize_macro_name(k)] = self.env[k] 219 elif v.is_value: 220 value = self.env[k] 221 if k in ( 222 c.SQLMESH_VARS, 223 c.SQLMESH_VARS_METADATA, 224 c.SQLMESH_BLUEPRINT_VARS, 225 c.SQLMESH_BLUEPRINT_VARS_METADATA, 226 ): 227 value = { 228 var_name: ( 229 self.parse_one(var_value.sql) 230 if isinstance(var_value, SqlValue) 231 else var_value 232 ) 233 for var_name, var_value in value.items() 234 } 235 236 self.locals[k] = value 237 238 def send( 239 self, name: str, *args: t.Any, **kwargs: t.Any 240 ) -> t.Union[None, exp.Expr, t.List[exp.Expr]]: 241 func = self.macros.get(normalize_macro_name(name)) 242 243 if not callable(func): 244 raise MacroEvalError(f"Macro '{name}' does not exist.") 245 246 try: 247 return call_macro( 248 func, self.dialect, self._path, provided_args=(self, *args), provided_kwargs=kwargs 249 ) # type: ignore 250 except Exception as e: 251 raise MacroEvalError( 252 f"An error occurred during evaluation of '{name}'\n\n" 253 + format_evaluated_code_exception(e, self.python_env) 254 ) 255 256 def transform(self, expression: exp.Expr) -> exp.Expr | t.List[exp.Expr] | None: 257 changed = False 258 259 def evaluate_macros( 260 node: exp.Expr, 261 ) -> exp.Expr | t.List[exp.Expr] | None: 262 nonlocal changed 263 264 if isinstance(node, MacroVar): 265 changed = True 266 variables = self.variables 267 268 # This makes all variables case-insensitive, e.g. @X is the same as @x. We do this 269 # for consistency, since `variables` and `blueprint_variables` are normalized. 270 var_name = node.name.lower() 271 272 if var_name not in self.locals and var_name not in variables: 273 if not isinstance(node.parent, StagedFilePath): 274 raise SQLMeshError(f"Macro variable '{node.name}' is undefined.") 275 276 return node 277 278 # Precedence order is locals (e.g. @DEF) > blueprint variables > config variables 279 value = self.locals.get(var_name, variables.get(var_name)) 280 if isinstance(value, list): 281 return exp.convert( 282 tuple(self.transform(v) if isinstance(v, exp.Expr) else v for v in value) 283 ) 284 285 return exp.convert(self.transform(value) if isinstance(value, exp.Expr) else value) 286 if isinstance(node, exp.Identifier) and "@" in node.this: 287 text = self.template(node.this, {}) 288 if node.this != text: 289 changed = True 290 return exp.to_identifier(text, quoted=node.quoted or None) 291 if isinstance(node, MacroFunc): 292 changed = True 293 return self.evaluate(node) 294 return node 295 296 transformed = exp.replace_tree( 297 expression.copy(), evaluate_macros, prune=lambda n: isinstance(n, exp.Lambda) 298 ) 299 300 if changed: 301 # the transformations could have corrupted the ast, turning this into sql and reparsing ensures 302 # that the ast is correct 303 if isinstance(transformed, list): 304 return [ 305 self.parse_one(node.sql(dialect=self.dialect, copy=False)) 306 for node in transformed 307 ] 308 if isinstance(transformed, exp.Expr): 309 return self.parse_one(transformed.sql(dialect=self.dialect, copy=False)) 310 311 return transformed 312 313 def template(self, text: t.Any, local_variables: t.Dict[str, t.Any]) -> str: 314 """Substitute @vars with locals. 315 316 Args: 317 text: The string to do substitition on. 318 local_variables: Local variables in the context so that lambdas can be used. 319 320 Returns: 321 The rendered string. 322 """ 323 # We try to convert all variables into sqlglot expressions because they're going to be converted 324 # into strings; in sql we don't convert strings because that would result in adding quotes 325 base_mapping = { 326 k.lower(): convert_sql(v, self.dialect) 327 for k, v in chain(self.variables.items(), self.locals.items(), local_variables.items()) 328 if k.lower() 329 not in ( 330 "engine_adapter", 331 "snapshot", 332 ) 333 } 334 return MacroStrTemplate(str(text)).safe_substitute(CaseInsensitiveMapping(base_mapping)) 335 336 def evaluate(self, node: MacroFunc) -> exp.Expr | t.List[exp.Expr] | None: 337 if isinstance(node, MacroDef): 338 if isinstance(node.expression, exp.Lambda): 339 _, fn = _norm_var_arg_lambda(self, node.expression) 340 self.macros[normalize_macro_name(node.name)] = lambda _, *args: fn( 341 args[0] if len(args) == 1 else exp.Tuple(expressions=list(args)) 342 ) 343 else: 344 # Make variables defined through `@DEF` case-insensitive 345 self.locals[node.name.lower()] = self.transform(node.expression) 346 347 return node 348 349 if isinstance(node, (MacroSQL, MacroStrReplace)): 350 result: t.Optional[exp.Expr | t.List[exp.Expr]] = exp.convert( 351 self.eval_expression(node) 352 ) 353 else: 354 func = t.cast(exp.Anonymous, node.this) 355 356 args = [] 357 kwargs = {} 358 for e in func.expressions: 359 if isinstance(e, exp.PropertyEQ): 360 kwargs[e.this.name] = e.expression 361 else: 362 if kwargs: 363 raise MacroEvalError( 364 "Positional argument cannot follow keyword argument.\n " 365 f"{func.sql(dialect=self.dialect)} at '{self._path}'" 366 ) 367 368 args.append(e) 369 370 result = self.send(func.name, *args, **kwargs) 371 372 if result is None: 373 return None 374 375 if isinstance(result, (tuple, list)): 376 result = [self.parse_one(item) for item in result if item is not None] 377 378 if ( 379 len(result) == 1 380 and isinstance(result[0], (exp.Array, exp.Tuple)) 381 and node.find_ancestor(MacroFunc) 382 ): 383 """ 384 if: 385 - the output of evaluating this node is being passed as an argument to another macro function 386 - and that output is something that _norm_var_arg_lambda() will unpack into varargs 387 > (a list containing a single item of type exp.Tuple/exp.Array) 388 then we will get inconsistent behaviour depending on if this node emits a list with a single item vs multiple items. 389 390 In the first case, emitting a list containing a single array item will cause that array to get unpacked and its *members* passed to the calling macro 391 In the second case, emitting a list containing multiple array items will cause each item to get passed as-is to the calling macro 392 393 To prevent this inconsistency, we wrap this node output in an exp.Array so that _norm_var_arg_lambda() can "unpack" that into the 394 actual argument we want to pass to the parent macro function 395 396 Note we only do this for evaluation results that get passed as an argument to another macro, because when the final 397 result is given to something like SELECT, we still want that to be unpacked into a list of items like: 398 - SELECT ARRAY(1), ARRAY(2) 399 rather than a single item like: 400 - SELECT ARRAY(ARRAY(1), ARRAY(2)) 401 """ 402 result = [exp.Array(expressions=result)] 403 else: 404 result = self.parse_one(result) 405 406 return result 407 408 def eval_expression(self, node: t.Any) -> t.Any: 409 """Converts a SQLGlot expression into executable Python code and evals it. 410 411 If the node is not an expression, it will simply be returned. 412 413 Args: 414 node: expression 415 Returns: 416 The return value of the evaled Python Code. 417 """ 418 if not isinstance(node, exp.Expr): 419 return node 420 code = node.sql() 421 try: 422 code = self.generator.generate(node) 423 return eval(code, self.env, self.locals) 424 except Exception as e: 425 raise MacroEvalError( 426 f"Error trying to eval macro.\n\nGenerated code: {code}\n\nOriginal sql: {node}\n\n" 427 + format_evaluated_code_exception(e, self.python_env) 428 ) 429 430 def parse_one( 431 self, sql: str | exp.Expr, into: t.Optional[exp.IntoType] = None, **opts: t.Any 432 ) -> exp.Expr: 433 """Parses the given SQL string and returns a syntax tree for the first 434 parsed SQL statement. 435 436 Args: 437 sql: the SQL code or expression to parse. 438 into: the Expression to parse into 439 **opts: other options 440 441 Returns: 442 Expression: the syntax tree for the first parsed statement 443 """ 444 return sqlglot.maybe_parse(sql, dialect=self.dialect, into=into, **opts) 445 446 def columns_to_types(self, model_name: TableName | exp.Column) -> t.Dict[str, exp.DataType]: 447 """Returns the columns-to-types mapping corresponding to the specified model.""" 448 449 # We only return this dummy schema at load time, because if we don't actually know the 450 # target model's schema at creation/evaluation time, returning a dummy schema could lead 451 # to unintelligible errors when the query is executed 452 if (self._schema is None or self._schema.empty) and self.runtime_stage == "loading": 453 self.columns_to_types_called = True 454 return {"__schema_unavailable_at_load__": exp.DataType.build("unknown")} 455 456 normalized_model_name = normalize_model_name( 457 model_name, 458 default_catalog=self.default_catalog, 459 dialect=self.dialect, 460 ) 461 model_name = exp.to_table(normalized_model_name) 462 463 columns_to_types = ( 464 self._schema.find(model_name, ensure_data_types=True) if self._schema else None 465 ) 466 if columns_to_types is None: 467 snapshot = self.get_snapshot(model_name) 468 if snapshot and snapshot.node.is_model: 469 columns_to_types = snapshot.node.columns_to_types # type: ignore 470 471 if columns_to_types is None: 472 raise SQLMeshError(f"Schema for model '{model_name}' can't be statically determined.") 473 474 return columns_to_types 475 476 def get_snapshot(self, model_name: TableName | exp.Column) -> t.Optional[Snapshot]: 477 """Returns the snapshot that corresponds to the given model name.""" 478 return self._snapshots.get( 479 normalize_model_name( 480 model_name, 481 default_catalog=self.default_catalog, 482 dialect=self.dialect, 483 ) 484 ) 485 486 def resolve_table(self, table: str | exp.Table) -> str: 487 """Gets the physical table name for a given model.""" 488 if not self._resolve_table: 489 raise SQLMeshError( 490 "Macro evaluator not properly initialized with resolve_table lambda." 491 ) 492 return self._resolve_table(table) 493 494 def resolve_tables(self, query: exp.Expr) -> exp.Expr: 495 """Resolves queries with references to SQLMesh model names to their physical tables.""" 496 if not self._resolve_tables: 497 raise SQLMeshError( 498 "Macro evaluator not properly initialized with resolve_tables lambda." 499 ) 500 return self._resolve_tables(query) 501 502 @property 503 def runtime_stage(self) -> RuntimeStage: 504 """Returns the current runtime stage of the macro evaluation.""" 505 return self.locals["runtime_stage"] 506 507 @property 508 def this_model(self) -> str: 509 """Returns the resolved name of the surrounding model.""" 510 this_model = self.locals.get("this_model") 511 if not this_model: 512 raise SQLMeshError("Model name is not available in the macro evaluator.") 513 return this_model.sql(dialect=self.dialect, identify=True, comments=False) 514 515 @property 516 def this_model_fqn(self) -> str: 517 if self._model_fqn is None: 518 raise SQLMeshError("Model name is not available in the macro evaluator.") 519 return self._model_fqn 520 521 @property 522 def engine_adapter(self) -> EngineAdapter: 523 engine_adapter = self.locals.get("engine_adapter") 524 if not engine_adapter: 525 raise SQLMeshError( 526 "The engine adapter is not available while models are loading." 527 " You can gate these calls by checking in Python: evaluator.runtime_stage != 'loading' or SQL: @runtime_stage <> 'loading'." 528 ) 529 return self.locals["engine_adapter"] 530 531 @property 532 def gateway(self) -> t.Optional[str]: 533 """Returns the gateway name.""" 534 return self.var(c.GATEWAY) 535 536 @property 537 def snapshots(self) -> t.Dict[str, Snapshot]: 538 """Returns the snapshots if available.""" 539 return self._snapshots 540 541 @property 542 def this_env(self) -> str: 543 """Returns the name of the current environment in before after all.""" 544 if "this_env" not in self.locals: 545 raise SQLMeshError("Environment name is only available in before_all and after_all") 546 return self.locals["this_env"] 547 548 @property 549 def schemas(self) -> t.List[str]: 550 """Returns the schemas of the current environment in before after all macros.""" 551 if "schemas" not in self.locals: 552 raise SQLMeshError("Schemas are only available in before_all and after_all") 553 return self.locals["schemas"] 554 555 @property 556 def views(self) -> t.List[str]: 557 """Returns the views of the current environment in before after all macros.""" 558 if "views" not in self.locals: 559 raise SQLMeshError("Views are only available in before_all and after_all") 560 return self.locals["views"] 561 562 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 563 """Returns the value of the specified variable, or the default value if it doesn't exist.""" 564 return { 565 **(self.locals.get(c.SQLMESH_VARS) or {}), 566 **(self.locals.get(c.SQLMESH_VARS_METADATA) or {}), 567 }.get(var_name.lower(), default) 568 569 def blueprint_var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 570 """Returns the value of the specified blueprint variable, or the default value if it doesn't exist.""" 571 return { 572 **(self.locals.get(c.SQLMESH_BLUEPRINT_VARS) or {}), 573 **(self.locals.get(c.SQLMESH_BLUEPRINT_VARS_METADATA) or {}), 574 }.get(var_name.lower(), default) 575 576 @property 577 def variables(self) -> t.Dict[str, t.Any]: 578 return { 579 **self.locals.get(c.SQLMESH_VARS, {}), 580 **self.locals.get(c.SQLMESH_VARS_METADATA, {}), 581 **self.locals.get(c.SQLMESH_BLUEPRINT_VARS, {}), 582 **self.locals.get(c.SQLMESH_BLUEPRINT_VARS_METADATA, {}), 583 } 584 585 def _coerce(self, expr: exp.Expr, typ: t.Any, strict: bool = False) -> t.Any: 586 """Coerces the given expression to the specified type on a best-effort basis.""" 587 return _coerce(expr, typ, self.dialect, self._path, strict) 588 589 590class macro(registry_decorator): 591 """Specifies a function is a macro and registers it the global MACROS registry. 592 593 Registered macros can be referenced in SQL statements to make queries more dynamic/cleaner. 594 595 Example: 596 from sqlglot import exp 597 from sqlmesh.core.macros import MacroEvaluator, macro 598 599 @macro() 600 def add_one(evaluator: MacroEvaluator, column: exp.Literal) -> exp.Add: 601 return evaluator.parse_one(f"{column} + 1") 602 603 Args: 604 name: A custom name for the macro, the default is the name of the function. 605 """ 606 607 registry_name = "macros" 608 609 def __init__(self, *args: t.Any, metadata_only: bool = False, **kwargs: t.Any) -> None: 610 super().__init__(*args, **kwargs) 611 self.metadata_only = metadata_only 612 613 def __call__( 614 self, func: t.Callable[..., DECORATOR_RETURN_TYPE] 615 ) -> t.Callable[..., DECORATOR_RETURN_TYPE]: 616 if self.metadata_only: 617 setattr(func, c.SQLMESH_METADATA, self.metadata_only) 618 wrapper = super().__call__(func) 619 620 # This is used to identify macros at runtime to unwrap during serialization. 621 setattr(wrapper, c.SQLMESH_MACRO, True) 622 return wrapper 623 624 625ExecutableOrMacro = t.Union[Executable, macro] 626MacroRegistry = UniqueKeyDict[str, ExecutableOrMacro] 627 628 629def _norm_var_arg_lambda( 630 evaluator: MacroEvaluator, func: exp.Lambda, *items: t.Any 631) -> t.Tuple[t.Iterable, t.Callable]: 632 """ 633 Converts sql literal array and lambda into actual python iterable + callable. 634 635 In order to support expressions like @EACH([a, b, c], x -> @SQL('@x')), the lambda var x 636 needs be passed to the local state. 637 638 Args: 639 evaluator: MacroEvaluator that invoked the macro 640 func: Lambda SQLGlot expression. 641 items: Array or items of SQLGlot expressions. 642 """ 643 644 def substitute( 645 node: exp.Expr, args: t.Dict[str, exp.Expr] 646 ) -> exp.Expr | t.List[exp.Expr] | None: 647 if isinstance(node, (exp.Identifier, exp.Var)): 648 if not isinstance(node.parent, exp.Column): 649 name = node.name.lower() 650 if name in args: 651 return args[name].copy() 652 if name in evaluator.locals: 653 return exp.convert(evaluator.locals[name]) 654 if SQLMESH_MACRO_PREFIX in node.name: 655 return node.__class__( 656 this=evaluator.template(node.name, {k: v.name for k, v in args.items()}) 657 ) 658 elif isinstance(node, MacroFunc): 659 local_copy = evaluator.locals.copy() 660 evaluator.locals.update(args) 661 result = evaluator.transform(node) 662 evaluator.locals = local_copy 663 return result 664 return node 665 666 if len(items) == 1: 667 item = items[0] 668 expressions = ( 669 item.expressions 670 if isinstance(item, (exp.Array, exp.Tuple)) 671 else [item.this] 672 if isinstance(item, exp.Paren) 673 else item 674 ) 675 else: 676 expressions = items 677 678 if not callable(func): 679 return expressions, lambda args: func.this.transform( 680 substitute, 681 { 682 expression.name.lower(): arg 683 for expression, arg in zip( 684 func.expressions, args.expressions if isinstance(args, exp.Tuple) else [args] 685 ) 686 }, 687 ) 688 689 return expressions, func 690 691 692@macro() 693def each( 694 evaluator: MacroEvaluator, 695 *args: t.Any, 696) -> t.List[t.Any]: 697 """Iterates through items calling func on each. 698 699 If a func call on item returns None, it will be excluded from the list. 700 701 Args: 702 evaluator: MacroEvaluator that invoked the macro 703 args: The last argument should be a lambda of the form x -> x +1. The first argument can be 704 an Array or var args can be used. 705 706 Returns: 707 A list of items that is the result of func 708 """ 709 *items, func = args 710 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 711 return [item for item in map(func, ensure_collection(items)) if item is not None] 712 713 714@macro("IF") 715def if_( 716 evaluator: MacroEvaluator, 717 condition: t.Any, 718 true: t.Any, 719 false: t.Any = None, 720) -> t.Any: 721 """Evaluates a given condition and returns the second argument if true or else the third argument. 722 723 If false is not passed in, the default return value will be None. 724 725 Example: 726 >>> from sqlglot import parse_one 727 >>> from sqlmesh.core.macros import MacroEvaluator 728 >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a, b)")).sql() 729 'b' 730 731 >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a)")) 732 """ 733 734 if evaluator.eval_expression(condition): 735 return true 736 return false 737 738 739@macro("REDUCE") 740def reduce_(evaluator: MacroEvaluator, *args: t.Any) -> t.Any: 741 """Iterates through items applying provided function that takes two arguments 742 cumulatively to the items of iterable items, from left to right, so as to reduce 743 the iterable to a single item. 744 745 Example: 746 >>> from sqlglot import parse_one 747 >>> from sqlmesh.core.macros import MacroEvaluator 748 >>> sql = "@SQL(@REDUCE([100, 200, 300, 400], (x, y) -> x + y))" 749 >>> MacroEvaluator().transform(parse_one(sql)).sql() 750 '1000' 751 752 Args: 753 evaluator: MacroEvaluator that invoked the macro 754 args: The last argument should be a lambda of the form (x, y) -> x + y. The first argument can be 755 an Array or var args can be used. 756 Returns: 757 A single item that is the result of applying func cumulatively to items 758 """ 759 *items, func = args 760 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 761 return reduce(lambda a, b: func(exp.Tuple(expressions=[a, b])), ensure_collection(items)) 762 763 764@macro("FILTER") 765def filter_(evaluator: MacroEvaluator, *args: t.Any) -> t.List[t.Any]: 766 """Iterates through items, applying provided function to each item and removing 767 all items where the function returns False 768 769 Example: 770 >>> from sqlglot import parse_one 771 >>> from sqlmesh.core.macros import MacroEvaluator 772 >>> sql = "@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y)" 773 >>> MacroEvaluator().transform(parse_one(sql)).sql() 774 '2 + 3' 775 776 >>> sql = "@EVAL(@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y))" 777 >>> MacroEvaluator().transform(parse_one(sql)).sql() 778 '5' 779 780 Args: 781 evaluator: MacroEvaluator that invoked the macro 782 args: The last argument should be a lambda of the form x -> x > 1. The first argument can be 783 an Array or var args can be used. 784 Returns: 785 The items for which the func returned True 786 """ 787 *items, func = args 788 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 789 return list(filter(lambda arg: evaluator.eval_expression(func(arg)), items)) 790 791 792def _optional_expression( 793 evaluator: MacroEvaluator, 794 condition: exp.Condition, 795 expression: exp.Expr, 796) -> t.Optional[exp.Expr]: 797 """Inserts expression when the condition is True 798 799 The following examples express the usage of this function in the context of the macros which wrap it. 800 801 Examples: 802 >>> from sqlglot import parse_one 803 >>> from sqlmesh.core.macros import MacroEvaluator 804 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 805 >>> MacroEvaluator().transform(parse_one(sql)).sql() 806 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 807 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 808 >>> MacroEvaluator().transform(parse_one(sql)).sql() 809 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 810 >>> sql = "select * from city @GROUP_BY(True) country, population" 811 >>> MacroEvaluator().transform(parse_one(sql)).sql() 812 'SELECT * FROM city GROUP BY country, population' 813 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 814 >>> MacroEvaluator().transform(parse_one(sql)).sql() 815 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 816 817 Args: 818 evaluator: MacroEvaluator that invoked the macro 819 condition: Condition expression 820 expression: SQL expression 821 Returns: 822 Expression if the conditional is True; otherwise None 823 """ 824 return expression if evaluator.eval_expression(condition) else None 825 826 827with_ = macro("WITH")(_optional_expression) 828join = macro("JOIN")(_optional_expression) 829where = macro("WHERE")(_optional_expression) 830group_by = macro("GROUP_BY")(_optional_expression) 831having = macro("HAVING")(_optional_expression) 832order_by = macro("ORDER_BY")(_optional_expression) 833limit = macro("LIMIT")(_optional_expression) 834 835 836@macro("eval") 837def eval_(evaluator: MacroEvaluator, condition: exp.Condition) -> t.Any: 838 """Evaluate the given condition in a Python/SQL interpretor. 839 840 Example: 841 >>> from sqlglot import parse_one 842 >>> from sqlmesh.core.macros import MacroEvaluator 843 >>> sql = "@EVAL(1 + 1)" 844 >>> MacroEvaluator().transform(parse_one(sql)).sql() 845 '2' 846 """ 847 return evaluator.eval_expression(condition) 848 849 850# macros with union types need to use t.Union since | isn't available until 3.9 851@macro() 852def star( 853 evaluator: MacroEvaluator, 854 relation: exp.Table, 855 alias: exp.Column = t.cast(exp.Column, exp.column("")), 856 exclude: t.Union[exp.Array, exp.Tuple] = exp.Tuple(expressions=[]), 857 prefix: exp.Literal = exp.Literal.string(""), 858 suffix: exp.Literal = exp.Literal.string(""), 859 quote_identifiers: exp.Boolean = exp.true(), 860 except_: t.Union[exp.Array, exp.Tuple] = exp.Tuple(expressions=[]), 861) -> t.List[exp.Expr]: 862 """Returns a list of projections for the given relation. 863 864 Args: 865 evaluator: MacroEvaluator that invoked the macro 866 relation: The relation to select star from 867 alias: The alias of the relation 868 exclude: Columns to exclude 869 prefix: A prefix to use for all selections 870 suffix: A suffix to use for all selections 871 quote_identifiers: Whether or not quote the resulting aliases, defaults to true 872 except_: Alias for exclude (TODO: deprecate this, update docs) 873 874 Returns: 875 An array of columns. 876 877 Example: 878 >>> from sqlglot import parse_one, exp 879 >>> from sqlglot.schema import MappingSchema 880 >>> from sqlmesh.core.macros import MacroEvaluator 881 >>> sql = "SELECT @STAR(foo, bar, exclude := [c], prefix := 'baz_') FROM foo AS bar" 882 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": exp.DataType.build("string"), "b": exp.DataType.build("string"), "c": exp.DataType.build("string"), "d": exp.DataType.build("int")}})).transform(parse_one(sql)).sql() 883 'SELECT CAST("bar"."a" AS TEXT) AS "baz_a", CAST("bar"."b" AS TEXT) AS "baz_b", CAST("bar"."d" AS INT) AS "baz_d" FROM foo AS bar' 884 """ 885 if alias and not isinstance(alias, (exp.Identifier, exp.Column)): 886 raise SQLMeshError(f"Invalid alias '{alias}'. Expected an identifier.") 887 if exclude and not isinstance(exclude, (exp.Array, exp.Tuple)): 888 raise SQLMeshError(f"Invalid exclude '{exclude}'. Expected an array.") 889 if except_ != exp.tuple_(): 890 from sqlmesh.core.console import get_console 891 892 get_console().log_warning( 893 "The 'except_' argument in @STAR will soon be deprecated. Use 'exclude' instead." 894 ) 895 if not isinstance(exclude, (exp.Array, exp.Tuple)): 896 raise SQLMeshError(f"Invalid exclude_ '{exclude}'. Expected an array.") 897 if prefix and not isinstance(prefix, exp.Literal): 898 raise SQLMeshError(f"Invalid prefix '{prefix}'. Expected a literal.") 899 if suffix and not isinstance(suffix, exp.Literal): 900 raise SQLMeshError(f"Invalid suffix '{suffix}'. Expected a literal.") 901 if not isinstance(quote_identifiers, exp.Boolean): 902 raise SQLMeshError(f"Invalid quote_identifiers '{quote_identifiers}'. Expected a boolean.") 903 904 excluded_names = { 905 normalize_identifiers(excluded, dialect=evaluator.dialect).name 906 for excluded in exclude.expressions or except_.expressions 907 } 908 quoted = quote_identifiers.this 909 table_identifier = normalize_identifiers( 910 alias if alias.name else relation, dialect=evaluator.dialect 911 ).name 912 913 columns_to_types = { 914 k: v for k, v in evaluator.columns_to_types(relation).items() if k not in excluded_names 915 } 916 if columns_to_types_all_known(columns_to_types): 917 return [ 918 exp.cast( 919 exp.column(column, table=table_identifier, quoted=quoted), 920 dtype, 921 dialect=evaluator.dialect, 922 ).as_(f"{prefix.this}{column}{suffix.this}", quoted=quoted) 923 for column, dtype in columns_to_types.items() 924 ] 925 return [ 926 exp.column(column, table=table_identifier, quoted=quoted).as_( 927 f"{prefix.this}{column}{suffix.this}", quoted=quoted 928 ) 929 for column, type_ in columns_to_types.items() 930 ] 931 932 933@macro() 934def generate_surrogate_key( 935 evaluator: MacroEvaluator, 936 *fields: exp.Expr, 937 hash_function: exp.Literal = exp.Literal.string("MD5"), 938) -> exp.Func: 939 """Generates a surrogate key (string) for the given fields. 940 941 Example: 942 >>> from sqlglot import parse_one 943 >>> from sqlmesh.core.macros import MacroEvaluator 944 >>> 945 >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c) FROM foo" 946 >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql, dialect="bigquery")).sql("bigquery") 947 "SELECT TO_HEX(MD5(CONCAT(COALESCE(CAST(a AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS STRING), '_sqlmesh_surrogate_key_null_')))) FROM foo" 948 >>> 949 >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c, hash_function := 'SHA256') FROM foo" 950 >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql, dialect="bigquery")).sql("bigquery") 951 "SELECT SHA256(CONCAT(COALESCE(CAST(a AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS STRING), '_sqlmesh_surrogate_key_null_'))) FROM foo" 952 """ 953 string_fields: t.List[exp.Expr] = [] 954 for i, field in enumerate(fields): 955 if i > 0: 956 string_fields.append(exp.Literal.string("|")) 957 string_fields.append( 958 exp.func( 959 "COALESCE", 960 exp.cast(field, exp.DataType.build("text")), 961 exp.Literal.string("_sqlmesh_surrogate_key_null_"), 962 ) 963 ) 964 965 func = exp.func( 966 hash_function.name, 967 exp.func("CONCAT", *string_fields), 968 dialect=evaluator.dialect, 969 ) 970 if isinstance(func, exp.MD5Digest): 971 func = exp.MD5(this=func.this) 972 973 return func 974 975 976@macro() 977def safe_add(_: MacroEvaluator, *fields: exp.Expr) -> exp.Case: 978 """Adds numbers together, substitutes nulls for 0s and only returns null if all fields are null. 979 980 Example: 981 >>> from sqlglot import parse_one 982 >>> from sqlmesh.core.macros import MacroEvaluator 983 >>> sql = "SELECT @SAFE_ADD(a, b) FROM foo" 984 >>> MacroEvaluator().transform(parse_one(sql)).sql() 985 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) + COALESCE(b, 0) END FROM foo' 986 """ 987 return ( 988 exp.Case() 989 .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null()) 990 .else_(reduce(lambda a, b: a + b, [exp.func("COALESCE", field, 0) for field in fields])) # type: ignore 991 ) 992 993 994@macro() 995def safe_sub(_: MacroEvaluator, *fields: exp.Expr) -> exp.Case: 996 """Subtract numbers, substitutes nulls for 0s and only returns null if all fields are null. 997 998 Example: 999 >>> from sqlglot import parse_one 1000 >>> from sqlmesh.core.macros import MacroEvaluator 1001 >>> sql = "SELECT @SAFE_SUB(a, b) FROM foo" 1002 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1003 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) - COALESCE(b, 0) END FROM foo' 1004 """ 1005 return ( 1006 exp.Case() 1007 .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null()) 1008 .else_(reduce(lambda a, b: a - b, [exp.func("COALESCE", field, 0) for field in fields])) # type: ignore 1009 ) 1010 1011 1012@macro() 1013def safe_div(_: MacroEvaluator, numerator: exp.Expr, denominator: exp.Expr) -> exp.Div: 1014 """Divides numbers, returns null if the denominator is 0. 1015 1016 Example: 1017 >>> from sqlglot import parse_one 1018 >>> from sqlmesh.core.macros import MacroEvaluator 1019 >>> sql = "SELECT @SAFE_DIV(a, b) FROM foo" 1020 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1021 'SELECT a / NULLIF(b, 0) FROM foo' 1022 """ 1023 return numerator / exp.func("NULLIF", denominator, 0) 1024 1025 1026@macro() 1027def union( 1028 evaluator: MacroEvaluator, 1029 *args: exp.Expr, 1030) -> exp.Query: 1031 """Returns a UNION of the given tables. Only choosing columns that have the same name and type. 1032 1033 Args: 1034 evaluator: MacroEvaluator that invoked the macro 1035 args: Variable arguments that can be: 1036 - First argument can be a condition (exp.Condition) 1037 - A union type ('ALL' or 'DISTINCT') as exp.Literal 1038 - Tables (exp.Table) 1039 1040 Example: 1041 >>> from sqlglot import parse_one 1042 >>> from sqlglot.schema import MappingSchema 1043 >>> from sqlmesh.core.macros import MacroEvaluator 1044 >>> sql = "@UNION('distinct', foo, bar)" 1045 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 1046 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar' 1047 >>> sql = "@UNION(True, 'distinct', foo, bar)" 1048 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 1049 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar' 1050 """ 1051 1052 if not args: 1053 raise SQLMeshError("At least one table is required for the @UNION macro.") 1054 1055 arg_idx = 0 1056 # Check for condition 1057 condition = evaluator.eval_expression(args[arg_idx]) 1058 if isinstance(condition, bool): 1059 arg_idx += 1 1060 if arg_idx >= len(args): 1061 raise SQLMeshError("Expected more arguments after the condition of the `@UNION` macro.") 1062 1063 # Check for union type 1064 type_ = exp.Literal.string("ALL") 1065 if isinstance(args[arg_idx], exp.Literal): 1066 type_ = args[arg_idx] # type: ignore 1067 arg_idx += 1 1068 kind = type_.name.upper() 1069 if kind not in ("ALL", "DISTINCT"): 1070 raise SQLMeshError(f"Invalid type '{type_}'. Expected 'ALL' or 'DISTINCT'.") 1071 1072 # Remaining args should be tables 1073 tables = [ 1074 exp.to_table(e.sql(evaluator.dialect), dialect=evaluator.dialect) for e in args[arg_idx:] 1075 ] 1076 1077 columns = { 1078 column 1079 for column, _ in reduce( 1080 lambda a, b: a & b, # type: ignore 1081 (evaluator.columns_to_types(table).items() for table in tables), 1082 ) 1083 } 1084 1085 projections = [ 1086 exp.cast(column, type_, dialect=evaluator.dialect).as_(column) 1087 for column, type_ in evaluator.columns_to_types(tables[0]).items() 1088 if column in columns 1089 ] 1090 1091 # Skip the union if condition is False 1092 if condition == False: 1093 return exp.select(*projections).from_(tables[0]) 1094 1095 return reduce( 1096 lambda a, b: a.union(b, distinct=kind == "DISTINCT"), # type: ignore 1097 [exp.select(*projections).from_(t) for t in tables], 1098 ) 1099 1100 1101@macro() 1102def haversine_distance( 1103 _: MacroEvaluator, 1104 lat1: exp.Expr, 1105 lon1: exp.Expr, 1106 lat2: exp.Expr, 1107 lon2: exp.Expr, 1108 unit: exp.Literal = exp.Literal.string("mi"), 1109) -> exp.Mul: 1110 """Returns the haversine distance between two points. 1111 1112 Example: 1113 >>> from sqlglot import parse_one 1114 >>> from sqlmesh.core.macros import MacroEvaluator 1115 >>> sql = "SELECT @HAVERSINE_DISTANCE(driver_y, driver_x, passenger_y, passenger_x, 'mi') FROM rides" 1116 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1117 'SELECT 7922 * ASIN(SQRT((POWER(SIN(RADIANS((passenger_y - driver_y) / 2)), 2)) + (COS(RADIANS(driver_y)) * COS(RADIANS(passenger_y)) * POWER(SIN(RADIANS((passenger_x - driver_x) / 2)), 2)))) * 1.0 FROM rides' 1118 """ 1119 if unit.this == "mi": 1120 conversion_rate = 1.0 1121 elif unit.this == "km": 1122 conversion_rate = 1.60934 1123 else: 1124 raise SQLMeshError(f"Invalid unit '{unit}'. Expected 'mi' or 'km'.") 1125 1126 return ( 1127 2 1128 * 3961 1129 * exp.func( 1130 "ASIN", 1131 exp.func( 1132 "SQRT", 1133 exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lat2 - lat1) / 2)), 2) 1134 + exp.func("COS", exp.func("RADIANS", lat1)) 1135 * exp.func("COS", exp.func("RADIANS", lat2)) 1136 * exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lon2 - lon1) / 2)), 2), 1137 ), 1138 ) 1139 * conversion_rate 1140 ) 1141 1142 1143@macro() 1144def pivot( 1145 evaluator: MacroEvaluator, 1146 column: SQL, 1147 values: t.List[exp.Expr], 1148 alias: bool = True, 1149 agg: exp.Expr = exp.Literal.string("SUM"), 1150 cmp: exp.Expr = exp.Literal.string("="), 1151 prefix: exp.Expr = exp.Literal.string(""), 1152 suffix: exp.Expr = exp.Literal.string(""), 1153 then_value: SQL = SQL("1"), 1154 else_value: SQL = SQL("0"), 1155 quote: bool = True, 1156 distinct: bool = False, 1157) -> t.List[exp.Expr]: 1158 """Returns a list of projections as a result of pivoting the given column on the given values. 1159 1160 Example: 1161 >>> from sqlglot import parse_one 1162 >>> from sqlmesh.core.macros import MacroEvaluator 1163 >>> sql = "SELECT date_day, @PIVOT(status, ['cancelled', 'completed']) FROM rides GROUP BY 1" 1164 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1165 'SELECT date_day, SUM(CASE WHEN status = \\'cancelled\\' THEN 1 ELSE 0 END) AS "cancelled", SUM(CASE WHEN status = \\'completed\\' THEN 1 ELSE 0 END) AS "completed" FROM rides GROUP BY 1' 1166 >>> sql = "SELECT @PIVOT(a, ['v'], then_value := tv, suffix := '_sfx', quote := FALSE)" 1167 >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql)).sql("bigquery") 1168 "SELECT SUM(CASE WHEN a = 'v' THEN tv ELSE 0 END) AS v_sfx" 1169 """ 1170 aggregates: t.List[exp.Expr] = [] 1171 for value in values: 1172 proj = f"{agg.name}(" 1173 if distinct: 1174 proj += "DISTINCT " 1175 1176 proj += f"CASE WHEN {column} {cmp.name} {value.sql(evaluator.dialect)} THEN {then_value} ELSE {else_value} END) " 1177 node: exp.Expr = evaluator.parse_one(proj) 1178 1179 if alias: 1180 node = node.as_( 1181 f"{prefix.name}{value.name}{suffix.name}", 1182 quoted=quote, 1183 copy=False, 1184 dialect=evaluator.dialect, 1185 ) 1186 1187 aggregates.append(node) 1188 1189 return aggregates 1190 1191 1192@macro("AND") 1193def and_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expr]) -> exp.Condition: 1194 """Returns an AND statement filtering out any NULL expressions.""" 1195 conditions = [e for e in expressions if not isinstance(e, exp.Null)] 1196 1197 if not conditions: 1198 return exp.true() 1199 1200 return exp.and_(*conditions, dialect=evaluator.dialect) 1201 1202 1203@macro("OR") 1204def or_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expr]) -> exp.Condition: 1205 """Returns an OR statement filtering out any NULL expressions.""" 1206 conditions = [e for e in expressions if not isinstance(e, exp.Null)] 1207 1208 if not conditions: 1209 return exp.true() 1210 1211 return exp.or_(*conditions, dialect=evaluator.dialect) 1212 1213 1214@macro("VAR") 1215def var( 1216 evaluator: MacroEvaluator, var_name: exp.Expr, default: t.Optional[exp.Expr] = None 1217) -> exp.Expr: 1218 """Returns the value of a variable or the default value if the variable is not set.""" 1219 if not var_name.is_string: 1220 raise SQLMeshError(f"Invalid variable name '{var_name.sql()}'. Expected a string literal.") 1221 1222 return exp.convert(evaluator.var(var_name.this, default)) 1223 1224 1225@macro("BLUEPRINT_VAR") 1226def blueprint_var( 1227 evaluator: MacroEvaluator, var_name: exp.Expr, default: t.Optional[exp.Expr] = None 1228) -> exp.Expr: 1229 """Returns the value of a blueprint variable or the default value if the variable is not set.""" 1230 if not var_name.is_string: 1231 raise SQLMeshError( 1232 f"Invalid blueprint variable name '{var_name.sql()}'. Expected a string literal." 1233 ) 1234 1235 return exp.convert(evaluator.blueprint_var(var_name.this, default)) 1236 1237 1238@macro() 1239def deduplicate( 1240 evaluator: MacroEvaluator, 1241 relation: exp.Expr, 1242 partition_by: t.List[exp.Expr], 1243 order_by: t.List[str], 1244) -> exp.Query: 1245 """Returns a QUERY to deduplicate rows within a table 1246 1247 Args: 1248 relation: table or CTE name to deduplicate 1249 partition_by: column names, or expressions to use to identify a window of rows out of which to select one as the deduplicated row 1250 order_by: A list of strings representing the ORDER BY clause 1251 1252 Example: 1253 >>> from sqlglot import parse_one 1254 >>> from sqlglot.schema import MappingSchema 1255 >>> from sqlmesh.core.macros import MacroEvaluator 1256 >>> sql = "@deduplicate(demo.table, [user_id, cast(timestamp as date)], ['timestamp desc', 'status asc'])" 1257 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1258 'SELECT * FROM demo.table QUALIFY ROW_NUMBER() OVER (PARTITION BY user_id, CAST(timestamp AS DATE) ORDER BY timestamp DESC, status ASC) = 1' 1259 """ 1260 if not isinstance(partition_by, list): 1261 raise SQLMeshError( 1262 "partition_by must be a list of columns: [<column>, cast(<column> as <type>)]" 1263 ) 1264 1265 if not isinstance(order_by, list): 1266 raise SQLMeshError( 1267 "order_by must be a list of strings, optional - nulls ordering: ['<column> <asc|desc> nulls <first|last>']" 1268 ) 1269 1270 partition_clause = exp.tuple_(*partition_by) 1271 1272 order_expressions = [ 1273 evaluator.transform(parse_one(order_item, into=exp.Ordered, dialect=evaluator.dialect)) 1274 for order_item in order_by 1275 ] 1276 1277 if not order_expressions: 1278 raise SQLMeshError( 1279 "order_by must be a list of strings, optional - nulls ordering: ['<column> <asc|desc> nulls <first|last>']" 1280 ) 1281 1282 order_clause = exp.Order(expressions=order_expressions) 1283 1284 window_function = exp.Window( 1285 this=exp.RowNumber(), partition_by=partition_clause, order=order_clause 1286 ) 1287 1288 first_unique_row = window_function.eq(1) 1289 1290 query = exp.select("*").from_(relation).qualify(first_unique_row) 1291 1292 return query 1293 1294 1295@macro() 1296def date_spine( 1297 evaluator: MacroEvaluator, 1298 datepart: exp.Expr, 1299 start_date: exp.Expr, 1300 end_date: exp.Expr, 1301) -> exp.Select: 1302 """Returns a query that produces a date spine with the given datepart, and range of start_date and end_date. Useful for joining as a date lookup table. 1303 1304 Args: 1305 datepart: The datepart to use for the date spine - day, week, month, quarter, year 1306 start_date: The start date for the date spine in format YYYY-MM-DD 1307 end_date: The end date for the date spine in format YYYY-MM-DD 1308 1309 Example: 1310 >>> from sqlglot import parse_one 1311 >>> from sqlglot.schema import MappingSchema 1312 >>> from sqlmesh.core.macros import MacroEvaluator 1313 >>> sql = "@date_spine('week', '2022-01-20', '2024-12-16')" 1314 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1315 "SELECT date_week FROM UNNEST(GENERATE_DATE_ARRAY(CAST(\'2022-01-20\' AS DATE), CAST(\'2024-12-16\' AS DATE), INTERVAL \'1\' WEEK)) AS _exploded(date_week)" 1316 """ 1317 datepart_name = datepart.name.lower() 1318 if datepart_name not in ("day", "week", "month", "quarter", "year"): 1319 raise SQLMeshError( 1320 f"Invalid datepart '{datepart_name}'. Expected: 'day', 'week', 'month', 'quarter', or 'year'" 1321 ) 1322 1323 start_date_name = start_date.name 1324 end_date_name = end_date.name 1325 1326 try: 1327 if start_date.is_string and end_date.is_string: 1328 start_date_obj = datetime.strptime(start_date_name, "%Y-%m-%d").date() 1329 end_date_obj = datetime.strptime(end_date_name, "%Y-%m-%d").date() 1330 else: 1331 start_date_obj = None 1332 end_date_obj = None 1333 except Exception as e: 1334 raise SQLMeshError( 1335 f"Invalid date format - start_date and end_date must be in format: YYYY-MM-DD. Error: {e}" 1336 ) 1337 1338 if start_date_obj and end_date_obj: 1339 if start_date_obj > end_date_obj: 1340 raise SQLMeshError( 1341 f"Invalid date range - start_date '{start_date_name}' is after end_date '{end_date_name}'." 1342 ) 1343 1344 start_date = exp.cast(start_date, "DATE") 1345 end_date = exp.cast(end_date, "DATE") 1346 1347 if datepart_name == "quarter" and evaluator.dialect in ( 1348 "spark", 1349 "spark2", 1350 "databricks", 1351 "postgres", 1352 ): 1353 date_interval = exp.Interval(this=exp.Literal.number(3), unit=exp.var("month")) 1354 else: 1355 date_interval = exp.Interval(this=exp.Literal.number(1), unit=exp.var(datepart_name)) 1356 1357 generate_date_array = exp.func( 1358 "GENERATE_DATE_ARRAY", 1359 start_date, 1360 end_date, 1361 date_interval, 1362 ) 1363 1364 alias_name = f"date_{datepart_name}" 1365 exploded = exp.alias_(exp.func("unnest", generate_date_array), "_exploded", table=[alias_name]) 1366 1367 return exp.select(alias_name).from_(exploded) 1368 1369 1370@macro() 1371def resolve_template( 1372 evaluator: MacroEvaluator, 1373 template: exp.Literal, 1374 mode: str = "literal", 1375) -> t.Union[exp.Literal, exp.Table]: 1376 """ 1377 Generates either a String literal or an exp.Table representing a physical table location, based on rendering the provided template String literal. 1378 1379 Note: It relies on the @this_model variable being available in the evaluation context (@this_model resolves to an exp.Table object 1380 representing the current physical table). 1381 Therefore, the @resolve_template macro must be used at creation or evaluation time and not at load time. 1382 1383 Args: 1384 template: Template string literal. Can contain the following placeholders: 1385 @{catalog_name} -> replaced with the catalog of the exp.Table returned from @this_model 1386 @{schema_name} -> replaced with the schema of the exp.Table returned from @this_model 1387 @{table_name} -> replaced with the name of the exp.Table returned from @this_model 1388 mode: What to return. 1389 'literal' -> return an exp.Literal string 1390 'table' -> return an exp.Table 1391 1392 Example: 1393 >>> from sqlglot import parse_one, exp 1394 >>> from sqlmesh.core.macros import MacroEvaluator, RuntimeStage 1395 >>> sql = "@resolve_template('s3://data-bucket/prod/@{catalog_name}/@{schema_name}/@{table_name}')" 1396 >>> evaluator = MacroEvaluator(runtime_stage=RuntimeStage.CREATING) 1397 >>> evaluator.locals.update({"this_model": exp.to_table("test_catalog.sqlmesh__test.test__test_model__2517971505")}) 1398 >>> evaluator.transform(parse_one(sql)).sql() 1399 "'s3://data-bucket/prod/test_catalog/sqlmesh__test/test__test_model__2517971505'" 1400 """ 1401 if "this_model" in evaluator.locals: 1402 this_model = exp.to_table(evaluator.locals["this_model"], dialect=evaluator.dialect) 1403 template_str: str = template.this 1404 result = ( 1405 template_str.replace("@{catalog_name}", this_model.catalog) 1406 .replace("@{schema_name}", this_model.db) 1407 .replace("@{table_name}", this_model.name) 1408 ) 1409 1410 if mode.lower() == "table": 1411 return exp.to_table(result, dialect=evaluator.dialect) 1412 return exp.Literal.string(result) 1413 if evaluator.runtime_stage != RuntimeStage.LOADING.value: 1414 # only error if we are CREATING, EVALUATING or TESTING and @this_model is not present; this could indicate a bug 1415 # otherwise, for LOADING, it's a no-op 1416 raise SQLMeshError( 1417 "@this_model must be present in the macro evaluation context in order to use @resolve_template" 1418 ) 1419 1420 return template 1421 1422 1423def normalize_macro_name(name: str) -> str: 1424 """Prefix macro name with @ and upcase""" 1425 return f"@{name.upper()}" 1426 1427 1428for m in macro.get_registry().values(): 1429 setattr(m, c.SQLMESH_BUILTIN, True) 1430 1431 1432def call_macro( 1433 func: t.Callable, 1434 dialect: DialectType, 1435 path: t.Optional[Path], 1436 provided_args: t.Tuple[t.Any, ...], 1437 provided_kwargs: t.Dict[str, t.Any], 1438 **optional_kwargs: t.Any, 1439) -> t.Any: 1440 # Bind the macro's actual parameters to its formal parameters 1441 sig = inspect.signature(func) 1442 1443 if optional_kwargs: 1444 provided_kwargs = provided_kwargs.copy() 1445 1446 for k, v in optional_kwargs.items(): 1447 if k in sig.parameters: 1448 provided_kwargs[k] = v 1449 1450 bound = sig.bind(*provided_args, **provided_kwargs) 1451 bound.apply_defaults() 1452 1453 try: 1454 annotations = t.get_type_hints(func, localns=get_supported_types()) 1455 except (NameError, TypeError): # forward references aren't handled 1456 annotations = {} 1457 1458 # If the macro is annotated, we try coerce the actual parameters to the corresponding types 1459 if annotations: 1460 for arg, value in bound.arguments.items(): 1461 typ = annotations.get(arg) 1462 if not typ: 1463 continue 1464 1465 # Changes to bound.arguments will reflect in bound.args and bound.kwargs 1466 # https://docs.python.org/3/library/inspect.html#inspect.BoundArguments.arguments 1467 param = sig.parameters[arg] 1468 if param.kind is inspect.Parameter.VAR_POSITIONAL: 1469 bound.arguments[arg] = tuple(_coerce(v, typ, dialect, path) for v in value) 1470 elif param.kind is inspect.Parameter.VAR_KEYWORD: 1471 bound.arguments[arg] = {k: _coerce(v, typ, dialect, path) for k, v in value.items()} 1472 else: 1473 bound.arguments[arg] = _coerce(value, typ, dialect, path) 1474 1475 return func(*bound.args, **bound.kwargs) 1476 1477 1478def _coerce( 1479 expr: t.Any, 1480 typ: t.Any, 1481 dialect: DialectType, 1482 path: t.Optional[Path] = None, 1483 strict: bool = False, 1484) -> t.Any: 1485 """Coerces the given expression to the specified type on a best-effort basis.""" 1486 base_err_msg = f"Failed to coerce expression '{expr}' to type '{typ}'." 1487 try: 1488 if typ is None or typ is t.Any or not isinstance(expr, exp.Expr): 1489 return expr 1490 base = t.get_origin(typ) or typ 1491 1492 # We need to handle Union and TypeVars first since we cannot use isinstance with it 1493 if base in UNION_TYPES: 1494 for branch in t.get_args(typ): 1495 try: 1496 return _coerce(expr, branch, dialect, path, strict=True) 1497 except Exception: 1498 pass 1499 raise SQLMeshError(base_err_msg) 1500 if base is SQL and isinstance(expr, exp.Expr): 1501 return expr.sql(dialect) 1502 1503 if base is t.Literal: 1504 if not isinstance(expr, (exp.Literal, exp.Boolean)): 1505 raise SQLMeshError( 1506 f"{base_err_msg} Coercion to {base} requires a literal expression." 1507 ) 1508 literal_type_args = t.get_args(typ) 1509 try: 1510 for literal_type_arg in literal_type_args: 1511 expr_is_bool = isinstance(expr.this, bool) 1512 literal_is_bool = isinstance(literal_type_arg, bool) 1513 if (expr_is_bool and literal_is_bool and literal_type_arg == expr.this) or ( 1514 not expr_is_bool 1515 and not literal_is_bool 1516 and str(literal_type_arg) == str(expr.this) 1517 ): 1518 return type(literal_type_arg)(expr.this) 1519 except Exception: 1520 raise SQLMeshError(base_err_msg) 1521 raise SQLMeshError(base_err_msg) 1522 1523 if isinstance(expr, base): 1524 return expr 1525 if issubclass(base, exp.Expr): 1526 d = Dialect.get_or_raise(dialect) 1527 into = base if base in d.parser_class.EXPRESSION_PARSERS else None 1528 if into is None: 1529 if isinstance(expr, exp.Literal): 1530 coerced = parse_one(expr.this) 1531 else: 1532 raise SQLMeshError( 1533 f"{base_err_msg} Coercion to {base} requires a literal expression." 1534 ) 1535 else: 1536 coerced = parse_one( 1537 expr.this if isinstance(expr, exp.Literal) else expr.sql(), into=into 1538 ) 1539 if isinstance(coerced, base): 1540 return coerced 1541 raise SQLMeshError(base_err_msg) 1542 1543 if base in (int, float, str) and isinstance(expr, exp.Literal): 1544 return base(expr.this) 1545 if base is str and isinstance(expr, exp.Column) and not expr.table: 1546 return expr.name 1547 if base is bool and isinstance(expr, exp.Boolean): 1548 return expr.this 1549 if base is datetime and isinstance(expr, exp.Literal): 1550 return to_datetime(expr.this) 1551 if base is date and isinstance(expr, exp.Literal): 1552 return to_date(expr.this) 1553 if base is tuple and isinstance(expr, (exp.Tuple, exp.Array)): 1554 generic = t.get_args(typ) 1555 if not generic: 1556 return tuple(expr.expressions) 1557 if generic[-1] is ...: 1558 return tuple(_coerce(expr, generic[0], dialect, path) for expr in expr.expressions) 1559 if len(generic) == len(expr.expressions): 1560 return tuple( 1561 _coerce(expr, generic[i], dialect, path) 1562 for i, expr in enumerate(expr.expressions) 1563 ) 1564 raise SQLMeshError(f"{base_err_msg} Expected {len(generic)} items.") 1565 if base is list and isinstance(expr, (exp.Array, exp.Tuple)): 1566 generic = t.get_args(typ) 1567 if not generic: 1568 return expr.expressions 1569 return [_coerce(expr, generic[0], dialect, path) for expr in expr.expressions] 1570 raise SQLMeshError(base_err_msg) 1571 except Exception: 1572 if strict: 1573 raise 1574 1575 from sqlmesh.core.console import get_console 1576 1577 get_console().log_error( 1578 f"Coercion of expression '{expr}' to type '{typ}' failed. Using non coerced expression at '{path}'", 1579 ) 1580 return expr 1581 1582 1583def convert_sql(v: t.Any, dialect: DialectType) -> t.Any: 1584 try: 1585 return _cache_convert_sql(v, dialect, v.__class__) 1586 # dicts aren't hashable but are convertable 1587 except TypeError: 1588 return _convert_sql(v, dialect) 1589 1590 1591def _convert_sql(v: t.Any, dialect: DialectType) -> t.Any: 1592 if not isinstance(v, str): 1593 try: 1594 v = exp.convert(v) 1595 # we use bare Exception instead of ValueError because there's 1596 # a recursive error with MagicMock. 1597 except Exception: 1598 pass 1599 1600 if isinstance(v, exp.Expr): 1601 if (isinstance(v, exp.Column) and not v.table) or ( 1602 isinstance(v, exp.Identifier) or v.is_string 1603 ): 1604 return v.name 1605 v = v.sql(dialect=dialect) 1606 return v 1607 1608 1609@lru_cache(maxsize=16384) 1610def _cache_convert_sql(v: t.Any, dialect: DialectType, t: type) -> t.Any: 1611 return _convert_sql(v, dialect)
64class RuntimeStage(Enum): 65 LOADING = "loading" 66 CREATING = "creating" 67 EVALUATING = "evaluating" 68 PROMOTING = "promoting" 69 DEMOTING = "demoting" 70 AUDITING = "auditing" 71 TESTING = "testing" 72 BEFORE_ALL = "before_all" 73 AFTER_ALL = "after_all"
An enumeration.
Inherited Members
- enum.Enum
- name
- value
A string class for supporting $-substitutions.
Inherited Members
- string.Template
- Template
- idpattern
- braceidpattern
- flags
- template
- substitute
- safe_substitute
84@lru_cache() 85def get_supported_types() -> t.Dict[str, t.Any]: 86 from sqlmesh.core.context import ExecutionContext 87 88 return { 89 "t": t, 90 "typing": t, 91 "List": t.List, 92 "Tuple": t.Tuple, 93 "Union": t.Union, 94 "DatetimeRanges": DatetimeRanges, 95 "exp": exp, 96 "SQL": SQL, 97 "MacroEvaluator": MacroEvaluator, 98 "ExecutionContext": ExecutionContext, 99 }
132class CaseInsensitiveMapping(t.Dict[str, t.Any]): 133 def __init__(self, data: t.Dict[str, t.Any]) -> None: 134 super().__init__(data) 135 136 def __getitem__(self, key: str) -> t.Any: 137 return super().__getitem__(key.lower()) 138 139 def get(self, key: str, default: t.Any = None, /) -> t.Any: 140 return super().get(key.lower(), default)
139 def get(self, key: str, default: t.Any = None, /) -> t.Any: 140 return super().get(key.lower(), default)
Return the value for key if key is in the dictionary, else default.
Inherited Members
- builtins.dict
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy
143class MacroDialect(Python): 144 class Generator(Python.Generator): 145 TRANSFORMS = { 146 **Python.Generator.TRANSFORMS, # type: ignore 147 exp.Column: lambda self, e: f"exp.to_column('{self.sql(e, 'this')}')", 148 exp.Lambda: lambda self, e: f"lambda {self.expressions(e)}: {self.sql(e, 'this')}", 149 MacroFunc: _macro_func_sql, 150 MacroSQL: lambda self, e: _macro_sql(self.sql(e, "this"), e.args.get("into")), 151 MacroStrReplace: lambda self, e: _macro_str_replace(self.sql(e, "this")), 152 }
Mapping of an escaped sequence (\n) to its unescaped version (
).
Inherited Members
- sqlglot.dialects.dialect.Dialect
- Dialect
- INDEX_OFFSET
- WEEK_OFFSET
- UNNEST_COLUMN_ONLY
- ALIAS_POST_TABLESAMPLE
- TABLESAMPLE_SIZE_IS_PERCENT
- NORMALIZATION_STRATEGY
- IDENTIFIERS_CAN_START_WITH_DIGIT
- DPIPE_IS_STRING_CONCAT
- STRICT_STRING_CONCAT
- SUPPORTS_USER_DEFINED_TYPES
- COPY_PARAMS_ARE_CSV
- NORMALIZE_FUNCTIONS
- PRESERVE_ORIGINAL_NAMES
- LOG_BASE_FIRST
- NULL_ORDERING
- TYPED_DIVISION
- SAFE_DIVISION
- CONCAT_COALESCE
- HEX_LOWERCASE
- DATE_FORMAT
- DATEINT_FORMAT
- TIME_FORMAT
- TIME_MAPPING
- FORMAT_MAPPING
- PSEUDOCOLUMNS
- PREFER_CTE_ALIAS_COLUMN
- FORCE_EARLY_ALIAS_REF_EXPANSION
- EXPAND_ONLY_GROUP_ALIAS_REF
- ANNOTATE_ALL_SCOPES
- DISABLES_ALIAS_REF_EXPANSION
- SUPPORTS_ALIAS_REFS_IN_JOIN_CONDITIONS
- SUPPORTS_ORDER_BY_ALL
- PROJECTION_ALIASES_SHADOW_SOURCE_NAMES
- TABLES_REFERENCEABLE_AS_COLUMNS
- SUPPORTS_STRUCT_STAR_EXPANSION
- EXCLUDES_PSEUDOCOLUMNS_FROM_STAR
- QUERY_RESULTS_ARE_STRUCTS
- REQUIRES_PARENTHESIZED_STRUCT_ACCESS
- SUPPORTS_NULL_TYPE
- COALESCE_COMPARISON_NON_STANDARD
- HAS_DISTINCT_ARRAY_CONSTRUCTORS
- SUPPORTS_FIXED_SIZE_ARRAYS
- STRICT_JSON_PATH_SYNTAX
- ON_CONDITION_EMPTY_BEFORE_ERROR
- ARRAY_AGG_INCLUDES_NULLS
- ARRAY_FUNCS_PROPAGATES_NULLS
- PROMOTE_TO_INFERRED_DATETIME_TYPE
- SUPPORTS_VALUES_DEFAULT
- NUMBERS_CAN_BE_UNDERSCORE_SEPARATED
- HEX_STRING_IS_INTEGER_TYPE
- REGEXP_EXTRACT_DEFAULT_GROUP
- REGEXP_EXTRACT_POSITION_OVERFLOW_RETURNS_NULL
- SET_OP_DISTINCT_BY_DEFAULT
- CREATABLE_KIND_MAPPING
- ALTER_TABLE_SUPPORTS_CASCADE
- ALTER_TABLE_ADD_REQUIRED_FOR_EACH_COLUMN
- TRY_CAST_REQUIRES_STRING
- SAFE_TO_ELIMINATE_DOUBLE_NEGATION
- INITCAP_DEFAULT_DELIMITER_CHARS
- BYTE_STRING_IS_BYTES_TYPE
- UUID_IS_STRING_TYPE
- JSON_EXTRACT_SCALAR_SCALAR_ONLY
- DEFAULT_FUNCTIONS_COLUMN_NAMES
- DEFAULT_NULL_TYPE
- LEAST_GREATEST_IGNORES_NULLS
- PRIORITIZE_NON_LITERAL_TYPES
- ALIAS_POST_VERSION
- DATE_PART_MAPPING
- COERCES_TO
- EXPRESSION_METADATA
- SUPPORTED_SETTINGS
- get_or_raise
- format_time
- version
- settings
- normalize_identifier
- case_sensitive
- can_quote
- quote_identifier
- to_json_path
- parse
- parse_into
- generate
- transpile
- tokenize
- tokenizer
- jsonpath_tokenizer
- parser
- generator
- generate_values_aliases
- sqlglot.executor.python.Python
- Tokenizer
144 class Generator(Python.Generator): 145 TRANSFORMS = { 146 **Python.Generator.TRANSFORMS, # type: ignore 147 exp.Column: lambda self, e: f"exp.to_column('{self.sql(e, 'this')}')", 148 exp.Lambda: lambda self, e: f"lambda {self.expressions(e)}: {self.sql(e, 'this')}", 149 MacroFunc: _macro_func_sql, 150 MacroSQL: lambda self, e: _macro_sql(self.sql(e, "this"), e.args.get("into")), 151 MacroStrReplace: lambda self, e: _macro_str_replace(self.sql(e, "this")), 152 }
Generator converts a given syntax tree to the corresponding SQL string.
Arguments:
- pretty: Whether to format the produced SQL string. Default: False.
- identify: Determines when an identifier should be quoted. Possible values are: False (default): Never quote, except in cases where it's mandatory by the dialect. True: Always quote except for specials cases. 'safe': Only quote identifiers that are case insensitive.
- normalize: Whether to normalize identifiers to lowercase. Default: False.
- pad: The pad size in a formatted string. For example, this affects the indentation of a projection in a query, relative to its nesting level. Default: 2.
- indent: The indentation size in a formatted string. For example, this affects the
indentation of subqueries and filters under a
WHEREclause. Default: 2. - normalize_functions: How to normalize function names. Possible values are: "upper" or True (default): Convert names to uppercase. "lower": Convert names to lowercase. False: Disables function name normalization.
- unsupported_level: Determines the generator's behavior when it encounters unsupported expressions. Default ErrorLevel.WARN.
- max_unsupported: Maximum number of unsupported messages to include in a raised UnsupportedError. This is only relevant if unsupported_level is ErrorLevel.RAISE. Default: 3
- leading_comma: Whether the comma is leading or trailing in select expressions. This is only relevant when generating in pretty mode. Default: False
- max_text_width: The max number of characters in a segment before creating new lines in pretty mode. The default is on the smaller end because the length only represents a segment and not the true line length. Default: 80
- comments: Whether to preserve comments in the output SQL code. Default: True
Inherited Members
- sqlglot.generator.Generator
- Generator
- NULL_ORDERING_SUPPORTED
- WINDOW_FUNCS_WITH_NULL_ORDERING
- IGNORE_NULLS_IN_FUNC
- IGNORE_NULLS_BEFORE_ORDER
- LOCKING_READS_SUPPORTED
- EXCEPT_INTERSECT_SUPPORT_ALL_CLAUSE
- WRAP_DERIVED_VALUES
- CREATE_FUNCTION_RETURN_AS
- MATCHED_BY_SOURCE
- SINGLE_STRING_INTERVAL
- INTERVAL_ALLOWS_PLURAL_FORM
- LIMIT_FETCH
- LIMIT_ONLY_LITERALS
- RENAME_TABLE_WITH_DB
- GROUPINGS_SEP
- INDEX_ON
- INOUT_SEPARATOR
- JOIN_HINTS
- DIRECTED_JOINS
- TABLE_HINTS
- QUERY_HINTS
- QUERY_HINT_SEP
- IS_BOOL_ALLOWED
- DUPLICATE_KEY_UPDATE_WITH_SET
- LIMIT_IS_TOP
- RETURNING_END
- EXTRACT_ALLOWS_QUOTES
- TZ_TO_WITH_TIME_ZONE
- NVL2_SUPPORTED
- VALUES_AS_TABLE
- ALTER_TABLE_INCLUDE_COLUMN_KEYWORD
- UNNEST_WITH_ORDINALITY
- AGGREGATE_FILTER_SUPPORTED
- SEMI_ANTI_JOIN_WITH_SIDE
- COMPUTED_COLUMN_WITH_TYPE
- SUPPORTS_TABLE_COPY
- TABLESAMPLE_REQUIRES_PARENS
- TABLESAMPLE_SIZE_IS_ROWS
- TABLESAMPLE_KEYWORDS
- TABLESAMPLE_WITH_METHOD
- TABLESAMPLE_SEED_KEYWORD
- COLLATE_IS_FUNC
- DATA_TYPE_SPECIFIERS_ALLOWED
- ENSURE_BOOLS
- CTE_RECURSIVE_KEYWORD_REQUIRED
- SUPPORTS_SINGLE_ARG_CONCAT
- LAST_DAY_SUPPORTS_DATE_PART
- SUPPORTS_TABLE_ALIAS_COLUMNS
- UNPIVOT_ALIASES_ARE_IDENTIFIERS
- JSON_KEY_VALUE_PAIR_SEP
- INSERT_OVERWRITE
- SUPPORTS_SELECT_INTO
- SUPPORTS_UNLOGGED_TABLES
- SUPPORTS_CREATE_TABLE_LIKE
- LIKE_PROPERTY_INSIDE_SCHEMA
- MULTI_ARG_DISTINCT
- JSON_TYPE_REQUIRED_FOR_EXTRACTION
- JSON_PATH_BRACKETED_KEY_SUPPORTED
- JSON_PATH_SINGLE_QUOTE_ESCAPE
- SUPPORTED_JSON_PATH_PARTS
- CAN_IMPLEMENT_ARRAY_ANY
- SUPPORTS_TO_NUMBER
- SUPPORTS_WINDOW_EXCLUDE
- SET_OP_MODIFIERS
- COPY_PARAMS_ARE_WRAPPED
- COPY_PARAMS_EQ_REQUIRED
- COPY_HAS_INTO_KEYWORD
- UNICODE_SUBSTITUTE
- STAR_EXCEPT
- HEX_FUNC
- WITH_PROPERTIES_PREFIX
- QUOTE_JSON_PATH
- PAD_FILL_PATTERN_IS_REQUIRED
- SUPPORTS_EXPLODING_PROJECTIONS
- ARRAY_CONCAT_IS_VAR_LEN
- SUPPORTS_CONVERT_TIMEZONE
- SUPPORTS_MEDIAN
- SUPPORTS_UNIX_SECONDS
- ALTER_SET_WRAPPED
- NORMALIZE_EXTRACT_DATE_PARTS
- PARSE_JSON_NAME
- ARRAY_SIZE_NAME
- ALTER_SET_TYPE
- ARRAY_SIZE_DIM_REQUIRED
- SUPPORTS_BETWEEN_FLAGS
- SUPPORTS_LIKE_QUANTIFIERS
- MATCH_AGAINST_TABLE_PREFIX
- SET_ASSIGNMENT_REQUIRES_VARIABLE_KEYWORD
- DECLARE_DEFAULT_ASSIGNMENT
- UPDATE_STATEMENT_SUPPORTS_FROM
- STAR_EXCLUDE_REQUIRES_DERIVED_TABLE
- TYPE_MAPPING
- UNSUPPORTED_TYPES
- TIME_PART_SINGULARS
- TOKEN_MAPPING
- STRUCT_DELIMITER
- PARAMETER_TOKEN
- NAMED_PLACEHOLDER_TOKEN
- EXPRESSION_PRECEDES_PROPERTIES_CREATABLES
- PROPERTIES_LOCATION
- RESERVED_KEYWORDS
- WITH_SEPARATED_COMMENTS
- EXCLUDE_COMMENTS
- UNWRAPPED_INTERVAL_VALUES
- PARAMETERIZABLE_TEXT_TYPES
- EXPRESSIONS_WITHOUT_NESTED_CTES
- RESPECT_IGNORE_NULLS_UNSUPPORTED_EXPRESSIONS
- SAFE_JSON_PATH_KEY_RE
- SENTINEL_LINE_BREAK
- pretty
- identify
- normalize
- pad
- unsupported_level
- max_unsupported
- leading_comma
- max_text_width
- comments
- dialect
- normalize_functions
- unsupported_messages
- generate
- preprocess
- unsupported
- sep
- seg
- sanitize_comment
- maybe_comment
- wrap
- no_identify
- normalize_func
- indent
- sql
- uncache_sql
- cache_sql
- characterset_sql
- column_parts
- column_sql
- pseudocolumn_sql
- columnposition_sql
- columndef_sql
- columnconstraint_sql
- computedcolumnconstraint_sql
- autoincrementcolumnconstraint_sql
- compresscolumnconstraint_sql
- generatedasidentitycolumnconstraint_sql
- generatedasrowcolumnconstraint_sql
- periodforsystemtimeconstraint_sql
- notnullcolumnconstraint_sql
- primarykeycolumnconstraint_sql
- uniquecolumnconstraint_sql
- inoutcolumnconstraint_sql
- createable_sql
- create_sql
- sequenceproperties_sql
- triggerproperties_sql
- triggerreferencing_sql
- triggerevent_sql
- clone_sql
- describe_sql
- heredoc_sql
- prepend_ctes
- with_sql
- cte_sql
- tablealias_sql
- bitstring_sql
- hexstring_sql
- bytestring_sql
- unicodestring_sql
- rawstring_sql
- datatypeparam_sql
- datatype_sql
- directory_sql
- delete_sql
- drop_sql
- set_operation
- set_operations
- fetch_sql
- limitoptions_sql
- filter_sql
- hint_sql
- indexparameters_sql
- index_sql
- identifier_sql
- hex_sql
- lowerhex_sql
- inputoutputformat_sql
- national_sql
- partition_sql
- properties_sql
- root_properties
- properties
- with_properties
- locate_properties
- property_name
- property_sql
- likeproperty_sql
- fallbackproperty_sql
- journalproperty_sql
- freespaceproperty_sql
- checksumproperty_sql
- mergeblockratioproperty_sql
- datablocksizeproperty_sql
- blockcompressionproperty_sql
- isolatedloadingproperty_sql
- partitionboundspec_sql
- partitionedofproperty_sql
- lockingproperty_sql
- withdataproperty_sql
- withsystemversioningproperty_sql
- insert_sql
- introducer_sql
- kill_sql
- pseudotype_sql
- objectidentifier_sql
- onconflict_sql
- returning_sql
- rowformatdelimitedproperty_sql
- withtablehint_sql
- indextablehint_sql
- historicaldata_sql
- table_parts
- table_sql
- tablefromrows_sql
- tablesample_sql
- pivot_sql
- version_sql
- tuple_sql
- update_sql
- values_sql
- var_sql
- into_sql
- from_sql
- groupingsets_sql
- rollup_sql
- rollupindex_sql
- rollupproperty_sql
- cube_sql
- group_sql
- having_sql
- connect_sql
- prior_sql
- join_sql
- lambda_sql
- lateral_op
- lateral_sql
- limit_sql
- offset_sql
- setitem_sql
- set_sql
- queryband_sql
- pragma_sql
- lock_sql
- literal_sql
- escape_str
- loaddata_sql
- null_sql
- boolean_sql
- booland_sql
- boolor_sql
- order_sql
- withfill_sql
- cluster_sql
- distribute_sql
- sort_sql
- ordered_sql
- matchrecognizemeasure_sql
- matchrecognize_sql
- query_modifiers
- options_modifier
- for_modifiers
- queryoption_sql
- offset_limit_modifiers
- after_limit_modifiers
- select_sql
- schema_sql
- schema_columns_sql
- star_sql
- parameter_sql
- sessionparameter_sql
- placeholder_sql
- subquery_sql
- qualify_sql
- unnest_sql
- prewhere_sql
- where_sql
- window_sql
- partition_by_sql
- windowspec_sql
- withingroup_sql
- between_sql
- bracket_offset_expressions
- bracket_sql
- all_sql
- any_sql
- exists_sql
- case_sql
- constraint_sql
- nextvaluefor_sql
- extract_sql
- trim_sql
- convert_concat_args
- concat_sql
- concatws_sql
- check_sql
- foreignkey_sql
- primarykey_sql
- if_sql
- matchagainst_sql
- jsonkeyvalue_sql
- jsonpath_sql
- json_path_part
- formatjson_sql
- formatphrase_sql
- jsonarray_sql
- jsonarrayagg_sql
- jsoncolumndef_sql
- jsonschema_sql
- jsontable_sql
- openjsoncolumndef_sql
- openjson_sql
- in_sql
- in_unnest_op
- interval_sql
- return_sql
- reference_sql
- anonymous_sql
- paren_sql
- neg_sql
- not_sql
- alias_sql
- pivotalias_sql
- aliases_sql
- atindex_sql
- attimezone_sql
- fromtimezone_sql
- add_sql
- and_sql
- or_sql
- xor_sql
- connector_sql
- bitwiseand_sql
- bitwiseleftshift_sql
- bitwisenot_sql
- bitwiseor_sql
- bitwiserightshift_sql
- bitwisexor_sql
- cast_sql
- strtotime_sql
- currentdate_sql
- collate_sql
- command_sql
- comment_sql
- mergetreettlaction_sql
- mergetreettl_sql
- transaction_sql
- commit_sql
- rollback_sql
- altercolumn_sql
- alterindex_sql
- alterdiststyle_sql
- altersortkey_sql
- alterrename_sql
- renamecolumn_sql
- alterset_sql
- alter_sql
- altersession_sql
- add_column_sql
- droppartition_sql
- addconstraint_sql
- addpartition_sql
- distinct_sql
- ignorenulls_sql
- respectnulls_sql
- havingmax_sql
- intdiv_sql
- dpipe_sql
- div_sql
- safedivide_sql
- overlaps_sql
- distance_sql
- dot_sql
- eq_sql
- propertyeq_sql
- escape_sql
- glob_sql
- gt_sql
- gte_sql
- is_sql
- like_sql
- ilike_sql
- match_sql
- similarto_sql
- lt_sql
- lte_sql
- mod_sql
- mul_sql
- neq_sql
- nullsafeeq_sql
- nullsafeneq_sql
- sub_sql
- trycast_sql
- jsoncast_sql
- try_sql
- log_sql
- use_sql
- binary
- ceil_floor
- function_fallback_sql
- func
- format_args
- too_wide
- format_time
- expressions
- op_expressions
- naked_property
- tag_sql
- token_sql
- userdefinedfunction_sql
- joinhint_sql
- kwarg_sql
- when_sql
- whens_sql
- merge_sql
- tochar_sql
- tonumber_sql
- dictproperty_sql
- dictrange_sql
- dictsubproperty_sql
- duplicatekeyproperty_sql
- uniquekeyproperty_sql
- distributedbyproperty_sql
- oncluster_sql
- clusteredbyproperty_sql
- anyvalue_sql
- querytransform_sql
- indexconstraintoption_sql
- checkcolumnconstraint_sql
- indexcolumnconstraint_sql
- nvl2_sql
- comprehension_sql
- columnprefix_sql
- opclass_sql
- predict_sql
- generateembedding_sql
- mltranslate_sql
- mlforecast_sql
- featuresattime_sql
- vectorsearch_sql
- forin_sql
- refresh_sql
- toarray_sql
- tsordstotime_sql
- tsordstotimestamp_sql
- tsordstodatetime_sql
- tsordstodate_sql
- unixdate_sql
- lastday_sql
- dateadd_sql
- arrayany_sql
- struct_sql
- partitionrange_sql
- truncatetable_sql
- convert_sql
- copyparameter_sql
- credentials_sql
- copy_sql
- semicolon_sql
- datadeletionproperty_sql
- maskingpolicycolumnconstraint_sql
- gapfill_sql
- scope_resolution
- scoperesolution_sql
- parsejson_sql
- rand_sql
- changes_sql
- pad_sql
- summarize_sql
- explodinggenerateseries_sql
- converttimezone_sql
- json_sql
- jsonvalue_sql
- skipjsoncolumn_sql
- conditionalinsert_sql
- multitableinserts_sql
- oncondition_sql
- jsonextractquote_sql
- jsonexists_sql
- arrayagg_sql
- slice_sql
- apply_sql
- grant_sql
- revoke_sql
- grantprivilege_sql
- grantprincipal_sql
- columns_sql
- overlay_sql
- todouble_sql
- string_sql
- median_sql
- overflowtruncatebehavior_sql
- unixseconds_sql
- arraysize_sql
- attach_sql
- detach_sql
- attachoption_sql
- watermarkcolumnconstraint_sql
- encodeproperty_sql
- includeproperty_sql
- xmlelement_sql
- xmlkeyvalueoption_sql
- partitionbyrangeproperty_sql
- partitionbyrangepropertydynamic_sql
- unpivotcolumns_sql
- analyzesample_sql
- analyzestatistics_sql
- analyzehistogram_sql
- analyzedelete_sql
- analyzelistchainedrows_sql
- analyzevalidate_sql
- analyze_sql
- xmltable_sql
- xmlnamespace_sql
- export_sql
- declare_sql
- declareitem_sql
- recursivewithsearch_sql
- parameterizedagg_sql
- anonymousaggfunc_sql
- combinedaggfunc_sql
- combinedparameterizedagg_sql
- show_sql
- install_sql
- get_put_sql
- translatecharacters_sql
- decodecase_sql
- semanticview_sql
- getextract_sql
- datefromunixdate_sql
- space_sql
- buildproperty_sql
- refreshtriggerproperty_sql
- modelattribute_sql
- directorystage_sql
- uuid_sql
- initcap_sql
- localtime_sql
- localtimestamp_sql
- weekstart_sql
- chr_sql
- block_sql
- storedprocedure_sql
- ifblock_sql
- whileblock_sql
- execute_sql
- executesql_sql
- altermodifysqlsecurity_sql
155class MacroEvaluator: 156 """The class responsible for evaluating SQLMesh Macros/SQL. 157 158 SQLMesh supports special preprocessed SQL prefixed with `@`. Although it provides similar power to 159 traditional methods like string templating, there is semantic understanding of SQL which prevents 160 common errors like leading/trailing commas, syntax errors, etc. 161 162 SQLMesh SQL allows for macro variables and macro functions. Macro variables take the form of @variable. These are used for variable substitution. 163 164 SELECT * FROM foo WHERE ds BETWEEN @start_date AND @end_date 165 166 Macro variables can be defined with a special macro function. 167 168 @DEF(start_date, '2021-01-01') 169 170 Args: 171 dialect: Dialect of the SQL to evaluate. 172 python_env: Serialized Python environment. 173 """ 174 175 def __init__( 176 self, 177 dialect: DialectType = "", 178 python_env: t.Optional[t.Dict[str, Executable]] = None, 179 schema: t.Optional[MappingSchema] = None, 180 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 181 resolve_table: t.Optional[t.Callable[[str | exp.Table], str]] = None, 182 resolve_tables: t.Optional[t.Callable[[exp.Expr], exp.Expr]] = None, 183 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 184 default_catalog: t.Optional[str] = None, 185 path: t.Optional[Path] = None, 186 environment_naming_info: t.Optional[EnvironmentNamingInfo] = None, 187 model_fqn: t.Optional[str] = None, 188 ): 189 self.dialect = dialect 190 self.generator = MacroDialect().generator() 191 self.locals: t.Dict[str, t.Any] = { 192 "runtime_stage": runtime_stage.value, 193 "default_catalog": default_catalog, 194 } 195 self.env = { 196 **ENV, 197 "self": self, 198 "SQL": SQL, 199 "MacroEvaluator": MacroEvaluator, 200 } 201 self.python_env = python_env or {} 202 self.macros = {normalize_macro_name(k): v.func for k, v in macro.get_registry().items()} 203 self.columns_to_types_called = False 204 self.default_catalog = default_catalog 205 206 self._schema = schema 207 self._resolve_table = resolve_table 208 self._resolve_tables = resolve_tables 209 self._snapshots = snapshots if snapshots is not None else {} 210 self._path = path 211 self._environment_naming_info = environment_naming_info 212 self._model_fqn = model_fqn 213 214 prepare_env(self.python_env, self.env) 215 for k, v in self.python_env.items(): 216 if v.is_definition: 217 self.macros[normalize_macro_name(k)] = self.env[v.name or k] 218 elif v.is_import and getattr(self.env.get(k), c.SQLMESH_MACRO, None): 219 self.macros[normalize_macro_name(k)] = self.env[k] 220 elif v.is_value: 221 value = self.env[k] 222 if k in ( 223 c.SQLMESH_VARS, 224 c.SQLMESH_VARS_METADATA, 225 c.SQLMESH_BLUEPRINT_VARS, 226 c.SQLMESH_BLUEPRINT_VARS_METADATA, 227 ): 228 value = { 229 var_name: ( 230 self.parse_one(var_value.sql) 231 if isinstance(var_value, SqlValue) 232 else var_value 233 ) 234 for var_name, var_value in value.items() 235 } 236 237 self.locals[k] = value 238 239 def send( 240 self, name: str, *args: t.Any, **kwargs: t.Any 241 ) -> t.Union[None, exp.Expr, t.List[exp.Expr]]: 242 func = self.macros.get(normalize_macro_name(name)) 243 244 if not callable(func): 245 raise MacroEvalError(f"Macro '{name}' does not exist.") 246 247 try: 248 return call_macro( 249 func, self.dialect, self._path, provided_args=(self, *args), provided_kwargs=kwargs 250 ) # type: ignore 251 except Exception as e: 252 raise MacroEvalError( 253 f"An error occurred during evaluation of '{name}'\n\n" 254 + format_evaluated_code_exception(e, self.python_env) 255 ) 256 257 def transform(self, expression: exp.Expr) -> exp.Expr | t.List[exp.Expr] | None: 258 changed = False 259 260 def evaluate_macros( 261 node: exp.Expr, 262 ) -> exp.Expr | t.List[exp.Expr] | None: 263 nonlocal changed 264 265 if isinstance(node, MacroVar): 266 changed = True 267 variables = self.variables 268 269 # This makes all variables case-insensitive, e.g. @X is the same as @x. We do this 270 # for consistency, since `variables` and `blueprint_variables` are normalized. 271 var_name = node.name.lower() 272 273 if var_name not in self.locals and var_name not in variables: 274 if not isinstance(node.parent, StagedFilePath): 275 raise SQLMeshError(f"Macro variable '{node.name}' is undefined.") 276 277 return node 278 279 # Precedence order is locals (e.g. @DEF) > blueprint variables > config variables 280 value = self.locals.get(var_name, variables.get(var_name)) 281 if isinstance(value, list): 282 return exp.convert( 283 tuple(self.transform(v) if isinstance(v, exp.Expr) else v for v in value) 284 ) 285 286 return exp.convert(self.transform(value) if isinstance(value, exp.Expr) else value) 287 if isinstance(node, exp.Identifier) and "@" in node.this: 288 text = self.template(node.this, {}) 289 if node.this != text: 290 changed = True 291 return exp.to_identifier(text, quoted=node.quoted or None) 292 if isinstance(node, MacroFunc): 293 changed = True 294 return self.evaluate(node) 295 return node 296 297 transformed = exp.replace_tree( 298 expression.copy(), evaluate_macros, prune=lambda n: isinstance(n, exp.Lambda) 299 ) 300 301 if changed: 302 # the transformations could have corrupted the ast, turning this into sql and reparsing ensures 303 # that the ast is correct 304 if isinstance(transformed, list): 305 return [ 306 self.parse_one(node.sql(dialect=self.dialect, copy=False)) 307 for node in transformed 308 ] 309 if isinstance(transformed, exp.Expr): 310 return self.parse_one(transformed.sql(dialect=self.dialect, copy=False)) 311 312 return transformed 313 314 def template(self, text: t.Any, local_variables: t.Dict[str, t.Any]) -> str: 315 """Substitute @vars with locals. 316 317 Args: 318 text: The string to do substitition on. 319 local_variables: Local variables in the context so that lambdas can be used. 320 321 Returns: 322 The rendered string. 323 """ 324 # We try to convert all variables into sqlglot expressions because they're going to be converted 325 # into strings; in sql we don't convert strings because that would result in adding quotes 326 base_mapping = { 327 k.lower(): convert_sql(v, self.dialect) 328 for k, v in chain(self.variables.items(), self.locals.items(), local_variables.items()) 329 if k.lower() 330 not in ( 331 "engine_adapter", 332 "snapshot", 333 ) 334 } 335 return MacroStrTemplate(str(text)).safe_substitute(CaseInsensitiveMapping(base_mapping)) 336 337 def evaluate(self, node: MacroFunc) -> exp.Expr | t.List[exp.Expr] | None: 338 if isinstance(node, MacroDef): 339 if isinstance(node.expression, exp.Lambda): 340 _, fn = _norm_var_arg_lambda(self, node.expression) 341 self.macros[normalize_macro_name(node.name)] = lambda _, *args: fn( 342 args[0] if len(args) == 1 else exp.Tuple(expressions=list(args)) 343 ) 344 else: 345 # Make variables defined through `@DEF` case-insensitive 346 self.locals[node.name.lower()] = self.transform(node.expression) 347 348 return node 349 350 if isinstance(node, (MacroSQL, MacroStrReplace)): 351 result: t.Optional[exp.Expr | t.List[exp.Expr]] = exp.convert( 352 self.eval_expression(node) 353 ) 354 else: 355 func = t.cast(exp.Anonymous, node.this) 356 357 args = [] 358 kwargs = {} 359 for e in func.expressions: 360 if isinstance(e, exp.PropertyEQ): 361 kwargs[e.this.name] = e.expression 362 else: 363 if kwargs: 364 raise MacroEvalError( 365 "Positional argument cannot follow keyword argument.\n " 366 f"{func.sql(dialect=self.dialect)} at '{self._path}'" 367 ) 368 369 args.append(e) 370 371 result = self.send(func.name, *args, **kwargs) 372 373 if result is None: 374 return None 375 376 if isinstance(result, (tuple, list)): 377 result = [self.parse_one(item) for item in result if item is not None] 378 379 if ( 380 len(result) == 1 381 and isinstance(result[0], (exp.Array, exp.Tuple)) 382 and node.find_ancestor(MacroFunc) 383 ): 384 """ 385 if: 386 - the output of evaluating this node is being passed as an argument to another macro function 387 - and that output is something that _norm_var_arg_lambda() will unpack into varargs 388 > (a list containing a single item of type exp.Tuple/exp.Array) 389 then we will get inconsistent behaviour depending on if this node emits a list with a single item vs multiple items. 390 391 In the first case, emitting a list containing a single array item will cause that array to get unpacked and its *members* passed to the calling macro 392 In the second case, emitting a list containing multiple array items will cause each item to get passed as-is to the calling macro 393 394 To prevent this inconsistency, we wrap this node output in an exp.Array so that _norm_var_arg_lambda() can "unpack" that into the 395 actual argument we want to pass to the parent macro function 396 397 Note we only do this for evaluation results that get passed as an argument to another macro, because when the final 398 result is given to something like SELECT, we still want that to be unpacked into a list of items like: 399 - SELECT ARRAY(1), ARRAY(2) 400 rather than a single item like: 401 - SELECT ARRAY(ARRAY(1), ARRAY(2)) 402 """ 403 result = [exp.Array(expressions=result)] 404 else: 405 result = self.parse_one(result) 406 407 return result 408 409 def eval_expression(self, node: t.Any) -> t.Any: 410 """Converts a SQLGlot expression into executable Python code and evals it. 411 412 If the node is not an expression, it will simply be returned. 413 414 Args: 415 node: expression 416 Returns: 417 The return value of the evaled Python Code. 418 """ 419 if not isinstance(node, exp.Expr): 420 return node 421 code = node.sql() 422 try: 423 code = self.generator.generate(node) 424 return eval(code, self.env, self.locals) 425 except Exception as e: 426 raise MacroEvalError( 427 f"Error trying to eval macro.\n\nGenerated code: {code}\n\nOriginal sql: {node}\n\n" 428 + format_evaluated_code_exception(e, self.python_env) 429 ) 430 431 def parse_one( 432 self, sql: str | exp.Expr, into: t.Optional[exp.IntoType] = None, **opts: t.Any 433 ) -> exp.Expr: 434 """Parses the given SQL string and returns a syntax tree for the first 435 parsed SQL statement. 436 437 Args: 438 sql: the SQL code or expression to parse. 439 into: the Expression to parse into 440 **opts: other options 441 442 Returns: 443 Expression: the syntax tree for the first parsed statement 444 """ 445 return sqlglot.maybe_parse(sql, dialect=self.dialect, into=into, **opts) 446 447 def columns_to_types(self, model_name: TableName | exp.Column) -> t.Dict[str, exp.DataType]: 448 """Returns the columns-to-types mapping corresponding to the specified model.""" 449 450 # We only return this dummy schema at load time, because if we don't actually know the 451 # target model's schema at creation/evaluation time, returning a dummy schema could lead 452 # to unintelligible errors when the query is executed 453 if (self._schema is None or self._schema.empty) and self.runtime_stage == "loading": 454 self.columns_to_types_called = True 455 return {"__schema_unavailable_at_load__": exp.DataType.build("unknown")} 456 457 normalized_model_name = normalize_model_name( 458 model_name, 459 default_catalog=self.default_catalog, 460 dialect=self.dialect, 461 ) 462 model_name = exp.to_table(normalized_model_name) 463 464 columns_to_types = ( 465 self._schema.find(model_name, ensure_data_types=True) if self._schema else None 466 ) 467 if columns_to_types is None: 468 snapshot = self.get_snapshot(model_name) 469 if snapshot and snapshot.node.is_model: 470 columns_to_types = snapshot.node.columns_to_types # type: ignore 471 472 if columns_to_types is None: 473 raise SQLMeshError(f"Schema for model '{model_name}' can't be statically determined.") 474 475 return columns_to_types 476 477 def get_snapshot(self, model_name: TableName | exp.Column) -> t.Optional[Snapshot]: 478 """Returns the snapshot that corresponds to the given model name.""" 479 return self._snapshots.get( 480 normalize_model_name( 481 model_name, 482 default_catalog=self.default_catalog, 483 dialect=self.dialect, 484 ) 485 ) 486 487 def resolve_table(self, table: str | exp.Table) -> str: 488 """Gets the physical table name for a given model.""" 489 if not self._resolve_table: 490 raise SQLMeshError( 491 "Macro evaluator not properly initialized with resolve_table lambda." 492 ) 493 return self._resolve_table(table) 494 495 def resolve_tables(self, query: exp.Expr) -> exp.Expr: 496 """Resolves queries with references to SQLMesh model names to their physical tables.""" 497 if not self._resolve_tables: 498 raise SQLMeshError( 499 "Macro evaluator not properly initialized with resolve_tables lambda." 500 ) 501 return self._resolve_tables(query) 502 503 @property 504 def runtime_stage(self) -> RuntimeStage: 505 """Returns the current runtime stage of the macro evaluation.""" 506 return self.locals["runtime_stage"] 507 508 @property 509 def this_model(self) -> str: 510 """Returns the resolved name of the surrounding model.""" 511 this_model = self.locals.get("this_model") 512 if not this_model: 513 raise SQLMeshError("Model name is not available in the macro evaluator.") 514 return this_model.sql(dialect=self.dialect, identify=True, comments=False) 515 516 @property 517 def this_model_fqn(self) -> str: 518 if self._model_fqn is None: 519 raise SQLMeshError("Model name is not available in the macro evaluator.") 520 return self._model_fqn 521 522 @property 523 def engine_adapter(self) -> EngineAdapter: 524 engine_adapter = self.locals.get("engine_adapter") 525 if not engine_adapter: 526 raise SQLMeshError( 527 "The engine adapter is not available while models are loading." 528 " You can gate these calls by checking in Python: evaluator.runtime_stage != 'loading' or SQL: @runtime_stage <> 'loading'." 529 ) 530 return self.locals["engine_adapter"] 531 532 @property 533 def gateway(self) -> t.Optional[str]: 534 """Returns the gateway name.""" 535 return self.var(c.GATEWAY) 536 537 @property 538 def snapshots(self) -> t.Dict[str, Snapshot]: 539 """Returns the snapshots if available.""" 540 return self._snapshots 541 542 @property 543 def this_env(self) -> str: 544 """Returns the name of the current environment in before after all.""" 545 if "this_env" not in self.locals: 546 raise SQLMeshError("Environment name is only available in before_all and after_all") 547 return self.locals["this_env"] 548 549 @property 550 def schemas(self) -> t.List[str]: 551 """Returns the schemas of the current environment in before after all macros.""" 552 if "schemas" not in self.locals: 553 raise SQLMeshError("Schemas are only available in before_all and after_all") 554 return self.locals["schemas"] 555 556 @property 557 def views(self) -> t.List[str]: 558 """Returns the views of the current environment in before after all macros.""" 559 if "views" not in self.locals: 560 raise SQLMeshError("Views are only available in before_all and after_all") 561 return self.locals["views"] 562 563 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 564 """Returns the value of the specified variable, or the default value if it doesn't exist.""" 565 return { 566 **(self.locals.get(c.SQLMESH_VARS) or {}), 567 **(self.locals.get(c.SQLMESH_VARS_METADATA) or {}), 568 }.get(var_name.lower(), default) 569 570 def blueprint_var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 571 """Returns the value of the specified blueprint variable, or the default value if it doesn't exist.""" 572 return { 573 **(self.locals.get(c.SQLMESH_BLUEPRINT_VARS) or {}), 574 **(self.locals.get(c.SQLMESH_BLUEPRINT_VARS_METADATA) or {}), 575 }.get(var_name.lower(), default) 576 577 @property 578 def variables(self) -> t.Dict[str, t.Any]: 579 return { 580 **self.locals.get(c.SQLMESH_VARS, {}), 581 **self.locals.get(c.SQLMESH_VARS_METADATA, {}), 582 **self.locals.get(c.SQLMESH_BLUEPRINT_VARS, {}), 583 **self.locals.get(c.SQLMESH_BLUEPRINT_VARS_METADATA, {}), 584 } 585 586 def _coerce(self, expr: exp.Expr, typ: t.Any, strict: bool = False) -> t.Any: 587 """Coerces the given expression to the specified type on a best-effort basis.""" 588 return _coerce(expr, typ, self.dialect, self._path, strict)
The class responsible for evaluating SQLMesh Macros/SQL.
SQLMesh supports special preprocessed SQL prefixed with @. Although it provides similar power to
traditional methods like string templating, there is semantic understanding of SQL which prevents
common errors like leading/trailing commas, syntax errors, etc.
SQLMesh SQL allows for macro variables and macro functions. Macro variables take the form of @variable. These are used for variable substitution.
SELECT * FROM foo WHERE ds BETWEEN @start_date AND @end_date
Macro variables can be defined with a special macro function.
@DEF(start_date, '2021-01-01')
Arguments:
- dialect: Dialect of the SQL to evaluate.
- python_env: Serialized Python environment.
175 def __init__( 176 self, 177 dialect: DialectType = "", 178 python_env: t.Optional[t.Dict[str, Executable]] = None, 179 schema: t.Optional[MappingSchema] = None, 180 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 181 resolve_table: t.Optional[t.Callable[[str | exp.Table], str]] = None, 182 resolve_tables: t.Optional[t.Callable[[exp.Expr], exp.Expr]] = None, 183 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 184 default_catalog: t.Optional[str] = None, 185 path: t.Optional[Path] = None, 186 environment_naming_info: t.Optional[EnvironmentNamingInfo] = None, 187 model_fqn: t.Optional[str] = None, 188 ): 189 self.dialect = dialect 190 self.generator = MacroDialect().generator() 191 self.locals: t.Dict[str, t.Any] = { 192 "runtime_stage": runtime_stage.value, 193 "default_catalog": default_catalog, 194 } 195 self.env = { 196 **ENV, 197 "self": self, 198 "SQL": SQL, 199 "MacroEvaluator": MacroEvaluator, 200 } 201 self.python_env = python_env or {} 202 self.macros = {normalize_macro_name(k): v.func for k, v in macro.get_registry().items()} 203 self.columns_to_types_called = False 204 self.default_catalog = default_catalog 205 206 self._schema = schema 207 self._resolve_table = resolve_table 208 self._resolve_tables = resolve_tables 209 self._snapshots = snapshots if snapshots is not None else {} 210 self._path = path 211 self._environment_naming_info = environment_naming_info 212 self._model_fqn = model_fqn 213 214 prepare_env(self.python_env, self.env) 215 for k, v in self.python_env.items(): 216 if v.is_definition: 217 self.macros[normalize_macro_name(k)] = self.env[v.name or k] 218 elif v.is_import and getattr(self.env.get(k), c.SQLMESH_MACRO, None): 219 self.macros[normalize_macro_name(k)] = self.env[k] 220 elif v.is_value: 221 value = self.env[k] 222 if k in ( 223 c.SQLMESH_VARS, 224 c.SQLMESH_VARS_METADATA, 225 c.SQLMESH_BLUEPRINT_VARS, 226 c.SQLMESH_BLUEPRINT_VARS_METADATA, 227 ): 228 value = { 229 var_name: ( 230 self.parse_one(var_value.sql) 231 if isinstance(var_value, SqlValue) 232 else var_value 233 ) 234 for var_name, var_value in value.items() 235 } 236 237 self.locals[k] = value
239 def send( 240 self, name: str, *args: t.Any, **kwargs: t.Any 241 ) -> t.Union[None, exp.Expr, t.List[exp.Expr]]: 242 func = self.macros.get(normalize_macro_name(name)) 243 244 if not callable(func): 245 raise MacroEvalError(f"Macro '{name}' does not exist.") 246 247 try: 248 return call_macro( 249 func, self.dialect, self._path, provided_args=(self, *args), provided_kwargs=kwargs 250 ) # type: ignore 251 except Exception as e: 252 raise MacroEvalError( 253 f"An error occurred during evaluation of '{name}'\n\n" 254 + format_evaluated_code_exception(e, self.python_env) 255 )
257 def transform(self, expression: exp.Expr) -> exp.Expr | t.List[exp.Expr] | None: 258 changed = False 259 260 def evaluate_macros( 261 node: exp.Expr, 262 ) -> exp.Expr | t.List[exp.Expr] | None: 263 nonlocal changed 264 265 if isinstance(node, MacroVar): 266 changed = True 267 variables = self.variables 268 269 # This makes all variables case-insensitive, e.g. @X is the same as @x. We do this 270 # for consistency, since `variables` and `blueprint_variables` are normalized. 271 var_name = node.name.lower() 272 273 if var_name not in self.locals and var_name not in variables: 274 if not isinstance(node.parent, StagedFilePath): 275 raise SQLMeshError(f"Macro variable '{node.name}' is undefined.") 276 277 return node 278 279 # Precedence order is locals (e.g. @DEF) > blueprint variables > config variables 280 value = self.locals.get(var_name, variables.get(var_name)) 281 if isinstance(value, list): 282 return exp.convert( 283 tuple(self.transform(v) if isinstance(v, exp.Expr) else v for v in value) 284 ) 285 286 return exp.convert(self.transform(value) if isinstance(value, exp.Expr) else value) 287 if isinstance(node, exp.Identifier) and "@" in node.this: 288 text = self.template(node.this, {}) 289 if node.this != text: 290 changed = True 291 return exp.to_identifier(text, quoted=node.quoted or None) 292 if isinstance(node, MacroFunc): 293 changed = True 294 return self.evaluate(node) 295 return node 296 297 transformed = exp.replace_tree( 298 expression.copy(), evaluate_macros, prune=lambda n: isinstance(n, exp.Lambda) 299 ) 300 301 if changed: 302 # the transformations could have corrupted the ast, turning this into sql and reparsing ensures 303 # that the ast is correct 304 if isinstance(transformed, list): 305 return [ 306 self.parse_one(node.sql(dialect=self.dialect, copy=False)) 307 for node in transformed 308 ] 309 if isinstance(transformed, exp.Expr): 310 return self.parse_one(transformed.sql(dialect=self.dialect, copy=False)) 311 312 return transformed
314 def template(self, text: t.Any, local_variables: t.Dict[str, t.Any]) -> str: 315 """Substitute @vars with locals. 316 317 Args: 318 text: The string to do substitition on. 319 local_variables: Local variables in the context so that lambdas can be used. 320 321 Returns: 322 The rendered string. 323 """ 324 # We try to convert all variables into sqlglot expressions because they're going to be converted 325 # into strings; in sql we don't convert strings because that would result in adding quotes 326 base_mapping = { 327 k.lower(): convert_sql(v, self.dialect) 328 for k, v in chain(self.variables.items(), self.locals.items(), local_variables.items()) 329 if k.lower() 330 not in ( 331 "engine_adapter", 332 "snapshot", 333 ) 334 } 335 return MacroStrTemplate(str(text)).safe_substitute(CaseInsensitiveMapping(base_mapping))
Substitute @vars with locals.
Arguments:
- text: The string to do substitition on.
- local_variables: Local variables in the context so that lambdas can be used.
Returns:
The rendered string.
337 def evaluate(self, node: MacroFunc) -> exp.Expr | t.List[exp.Expr] | None: 338 if isinstance(node, MacroDef): 339 if isinstance(node.expression, exp.Lambda): 340 _, fn = _norm_var_arg_lambda(self, node.expression) 341 self.macros[normalize_macro_name(node.name)] = lambda _, *args: fn( 342 args[0] if len(args) == 1 else exp.Tuple(expressions=list(args)) 343 ) 344 else: 345 # Make variables defined through `@DEF` case-insensitive 346 self.locals[node.name.lower()] = self.transform(node.expression) 347 348 return node 349 350 if isinstance(node, (MacroSQL, MacroStrReplace)): 351 result: t.Optional[exp.Expr | t.List[exp.Expr]] = exp.convert( 352 self.eval_expression(node) 353 ) 354 else: 355 func = t.cast(exp.Anonymous, node.this) 356 357 args = [] 358 kwargs = {} 359 for e in func.expressions: 360 if isinstance(e, exp.PropertyEQ): 361 kwargs[e.this.name] = e.expression 362 else: 363 if kwargs: 364 raise MacroEvalError( 365 "Positional argument cannot follow keyword argument.\n " 366 f"{func.sql(dialect=self.dialect)} at '{self._path}'" 367 ) 368 369 args.append(e) 370 371 result = self.send(func.name, *args, **kwargs) 372 373 if result is None: 374 return None 375 376 if isinstance(result, (tuple, list)): 377 result = [self.parse_one(item) for item in result if item is not None] 378 379 if ( 380 len(result) == 1 381 and isinstance(result[0], (exp.Array, exp.Tuple)) 382 and node.find_ancestor(MacroFunc) 383 ): 384 """ 385 if: 386 - the output of evaluating this node is being passed as an argument to another macro function 387 - and that output is something that _norm_var_arg_lambda() will unpack into varargs 388 > (a list containing a single item of type exp.Tuple/exp.Array) 389 then we will get inconsistent behaviour depending on if this node emits a list with a single item vs multiple items. 390 391 In the first case, emitting a list containing a single array item will cause that array to get unpacked and its *members* passed to the calling macro 392 In the second case, emitting a list containing multiple array items will cause each item to get passed as-is to the calling macro 393 394 To prevent this inconsistency, we wrap this node output in an exp.Array so that _norm_var_arg_lambda() can "unpack" that into the 395 actual argument we want to pass to the parent macro function 396 397 Note we only do this for evaluation results that get passed as an argument to another macro, because when the final 398 result is given to something like SELECT, we still want that to be unpacked into a list of items like: 399 - SELECT ARRAY(1), ARRAY(2) 400 rather than a single item like: 401 - SELECT ARRAY(ARRAY(1), ARRAY(2)) 402 """ 403 result = [exp.Array(expressions=result)] 404 else: 405 result = self.parse_one(result) 406 407 return result
409 def eval_expression(self, node: t.Any) -> t.Any: 410 """Converts a SQLGlot expression into executable Python code and evals it. 411 412 If the node is not an expression, it will simply be returned. 413 414 Args: 415 node: expression 416 Returns: 417 The return value of the evaled Python Code. 418 """ 419 if not isinstance(node, exp.Expr): 420 return node 421 code = node.sql() 422 try: 423 code = self.generator.generate(node) 424 return eval(code, self.env, self.locals) 425 except Exception as e: 426 raise MacroEvalError( 427 f"Error trying to eval macro.\n\nGenerated code: {code}\n\nOriginal sql: {node}\n\n" 428 + format_evaluated_code_exception(e, self.python_env) 429 )
Converts a SQLGlot expression into executable Python code and evals it.
If the node is not an expression, it will simply be returned.
Arguments:
- node: expression
Returns:
The return value of the evaled Python Code.
431 def parse_one( 432 self, sql: str | exp.Expr, into: t.Optional[exp.IntoType] = None, **opts: t.Any 433 ) -> exp.Expr: 434 """Parses the given SQL string and returns a syntax tree for the first 435 parsed SQL statement. 436 437 Args: 438 sql: the SQL code or expression to parse. 439 into: the Expression to parse into 440 **opts: other options 441 442 Returns: 443 Expression: the syntax tree for the first parsed statement 444 """ 445 return sqlglot.maybe_parse(sql, dialect=self.dialect, into=into, **opts)
Parses the given SQL string and returns a syntax tree for the first parsed SQL statement.
Arguments:
- sql: the SQL code or expression to parse.
- into: the Expression to parse into
- **opts: other options
Returns:
Expression: the syntax tree for the first parsed statement
447 def columns_to_types(self, model_name: TableName | exp.Column) -> t.Dict[str, exp.DataType]: 448 """Returns the columns-to-types mapping corresponding to the specified model.""" 449 450 # We only return this dummy schema at load time, because if we don't actually know the 451 # target model's schema at creation/evaluation time, returning a dummy schema could lead 452 # to unintelligible errors when the query is executed 453 if (self._schema is None or self._schema.empty) and self.runtime_stage == "loading": 454 self.columns_to_types_called = True 455 return {"__schema_unavailable_at_load__": exp.DataType.build("unknown")} 456 457 normalized_model_name = normalize_model_name( 458 model_name, 459 default_catalog=self.default_catalog, 460 dialect=self.dialect, 461 ) 462 model_name = exp.to_table(normalized_model_name) 463 464 columns_to_types = ( 465 self._schema.find(model_name, ensure_data_types=True) if self._schema else None 466 ) 467 if columns_to_types is None: 468 snapshot = self.get_snapshot(model_name) 469 if snapshot and snapshot.node.is_model: 470 columns_to_types = snapshot.node.columns_to_types # type: ignore 471 472 if columns_to_types is None: 473 raise SQLMeshError(f"Schema for model '{model_name}' can't be statically determined.") 474 475 return columns_to_types
Returns the columns-to-types mapping corresponding to the specified model.
477 def get_snapshot(self, model_name: TableName | exp.Column) -> t.Optional[Snapshot]: 478 """Returns the snapshot that corresponds to the given model name.""" 479 return self._snapshots.get( 480 normalize_model_name( 481 model_name, 482 default_catalog=self.default_catalog, 483 dialect=self.dialect, 484 ) 485 )
Returns the snapshot that corresponds to the given model name.
487 def resolve_table(self, table: str | exp.Table) -> str: 488 """Gets the physical table name for a given model.""" 489 if not self._resolve_table: 490 raise SQLMeshError( 491 "Macro evaluator not properly initialized with resolve_table lambda." 492 ) 493 return self._resolve_table(table)
Gets the physical table name for a given model.
495 def resolve_tables(self, query: exp.Expr) -> exp.Expr: 496 """Resolves queries with references to SQLMesh model names to their physical tables.""" 497 if not self._resolve_tables: 498 raise SQLMeshError( 499 "Macro evaluator not properly initialized with resolve_tables lambda." 500 ) 501 return self._resolve_tables(query)
Resolves queries with references to SQLMesh model names to their physical tables.
503 @property 504 def runtime_stage(self) -> RuntimeStage: 505 """Returns the current runtime stage of the macro evaluation.""" 506 return self.locals["runtime_stage"]
Returns the current runtime stage of the macro evaluation.
508 @property 509 def this_model(self) -> str: 510 """Returns the resolved name of the surrounding model.""" 511 this_model = self.locals.get("this_model") 512 if not this_model: 513 raise SQLMeshError("Model name is not available in the macro evaluator.") 514 return this_model.sql(dialect=self.dialect, identify=True, comments=False)
Returns the resolved name of the surrounding model.
522 @property 523 def engine_adapter(self) -> EngineAdapter: 524 engine_adapter = self.locals.get("engine_adapter") 525 if not engine_adapter: 526 raise SQLMeshError( 527 "The engine adapter is not available while models are loading." 528 " You can gate these calls by checking in Python: evaluator.runtime_stage != 'loading' or SQL: @runtime_stage <> 'loading'." 529 ) 530 return self.locals["engine_adapter"]
532 @property 533 def gateway(self) -> t.Optional[str]: 534 """Returns the gateway name.""" 535 return self.var(c.GATEWAY)
Returns the gateway name.
537 @property 538 def snapshots(self) -> t.Dict[str, Snapshot]: 539 """Returns the snapshots if available.""" 540 return self._snapshots
Returns the snapshots if available.
542 @property 543 def this_env(self) -> str: 544 """Returns the name of the current environment in before after all.""" 545 if "this_env" not in self.locals: 546 raise SQLMeshError("Environment name is only available in before_all and after_all") 547 return self.locals["this_env"]
Returns the name of the current environment in before after all.
549 @property 550 def schemas(self) -> t.List[str]: 551 """Returns the schemas of the current environment in before after all macros.""" 552 if "schemas" not in self.locals: 553 raise SQLMeshError("Schemas are only available in before_all and after_all") 554 return self.locals["schemas"]
Returns the schemas of the current environment in before after all macros.
556 @property 557 def views(self) -> t.List[str]: 558 """Returns the views of the current environment in before after all macros.""" 559 if "views" not in self.locals: 560 raise SQLMeshError("Views are only available in before_all and after_all") 561 return self.locals["views"]
Returns the views of the current environment in before after all macros.
563 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 564 """Returns the value of the specified variable, or the default value if it doesn't exist.""" 565 return { 566 **(self.locals.get(c.SQLMESH_VARS) or {}), 567 **(self.locals.get(c.SQLMESH_VARS_METADATA) or {}), 568 }.get(var_name.lower(), default)
Returns the value of the specified variable, or the default value if it doesn't exist.
570 def blueprint_var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 571 """Returns the value of the specified blueprint variable, or the default value if it doesn't exist.""" 572 return { 573 **(self.locals.get(c.SQLMESH_BLUEPRINT_VARS) or {}), 574 **(self.locals.get(c.SQLMESH_BLUEPRINT_VARS_METADATA) or {}), 575 }.get(var_name.lower(), default)
Returns the value of the specified blueprint variable, or the default value if it doesn't exist.
591class macro(registry_decorator): 592 """Specifies a function is a macro and registers it the global MACROS registry. 593 594 Registered macros can be referenced in SQL statements to make queries more dynamic/cleaner. 595 596 Example: 597 from sqlglot import exp 598 from sqlmesh.core.macros import MacroEvaluator, macro 599 600 @macro() 601 def add_one(evaluator: MacroEvaluator, column: exp.Literal) -> exp.Add: 602 return evaluator.parse_one(f"{column} + 1") 603 604 Args: 605 name: A custom name for the macro, the default is the name of the function. 606 """ 607 608 registry_name = "macros" 609 610 def __init__(self, *args: t.Any, metadata_only: bool = False, **kwargs: t.Any) -> None: 611 super().__init__(*args, **kwargs) 612 self.metadata_only = metadata_only 613 614 def __call__( 615 self, func: t.Callable[..., DECORATOR_RETURN_TYPE] 616 ) -> t.Callable[..., DECORATOR_RETURN_TYPE]: 617 if self.metadata_only: 618 setattr(func, c.SQLMESH_METADATA, self.metadata_only) 619 wrapper = super().__call__(func) 620 621 # This is used to identify macros at runtime to unwrap during serialization. 622 setattr(wrapper, c.SQLMESH_MACRO, True) 623 return wrapper
Specifies a function is a macro and registers it the global MACROS registry.
Registered macros can be referenced in SQL statements to make queries more dynamic/cleaner.
Example:
from sqlglot import exp from sqlmesh.core.macros import MacroEvaluator, macro
@macro() def add_one(evaluator: MacroEvaluator, column: exp.Literal) -> exp.Add: return evaluator.parse_one(f"{column} + 1")
Arguments:
- name: A custom name for the macro, the default is the name of the function.
Inherited Members
693@macro() 694def each( 695 evaluator: MacroEvaluator, 696 *args: t.Any, 697) -> t.List[t.Any]: 698 """Iterates through items calling func on each. 699 700 If a func call on item returns None, it will be excluded from the list. 701 702 Args: 703 evaluator: MacroEvaluator that invoked the macro 704 args: The last argument should be a lambda of the form x -> x +1. The first argument can be 705 an Array or var args can be used. 706 707 Returns: 708 A list of items that is the result of func 709 """ 710 *items, func = args 711 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 712 return [item for item in map(func, ensure_collection(items)) if item is not None]
Iterates through items calling func on each.
If a func call on item returns None, it will be excluded from the list.
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- args: The last argument should be a lambda of the form x -> x +1. The first argument can be an Array or var args can be used.
Returns:
A list of items that is the result of func
715@macro("IF") 716def if_( 717 evaluator: MacroEvaluator, 718 condition: t.Any, 719 true: t.Any, 720 false: t.Any = None, 721) -> t.Any: 722 """Evaluates a given condition and returns the second argument if true or else the third argument. 723 724 If false is not passed in, the default return value will be None. 725 726 Example: 727 >>> from sqlglot import parse_one 728 >>> from sqlmesh.core.macros import MacroEvaluator 729 >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a, b)")).sql() 730 'b' 731 732 >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a)")) 733 """ 734 735 if evaluator.eval_expression(condition): 736 return true 737 return false
Evaluates a given condition and returns the second argument if true or else the third argument.
If false is not passed in, the default return value will be None.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a, b)")).sql() 'b'>>> MacroEvaluator().transform(parse_one("@IF('a' = 1, a)"))
740@macro("REDUCE") 741def reduce_(evaluator: MacroEvaluator, *args: t.Any) -> t.Any: 742 """Iterates through items applying provided function that takes two arguments 743 cumulatively to the items of iterable items, from left to right, so as to reduce 744 the iterable to a single item. 745 746 Example: 747 >>> from sqlglot import parse_one 748 >>> from sqlmesh.core.macros import MacroEvaluator 749 >>> sql = "@SQL(@REDUCE([100, 200, 300, 400], (x, y) -> x + y))" 750 >>> MacroEvaluator().transform(parse_one(sql)).sql() 751 '1000' 752 753 Args: 754 evaluator: MacroEvaluator that invoked the macro 755 args: The last argument should be a lambda of the form (x, y) -> x + y. The first argument can be 756 an Array or var args can be used. 757 Returns: 758 A single item that is the result of applying func cumulatively to items 759 """ 760 *items, func = args 761 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 762 return reduce(lambda a, b: func(exp.Tuple(expressions=[a, b])), ensure_collection(items))
Iterates through items applying provided function that takes two arguments cumulatively to the items of iterable items, from left to right, so as to reduce the iterable to a single item.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@SQL(@REDUCE([100, 200, 300, 400], (x, y) -> x + y))" >>> MacroEvaluator().transform(parse_one(sql)).sql() '1000'
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- args: The last argument should be a lambda of the form (x, y) -> x + y. The first argument can be an Array or var args can be used.
Returns:
A single item that is the result of applying func cumulatively to items
765@macro("FILTER") 766def filter_(evaluator: MacroEvaluator, *args: t.Any) -> t.List[t.Any]: 767 """Iterates through items, applying provided function to each item and removing 768 all items where the function returns False 769 770 Example: 771 >>> from sqlglot import parse_one 772 >>> from sqlmesh.core.macros import MacroEvaluator 773 >>> sql = "@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y)" 774 >>> MacroEvaluator().transform(parse_one(sql)).sql() 775 '2 + 3' 776 777 >>> sql = "@EVAL(@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y))" 778 >>> MacroEvaluator().transform(parse_one(sql)).sql() 779 '5' 780 781 Args: 782 evaluator: MacroEvaluator that invoked the macro 783 args: The last argument should be a lambda of the form x -> x > 1. The first argument can be 784 an Array or var args can be used. 785 Returns: 786 The items for which the func returned True 787 """ 788 *items, func = args 789 items, func = _norm_var_arg_lambda(evaluator, func, *items) # type: ignore 790 return list(filter(lambda arg: evaluator.eval_expression(func(arg)), items))
Iterates through items, applying provided function to each item and removing all items where the function returns False
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y)" >>> MacroEvaluator().transform(parse_one(sql)).sql() '2 + 3'>>> sql = "@EVAL(@REDUCE(@FILTER([1, 2, 3], x -> x > 1), (x, y) -> x + y))" >>> MacroEvaluator().transform(parse_one(sql)).sql() '5'
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- args: The last argument should be a lambda of the form x -> x > 1. The first argument can be an Array or var args can be used.
Returns:
The items for which the func returned True
793def _optional_expression( 794 evaluator: MacroEvaluator, 795 condition: exp.Condition, 796 expression: exp.Expr, 797) -> t.Optional[exp.Expr]: 798 """Inserts expression when the condition is True 799 800 The following examples express the usage of this function in the context of the macros which wrap it. 801 802 Examples: 803 >>> from sqlglot import parse_one 804 >>> from sqlmesh.core.macros import MacroEvaluator 805 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 806 >>> MacroEvaluator().transform(parse_one(sql)).sql() 807 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 808 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 809 >>> MacroEvaluator().transform(parse_one(sql)).sql() 810 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 811 >>> sql = "select * from city @GROUP_BY(True) country, population" 812 >>> MacroEvaluator().transform(parse_one(sql)).sql() 813 'SELECT * FROM city GROUP BY country, population' 814 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 815 >>> MacroEvaluator().transform(parse_one(sql)).sql() 816 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 817 818 Args: 819 evaluator: MacroEvaluator that invoked the macro 820 condition: Condition expression 821 expression: SQL expression 822 Returns: 823 Expression if the conditional is True; otherwise None 824 """ 825 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
793def _optional_expression( 794 evaluator: MacroEvaluator, 795 condition: exp.Condition, 796 expression: exp.Expr, 797) -> t.Optional[exp.Expr]: 798 """Inserts expression when the condition is True 799 800 The following examples express the usage of this function in the context of the macros which wrap it. 801 802 Examples: 803 >>> from sqlglot import parse_one 804 >>> from sqlmesh.core.macros import MacroEvaluator 805 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 806 >>> MacroEvaluator().transform(parse_one(sql)).sql() 807 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 808 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 809 >>> MacroEvaluator().transform(parse_one(sql)).sql() 810 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 811 >>> sql = "select * from city @GROUP_BY(True) country, population" 812 >>> MacroEvaluator().transform(parse_one(sql)).sql() 813 'SELECT * FROM city GROUP BY country, population' 814 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 815 >>> MacroEvaluator().transform(parse_one(sql)).sql() 816 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 817 818 Args: 819 evaluator: MacroEvaluator that invoked the macro 820 condition: Condition expression 821 expression: SQL expression 822 Returns: 823 Expression if the conditional is True; otherwise None 824 """ 825 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
793def _optional_expression( 794 evaluator: MacroEvaluator, 795 condition: exp.Condition, 796 expression: exp.Expr, 797) -> t.Optional[exp.Expr]: 798 """Inserts expression when the condition is True 799 800 The following examples express the usage of this function in the context of the macros which wrap it. 801 802 Examples: 803 >>> from sqlglot import parse_one 804 >>> from sqlmesh.core.macros import MacroEvaluator 805 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 806 >>> MacroEvaluator().transform(parse_one(sql)).sql() 807 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 808 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 809 >>> MacroEvaluator().transform(parse_one(sql)).sql() 810 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 811 >>> sql = "select * from city @GROUP_BY(True) country, population" 812 >>> MacroEvaluator().transform(parse_one(sql)).sql() 813 'SELECT * FROM city GROUP BY country, population' 814 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 815 >>> MacroEvaluator().transform(parse_one(sql)).sql() 816 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 817 818 Args: 819 evaluator: MacroEvaluator that invoked the macro 820 condition: Condition expression 821 expression: SQL expression 822 Returns: 823 Expression if the conditional is True; otherwise None 824 """ 825 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
793def _optional_expression( 794 evaluator: MacroEvaluator, 795 condition: exp.Condition, 796 expression: exp.Expr, 797) -> t.Optional[exp.Expr]: 798 """Inserts expression when the condition is True 799 800 The following examples express the usage of this function in the context of the macros which wrap it. 801 802 Examples: 803 >>> from sqlglot import parse_one 804 >>> from sqlmesh.core.macros import MacroEvaluator 805 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 806 >>> MacroEvaluator().transform(parse_one(sql)).sql() 807 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 808 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 809 >>> MacroEvaluator().transform(parse_one(sql)).sql() 810 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 811 >>> sql = "select * from city @GROUP_BY(True) country, population" 812 >>> MacroEvaluator().transform(parse_one(sql)).sql() 813 'SELECT * FROM city GROUP BY country, population' 814 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 815 >>> MacroEvaluator().transform(parse_one(sql)).sql() 816 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 817 818 Args: 819 evaluator: MacroEvaluator that invoked the macro 820 condition: Condition expression 821 expression: SQL expression 822 Returns: 823 Expression if the conditional is True; otherwise None 824 """ 825 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
793def _optional_expression( 794 evaluator: MacroEvaluator, 795 condition: exp.Condition, 796 expression: exp.Expr, 797) -> t.Optional[exp.Expr]: 798 """Inserts expression when the condition is True 799 800 The following examples express the usage of this function in the context of the macros which wrap it. 801 802 Examples: 803 >>> from sqlglot import parse_one 804 >>> from sqlmesh.core.macros import MacroEvaluator 805 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 806 >>> MacroEvaluator().transform(parse_one(sql)).sql() 807 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 808 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 809 >>> MacroEvaluator().transform(parse_one(sql)).sql() 810 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 811 >>> sql = "select * from city @GROUP_BY(True) country, population" 812 >>> MacroEvaluator().transform(parse_one(sql)).sql() 813 'SELECT * FROM city GROUP BY country, population' 814 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 815 >>> MacroEvaluator().transform(parse_one(sql)).sql() 816 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 817 818 Args: 819 evaluator: MacroEvaluator that invoked the macro 820 condition: Condition expression 821 expression: SQL expression 822 Returns: 823 Expression if the conditional is True; otherwise None 824 """ 825 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
793def _optional_expression( 794 evaluator: MacroEvaluator, 795 condition: exp.Condition, 796 expression: exp.Expr, 797) -> t.Optional[exp.Expr]: 798 """Inserts expression when the condition is True 799 800 The following examples express the usage of this function in the context of the macros which wrap it. 801 802 Examples: 803 >>> from sqlglot import parse_one 804 >>> from sqlmesh.core.macros import MacroEvaluator 805 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 806 >>> MacroEvaluator().transform(parse_one(sql)).sql() 807 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 808 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 809 >>> MacroEvaluator().transform(parse_one(sql)).sql() 810 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 811 >>> sql = "select * from city @GROUP_BY(True) country, population" 812 >>> MacroEvaluator().transform(parse_one(sql)).sql() 813 'SELECT * FROM city GROUP BY country, population' 814 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 815 >>> MacroEvaluator().transform(parse_one(sql)).sql() 816 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 817 818 Args: 819 evaluator: MacroEvaluator that invoked the macro 820 condition: Condition expression 821 expression: SQL expression 822 Returns: 823 Expression if the conditional is True; otherwise None 824 """ 825 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
793def _optional_expression( 794 evaluator: MacroEvaluator, 795 condition: exp.Condition, 796 expression: exp.Expr, 797) -> t.Optional[exp.Expr]: 798 """Inserts expression when the condition is True 799 800 The following examples express the usage of this function in the context of the macros which wrap it. 801 802 Examples: 803 >>> from sqlglot import parse_one 804 >>> from sqlmesh.core.macros import MacroEvaluator 805 >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" 806 >>> MacroEvaluator().transform(parse_one(sql)).sql() 807 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' 808 >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" 809 >>> MacroEvaluator().transform(parse_one(sql)).sql() 810 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' 811 >>> sql = "select * from city @GROUP_BY(True) country, population" 812 >>> MacroEvaluator().transform(parse_one(sql)).sql() 813 'SELECT * FROM city GROUP BY country, population' 814 >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" 815 >>> MacroEvaluator().transform(parse_one(sql)).sql() 816 "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'" 817 818 Args: 819 evaluator: MacroEvaluator that invoked the macro 820 condition: Condition expression 821 expression: SQL expression 822 Returns: 823 Expression if the conditional is True; otherwise None 824 """ 825 return expression if evaluator.eval_expression(condition) else None
Inserts expression when the condition is True
The following examples express the usage of this function in the context of the macros which wrap it.
Examples:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@WITH(True) all_cities as (select * from city) select all_cities" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'WITH all_cities AS (SELECT * FROM city) SELECT all_cities' >>> sql = "select * from city left outer @JOIN(True) country on city.country = country.name" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city LEFT OUTER JOIN country ON city.country = country.name' >>> sql = "select * from city @GROUP_BY(True) country, population" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM city GROUP BY country, population' >>> sql = "select * from city group by country @HAVING(True) population > 100 and country = 'Mexico'" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT * FROM city GROUP BY country HAVING population > 100 AND country = 'Mexico'"
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- condition: Condition expression
- expression: SQL expression
Returns:
Expression if the conditional is True; otherwise None
837@macro("eval") 838def eval_(evaluator: MacroEvaluator, condition: exp.Condition) -> t.Any: 839 """Evaluate the given condition in a Python/SQL interpretor. 840 841 Example: 842 >>> from sqlglot import parse_one 843 >>> from sqlmesh.core.macros import MacroEvaluator 844 >>> sql = "@EVAL(1 + 1)" 845 >>> MacroEvaluator().transform(parse_one(sql)).sql() 846 '2' 847 """ 848 return evaluator.eval_expression(condition)
Evaluate the given condition in a Python/SQL interpretor.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@EVAL(1 + 1)" >>> MacroEvaluator().transform(parse_one(sql)).sql() '2'
852@macro() 853def star( 854 evaluator: MacroEvaluator, 855 relation: exp.Table, 856 alias: exp.Column = t.cast(exp.Column, exp.column("")), 857 exclude: t.Union[exp.Array, exp.Tuple] = exp.Tuple(expressions=[]), 858 prefix: exp.Literal = exp.Literal.string(""), 859 suffix: exp.Literal = exp.Literal.string(""), 860 quote_identifiers: exp.Boolean = exp.true(), 861 except_: t.Union[exp.Array, exp.Tuple] = exp.Tuple(expressions=[]), 862) -> t.List[exp.Expr]: 863 """Returns a list of projections for the given relation. 864 865 Args: 866 evaluator: MacroEvaluator that invoked the macro 867 relation: The relation to select star from 868 alias: The alias of the relation 869 exclude: Columns to exclude 870 prefix: A prefix to use for all selections 871 suffix: A suffix to use for all selections 872 quote_identifiers: Whether or not quote the resulting aliases, defaults to true 873 except_: Alias for exclude (TODO: deprecate this, update docs) 874 875 Returns: 876 An array of columns. 877 878 Example: 879 >>> from sqlglot import parse_one, exp 880 >>> from sqlglot.schema import MappingSchema 881 >>> from sqlmesh.core.macros import MacroEvaluator 882 >>> sql = "SELECT @STAR(foo, bar, exclude := [c], prefix := 'baz_') FROM foo AS bar" 883 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": exp.DataType.build("string"), "b": exp.DataType.build("string"), "c": exp.DataType.build("string"), "d": exp.DataType.build("int")}})).transform(parse_one(sql)).sql() 884 'SELECT CAST("bar"."a" AS TEXT) AS "baz_a", CAST("bar"."b" AS TEXT) AS "baz_b", CAST("bar"."d" AS INT) AS "baz_d" FROM foo AS bar' 885 """ 886 if alias and not isinstance(alias, (exp.Identifier, exp.Column)): 887 raise SQLMeshError(f"Invalid alias '{alias}'. Expected an identifier.") 888 if exclude and not isinstance(exclude, (exp.Array, exp.Tuple)): 889 raise SQLMeshError(f"Invalid exclude '{exclude}'. Expected an array.") 890 if except_ != exp.tuple_(): 891 from sqlmesh.core.console import get_console 892 893 get_console().log_warning( 894 "The 'except_' argument in @STAR will soon be deprecated. Use 'exclude' instead." 895 ) 896 if not isinstance(exclude, (exp.Array, exp.Tuple)): 897 raise SQLMeshError(f"Invalid exclude_ '{exclude}'. Expected an array.") 898 if prefix and not isinstance(prefix, exp.Literal): 899 raise SQLMeshError(f"Invalid prefix '{prefix}'. Expected a literal.") 900 if suffix and not isinstance(suffix, exp.Literal): 901 raise SQLMeshError(f"Invalid suffix '{suffix}'. Expected a literal.") 902 if not isinstance(quote_identifiers, exp.Boolean): 903 raise SQLMeshError(f"Invalid quote_identifiers '{quote_identifiers}'. Expected a boolean.") 904 905 excluded_names = { 906 normalize_identifiers(excluded, dialect=evaluator.dialect).name 907 for excluded in exclude.expressions or except_.expressions 908 } 909 quoted = quote_identifiers.this 910 table_identifier = normalize_identifiers( 911 alias if alias.name else relation, dialect=evaluator.dialect 912 ).name 913 914 columns_to_types = { 915 k: v for k, v in evaluator.columns_to_types(relation).items() if k not in excluded_names 916 } 917 if columns_to_types_all_known(columns_to_types): 918 return [ 919 exp.cast( 920 exp.column(column, table=table_identifier, quoted=quoted), 921 dtype, 922 dialect=evaluator.dialect, 923 ).as_(f"{prefix.this}{column}{suffix.this}", quoted=quoted) 924 for column, dtype in columns_to_types.items() 925 ] 926 return [ 927 exp.column(column, table=table_identifier, quoted=quoted).as_( 928 f"{prefix.this}{column}{suffix.this}", quoted=quoted 929 ) 930 for column, type_ in columns_to_types.items() 931 ]
Returns a list of projections for the given relation.
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- relation: The relation to select star from
- alias: The alias of the relation
- exclude: Columns to exclude
- prefix: A prefix to use for all selections
- suffix: A suffix to use for all selections
- quote_identifiers: Whether or not quote the resulting aliases, defaults to true
- except_: Alias for exclude (TODO: deprecate this, update docs)
Returns:
An array of columns.
Example:
>>> from sqlglot import parse_one, exp >>> from sqlglot.schema import MappingSchema >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @STAR(foo, bar, exclude := [c], prefix := 'baz_') FROM foo AS bar" >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": exp.DataType.build("string"), "b": exp.DataType.build("string"), "c": exp.DataType.build("string"), "d": exp.DataType.build("int")}})).transform(parse_one(sql)).sql() 'SELECT CAST("bar"."a" AS TEXT) AS "baz_a", CAST("bar"."b" AS TEXT) AS "baz_b", CAST("bar"."d" AS INT) AS "baz_d" FROM foo AS bar'
934@macro() 935def generate_surrogate_key( 936 evaluator: MacroEvaluator, 937 *fields: exp.Expr, 938 hash_function: exp.Literal = exp.Literal.string("MD5"), 939) -> exp.Func: 940 """Generates a surrogate key (string) for the given fields. 941 942 Example: 943 >>> from sqlglot import parse_one 944 >>> from sqlmesh.core.macros import MacroEvaluator 945 >>> 946 >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c) FROM foo" 947 >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql, dialect="bigquery")).sql("bigquery") 948 "SELECT TO_HEX(MD5(CONCAT(COALESCE(CAST(a AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS STRING), '_sqlmesh_surrogate_key_null_')))) FROM foo" 949 >>> 950 >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c, hash_function := 'SHA256') FROM foo" 951 >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql, dialect="bigquery")).sql("bigquery") 952 "SELECT SHA256(CONCAT(COALESCE(CAST(a AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS STRING), '_sqlmesh_surrogate_key_null_'))) FROM foo" 953 """ 954 string_fields: t.List[exp.Expr] = [] 955 for i, field in enumerate(fields): 956 if i > 0: 957 string_fields.append(exp.Literal.string("|")) 958 string_fields.append( 959 exp.func( 960 "COALESCE", 961 exp.cast(field, exp.DataType.build("text")), 962 exp.Literal.string("_sqlmesh_surrogate_key_null_"), 963 ) 964 ) 965 966 func = exp.func( 967 hash_function.name, 968 exp.func("CONCAT", *string_fields), 969 dialect=evaluator.dialect, 970 ) 971 if isinstance(func, exp.MD5Digest): 972 func = exp.MD5(this=func.this) 973 974 return func
Generates a surrogate key (string) for the given fields.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c) FROM foo" >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql, dialect="bigquery")).sql("bigquery") "SELECT TO_HEX(MD5(CONCAT(COALESCE(CAST(a AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS STRING), '_sqlmesh_surrogate_key_null_')))) FROM foo" >>> >>> sql = "SELECT @GENERATE_SURROGATE_KEY(a, b, c, hash_function := 'SHA256') FROM foo" >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql, dialect="bigquery")).sql("bigquery") "SELECT SHA256(CONCAT(COALESCE(CAST(a AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(b AS STRING), '_sqlmesh_surrogate_key_null_'), '|', COALESCE(CAST(c AS STRING), '_sqlmesh_surrogate_key_null_'))) FROM foo"
977@macro() 978def safe_add(_: MacroEvaluator, *fields: exp.Expr) -> exp.Case: 979 """Adds numbers together, substitutes nulls for 0s and only returns null if all fields are null. 980 981 Example: 982 >>> from sqlglot import parse_one 983 >>> from sqlmesh.core.macros import MacroEvaluator 984 >>> sql = "SELECT @SAFE_ADD(a, b) FROM foo" 985 >>> MacroEvaluator().transform(parse_one(sql)).sql() 986 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) + COALESCE(b, 0) END FROM foo' 987 """ 988 return ( 989 exp.Case() 990 .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null()) 991 .else_(reduce(lambda a, b: a + b, [exp.func("COALESCE", field, 0) for field in fields])) # type: ignore 992 )
Adds numbers together, substitutes nulls for 0s and only returns null if all fields are null.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @SAFE_ADD(a, b) FROM foo" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) + COALESCE(b, 0) END FROM foo'
995@macro() 996def safe_sub(_: MacroEvaluator, *fields: exp.Expr) -> exp.Case: 997 """Subtract numbers, substitutes nulls for 0s and only returns null if all fields are null. 998 999 Example: 1000 >>> from sqlglot import parse_one 1001 >>> from sqlmesh.core.macros import MacroEvaluator 1002 >>> sql = "SELECT @SAFE_SUB(a, b) FROM foo" 1003 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1004 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) - COALESCE(b, 0) END FROM foo' 1005 """ 1006 return ( 1007 exp.Case() 1008 .when(exp.and_(*(field.is_(exp.null()) for field in fields)), exp.null()) 1009 .else_(reduce(lambda a, b: a - b, [exp.func("COALESCE", field, 0) for field in fields])) # type: ignore 1010 )
Subtract numbers, substitutes nulls for 0s and only returns null if all fields are null.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @SAFE_SUB(a, b) FROM foo" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT CASE WHEN a IS NULL AND b IS NULL THEN NULL ELSE COALESCE(a, 0) - COALESCE(b, 0) END FROM foo'
1013@macro() 1014def safe_div(_: MacroEvaluator, numerator: exp.Expr, denominator: exp.Expr) -> exp.Div: 1015 """Divides numbers, returns null if the denominator is 0. 1016 1017 Example: 1018 >>> from sqlglot import parse_one 1019 >>> from sqlmesh.core.macros import MacroEvaluator 1020 >>> sql = "SELECT @SAFE_DIV(a, b) FROM foo" 1021 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1022 'SELECT a / NULLIF(b, 0) FROM foo' 1023 """ 1024 return numerator / exp.func("NULLIF", denominator, 0)
Divides numbers, returns null if the denominator is 0.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @SAFE_DIV(a, b) FROM foo" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT a / NULLIF(b, 0) FROM foo'
1027@macro() 1028def union( 1029 evaluator: MacroEvaluator, 1030 *args: exp.Expr, 1031) -> exp.Query: 1032 """Returns a UNION of the given tables. Only choosing columns that have the same name and type. 1033 1034 Args: 1035 evaluator: MacroEvaluator that invoked the macro 1036 args: Variable arguments that can be: 1037 - First argument can be a condition (exp.Condition) 1038 - A union type ('ALL' or 'DISTINCT') as exp.Literal 1039 - Tables (exp.Table) 1040 1041 Example: 1042 >>> from sqlglot import parse_one 1043 >>> from sqlglot.schema import MappingSchema 1044 >>> from sqlmesh.core.macros import MacroEvaluator 1045 >>> sql = "@UNION('distinct', foo, bar)" 1046 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 1047 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar' 1048 >>> sql = "@UNION(True, 'distinct', foo, bar)" 1049 >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 1050 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar' 1051 """ 1052 1053 if not args: 1054 raise SQLMeshError("At least one table is required for the @UNION macro.") 1055 1056 arg_idx = 0 1057 # Check for condition 1058 condition = evaluator.eval_expression(args[arg_idx]) 1059 if isinstance(condition, bool): 1060 arg_idx += 1 1061 if arg_idx >= len(args): 1062 raise SQLMeshError("Expected more arguments after the condition of the `@UNION` macro.") 1063 1064 # Check for union type 1065 type_ = exp.Literal.string("ALL") 1066 if isinstance(args[arg_idx], exp.Literal): 1067 type_ = args[arg_idx] # type: ignore 1068 arg_idx += 1 1069 kind = type_.name.upper() 1070 if kind not in ("ALL", "DISTINCT"): 1071 raise SQLMeshError(f"Invalid type '{type_}'. Expected 'ALL' or 'DISTINCT'.") 1072 1073 # Remaining args should be tables 1074 tables = [ 1075 exp.to_table(e.sql(evaluator.dialect), dialect=evaluator.dialect) for e in args[arg_idx:] 1076 ] 1077 1078 columns = { 1079 column 1080 for column, _ in reduce( 1081 lambda a, b: a & b, # type: ignore 1082 (evaluator.columns_to_types(table).items() for table in tables), 1083 ) 1084 } 1085 1086 projections = [ 1087 exp.cast(column, type_, dialect=evaluator.dialect).as_(column) 1088 for column, type_ in evaluator.columns_to_types(tables[0]).items() 1089 if column in columns 1090 ] 1091 1092 # Skip the union if condition is False 1093 if condition == False: 1094 return exp.select(*projections).from_(tables[0]) 1095 1096 return reduce( 1097 lambda a, b: a.union(b, distinct=kind == "DISTINCT"), # type: ignore 1098 [exp.select(*projections).from_(t) for t in tables], 1099 )
Returns a UNION of the given tables. Only choosing columns that have the same name and type.
Arguments:
- evaluator: MacroEvaluator that invoked the macro
- args: Variable arguments that can be:
- First argument can be a condition (exp.Condition)
- A union type ('ALL' or 'DISTINCT') as exp.Literal
- Tables (exp.Table)
Example:
>>> from sqlglot import parse_one >>> from sqlglot.schema import MappingSchema >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@UNION('distinct', foo, bar)" >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar' >>> sql = "@UNION(True, 'distinct', foo, bar)" >>> MacroEvaluator(schema=MappingSchema({"foo": {"a": "int", "b": "string", "c": "string"}, "bar": {"c": "string", "a": "int", "b": "int"}})).transform(parse_one(sql)).sql() 'SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM foo UNION SELECT CAST(a AS INT) AS a, CAST(c AS TEXT) AS c FROM bar'
1102@macro() 1103def haversine_distance( 1104 _: MacroEvaluator, 1105 lat1: exp.Expr, 1106 lon1: exp.Expr, 1107 lat2: exp.Expr, 1108 lon2: exp.Expr, 1109 unit: exp.Literal = exp.Literal.string("mi"), 1110) -> exp.Mul: 1111 """Returns the haversine distance between two points. 1112 1113 Example: 1114 >>> from sqlglot import parse_one 1115 >>> from sqlmesh.core.macros import MacroEvaluator 1116 >>> sql = "SELECT @HAVERSINE_DISTANCE(driver_y, driver_x, passenger_y, passenger_x, 'mi') FROM rides" 1117 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1118 'SELECT 7922 * ASIN(SQRT((POWER(SIN(RADIANS((passenger_y - driver_y) / 2)), 2)) + (COS(RADIANS(driver_y)) * COS(RADIANS(passenger_y)) * POWER(SIN(RADIANS((passenger_x - driver_x) / 2)), 2)))) * 1.0 FROM rides' 1119 """ 1120 if unit.this == "mi": 1121 conversion_rate = 1.0 1122 elif unit.this == "km": 1123 conversion_rate = 1.60934 1124 else: 1125 raise SQLMeshError(f"Invalid unit '{unit}'. Expected 'mi' or 'km'.") 1126 1127 return ( 1128 2 1129 * 3961 1130 * exp.func( 1131 "ASIN", 1132 exp.func( 1133 "SQRT", 1134 exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lat2 - lat1) / 2)), 2) 1135 + exp.func("COS", exp.func("RADIANS", lat1)) 1136 * exp.func("COS", exp.func("RADIANS", lat2)) 1137 * exp.func("POWER", exp.func("SIN", exp.func("RADIANS", (lon2 - lon1) / 2)), 2), 1138 ), 1139 ) 1140 * conversion_rate 1141 )
Returns the haversine distance between two points.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT @HAVERSINE_DISTANCE(driver_y, driver_x, passenger_y, passenger_x, 'mi') FROM rides" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT 7922 * ASIN(SQRT((POWER(SIN(RADIANS((passenger_y - driver_y) / 2)), 2)) + (COS(RADIANS(driver_y)) * COS(RADIANS(passenger_y)) * POWER(SIN(RADIANS((passenger_x - driver_x) / 2)), 2)))) * 1.0 FROM rides'
1144@macro() 1145def pivot( 1146 evaluator: MacroEvaluator, 1147 column: SQL, 1148 values: t.List[exp.Expr], 1149 alias: bool = True, 1150 agg: exp.Expr = exp.Literal.string("SUM"), 1151 cmp: exp.Expr = exp.Literal.string("="), 1152 prefix: exp.Expr = exp.Literal.string(""), 1153 suffix: exp.Expr = exp.Literal.string(""), 1154 then_value: SQL = SQL("1"), 1155 else_value: SQL = SQL("0"), 1156 quote: bool = True, 1157 distinct: bool = False, 1158) -> t.List[exp.Expr]: 1159 """Returns a list of projections as a result of pivoting the given column on the given values. 1160 1161 Example: 1162 >>> from sqlglot import parse_one 1163 >>> from sqlmesh.core.macros import MacroEvaluator 1164 >>> sql = "SELECT date_day, @PIVOT(status, ['cancelled', 'completed']) FROM rides GROUP BY 1" 1165 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1166 'SELECT date_day, SUM(CASE WHEN status = \\'cancelled\\' THEN 1 ELSE 0 END) AS "cancelled", SUM(CASE WHEN status = \\'completed\\' THEN 1 ELSE 0 END) AS "completed" FROM rides GROUP BY 1' 1167 >>> sql = "SELECT @PIVOT(a, ['v'], then_value := tv, suffix := '_sfx', quote := FALSE)" 1168 >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql)).sql("bigquery") 1169 "SELECT SUM(CASE WHEN a = 'v' THEN tv ELSE 0 END) AS v_sfx" 1170 """ 1171 aggregates: t.List[exp.Expr] = [] 1172 for value in values: 1173 proj = f"{agg.name}(" 1174 if distinct: 1175 proj += "DISTINCT " 1176 1177 proj += f"CASE WHEN {column} {cmp.name} {value.sql(evaluator.dialect)} THEN {then_value} ELSE {else_value} END) " 1178 node: exp.Expr = evaluator.parse_one(proj) 1179 1180 if alias: 1181 node = node.as_( 1182 f"{prefix.name}{value.name}{suffix.name}", 1183 quoted=quote, 1184 copy=False, 1185 dialect=evaluator.dialect, 1186 ) 1187 1188 aggregates.append(node) 1189 1190 return aggregates
Returns a list of projections as a result of pivoting the given column on the given values.
Example:
>>> from sqlglot import parse_one >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "SELECT date_day, @PIVOT(status, ['cancelled', 'completed']) FROM rides GROUP BY 1" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT date_day, SUM(CASE WHEN status = \'cancelled\' THEN 1 ELSE 0 END) AS "cancelled", SUM(CASE WHEN status = \'completed\' THEN 1 ELSE 0 END) AS "completed" FROM rides GROUP BY 1' >>> sql = "SELECT @PIVOT(a, ['v'], then_value := tv, suffix := '_sfx', quote := FALSE)" >>> MacroEvaluator(dialect="bigquery").transform(parse_one(sql)).sql("bigquery") "SELECT SUM(CASE WHEN a = 'v' THEN tv ELSE 0 END) AS v_sfx"
1193@macro("AND") 1194def and_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expr]) -> exp.Condition: 1195 """Returns an AND statement filtering out any NULL expressions.""" 1196 conditions = [e for e in expressions if not isinstance(e, exp.Null)] 1197 1198 if not conditions: 1199 return exp.true() 1200 1201 return exp.and_(*conditions, dialect=evaluator.dialect)
Returns an AND statement filtering out any NULL expressions.
1204@macro("OR") 1205def or_(evaluator: MacroEvaluator, *expressions: t.Optional[exp.Expr]) -> exp.Condition: 1206 """Returns an OR statement filtering out any NULL expressions.""" 1207 conditions = [e for e in expressions if not isinstance(e, exp.Null)] 1208 1209 if not conditions: 1210 return exp.true() 1211 1212 return exp.or_(*conditions, dialect=evaluator.dialect)
Returns an OR statement filtering out any NULL expressions.
1215@macro("VAR") 1216def var( 1217 evaluator: MacroEvaluator, var_name: exp.Expr, default: t.Optional[exp.Expr] = None 1218) -> exp.Expr: 1219 """Returns the value of a variable or the default value if the variable is not set.""" 1220 if not var_name.is_string: 1221 raise SQLMeshError(f"Invalid variable name '{var_name.sql()}'. Expected a string literal.") 1222 1223 return exp.convert(evaluator.var(var_name.this, default))
Returns the value of a variable or the default value if the variable is not set.
1226@macro("BLUEPRINT_VAR") 1227def blueprint_var( 1228 evaluator: MacroEvaluator, var_name: exp.Expr, default: t.Optional[exp.Expr] = None 1229) -> exp.Expr: 1230 """Returns the value of a blueprint variable or the default value if the variable is not set.""" 1231 if not var_name.is_string: 1232 raise SQLMeshError( 1233 f"Invalid blueprint variable name '{var_name.sql()}'. Expected a string literal." 1234 ) 1235 1236 return exp.convert(evaluator.blueprint_var(var_name.this, default))
Returns the value of a blueprint variable or the default value if the variable is not set.
1239@macro() 1240def deduplicate( 1241 evaluator: MacroEvaluator, 1242 relation: exp.Expr, 1243 partition_by: t.List[exp.Expr], 1244 order_by: t.List[str], 1245) -> exp.Query: 1246 """Returns a QUERY to deduplicate rows within a table 1247 1248 Args: 1249 relation: table or CTE name to deduplicate 1250 partition_by: column names, or expressions to use to identify a window of rows out of which to select one as the deduplicated row 1251 order_by: A list of strings representing the ORDER BY clause 1252 1253 Example: 1254 >>> from sqlglot import parse_one 1255 >>> from sqlglot.schema import MappingSchema 1256 >>> from sqlmesh.core.macros import MacroEvaluator 1257 >>> sql = "@deduplicate(demo.table, [user_id, cast(timestamp as date)], ['timestamp desc', 'status asc'])" 1258 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1259 'SELECT * FROM demo.table QUALIFY ROW_NUMBER() OVER (PARTITION BY user_id, CAST(timestamp AS DATE) ORDER BY timestamp DESC, status ASC) = 1' 1260 """ 1261 if not isinstance(partition_by, list): 1262 raise SQLMeshError( 1263 "partition_by must be a list of columns: [<column>, cast(<column> as <type>)]" 1264 ) 1265 1266 if not isinstance(order_by, list): 1267 raise SQLMeshError( 1268 "order_by must be a list of strings, optional - nulls ordering: ['<column> <asc|desc> nulls <first|last>']" 1269 ) 1270 1271 partition_clause = exp.tuple_(*partition_by) 1272 1273 order_expressions = [ 1274 evaluator.transform(parse_one(order_item, into=exp.Ordered, dialect=evaluator.dialect)) 1275 for order_item in order_by 1276 ] 1277 1278 if not order_expressions: 1279 raise SQLMeshError( 1280 "order_by must be a list of strings, optional - nulls ordering: ['<column> <asc|desc> nulls <first|last>']" 1281 ) 1282 1283 order_clause = exp.Order(expressions=order_expressions) 1284 1285 window_function = exp.Window( 1286 this=exp.RowNumber(), partition_by=partition_clause, order=order_clause 1287 ) 1288 1289 first_unique_row = window_function.eq(1) 1290 1291 query = exp.select("*").from_(relation).qualify(first_unique_row) 1292 1293 return query
Returns a QUERY to deduplicate rows within a table
Arguments:
- relation: table or CTE name to deduplicate
- partition_by: column names, or expressions to use to identify a window of rows out of which to select one as the deduplicated row
- order_by: A list of strings representing the ORDER BY clause
Example:
>>> from sqlglot import parse_one >>> from sqlglot.schema import MappingSchema >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@deduplicate(demo.table, [user_id, cast(timestamp as date)], ['timestamp desc', 'status asc'])" >>> MacroEvaluator().transform(parse_one(sql)).sql() 'SELECT * FROM demo.table QUALIFY ROW_NUMBER() OVER (PARTITION BY user_id, CAST(timestamp AS DATE) ORDER BY timestamp DESC, status ASC) = 1'
1296@macro() 1297def date_spine( 1298 evaluator: MacroEvaluator, 1299 datepart: exp.Expr, 1300 start_date: exp.Expr, 1301 end_date: exp.Expr, 1302) -> exp.Select: 1303 """Returns a query that produces a date spine with the given datepart, and range of start_date and end_date. Useful for joining as a date lookup table. 1304 1305 Args: 1306 datepart: The datepart to use for the date spine - day, week, month, quarter, year 1307 start_date: The start date for the date spine in format YYYY-MM-DD 1308 end_date: The end date for the date spine in format YYYY-MM-DD 1309 1310 Example: 1311 >>> from sqlglot import parse_one 1312 >>> from sqlglot.schema import MappingSchema 1313 >>> from sqlmesh.core.macros import MacroEvaluator 1314 >>> sql = "@date_spine('week', '2022-01-20', '2024-12-16')" 1315 >>> MacroEvaluator().transform(parse_one(sql)).sql() 1316 "SELECT date_week FROM UNNEST(GENERATE_DATE_ARRAY(CAST(\'2022-01-20\' AS DATE), CAST(\'2024-12-16\' AS DATE), INTERVAL \'1\' WEEK)) AS _exploded(date_week)" 1317 """ 1318 datepart_name = datepart.name.lower() 1319 if datepart_name not in ("day", "week", "month", "quarter", "year"): 1320 raise SQLMeshError( 1321 f"Invalid datepart '{datepart_name}'. Expected: 'day', 'week', 'month', 'quarter', or 'year'" 1322 ) 1323 1324 start_date_name = start_date.name 1325 end_date_name = end_date.name 1326 1327 try: 1328 if start_date.is_string and end_date.is_string: 1329 start_date_obj = datetime.strptime(start_date_name, "%Y-%m-%d").date() 1330 end_date_obj = datetime.strptime(end_date_name, "%Y-%m-%d").date() 1331 else: 1332 start_date_obj = None 1333 end_date_obj = None 1334 except Exception as e: 1335 raise SQLMeshError( 1336 f"Invalid date format - start_date and end_date must be in format: YYYY-MM-DD. Error: {e}" 1337 ) 1338 1339 if start_date_obj and end_date_obj: 1340 if start_date_obj > end_date_obj: 1341 raise SQLMeshError( 1342 f"Invalid date range - start_date '{start_date_name}' is after end_date '{end_date_name}'." 1343 ) 1344 1345 start_date = exp.cast(start_date, "DATE") 1346 end_date = exp.cast(end_date, "DATE") 1347 1348 if datepart_name == "quarter" and evaluator.dialect in ( 1349 "spark", 1350 "spark2", 1351 "databricks", 1352 "postgres", 1353 ): 1354 date_interval = exp.Interval(this=exp.Literal.number(3), unit=exp.var("month")) 1355 else: 1356 date_interval = exp.Interval(this=exp.Literal.number(1), unit=exp.var(datepart_name)) 1357 1358 generate_date_array = exp.func( 1359 "GENERATE_DATE_ARRAY", 1360 start_date, 1361 end_date, 1362 date_interval, 1363 ) 1364 1365 alias_name = f"date_{datepart_name}" 1366 exploded = exp.alias_(exp.func("unnest", generate_date_array), "_exploded", table=[alias_name]) 1367 1368 return exp.select(alias_name).from_(exploded)
Returns a query that produces a date spine with the given datepart, and range of start_date and end_date. Useful for joining as a date lookup table.
Arguments:
- datepart: The datepart to use for the date spine - day, week, month, quarter, year
- start_date: The start date for the date spine in format YYYY-MM-DD
- end_date: The end date for the date spine in format YYYY-MM-DD
Example:
>>> from sqlglot import parse_one >>> from sqlglot.schema import MappingSchema >>> from sqlmesh.core.macros import MacroEvaluator >>> sql = "@date_spine('week', '2022-01-20', '2024-12-16')" >>> MacroEvaluator().transform(parse_one(sql)).sql() "SELECT date_week FROM UNNEST(GENERATE_DATE_ARRAY(CAST('2022-01-20' AS DATE), CAST('2024-12-16' AS DATE), INTERVAL '1' WEEK)) AS _exploded(date_week)"
1371@macro() 1372def resolve_template( 1373 evaluator: MacroEvaluator, 1374 template: exp.Literal, 1375 mode: str = "literal", 1376) -> t.Union[exp.Literal, exp.Table]: 1377 """ 1378 Generates either a String literal or an exp.Table representing a physical table location, based on rendering the provided template String literal. 1379 1380 Note: It relies on the @this_model variable being available in the evaluation context (@this_model resolves to an exp.Table object 1381 representing the current physical table). 1382 Therefore, the @resolve_template macro must be used at creation or evaluation time and not at load time. 1383 1384 Args: 1385 template: Template string literal. Can contain the following placeholders: 1386 @{catalog_name} -> replaced with the catalog of the exp.Table returned from @this_model 1387 @{schema_name} -> replaced with the schema of the exp.Table returned from @this_model 1388 @{table_name} -> replaced with the name of the exp.Table returned from @this_model 1389 mode: What to return. 1390 'literal' -> return an exp.Literal string 1391 'table' -> return an exp.Table 1392 1393 Example: 1394 >>> from sqlglot import parse_one, exp 1395 >>> from sqlmesh.core.macros import MacroEvaluator, RuntimeStage 1396 >>> sql = "@resolve_template('s3://data-bucket/prod/@{catalog_name}/@{schema_name}/@{table_name}')" 1397 >>> evaluator = MacroEvaluator(runtime_stage=RuntimeStage.CREATING) 1398 >>> evaluator.locals.update({"this_model": exp.to_table("test_catalog.sqlmesh__test.test__test_model__2517971505")}) 1399 >>> evaluator.transform(parse_one(sql)).sql() 1400 "'s3://data-bucket/prod/test_catalog/sqlmesh__test/test__test_model__2517971505'" 1401 """ 1402 if "this_model" in evaluator.locals: 1403 this_model = exp.to_table(evaluator.locals["this_model"], dialect=evaluator.dialect) 1404 template_str: str = template.this 1405 result = ( 1406 template_str.replace("@{catalog_name}", this_model.catalog) 1407 .replace("@{schema_name}", this_model.db) 1408 .replace("@{table_name}", this_model.name) 1409 ) 1410 1411 if mode.lower() == "table": 1412 return exp.to_table(result, dialect=evaluator.dialect) 1413 return exp.Literal.string(result) 1414 if evaluator.runtime_stage != RuntimeStage.LOADING.value: 1415 # only error if we are CREATING, EVALUATING or TESTING and @this_model is not present; this could indicate a bug 1416 # otherwise, for LOADING, it's a no-op 1417 raise SQLMeshError( 1418 "@this_model must be present in the macro evaluation context in order to use @resolve_template" 1419 ) 1420 1421 return template
Generates either a String literal or an exp.Table representing a physical table location, based on rendering the provided template String literal.
Note: It relies on the @this_model variable being available in the evaluation context (@this_model resolves to an exp.Table object representing the current physical table). Therefore, the @resolve_template macro must be used at creation or evaluation time and not at load time.
Arguments:
- template: Template string literal. Can contain the following placeholders: @{catalog_name} -> replaced with the catalog of the exp.Table returned from @this_model @{schema_name} -> replaced with the schema of the exp.Table returned from @this_model @{table_name} -> replaced with the name of the exp.Table returned from @this_model
- mode: What to return. 'literal' -> return an exp.Literal string 'table' -> return an exp.Table
Example:
>>> from sqlglot import parse_one, exp >>> from sqlmesh.core.macros import MacroEvaluator, RuntimeStage >>> sql = "@resolve_template('s3://data-bucket/prod/@{catalog_name}/@{schema_name}/@{table_name}')" >>> evaluator = MacroEvaluator(runtime_stage=RuntimeStage.CREATING) >>> evaluator.locals.update({"this_model": exp.to_table("test_catalog.sqlmesh__test.test__test_model__2517971505")}) >>> evaluator.transform(parse_one(sql)).sql() "'s3://data-bucket/prod/test_catalog/sqlmesh__test/test__test_model__2517971505'"
1424def normalize_macro_name(name: str) -> str: 1425 """Prefix macro name with @ and upcase""" 1426 return f"@{name.upper()}"
Prefix macro name with @ and upcase
1433def call_macro( 1434 func: t.Callable, 1435 dialect: DialectType, 1436 path: t.Optional[Path], 1437 provided_args: t.Tuple[t.Any, ...], 1438 provided_kwargs: t.Dict[str, t.Any], 1439 **optional_kwargs: t.Any, 1440) -> t.Any: 1441 # Bind the macro's actual parameters to its formal parameters 1442 sig = inspect.signature(func) 1443 1444 if optional_kwargs: 1445 provided_kwargs = provided_kwargs.copy() 1446 1447 for k, v in optional_kwargs.items(): 1448 if k in sig.parameters: 1449 provided_kwargs[k] = v 1450 1451 bound = sig.bind(*provided_args, **provided_kwargs) 1452 bound.apply_defaults() 1453 1454 try: 1455 annotations = t.get_type_hints(func, localns=get_supported_types()) 1456 except (NameError, TypeError): # forward references aren't handled 1457 annotations = {} 1458 1459 # If the macro is annotated, we try coerce the actual parameters to the corresponding types 1460 if annotations: 1461 for arg, value in bound.arguments.items(): 1462 typ = annotations.get(arg) 1463 if not typ: 1464 continue 1465 1466 # Changes to bound.arguments will reflect in bound.args and bound.kwargs 1467 # https://docs.python.org/3/library/inspect.html#inspect.BoundArguments.arguments 1468 param = sig.parameters[arg] 1469 if param.kind is inspect.Parameter.VAR_POSITIONAL: 1470 bound.arguments[arg] = tuple(_coerce(v, typ, dialect, path) for v in value) 1471 elif param.kind is inspect.Parameter.VAR_KEYWORD: 1472 bound.arguments[arg] = {k: _coerce(v, typ, dialect, path) for k, v in value.items()} 1473 else: 1474 bound.arguments[arg] = _coerce(value, typ, dialect, path) 1475 1476 return func(*bound.args, **bound.kwargs)