sqlmesh.core.selector
1from __future__ import annotations 2 3import fnmatch 4import typing as t 5from pathlib import Path 6from itertools import zip_longest 7import abc 8 9from sqlglot import exp 10from sqlglot.errors import ParseError 11from sqlglot.tokens import Token, TokenType, Tokenizer as BaseTokenizer 12from sqlglot.dialects.dialect import Dialect, DialectType 13from sqlglot.helper import seq_get 14 15from sqlmesh.core import constants as c 16from sqlmesh.core.dialect import normalize_model_name 17from sqlmesh.core.environment import Environment 18from sqlmesh.core.model import update_model_schemas 19from sqlmesh.core.audit import StandaloneAudit 20from sqlmesh.utils import UniqueKeyDict 21from sqlmesh.utils.dag import DAG 22from sqlmesh.utils.git import GitClient 23from sqlmesh.utils.errors import SQLMeshError 24 25 26if t.TYPE_CHECKING: 27 from typing_extensions import Literal as Lit # noqa 28 from sqlmesh.core.model import Model 29 from sqlmesh.core.node import Node 30 from sqlmesh.core.state_sync import StateReader 31 32 33class Selector(abc.ABC): 34 def __init__( 35 self, 36 state_reader: StateReader, 37 models: UniqueKeyDict[str, Model], 38 context_path: Path = Path("."), 39 dag: t.Optional[DAG[str]] = None, 40 default_catalog: t.Optional[str] = None, 41 dialect: t.Optional[str] = None, 42 cache_dir: t.Optional[Path] = None, 43 ): 44 self._state_reader = state_reader 45 self._models = models 46 self._context_path = context_path 47 self._cache_dir = cache_dir if cache_dir else context_path / c.CACHE 48 self._default_catalog = default_catalog 49 self._dialect = dialect 50 self._git_client = GitClient(context_path) 51 52 if dag is None: 53 self._dag: DAG[str] = DAG() 54 for fqn, model in models.items(): 55 self._dag.add(fqn, model.depends_on) 56 else: 57 self._dag = dag 58 59 def select_models( 60 self, 61 model_selections: t.Iterable[str], 62 target_env_name: str, 63 fallback_env_name: t.Optional[str] = None, 64 ensure_finalized_snapshots: bool = False, 65 ) -> UniqueKeyDict[str, Model]: 66 """Given a set of selections returns models from the current state with names matching the 67 selection while sourcing the remaining models from the target environment. 68 69 Args: 70 model_selections: A set of selections. 71 target_env_name: The name of the target environment. 72 fallback_env_name: The name of the fallback environment that will be used if the target 73 environment doesn't exist. 74 ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized 75 environment state, or to use whatever snapshots are in the current environment state even if 76 the environment is not finalized. 77 78 Returns: 79 A dictionary of models. 80 """ 81 target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name)) 82 if target_env and target_env.expired: 83 target_env = None 84 85 if not target_env and fallback_env_name: 86 target_env = self._state_reader.get_environment( 87 Environment.sanitize_name(fallback_env_name) 88 ) 89 90 env_models: t.Dict[str, Model] = {} 91 if target_env: 92 environment_snapshot_infos = ( 93 target_env.snapshots 94 if not ensure_finalized_snapshots 95 else target_env.finalized_or_current_snapshots 96 ) 97 env_models = { 98 s.name: s.model 99 for s in self._state_reader.get_snapshots(environment_snapshot_infos).values() 100 if s.is_model 101 } 102 103 all_selected_models = self.expand_model_selections( 104 model_selections, models={**env_models, **self._models} 105 ) 106 107 dag: DAG[str] = DAG() 108 subdag = set() 109 110 for fqn in all_selected_models: 111 if fqn not in subdag: 112 subdag.add(fqn) 113 subdag.update(self._dag.downstream(fqn)) 114 115 models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 116 all_model_fqns = set(self._models) | set(env_models) 117 needs_update = False 118 119 def get_model(fqn: str) -> t.Optional[Model]: 120 if fqn not in all_selected_models and fqn in env_models: 121 # Unselected modified or added model. 122 model_from_env = env_models[fqn] 123 try: 124 # this triggers a render_query() which can throw an exception 125 model_from_env.depends_on 126 return model_from_env 127 except Exception as e: 128 raise SQLMeshError( 129 f"Model '{model_from_env.name}' sourced from state cannot be rendered " 130 f"in the local environment due to:\n> {str(e)}" 131 ) from e 132 if fqn in all_selected_models and fqn in self._models: 133 # Selected modified or removed model. 134 return self._models[fqn] 135 return None 136 137 for fqn in all_model_fqns: 138 model = get_model(fqn) 139 140 if not model: 141 continue 142 143 if model.fqn in subdag: 144 dag.add(model.fqn, model.depends_on) 145 146 for dep in model.depends_on: 147 schema = model.mapping_schema 148 149 for part in exp.to_table(dep).parts: 150 schema = schema.get(part.sql()) or {} 151 152 parent = get_model(dep) 153 154 parent_schema = { 155 c: t.sql(dialect=model.dialect) 156 for c, t in ((parent and parent.columns_to_types) or {}).items() 157 } 158 159 if schema != parent_schema: 160 model = model.copy(update={"mapping_schema": {}}) 161 needs_update = True 162 break 163 164 models[model.fqn] = model 165 166 if needs_update: 167 update_model_schemas(dag, models=models, cache_dir=self._cache_dir) 168 169 return models 170 171 def expand_model_selections( 172 self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None 173 ) -> t.Set[str]: 174 """Expands a set of model selections into a set of model fqns that can be looked up in the Context. 175 176 Args: 177 model_selections: A set of model selections. 178 179 Returns: 180 A set of model fqns. 181 """ 182 183 node = parse(" | ".join(f"({s})" for s in model_selections)) 184 185 all_models: t.Dict[str, Node] = models or dict(self._models) 186 models_by_tags: t.Dict[str, t.Set[str]] = {} 187 188 for fqn, model in all_models.items(): 189 for tag in model.tags: 190 tag = tag.lower() 191 models_by_tags.setdefault(tag, set()) 192 models_by_tags[tag].add(model.fqn) 193 194 def evaluate(node: exp.Expr) -> t.Set[str]: 195 if isinstance(node, exp.Var): 196 pattern = node.this 197 if "*" in pattern: 198 return { 199 fqn 200 for fqn, model in all_models.items() 201 if fnmatch.fnmatchcase(self._model_name(model), node.this) 202 } 203 return self._pattern_to_model_fqns(pattern, all_models) 204 if isinstance(node, exp.And): 205 return evaluate(node.left) & evaluate(node.right) 206 if isinstance(node, exp.Or): 207 return evaluate(node.left) | evaluate(node.right) 208 if isinstance(node, exp.Paren): 209 return evaluate(node.this) 210 if isinstance(node, exp.Not): 211 return set(all_models) - evaluate(node.this) 212 if isinstance(node, Git): 213 target_branch = node.name 214 git_modified_files = { 215 *self._git_client.list_untracked_files(), 216 *self._git_client.list_uncommitted_changed_files(), 217 *self._git_client.list_committed_changed_files(target_branch=target_branch), 218 } 219 return {m.fqn for m in all_models.values() if m._path in git_modified_files} 220 if isinstance(node, Tag): 221 pattern = node.name.lower() 222 223 if "*" in pattern: 224 return { 225 model 226 for tag, models in models_by_tags.items() 227 for model in models 228 if fnmatch.fnmatchcase(tag, pattern) 229 } 230 return models_by_tags.get(pattern, set()) 231 if isinstance(node, ResourceType): 232 resource_type = node.name.lower() 233 return { 234 fqn 235 for fqn, model in all_models.items() 236 if self._matches_resource_type(resource_type, model) 237 } 238 if isinstance(node, Direction): 239 selected = set() 240 241 for model_name in evaluate(node.this): 242 selected.add(model_name) 243 if node.args.get("up"): 244 for u in self._dag.upstream(model_name): 245 if u in all_models: 246 selected.add(u) 247 if node.args.get("down"): 248 selected.update(self._dag.downstream(model_name)) 249 return selected 250 raise ParseError(f"Unexpected node {node}") 251 252 return evaluate(node) 253 254 @abc.abstractmethod 255 def _model_name(self, model: Node) -> str: 256 """Given a model, return the name that a selector pattern contining wildcards should be fnmatch'd on""" 257 pass 258 259 @abc.abstractmethod 260 def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]: 261 """Given a pattern, return the keys of the matching models from :all_models""" 262 pass 263 264 @abc.abstractmethod 265 def _matches_resource_type(self, resource_type: str, model: Node) -> bool: 266 """Indicate whether or not the supplied model matches the supplied resource type""" 267 pass 268 269 270class NativeSelector(Selector): 271 """Implementation of selectors that matches objects based on SQLMesh native names""" 272 273 def _model_name(self, model: Node) -> str: 274 return model.name 275 276 def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]: 277 fqn = normalize_model_name(pattern, self._default_catalog, self._dialect) 278 return {fqn} if fqn in all_models else set() 279 280 def _matches_resource_type(self, resource_type: str, model: Node) -> bool: 281 if resource_type == "model": 282 return model.is_model 283 if resource_type == "audit": 284 return isinstance(model, StandaloneAudit) 285 286 raise SQLMeshError(f"Unsupported resource type: {resource_type}") 287 288 289class DbtSelector(Selector): 290 """Implementation of selectors that matches objects based on the DBT names instead of the SQLMesh native names""" 291 292 def _model_name(self, model: Node) -> str: 293 if dbt_fqn := model.dbt_fqn: 294 return dbt_fqn 295 raise SQLMeshError("dbt node information must be populated to use dbt selectors") 296 297 def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]: 298 # a pattern like "staging.customers" should match a model called "jaffle_shop.staging.customers" 299 # but not a model called "jaffle_shop.customers.staging" 300 # also a pattern like "aging" should not match "staging" so we need to consider components; not substrings 301 pattern_components = pattern.split(".") 302 first_pattern_component = pattern_components[0] 303 matches = set() 304 for fqn, model in all_models.items(): 305 if not model.dbt_fqn: 306 continue 307 308 dbt_fqn_components = model.dbt_fqn.split(".") 309 try: 310 starting_idx = dbt_fqn_components.index(first_pattern_component) 311 except ValueError: 312 continue 313 for pattern_component, fqn_component in zip_longest( 314 pattern_components, dbt_fqn_components[starting_idx:] 315 ): 316 if pattern_component and not fqn_component: 317 # the pattern still goes but we have run out of fqn components to match; no match 318 break 319 if fqn_component and not pattern_component: 320 # all elements of the pattern have matched elements of the fqn; match 321 matches.add(fqn) 322 break 323 if pattern_component != fqn_component: 324 # the pattern explicitly doesnt match a component; no match 325 break 326 else: 327 # called if no explicit break, indicating all components of the pattern matched all components of the fqn 328 matches.add(fqn) 329 return matches 330 331 def _matches_resource_type(self, resource_type: str, model: Node) -> bool: 332 """ 333 ref: https://docs.getdbt.com/reference/node-selection/methods#resource_type 334 335 # supported by SQLMesh 336 "model" 337 "seed" 338 "source" # external model 339 "test" # standalone audit 340 341 # not supported by SQLMesh yet, commented out to throw an error if someone tries to use them 342 "analysis" 343 "exposure" 344 "metric" 345 "saved_query" 346 "semantic_model" 347 "snapshot" 348 "unit_test" 349 """ 350 if resource_type not in ("model", "seed", "source", "test"): 351 raise SQLMeshError(f"Unsupported resource type: {resource_type}") 352 353 if isinstance(model, StandaloneAudit): 354 return resource_type == "test" 355 356 if resource_type == "model": 357 return model.is_model and not model.kind.is_external and not model.kind.is_seed 358 if resource_type == "source": 359 return model.kind.is_external 360 if resource_type == "seed": 361 return model.kind.is_seed 362 363 return False 364 365 366class SelectorDialect(Dialect): 367 IDENTIFIERS_CAN_START_WITH_DIGIT = True 368 369 class Tokenizer(BaseTokenizer): 370 SINGLE_TOKENS = { 371 "(": TokenType.L_PAREN, 372 ")": TokenType.R_PAREN, 373 "&": TokenType.AMP, 374 "|": TokenType.PIPE, 375 "^": TokenType.CARET, 376 "+": TokenType.PLUS, 377 "*": TokenType.STAR, 378 ":": TokenType.COLON, 379 } 380 381 KEYWORDS = {} 382 IDENTIFIERS = ["\\"] # there are no identifiers but need to put something here 383 IDENTIFIER_START = "" 384 IDENTIFIER_END = "" 385 386 387class Git(exp.Expression): 388 pass 389 390 391class Tag(exp.Expression): 392 pass 393 394 395class ResourceType(exp.Expression): 396 pass 397 398 399class Direction(exp.Expression): 400 pass 401 402 403def parse(selector: str, dialect: DialectType = None) -> exp.Expr: 404 tokens = SelectorDialect().tokenize(selector) 405 i = 0 406 407 def _curr() -> t.Optional[Token]: 408 return seq_get(tokens, i) 409 410 def _prev() -> Token: 411 return tokens[i - 1] 412 413 def _advance(num: int = 1) -> Token: 414 nonlocal i 415 i += num 416 return _prev() 417 418 def _next() -> t.Optional[Token]: 419 return seq_get(tokens, i + 1) 420 421 def _error(msg: str) -> str: 422 return f"{msg} at index {i}: {selector}" 423 424 def _match(token_type: TokenType, raise_unmatched: bool = False) -> t.Optional[Token]: 425 token = _curr() 426 if token and token.token_type == token_type: 427 return _advance() 428 if raise_unmatched: 429 raise ParseError(_error(f"Expected {token_type}")) 430 return None 431 432 def _parse_kind(kind: str) -> bool: 433 token = _curr() 434 next_token = _next() 435 436 if ( 437 token 438 and token.token_type == TokenType.VAR 439 and token.text.lower() == kind 440 and next_token 441 and next_token.token_type == TokenType.COLON 442 ): 443 _advance(2) 444 return True 445 return False 446 447 def _parse_var() -> exp.Expr: 448 upstream = _match(TokenType.PLUS) 449 downstream = None 450 tag = _parse_kind("tag") 451 resource_type = False if tag else _parse_kind("resource_type") 452 git = False if resource_type else _parse_kind("git") 453 lstar = "*" if _match(TokenType.STAR) else "" 454 directions = {} 455 456 if _match(TokenType.VAR) or _match(TokenType.NUMBER): 457 name = _prev().text 458 rstar = "*" if _match(TokenType.STAR) else "" 459 downstream = _match(TokenType.PLUS) 460 this: exp.Expr = exp.Var(this=f"{lstar}{name}{rstar}") 461 462 elif _match(TokenType.L_PAREN): 463 this = exp.Paren(this=_parse_conjunction()) 464 downstream = _match(TokenType.PLUS) 465 _match(TokenType.R_PAREN, True) 466 elif lstar: 467 this = exp.var("*") 468 else: 469 raise ParseError(_error("Expected model name.")) 470 471 if upstream: 472 directions["up"] = True 473 if downstream: 474 directions["down"] = True 475 476 if tag: 477 this = Tag(this=this) 478 if resource_type: 479 this = ResourceType(this=this) 480 if git: 481 this = Git(this=this) 482 if directions: 483 this = Direction(this=this, **directions) 484 return this 485 486 def _parse_unary() -> exp.Expr: 487 if _match(TokenType.CARET): 488 return exp.Not(this=_parse_unary()) 489 return _parse_var() 490 491 def _parse_conjunction() -> exp.Expr: 492 this = _parse_unary() 493 494 if _match(TokenType.AMP): 495 this = exp.And(this=this, expression=_parse_unary()) 496 if _match(TokenType.PIPE): 497 this = exp.Or(this=this, expression=_parse_conjunction()) 498 499 return this 500 501 return _parse_conjunction()
class
Selector(abc.ABC):
34class Selector(abc.ABC): 35 def __init__( 36 self, 37 state_reader: StateReader, 38 models: UniqueKeyDict[str, Model], 39 context_path: Path = Path("."), 40 dag: t.Optional[DAG[str]] = None, 41 default_catalog: t.Optional[str] = None, 42 dialect: t.Optional[str] = None, 43 cache_dir: t.Optional[Path] = None, 44 ): 45 self._state_reader = state_reader 46 self._models = models 47 self._context_path = context_path 48 self._cache_dir = cache_dir if cache_dir else context_path / c.CACHE 49 self._default_catalog = default_catalog 50 self._dialect = dialect 51 self._git_client = GitClient(context_path) 52 53 if dag is None: 54 self._dag: DAG[str] = DAG() 55 for fqn, model in models.items(): 56 self._dag.add(fqn, model.depends_on) 57 else: 58 self._dag = dag 59 60 def select_models( 61 self, 62 model_selections: t.Iterable[str], 63 target_env_name: str, 64 fallback_env_name: t.Optional[str] = None, 65 ensure_finalized_snapshots: bool = False, 66 ) -> UniqueKeyDict[str, Model]: 67 """Given a set of selections returns models from the current state with names matching the 68 selection while sourcing the remaining models from the target environment. 69 70 Args: 71 model_selections: A set of selections. 72 target_env_name: The name of the target environment. 73 fallback_env_name: The name of the fallback environment that will be used if the target 74 environment doesn't exist. 75 ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized 76 environment state, or to use whatever snapshots are in the current environment state even if 77 the environment is not finalized. 78 79 Returns: 80 A dictionary of models. 81 """ 82 target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name)) 83 if target_env and target_env.expired: 84 target_env = None 85 86 if not target_env and fallback_env_name: 87 target_env = self._state_reader.get_environment( 88 Environment.sanitize_name(fallback_env_name) 89 ) 90 91 env_models: t.Dict[str, Model] = {} 92 if target_env: 93 environment_snapshot_infos = ( 94 target_env.snapshots 95 if not ensure_finalized_snapshots 96 else target_env.finalized_or_current_snapshots 97 ) 98 env_models = { 99 s.name: s.model 100 for s in self._state_reader.get_snapshots(environment_snapshot_infos).values() 101 if s.is_model 102 } 103 104 all_selected_models = self.expand_model_selections( 105 model_selections, models={**env_models, **self._models} 106 ) 107 108 dag: DAG[str] = DAG() 109 subdag = set() 110 111 for fqn in all_selected_models: 112 if fqn not in subdag: 113 subdag.add(fqn) 114 subdag.update(self._dag.downstream(fqn)) 115 116 models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 117 all_model_fqns = set(self._models) | set(env_models) 118 needs_update = False 119 120 def get_model(fqn: str) -> t.Optional[Model]: 121 if fqn not in all_selected_models and fqn in env_models: 122 # Unselected modified or added model. 123 model_from_env = env_models[fqn] 124 try: 125 # this triggers a render_query() which can throw an exception 126 model_from_env.depends_on 127 return model_from_env 128 except Exception as e: 129 raise SQLMeshError( 130 f"Model '{model_from_env.name}' sourced from state cannot be rendered " 131 f"in the local environment due to:\n> {str(e)}" 132 ) from e 133 if fqn in all_selected_models and fqn in self._models: 134 # Selected modified or removed model. 135 return self._models[fqn] 136 return None 137 138 for fqn in all_model_fqns: 139 model = get_model(fqn) 140 141 if not model: 142 continue 143 144 if model.fqn in subdag: 145 dag.add(model.fqn, model.depends_on) 146 147 for dep in model.depends_on: 148 schema = model.mapping_schema 149 150 for part in exp.to_table(dep).parts: 151 schema = schema.get(part.sql()) or {} 152 153 parent = get_model(dep) 154 155 parent_schema = { 156 c: t.sql(dialect=model.dialect) 157 for c, t in ((parent and parent.columns_to_types) or {}).items() 158 } 159 160 if schema != parent_schema: 161 model = model.copy(update={"mapping_schema": {}}) 162 needs_update = True 163 break 164 165 models[model.fqn] = model 166 167 if needs_update: 168 update_model_schemas(dag, models=models, cache_dir=self._cache_dir) 169 170 return models 171 172 def expand_model_selections( 173 self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None 174 ) -> t.Set[str]: 175 """Expands a set of model selections into a set of model fqns that can be looked up in the Context. 176 177 Args: 178 model_selections: A set of model selections. 179 180 Returns: 181 A set of model fqns. 182 """ 183 184 node = parse(" | ".join(f"({s})" for s in model_selections)) 185 186 all_models: t.Dict[str, Node] = models or dict(self._models) 187 models_by_tags: t.Dict[str, t.Set[str]] = {} 188 189 for fqn, model in all_models.items(): 190 for tag in model.tags: 191 tag = tag.lower() 192 models_by_tags.setdefault(tag, set()) 193 models_by_tags[tag].add(model.fqn) 194 195 def evaluate(node: exp.Expr) -> t.Set[str]: 196 if isinstance(node, exp.Var): 197 pattern = node.this 198 if "*" in pattern: 199 return { 200 fqn 201 for fqn, model in all_models.items() 202 if fnmatch.fnmatchcase(self._model_name(model), node.this) 203 } 204 return self._pattern_to_model_fqns(pattern, all_models) 205 if isinstance(node, exp.And): 206 return evaluate(node.left) & evaluate(node.right) 207 if isinstance(node, exp.Or): 208 return evaluate(node.left) | evaluate(node.right) 209 if isinstance(node, exp.Paren): 210 return evaluate(node.this) 211 if isinstance(node, exp.Not): 212 return set(all_models) - evaluate(node.this) 213 if isinstance(node, Git): 214 target_branch = node.name 215 git_modified_files = { 216 *self._git_client.list_untracked_files(), 217 *self._git_client.list_uncommitted_changed_files(), 218 *self._git_client.list_committed_changed_files(target_branch=target_branch), 219 } 220 return {m.fqn for m in all_models.values() if m._path in git_modified_files} 221 if isinstance(node, Tag): 222 pattern = node.name.lower() 223 224 if "*" in pattern: 225 return { 226 model 227 for tag, models in models_by_tags.items() 228 for model in models 229 if fnmatch.fnmatchcase(tag, pattern) 230 } 231 return models_by_tags.get(pattern, set()) 232 if isinstance(node, ResourceType): 233 resource_type = node.name.lower() 234 return { 235 fqn 236 for fqn, model in all_models.items() 237 if self._matches_resource_type(resource_type, model) 238 } 239 if isinstance(node, Direction): 240 selected = set() 241 242 for model_name in evaluate(node.this): 243 selected.add(model_name) 244 if node.args.get("up"): 245 for u in self._dag.upstream(model_name): 246 if u in all_models: 247 selected.add(u) 248 if node.args.get("down"): 249 selected.update(self._dag.downstream(model_name)) 250 return selected 251 raise ParseError(f"Unexpected node {node}") 252 253 return evaluate(node) 254 255 @abc.abstractmethod 256 def _model_name(self, model: Node) -> str: 257 """Given a model, return the name that a selector pattern contining wildcards should be fnmatch'd on""" 258 pass 259 260 @abc.abstractmethod 261 def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]: 262 """Given a pattern, return the keys of the matching models from :all_models""" 263 pass 264 265 @abc.abstractmethod 266 def _matches_resource_type(self, resource_type: str, model: Node) -> bool: 267 """Indicate whether or not the supplied model matches the supplied resource type""" 268 pass
Helper class that provides a standard way to create an ABC using inheritance.
def
select_models( self, model_selections: Iterable[str], target_env_name: str, fallback_env_name: Optional[str] = None, ensure_finalized_snapshots: bool = False) -> sqlmesh.utils.UniqueKeyDict[str, typing.Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]]:
60 def select_models( 61 self, 62 model_selections: t.Iterable[str], 63 target_env_name: str, 64 fallback_env_name: t.Optional[str] = None, 65 ensure_finalized_snapshots: bool = False, 66 ) -> UniqueKeyDict[str, Model]: 67 """Given a set of selections returns models from the current state with names matching the 68 selection while sourcing the remaining models from the target environment. 69 70 Args: 71 model_selections: A set of selections. 72 target_env_name: The name of the target environment. 73 fallback_env_name: The name of the fallback environment that will be used if the target 74 environment doesn't exist. 75 ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized 76 environment state, or to use whatever snapshots are in the current environment state even if 77 the environment is not finalized. 78 79 Returns: 80 A dictionary of models. 81 """ 82 target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name)) 83 if target_env and target_env.expired: 84 target_env = None 85 86 if not target_env and fallback_env_name: 87 target_env = self._state_reader.get_environment( 88 Environment.sanitize_name(fallback_env_name) 89 ) 90 91 env_models: t.Dict[str, Model] = {} 92 if target_env: 93 environment_snapshot_infos = ( 94 target_env.snapshots 95 if not ensure_finalized_snapshots 96 else target_env.finalized_or_current_snapshots 97 ) 98 env_models = { 99 s.name: s.model 100 for s in self._state_reader.get_snapshots(environment_snapshot_infos).values() 101 if s.is_model 102 } 103 104 all_selected_models = self.expand_model_selections( 105 model_selections, models={**env_models, **self._models} 106 ) 107 108 dag: DAG[str] = DAG() 109 subdag = set() 110 111 for fqn in all_selected_models: 112 if fqn not in subdag: 113 subdag.add(fqn) 114 subdag.update(self._dag.downstream(fqn)) 115 116 models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 117 all_model_fqns = set(self._models) | set(env_models) 118 needs_update = False 119 120 def get_model(fqn: str) -> t.Optional[Model]: 121 if fqn not in all_selected_models and fqn in env_models: 122 # Unselected modified or added model. 123 model_from_env = env_models[fqn] 124 try: 125 # this triggers a render_query() which can throw an exception 126 model_from_env.depends_on 127 return model_from_env 128 except Exception as e: 129 raise SQLMeshError( 130 f"Model '{model_from_env.name}' sourced from state cannot be rendered " 131 f"in the local environment due to:\n> {str(e)}" 132 ) from e 133 if fqn in all_selected_models and fqn in self._models: 134 # Selected modified or removed model. 135 return self._models[fqn] 136 return None 137 138 for fqn in all_model_fqns: 139 model = get_model(fqn) 140 141 if not model: 142 continue 143 144 if model.fqn in subdag: 145 dag.add(model.fqn, model.depends_on) 146 147 for dep in model.depends_on: 148 schema = model.mapping_schema 149 150 for part in exp.to_table(dep).parts: 151 schema = schema.get(part.sql()) or {} 152 153 parent = get_model(dep) 154 155 parent_schema = { 156 c: t.sql(dialect=model.dialect) 157 for c, t in ((parent and parent.columns_to_types) or {}).items() 158 } 159 160 if schema != parent_schema: 161 model = model.copy(update={"mapping_schema": {}}) 162 needs_update = True 163 break 164 165 models[model.fqn] = model 166 167 if needs_update: 168 update_model_schemas(dag, models=models, cache_dir=self._cache_dir) 169 170 return models
Given a set of selections returns models from the current state with names matching the selection while sourcing the remaining models from the target environment.
Arguments:
- model_selections: A set of selections.
- target_env_name: The name of the target environment.
- fallback_env_name: The name of the fallback environment that will be used if the target environment doesn't exist.
- ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
Returns:
A dictionary of models.
def
expand_model_selections( self, model_selections: Iterable[str], models: Optional[Dict[str, Annotated[Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel, sqlmesh.core.audit.definition.StandaloneAudit], FieldInfo(annotation=NoneType, required=True, discriminator='source_type')]]] = None) -> Set[str]:
172 def expand_model_selections( 173 self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None 174 ) -> t.Set[str]: 175 """Expands a set of model selections into a set of model fqns that can be looked up in the Context. 176 177 Args: 178 model_selections: A set of model selections. 179 180 Returns: 181 A set of model fqns. 182 """ 183 184 node = parse(" | ".join(f"({s})" for s in model_selections)) 185 186 all_models: t.Dict[str, Node] = models or dict(self._models) 187 models_by_tags: t.Dict[str, t.Set[str]] = {} 188 189 for fqn, model in all_models.items(): 190 for tag in model.tags: 191 tag = tag.lower() 192 models_by_tags.setdefault(tag, set()) 193 models_by_tags[tag].add(model.fqn) 194 195 def evaluate(node: exp.Expr) -> t.Set[str]: 196 if isinstance(node, exp.Var): 197 pattern = node.this 198 if "*" in pattern: 199 return { 200 fqn 201 for fqn, model in all_models.items() 202 if fnmatch.fnmatchcase(self._model_name(model), node.this) 203 } 204 return self._pattern_to_model_fqns(pattern, all_models) 205 if isinstance(node, exp.And): 206 return evaluate(node.left) & evaluate(node.right) 207 if isinstance(node, exp.Or): 208 return evaluate(node.left) | evaluate(node.right) 209 if isinstance(node, exp.Paren): 210 return evaluate(node.this) 211 if isinstance(node, exp.Not): 212 return set(all_models) - evaluate(node.this) 213 if isinstance(node, Git): 214 target_branch = node.name 215 git_modified_files = { 216 *self._git_client.list_untracked_files(), 217 *self._git_client.list_uncommitted_changed_files(), 218 *self._git_client.list_committed_changed_files(target_branch=target_branch), 219 } 220 return {m.fqn for m in all_models.values() if m._path in git_modified_files} 221 if isinstance(node, Tag): 222 pattern = node.name.lower() 223 224 if "*" in pattern: 225 return { 226 model 227 for tag, models in models_by_tags.items() 228 for model in models 229 if fnmatch.fnmatchcase(tag, pattern) 230 } 231 return models_by_tags.get(pattern, set()) 232 if isinstance(node, ResourceType): 233 resource_type = node.name.lower() 234 return { 235 fqn 236 for fqn, model in all_models.items() 237 if self._matches_resource_type(resource_type, model) 238 } 239 if isinstance(node, Direction): 240 selected = set() 241 242 for model_name in evaluate(node.this): 243 selected.add(model_name) 244 if node.args.get("up"): 245 for u in self._dag.upstream(model_name): 246 if u in all_models: 247 selected.add(u) 248 if node.args.get("down"): 249 selected.update(self._dag.downstream(model_name)) 250 return selected 251 raise ParseError(f"Unexpected node {node}") 252 253 return evaluate(node)
Expands a set of model selections into a set of model fqns that can be looked up in the Context.
Arguments:
- model_selections: A set of model selections.
Returns:
A set of model fqns.
271class NativeSelector(Selector): 272 """Implementation of selectors that matches objects based on SQLMesh native names""" 273 274 def _model_name(self, model: Node) -> str: 275 return model.name 276 277 def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]: 278 fqn = normalize_model_name(pattern, self._default_catalog, self._dialect) 279 return {fqn} if fqn in all_models else set() 280 281 def _matches_resource_type(self, resource_type: str, model: Node) -> bool: 282 if resource_type == "model": 283 return model.is_model 284 if resource_type == "audit": 285 return isinstance(model, StandaloneAudit) 286 287 raise SQLMeshError(f"Unsupported resource type: {resource_type}")
Implementation of selectors that matches objects based on SQLMesh native names
Inherited Members
290class DbtSelector(Selector): 291 """Implementation of selectors that matches objects based on the DBT names instead of the SQLMesh native names""" 292 293 def _model_name(self, model: Node) -> str: 294 if dbt_fqn := model.dbt_fqn: 295 return dbt_fqn 296 raise SQLMeshError("dbt node information must be populated to use dbt selectors") 297 298 def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]: 299 # a pattern like "staging.customers" should match a model called "jaffle_shop.staging.customers" 300 # but not a model called "jaffle_shop.customers.staging" 301 # also a pattern like "aging" should not match "staging" so we need to consider components; not substrings 302 pattern_components = pattern.split(".") 303 first_pattern_component = pattern_components[0] 304 matches = set() 305 for fqn, model in all_models.items(): 306 if not model.dbt_fqn: 307 continue 308 309 dbt_fqn_components = model.dbt_fqn.split(".") 310 try: 311 starting_idx = dbt_fqn_components.index(first_pattern_component) 312 except ValueError: 313 continue 314 for pattern_component, fqn_component in zip_longest( 315 pattern_components, dbt_fqn_components[starting_idx:] 316 ): 317 if pattern_component and not fqn_component: 318 # the pattern still goes but we have run out of fqn components to match; no match 319 break 320 if fqn_component and not pattern_component: 321 # all elements of the pattern have matched elements of the fqn; match 322 matches.add(fqn) 323 break 324 if pattern_component != fqn_component: 325 # the pattern explicitly doesnt match a component; no match 326 break 327 else: 328 # called if no explicit break, indicating all components of the pattern matched all components of the fqn 329 matches.add(fqn) 330 return matches 331 332 def _matches_resource_type(self, resource_type: str, model: Node) -> bool: 333 """ 334 ref: https://docs.getdbt.com/reference/node-selection/methods#resource_type 335 336 # supported by SQLMesh 337 "model" 338 "seed" 339 "source" # external model 340 "test" # standalone audit 341 342 # not supported by SQLMesh yet, commented out to throw an error if someone tries to use them 343 "analysis" 344 "exposure" 345 "metric" 346 "saved_query" 347 "semantic_model" 348 "snapshot" 349 "unit_test" 350 """ 351 if resource_type not in ("model", "seed", "source", "test"): 352 raise SQLMeshError(f"Unsupported resource type: {resource_type}") 353 354 if isinstance(model, StandaloneAudit): 355 return resource_type == "test" 356 357 if resource_type == "model": 358 return model.is_model and not model.kind.is_external and not model.kind.is_seed 359 if resource_type == "source": 360 return model.kind.is_external 361 if resource_type == "seed": 362 return model.kind.is_seed 363 364 return False
Implementation of selectors that matches objects based on the DBT names instead of the SQLMesh native names
Inherited Members
class
SelectorDialect(sqlglot.dialects.dialect.Dialect):
367class SelectorDialect(Dialect): 368 IDENTIFIERS_CAN_START_WITH_DIGIT = True 369 370 class Tokenizer(BaseTokenizer): 371 SINGLE_TOKENS = { 372 "(": TokenType.L_PAREN, 373 ")": TokenType.R_PAREN, 374 "&": TokenType.AMP, 375 "|": TokenType.PIPE, 376 "^": TokenType.CARET, 377 "+": TokenType.PLUS, 378 "*": TokenType.STAR, 379 ":": TokenType.COLON, 380 } 381 382 KEYWORDS = {} 383 IDENTIFIERS = ["\\"] # there are no identifiers but need to put something here 384 IDENTIFIER_START = "" 385 IDENTIFIER_END = ""
tokenizer_class =
<class 'SelectorDialect.Tokenizer'>
VALID_INTERVAL_UNITS: Set[str] =
{'WEEK', 'NANOSECOND', 'MICROSECONDS', 'EPOCH_MILLISECOND', 'CENTURIES', 'HRS', 'CENTS', 'EPOCH_MILLISECONDS', 'USECONDS', 'WEEK_ISO', 'WY', 'MSEC', 'TIMEZONE_MINUTE', 'NANOSECS', 'SECONDS', 'Y', 'MIN', 'CENTURY', 'YEAR', 'DAYOFWEEK', 'MINUTE', 'EPOCH_MICROSECOND', 'MSECOND', 'EPOCH_NANOSECONDS', 'MI', 'NS', 'WEEKOFYEAR_ISO', 'MICROSEC', 'EPOCH_SECONDS', 'YYY', 'WEEKDAY', 'MILLENNIUM', 'MIL', 'YEARS', 'WEEKOFYEARISO', 'SECOND', 'EPOCH_SECOND', 'CENT', 'SECS', 'NSECONDS', 'MSECONDS', 'DAYOFWEEKISO', 'MILLENIA', 'DAY OF YEAR', 'EPOCH', 'MILLISEC', 'C', 'TZH', 'DAY', 'MONS', 'WOY', 'MILLISECON', 'DW', 'W', 'WK', 'HOURS', 'WEEKISO', 'USECS', 'DECADE', 'H', 'MS', 'MILS', 'DD', 'MICROSECOND', 'DAYOFMONTH', 'DAYS', 'EPOCH_NANOSECOND', 'QUARTER', 'TZM', 'WEEKDAY_ISO', 'YR', 'DOW', 'DW_ISO', 'DAY OF WEEK', 'TIMEZONE_HOUR', 'MONTHS', 'DOW_ISO', 'USEC', 'DY', 'MILLISECOND', 'MICROSECS', 'MILLISECONDS', 'YRS', 'USECOND', 'EPOCH_MICROSECONDS', 'MILLISECS', 'MINUTES', 'NANOSEC', 'MSECS', 'QTR', 'Q', 'HOUR', 'NSECOND', 'S', 'HR', 'DECADES', 'MINS', 'M', 'NSEC', 'MONTH', 'MM', 'HH', 'DEC', 'MON', 'QTRS', 'SEC', 'QUARTERS', 'YYYY', 'YY', 'DAYOFYEAR', 'DAYOFWEEK_ISO', 'DECS', 'US', 'WEEKOFYEAR', 'D', 'DOY'}
Inherited Members
- sqlglot.dialects.dialect.Dialect
- Dialect
- INDEX_OFFSET
- WEEK_OFFSET
- UNNEST_COLUMN_ONLY
- ALIAS_POST_TABLESAMPLE
- TABLESAMPLE_SIZE_IS_PERCENT
- NORMALIZATION_STRATEGY
- DPIPE_IS_STRING_CONCAT
- STRICT_STRING_CONCAT
- SUPPORTS_USER_DEFINED_TYPES
- COPY_PARAMS_ARE_CSV
- NORMALIZE_FUNCTIONS
- PRESERVE_ORIGINAL_NAMES
- LOG_BASE_FIRST
- NULL_ORDERING
- TYPED_DIVISION
- SAFE_DIVISION
- CONCAT_COALESCE
- HEX_LOWERCASE
- DATE_FORMAT
- DATEINT_FORMAT
- TIME_FORMAT
- TIME_MAPPING
- FORMAT_MAPPING
- UNESCAPED_SEQUENCES
- PSEUDOCOLUMNS
- PREFER_CTE_ALIAS_COLUMN
- FORCE_EARLY_ALIAS_REF_EXPANSION
- EXPAND_ONLY_GROUP_ALIAS_REF
- ANNOTATE_ALL_SCOPES
- DISABLES_ALIAS_REF_EXPANSION
- SUPPORTS_ALIAS_REFS_IN_JOIN_CONDITIONS
- SUPPORTS_ORDER_BY_ALL
- PROJECTION_ALIASES_SHADOW_SOURCE_NAMES
- TABLES_REFERENCEABLE_AS_COLUMNS
- SUPPORTS_STRUCT_STAR_EXPANSION
- EXCLUDES_PSEUDOCOLUMNS_FROM_STAR
- QUERY_RESULTS_ARE_STRUCTS
- REQUIRES_PARENTHESIZED_STRUCT_ACCESS
- SUPPORTS_NULL_TYPE
- COALESCE_COMPARISON_NON_STANDARD
- HAS_DISTINCT_ARRAY_CONSTRUCTORS
- SUPPORTS_FIXED_SIZE_ARRAYS
- STRICT_JSON_PATH_SYNTAX
- ON_CONDITION_EMPTY_BEFORE_ERROR
- ARRAY_AGG_INCLUDES_NULLS
- ARRAY_FUNCS_PROPAGATES_NULLS
- PROMOTE_TO_INFERRED_DATETIME_TYPE
- SUPPORTS_VALUES_DEFAULT
- NUMBERS_CAN_BE_UNDERSCORE_SEPARATED
- HEX_STRING_IS_INTEGER_TYPE
- REGEXP_EXTRACT_DEFAULT_GROUP
- REGEXP_EXTRACT_POSITION_OVERFLOW_RETURNS_NULL
- SET_OP_DISTINCT_BY_DEFAULT
- CREATABLE_KIND_MAPPING
- ALTER_TABLE_SUPPORTS_CASCADE
- ALTER_TABLE_ADD_REQUIRED_FOR_EACH_COLUMN
- TRY_CAST_REQUIRES_STRING
- SAFE_TO_ELIMINATE_DOUBLE_NEGATION
- INITCAP_DEFAULT_DELIMITER_CHARS
- BYTE_STRING_IS_BYTES_TYPE
- UUID_IS_STRING_TYPE
- JSON_EXTRACT_SCALAR_SCALAR_ONLY
- DEFAULT_FUNCTIONS_COLUMN_NAMES
- DEFAULT_NULL_TYPE
- LEAST_GREATEST_IGNORES_NULLS
- PRIORITIZE_NON_LITERAL_TYPES
- ALIAS_POST_VERSION
- DATE_PART_MAPPING
- COERCES_TO
- EXPRESSION_METADATA
- SUPPORTED_SETTINGS
- get_or_raise
- format_time
- version
- settings
- normalize_identifier
- case_sensitive
- can_quote
- quote_identifier
- to_json_path
- parse
- parse_into
- generate
- transpile
- tokenize
- tokenizer
- jsonpath_tokenizer
- parser
- generator
- generate_values_aliases
class
SelectorDialect.Tokenizer(sqlglot.tokens.Tokenizer):
370 class Tokenizer(BaseTokenizer): 371 SINGLE_TOKENS = { 372 "(": TokenType.L_PAREN, 373 ")": TokenType.R_PAREN, 374 "&": TokenType.AMP, 375 "|": TokenType.PIPE, 376 "^": TokenType.CARET, 377 "+": TokenType.PLUS, 378 "*": TokenType.STAR, 379 ":": TokenType.COLON, 380 } 381 382 KEYWORDS = {} 383 IDENTIFIERS = ["\\"] # there are no identifiers but need to put something here 384 IDENTIFIER_START = "" 385 IDENTIFIER_END = ""
SINGLE_TOKENS =
{'(': <TokenType.L_PAREN: 1>, ')': <TokenType.R_PAREN: 2>, '&': <TokenType.AMP: 36>, '|': <TokenType.PIPE: 39>, '^': <TokenType.CARET: 42>, '+': <TokenType.PLUS: 10>, '*': <TokenType.STAR: 20>, ':': <TokenType.COLON: 11>}
Inherited Members
- sqlglot.tokens.Tokenizer
- Tokenizer
- BIT_STRINGS
- BYTE_STRINGS
- HEX_STRINGS
- RAW_STRINGS
- HEREDOC_STRINGS
- UNICODE_STRINGS
- QUOTES
- STRING_ESCAPES
- VAR_SINGLE_TOKENS
- ESCAPE_FOLLOW_CHARS
- IDENTIFIER_ESCAPES
- HEREDOC_TAG_IS_IDENTIFIER
- HEREDOC_STRING_ALTERNATIVE
- STRING_ESCAPES_ALLOWED_IN_RAW_STRINGS
- NESTED_COMMENTS
- HINT_START
- TOKENS_PRECEDING_HINT
- COMMANDS
- COMMAND_PREFIX_TOKENS
- NUMERIC_LITERALS
- COMMENTS
- dialect
- tokenize
- sql
- size
- tokens
class
Git(sqlglot.expressions.core.Expression):
Inherited Members
- sqlglot.expressions.core.Expr
- Expr
- arg_types
- is_var_len_args
- is_subquery
- is_cast
- is_primitive
- dump
- load
- sqlglot.expressions.core.Expression
- this
- expression
- expressions
- text
- is_string
- is_number
- to_py
- is_int
- is_star
- alias
- alias_column_names
- name
- alias_or_name
- output_name
- type
- is_type
- is_leaf
- meta
- copy
- add_comments
- pop_comments
- append
- set
- depth
- iter_expressions
- find
- find_all
- find_ancestor
- parent_select
- same_parent
- root
- walk
- dfs
- bfs
- unnest
- unalias
- unnest_operands
- flatten
- to_s
- sql
- transform
- replace
- pop
- assert_is
- error_messages
- and_
- or_
- not_
- update_positions
- as_
- isin
- between
- is_
- like
- ilike
- eq
- neq
- rlike
- div
- asc
- desc
- args
- parent
- arg_key
- index
- comments
class
Tag(sqlglot.expressions.core.Expression):
Inherited Members
- sqlglot.expressions.core.Expr
- Expr
- arg_types
- is_var_len_args
- is_subquery
- is_cast
- is_primitive
- dump
- load
- sqlglot.expressions.core.Expression
- this
- expression
- expressions
- text
- is_string
- is_number
- to_py
- is_int
- is_star
- alias
- alias_column_names
- name
- alias_or_name
- output_name
- type
- is_type
- is_leaf
- meta
- copy
- add_comments
- pop_comments
- append
- set
- depth
- iter_expressions
- find
- find_all
- find_ancestor
- parent_select
- same_parent
- root
- walk
- dfs
- bfs
- unnest
- unalias
- unnest_operands
- flatten
- to_s
- sql
- transform
- replace
- pop
- assert_is
- error_messages
- and_
- or_
- not_
- update_positions
- as_
- isin
- between
- is_
- like
- ilike
- eq
- neq
- rlike
- div
- asc
- desc
- args
- parent
- arg_key
- index
- comments
class
ResourceType(sqlglot.expressions.core.Expression):
Inherited Members
- sqlglot.expressions.core.Expr
- Expr
- arg_types
- is_var_len_args
- is_subquery
- is_cast
- is_primitive
- dump
- load
- sqlglot.expressions.core.Expression
- this
- expression
- expressions
- text
- is_string
- is_number
- to_py
- is_int
- is_star
- alias
- alias_column_names
- name
- alias_or_name
- output_name
- type
- is_type
- is_leaf
- meta
- copy
- add_comments
- pop_comments
- append
- set
- depth
- iter_expressions
- find
- find_all
- find_ancestor
- parent_select
- same_parent
- root
- walk
- dfs
- bfs
- unnest
- unalias
- unnest_operands
- flatten
- to_s
- sql
- transform
- replace
- pop
- assert_is
- error_messages
- and_
- or_
- not_
- update_positions
- as_
- isin
- between
- is_
- like
- ilike
- eq
- neq
- rlike
- div
- asc
- desc
- args
- parent
- arg_key
- index
- comments
class
Direction(sqlglot.expressions.core.Expression):
Inherited Members
- sqlglot.expressions.core.Expr
- Expr
- arg_types
- is_var_len_args
- is_subquery
- is_cast
- is_primitive
- dump
- load
- sqlglot.expressions.core.Expression
- this
- expression
- expressions
- text
- is_string
- is_number
- to_py
- is_int
- is_star
- alias
- alias_column_names
- name
- alias_or_name
- output_name
- type
- is_type
- is_leaf
- meta
- copy
- add_comments
- pop_comments
- append
- set
- depth
- iter_expressions
- find
- find_all
- find_ancestor
- parent_select
- same_parent
- root
- walk
- dfs
- bfs
- unnest
- unalias
- unnest_operands
- flatten
- to_s
- sql
- transform
- replace
- pop
- assert_is
- error_messages
- and_
- or_
- not_
- update_positions
- as_
- isin
- between
- is_
- like
- ilike
- eq
- neq
- rlike
- div
- asc
- desc
- args
- parent
- arg_key
- index
- comments
def
parse( selector: str, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> sqlglot.expressions.core.Expr:
404def parse(selector: str, dialect: DialectType = None) -> exp.Expr: 405 tokens = SelectorDialect().tokenize(selector) 406 i = 0 407 408 def _curr() -> t.Optional[Token]: 409 return seq_get(tokens, i) 410 411 def _prev() -> Token: 412 return tokens[i - 1] 413 414 def _advance(num: int = 1) -> Token: 415 nonlocal i 416 i += num 417 return _prev() 418 419 def _next() -> t.Optional[Token]: 420 return seq_get(tokens, i + 1) 421 422 def _error(msg: str) -> str: 423 return f"{msg} at index {i}: {selector}" 424 425 def _match(token_type: TokenType, raise_unmatched: bool = False) -> t.Optional[Token]: 426 token = _curr() 427 if token and token.token_type == token_type: 428 return _advance() 429 if raise_unmatched: 430 raise ParseError(_error(f"Expected {token_type}")) 431 return None 432 433 def _parse_kind(kind: str) -> bool: 434 token = _curr() 435 next_token = _next() 436 437 if ( 438 token 439 and token.token_type == TokenType.VAR 440 and token.text.lower() == kind 441 and next_token 442 and next_token.token_type == TokenType.COLON 443 ): 444 _advance(2) 445 return True 446 return False 447 448 def _parse_var() -> exp.Expr: 449 upstream = _match(TokenType.PLUS) 450 downstream = None 451 tag = _parse_kind("tag") 452 resource_type = False if tag else _parse_kind("resource_type") 453 git = False if resource_type else _parse_kind("git") 454 lstar = "*" if _match(TokenType.STAR) else "" 455 directions = {} 456 457 if _match(TokenType.VAR) or _match(TokenType.NUMBER): 458 name = _prev().text 459 rstar = "*" if _match(TokenType.STAR) else "" 460 downstream = _match(TokenType.PLUS) 461 this: exp.Expr = exp.Var(this=f"{lstar}{name}{rstar}") 462 463 elif _match(TokenType.L_PAREN): 464 this = exp.Paren(this=_parse_conjunction()) 465 downstream = _match(TokenType.PLUS) 466 _match(TokenType.R_PAREN, True) 467 elif lstar: 468 this = exp.var("*") 469 else: 470 raise ParseError(_error("Expected model name.")) 471 472 if upstream: 473 directions["up"] = True 474 if downstream: 475 directions["down"] = True 476 477 if tag: 478 this = Tag(this=this) 479 if resource_type: 480 this = ResourceType(this=this) 481 if git: 482 this = Git(this=this) 483 if directions: 484 this = Direction(this=this, **directions) 485 return this 486 487 def _parse_unary() -> exp.Expr: 488 if _match(TokenType.CARET): 489 return exp.Not(this=_parse_unary()) 490 return _parse_var() 491 492 def _parse_conjunction() -> exp.Expr: 493 this = _parse_unary() 494 495 if _match(TokenType.AMP): 496 this = exp.And(this=this, expression=_parse_unary()) 497 if _match(TokenType.PIPE): 498 this = exp.Or(this=this, expression=_parse_conjunction()) 499 500 return this 501 502 return _parse_conjunction()