Edit on GitHub

sqlmesh.core.selector

  1from __future__ import annotations
  2
  3import fnmatch
  4import typing as t
  5from pathlib import Path
  6from itertools import zip_longest
  7import abc
  8
  9from sqlglot import exp
 10from sqlglot.errors import ParseError
 11from sqlglot.tokens import Token, TokenType, Tokenizer as BaseTokenizer
 12from sqlglot.dialects.dialect import Dialect, DialectType
 13from sqlglot.helper import seq_get
 14
 15from sqlmesh.core import constants as c
 16from sqlmesh.core.dialect import normalize_model_name
 17from sqlmesh.core.environment import Environment
 18from sqlmesh.core.model import update_model_schemas
 19from sqlmesh.core.audit import StandaloneAudit
 20from sqlmesh.utils import UniqueKeyDict
 21from sqlmesh.utils.dag import DAG
 22from sqlmesh.utils.git import GitClient
 23from sqlmesh.utils.errors import SQLMeshError
 24
 25
 26if t.TYPE_CHECKING:
 27    from typing_extensions import Literal as Lit  # noqa
 28    from sqlmesh.core.model import Model
 29    from sqlmesh.core.node import Node
 30    from sqlmesh.core.state_sync import StateReader
 31
 32
 33class Selector(abc.ABC):
 34    def __init__(
 35        self,
 36        state_reader: StateReader,
 37        models: UniqueKeyDict[str, Model],
 38        context_path: Path = Path("."),
 39        dag: t.Optional[DAG[str]] = None,
 40        default_catalog: t.Optional[str] = None,
 41        dialect: t.Optional[str] = None,
 42        cache_dir: t.Optional[Path] = None,
 43    ):
 44        self._state_reader = state_reader
 45        self._models = models
 46        self._context_path = context_path
 47        self._cache_dir = cache_dir if cache_dir else context_path / c.CACHE
 48        self._default_catalog = default_catalog
 49        self._dialect = dialect
 50        self._git_client = GitClient(context_path)
 51
 52        if dag is None:
 53            self._dag: DAG[str] = DAG()
 54            for fqn, model in models.items():
 55                self._dag.add(fqn, model.depends_on)
 56        else:
 57            self._dag = dag
 58
 59    def select_models(
 60        self,
 61        model_selections: t.Iterable[str],
 62        target_env_name: str,
 63        fallback_env_name: t.Optional[str] = None,
 64        ensure_finalized_snapshots: bool = False,
 65    ) -> UniqueKeyDict[str, Model]:
 66        """Given a set of selections returns models from the current state with names matching the
 67        selection while sourcing the remaining models from the target environment.
 68
 69        Args:
 70            model_selections: A set of selections.
 71            target_env_name: The name of the target environment.
 72            fallback_env_name: The name of the fallback environment that will be used if the target
 73                environment doesn't exist.
 74            ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized
 75                environment state, or to use whatever snapshots are in the current environment state even if
 76                the environment is not finalized.
 77
 78        Returns:
 79            A dictionary of models.
 80        """
 81        target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name))
 82        if target_env and target_env.expired:
 83            target_env = None
 84
 85        if not target_env and fallback_env_name:
 86            target_env = self._state_reader.get_environment(
 87                Environment.sanitize_name(fallback_env_name)
 88            )
 89
 90        env_models: t.Dict[str, Model] = {}
 91        if target_env:
 92            environment_snapshot_infos = (
 93                target_env.snapshots
 94                if not ensure_finalized_snapshots
 95                else target_env.finalized_or_current_snapshots
 96            )
 97            env_models = {
 98                s.name: s.model
 99                for s in self._state_reader.get_snapshots(environment_snapshot_infos).values()
100                if s.is_model
101            }
102
103        all_selected_models = self.expand_model_selections(
104            model_selections, models={**env_models, **self._models}
105        )
106
107        dag: DAG[str] = DAG()
108        subdag = set()
109
110        for fqn in all_selected_models:
111            if fqn not in subdag:
112                subdag.add(fqn)
113                subdag.update(self._dag.downstream(fqn))
114
115        models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
116        all_model_fqns = set(self._models) | set(env_models)
117        needs_update = False
118
119        def get_model(fqn: str) -> t.Optional[Model]:
120            if fqn not in all_selected_models and fqn in env_models:
121                # Unselected modified or added model.
122                model_from_env = env_models[fqn]
123                try:
124                    # this triggers a render_query() which can throw an exception
125                    model_from_env.depends_on
126                    return model_from_env
127                except Exception as e:
128                    raise SQLMeshError(
129                        f"Model '{model_from_env.name}' sourced from state cannot be rendered "
130                        f"in the local environment due to:\n> {str(e)}"
131                    ) from e
132            if fqn in all_selected_models and fqn in self._models:
133                # Selected modified or removed model.
134                return self._models[fqn]
135            return None
136
137        for fqn in all_model_fqns:
138            model = get_model(fqn)
139
140            if not model:
141                continue
142
143            if model.fqn in subdag:
144                dag.add(model.fqn, model.depends_on)
145
146                for dep in model.depends_on:
147                    schema = model.mapping_schema
148
149                    for part in exp.to_table(dep).parts:
150                        schema = schema.get(part.sql()) or {}
151
152                    parent = get_model(dep)
153
154                    parent_schema = {
155                        c: t.sql(dialect=model.dialect)
156                        for c, t in ((parent and parent.columns_to_types) or {}).items()
157                    }
158
159                    if schema != parent_schema:
160                        model = model.copy(update={"mapping_schema": {}})
161                        needs_update = True
162                        break
163
164            models[model.fqn] = model
165
166        if needs_update:
167            update_model_schemas(dag, models=models, cache_dir=self._cache_dir)
168
169        return models
170
171    def expand_model_selections(
172        self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None
173    ) -> t.Set[str]:
174        """Expands a set of model selections into a set of model fqns that can be looked up in the Context.
175
176        Args:
177            model_selections: A set of model selections.
178
179        Returns:
180            A set of model fqns.
181        """
182
183        node = parse(" | ".join(f"({s})" for s in model_selections))
184
185        all_models: t.Dict[str, Node] = models or dict(self._models)
186        models_by_tags: t.Dict[str, t.Set[str]] = {}
187
188        for fqn, model in all_models.items():
189            for tag in model.tags:
190                tag = tag.lower()
191                models_by_tags.setdefault(tag, set())
192                models_by_tags[tag].add(model.fqn)
193
194        def evaluate(node: exp.Expr) -> t.Set[str]:
195            if isinstance(node, exp.Var):
196                pattern = node.this
197                if "*" in pattern:
198                    return {
199                        fqn
200                        for fqn, model in all_models.items()
201                        if fnmatch.fnmatchcase(self._model_name(model), node.this)
202                    }
203                return self._pattern_to_model_fqns(pattern, all_models)
204            if isinstance(node, exp.And):
205                return evaluate(node.left) & evaluate(node.right)
206            if isinstance(node, exp.Or):
207                return evaluate(node.left) | evaluate(node.right)
208            if isinstance(node, exp.Paren):
209                return evaluate(node.this)
210            if isinstance(node, exp.Not):
211                return set(all_models) - evaluate(node.this)
212            if isinstance(node, Git):
213                target_branch = node.name
214                git_modified_files = {
215                    *self._git_client.list_untracked_files(),
216                    *self._git_client.list_uncommitted_changed_files(),
217                    *self._git_client.list_committed_changed_files(target_branch=target_branch),
218                }
219                return {m.fqn for m in all_models.values() if m._path in git_modified_files}
220            if isinstance(node, Tag):
221                pattern = node.name.lower()
222
223                if "*" in pattern:
224                    return {
225                        model
226                        for tag, models in models_by_tags.items()
227                        for model in models
228                        if fnmatch.fnmatchcase(tag, pattern)
229                    }
230                return models_by_tags.get(pattern, set())
231            if isinstance(node, ResourceType):
232                resource_type = node.name.lower()
233                return {
234                    fqn
235                    for fqn, model in all_models.items()
236                    if self._matches_resource_type(resource_type, model)
237                }
238            if isinstance(node, Direction):
239                selected = set()
240
241                for model_name in evaluate(node.this):
242                    selected.add(model_name)
243                    if node.args.get("up"):
244                        for u in self._dag.upstream(model_name):
245                            if u in all_models:
246                                selected.add(u)
247                    if node.args.get("down"):
248                        selected.update(self._dag.downstream(model_name))
249                return selected
250            raise ParseError(f"Unexpected node {node}")
251
252        return evaluate(node)
253
254    @abc.abstractmethod
255    def _model_name(self, model: Node) -> str:
256        """Given a model, return the name that a selector pattern contining wildcards should be fnmatch'd on"""
257        pass
258
259    @abc.abstractmethod
260    def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]:
261        """Given a pattern, return the keys of the matching models from :all_models"""
262        pass
263
264    @abc.abstractmethod
265    def _matches_resource_type(self, resource_type: str, model: Node) -> bool:
266        """Indicate whether or not the supplied model matches the supplied resource type"""
267        pass
268
269
270class NativeSelector(Selector):
271    """Implementation of selectors that matches objects based on SQLMesh native names"""
272
273    def _model_name(self, model: Node) -> str:
274        return model.name
275
276    def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]:
277        fqn = normalize_model_name(pattern, self._default_catalog, self._dialect)
278        return {fqn} if fqn in all_models else set()
279
280    def _matches_resource_type(self, resource_type: str, model: Node) -> bool:
281        if resource_type == "model":
282            return model.is_model
283        if resource_type == "audit":
284            return isinstance(model, StandaloneAudit)
285
286        raise SQLMeshError(f"Unsupported resource type: {resource_type}")
287
288
289class DbtSelector(Selector):
290    """Implementation of selectors that matches objects based on the DBT names instead of the SQLMesh native names"""
291
292    def _model_name(self, model: Node) -> str:
293        if dbt_fqn := model.dbt_fqn:
294            return dbt_fqn
295        raise SQLMeshError("dbt node information must be populated to use dbt selectors")
296
297    def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]:
298        # a pattern like "staging.customers" should match a model called "jaffle_shop.staging.customers"
299        # but not a model called "jaffle_shop.customers.staging"
300        # also a pattern like "aging" should not match "staging" so we need to consider components; not substrings
301        pattern_components = pattern.split(".")
302        first_pattern_component = pattern_components[0]
303        matches = set()
304        for fqn, model in all_models.items():
305            if not model.dbt_fqn:
306                continue
307
308            dbt_fqn_components = model.dbt_fqn.split(".")
309            try:
310                starting_idx = dbt_fqn_components.index(first_pattern_component)
311            except ValueError:
312                continue
313            for pattern_component, fqn_component in zip_longest(
314                pattern_components, dbt_fqn_components[starting_idx:]
315            ):
316                if pattern_component and not fqn_component:
317                    # the pattern still goes but we have run out of fqn components to match; no match
318                    break
319                if fqn_component and not pattern_component:
320                    # all elements of the pattern have matched elements of the fqn; match
321                    matches.add(fqn)
322                    break
323                if pattern_component != fqn_component:
324                    # the pattern explicitly doesnt match a component; no match
325                    break
326            else:
327                # called if no explicit break, indicating all components of the pattern matched all components of the fqn
328                matches.add(fqn)
329        return matches
330
331    def _matches_resource_type(self, resource_type: str, model: Node) -> bool:
332        """
333        ref: https://docs.getdbt.com/reference/node-selection/methods#resource_type
334
335        # supported by SQLMesh
336        "model"
337        "seed"
338        "source" # external model
339        "test" # standalone audit
340
341        # not supported by SQLMesh yet, commented out to throw an error if someone tries to use them
342        "analysis"
343        "exposure"
344        "metric"
345        "saved_query"
346        "semantic_model"
347        "snapshot"
348        "unit_test"
349        """
350        if resource_type not in ("model", "seed", "source", "test"):
351            raise SQLMeshError(f"Unsupported resource type: {resource_type}")
352
353        if isinstance(model, StandaloneAudit):
354            return resource_type == "test"
355
356        if resource_type == "model":
357            return model.is_model and not model.kind.is_external and not model.kind.is_seed
358        if resource_type == "source":
359            return model.kind.is_external
360        if resource_type == "seed":
361            return model.kind.is_seed
362
363        return False
364
365
366class SelectorDialect(Dialect):
367    IDENTIFIERS_CAN_START_WITH_DIGIT = True
368
369    class Tokenizer(BaseTokenizer):
370        SINGLE_TOKENS = {
371            "(": TokenType.L_PAREN,
372            ")": TokenType.R_PAREN,
373            "&": TokenType.AMP,
374            "|": TokenType.PIPE,
375            "^": TokenType.CARET,
376            "+": TokenType.PLUS,
377            "*": TokenType.STAR,
378            ":": TokenType.COLON,
379        }
380
381        KEYWORDS = {}
382        IDENTIFIERS = ["\\"]  # there are no identifiers but need to put something here
383        IDENTIFIER_START = ""
384        IDENTIFIER_END = ""
385
386
387class Git(exp.Expression):
388    pass
389
390
391class Tag(exp.Expression):
392    pass
393
394
395class ResourceType(exp.Expression):
396    pass
397
398
399class Direction(exp.Expression):
400    pass
401
402
403def parse(selector: str, dialect: DialectType = None) -> exp.Expr:
404    tokens = SelectorDialect().tokenize(selector)
405    i = 0
406
407    def _curr() -> t.Optional[Token]:
408        return seq_get(tokens, i)
409
410    def _prev() -> Token:
411        return tokens[i - 1]
412
413    def _advance(num: int = 1) -> Token:
414        nonlocal i
415        i += num
416        return _prev()
417
418    def _next() -> t.Optional[Token]:
419        return seq_get(tokens, i + 1)
420
421    def _error(msg: str) -> str:
422        return f"{msg} at index {i}: {selector}"
423
424    def _match(token_type: TokenType, raise_unmatched: bool = False) -> t.Optional[Token]:
425        token = _curr()
426        if token and token.token_type == token_type:
427            return _advance()
428        if raise_unmatched:
429            raise ParseError(_error(f"Expected {token_type}"))
430        return None
431
432    def _parse_kind(kind: str) -> bool:
433        token = _curr()
434        next_token = _next()
435
436        if (
437            token
438            and token.token_type == TokenType.VAR
439            and token.text.lower() == kind
440            and next_token
441            and next_token.token_type == TokenType.COLON
442        ):
443            _advance(2)
444            return True
445        return False
446
447    def _parse_var() -> exp.Expr:
448        upstream = _match(TokenType.PLUS)
449        downstream = None
450        tag = _parse_kind("tag")
451        resource_type = False if tag else _parse_kind("resource_type")
452        git = False if resource_type else _parse_kind("git")
453        lstar = "*" if _match(TokenType.STAR) else ""
454        directions = {}
455
456        if _match(TokenType.VAR) or _match(TokenType.NUMBER):
457            name = _prev().text
458            rstar = "*" if _match(TokenType.STAR) else ""
459            downstream = _match(TokenType.PLUS)
460            this: exp.Expr = exp.Var(this=f"{lstar}{name}{rstar}")
461
462        elif _match(TokenType.L_PAREN):
463            this = exp.Paren(this=_parse_conjunction())
464            downstream = _match(TokenType.PLUS)
465            _match(TokenType.R_PAREN, True)
466        elif lstar:
467            this = exp.var("*")
468        else:
469            raise ParseError(_error("Expected model name."))
470
471        if upstream:
472            directions["up"] = True
473        if downstream:
474            directions["down"] = True
475
476        if tag:
477            this = Tag(this=this)
478        if resource_type:
479            this = ResourceType(this=this)
480        if git:
481            this = Git(this=this)
482        if directions:
483            this = Direction(this=this, **directions)
484        return this
485
486    def _parse_unary() -> exp.Expr:
487        if _match(TokenType.CARET):
488            return exp.Not(this=_parse_unary())
489        return _parse_var()
490
491    def _parse_conjunction() -> exp.Expr:
492        this = _parse_unary()
493
494        if _match(TokenType.AMP):
495            this = exp.And(this=this, expression=_parse_unary())
496        if _match(TokenType.PIPE):
497            this = exp.Or(this=this, expression=_parse_conjunction())
498
499        return this
500
501    return _parse_conjunction()
class Selector(abc.ABC):
 34class Selector(abc.ABC):
 35    def __init__(
 36        self,
 37        state_reader: StateReader,
 38        models: UniqueKeyDict[str, Model],
 39        context_path: Path = Path("."),
 40        dag: t.Optional[DAG[str]] = None,
 41        default_catalog: t.Optional[str] = None,
 42        dialect: t.Optional[str] = None,
 43        cache_dir: t.Optional[Path] = None,
 44    ):
 45        self._state_reader = state_reader
 46        self._models = models
 47        self._context_path = context_path
 48        self._cache_dir = cache_dir if cache_dir else context_path / c.CACHE
 49        self._default_catalog = default_catalog
 50        self._dialect = dialect
 51        self._git_client = GitClient(context_path)
 52
 53        if dag is None:
 54            self._dag: DAG[str] = DAG()
 55            for fqn, model in models.items():
 56                self._dag.add(fqn, model.depends_on)
 57        else:
 58            self._dag = dag
 59
 60    def select_models(
 61        self,
 62        model_selections: t.Iterable[str],
 63        target_env_name: str,
 64        fallback_env_name: t.Optional[str] = None,
 65        ensure_finalized_snapshots: bool = False,
 66    ) -> UniqueKeyDict[str, Model]:
 67        """Given a set of selections returns models from the current state with names matching the
 68        selection while sourcing the remaining models from the target environment.
 69
 70        Args:
 71            model_selections: A set of selections.
 72            target_env_name: The name of the target environment.
 73            fallback_env_name: The name of the fallback environment that will be used if the target
 74                environment doesn't exist.
 75            ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized
 76                environment state, or to use whatever snapshots are in the current environment state even if
 77                the environment is not finalized.
 78
 79        Returns:
 80            A dictionary of models.
 81        """
 82        target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name))
 83        if target_env and target_env.expired:
 84            target_env = None
 85
 86        if not target_env and fallback_env_name:
 87            target_env = self._state_reader.get_environment(
 88                Environment.sanitize_name(fallback_env_name)
 89            )
 90
 91        env_models: t.Dict[str, Model] = {}
 92        if target_env:
 93            environment_snapshot_infos = (
 94                target_env.snapshots
 95                if not ensure_finalized_snapshots
 96                else target_env.finalized_or_current_snapshots
 97            )
 98            env_models = {
 99                s.name: s.model
100                for s in self._state_reader.get_snapshots(environment_snapshot_infos).values()
101                if s.is_model
102            }
103
104        all_selected_models = self.expand_model_selections(
105            model_selections, models={**env_models, **self._models}
106        )
107
108        dag: DAG[str] = DAG()
109        subdag = set()
110
111        for fqn in all_selected_models:
112            if fqn not in subdag:
113                subdag.add(fqn)
114                subdag.update(self._dag.downstream(fqn))
115
116        models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
117        all_model_fqns = set(self._models) | set(env_models)
118        needs_update = False
119
120        def get_model(fqn: str) -> t.Optional[Model]:
121            if fqn not in all_selected_models and fqn in env_models:
122                # Unselected modified or added model.
123                model_from_env = env_models[fqn]
124                try:
125                    # this triggers a render_query() which can throw an exception
126                    model_from_env.depends_on
127                    return model_from_env
128                except Exception as e:
129                    raise SQLMeshError(
130                        f"Model '{model_from_env.name}' sourced from state cannot be rendered "
131                        f"in the local environment due to:\n> {str(e)}"
132                    ) from e
133            if fqn in all_selected_models and fqn in self._models:
134                # Selected modified or removed model.
135                return self._models[fqn]
136            return None
137
138        for fqn in all_model_fqns:
139            model = get_model(fqn)
140
141            if not model:
142                continue
143
144            if model.fqn in subdag:
145                dag.add(model.fqn, model.depends_on)
146
147                for dep in model.depends_on:
148                    schema = model.mapping_schema
149
150                    for part in exp.to_table(dep).parts:
151                        schema = schema.get(part.sql()) or {}
152
153                    parent = get_model(dep)
154
155                    parent_schema = {
156                        c: t.sql(dialect=model.dialect)
157                        for c, t in ((parent and parent.columns_to_types) or {}).items()
158                    }
159
160                    if schema != parent_schema:
161                        model = model.copy(update={"mapping_schema": {}})
162                        needs_update = True
163                        break
164
165            models[model.fqn] = model
166
167        if needs_update:
168            update_model_schemas(dag, models=models, cache_dir=self._cache_dir)
169
170        return models
171
172    def expand_model_selections(
173        self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None
174    ) -> t.Set[str]:
175        """Expands a set of model selections into a set of model fqns that can be looked up in the Context.
176
177        Args:
178            model_selections: A set of model selections.
179
180        Returns:
181            A set of model fqns.
182        """
183
184        node = parse(" | ".join(f"({s})" for s in model_selections))
185
186        all_models: t.Dict[str, Node] = models or dict(self._models)
187        models_by_tags: t.Dict[str, t.Set[str]] = {}
188
189        for fqn, model in all_models.items():
190            for tag in model.tags:
191                tag = tag.lower()
192                models_by_tags.setdefault(tag, set())
193                models_by_tags[tag].add(model.fqn)
194
195        def evaluate(node: exp.Expr) -> t.Set[str]:
196            if isinstance(node, exp.Var):
197                pattern = node.this
198                if "*" in pattern:
199                    return {
200                        fqn
201                        for fqn, model in all_models.items()
202                        if fnmatch.fnmatchcase(self._model_name(model), node.this)
203                    }
204                return self._pattern_to_model_fqns(pattern, all_models)
205            if isinstance(node, exp.And):
206                return evaluate(node.left) & evaluate(node.right)
207            if isinstance(node, exp.Or):
208                return evaluate(node.left) | evaluate(node.right)
209            if isinstance(node, exp.Paren):
210                return evaluate(node.this)
211            if isinstance(node, exp.Not):
212                return set(all_models) - evaluate(node.this)
213            if isinstance(node, Git):
214                target_branch = node.name
215                git_modified_files = {
216                    *self._git_client.list_untracked_files(),
217                    *self._git_client.list_uncommitted_changed_files(),
218                    *self._git_client.list_committed_changed_files(target_branch=target_branch),
219                }
220                return {m.fqn for m in all_models.values() if m._path in git_modified_files}
221            if isinstance(node, Tag):
222                pattern = node.name.lower()
223
224                if "*" in pattern:
225                    return {
226                        model
227                        for tag, models in models_by_tags.items()
228                        for model in models
229                        if fnmatch.fnmatchcase(tag, pattern)
230                    }
231                return models_by_tags.get(pattern, set())
232            if isinstance(node, ResourceType):
233                resource_type = node.name.lower()
234                return {
235                    fqn
236                    for fqn, model in all_models.items()
237                    if self._matches_resource_type(resource_type, model)
238                }
239            if isinstance(node, Direction):
240                selected = set()
241
242                for model_name in evaluate(node.this):
243                    selected.add(model_name)
244                    if node.args.get("up"):
245                        for u in self._dag.upstream(model_name):
246                            if u in all_models:
247                                selected.add(u)
248                    if node.args.get("down"):
249                        selected.update(self._dag.downstream(model_name))
250                return selected
251            raise ParseError(f"Unexpected node {node}")
252
253        return evaluate(node)
254
255    @abc.abstractmethod
256    def _model_name(self, model: Node) -> str:
257        """Given a model, return the name that a selector pattern contining wildcards should be fnmatch'd on"""
258        pass
259
260    @abc.abstractmethod
261    def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]:
262        """Given a pattern, return the keys of the matching models from :all_models"""
263        pass
264
265    @abc.abstractmethod
266    def _matches_resource_type(self, resource_type: str, model: Node) -> bool:
267        """Indicate whether or not the supplied model matches the supplied resource type"""
268        pass

Helper class that provides a standard way to create an ABC using inheritance.

def select_models( self, model_selections: Iterable[str], target_env_name: str, fallback_env_name: Optional[str] = None, ensure_finalized_snapshots: bool = False) -> sqlmesh.utils.UniqueKeyDict[str, typing.Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]]:
 60    def select_models(
 61        self,
 62        model_selections: t.Iterable[str],
 63        target_env_name: str,
 64        fallback_env_name: t.Optional[str] = None,
 65        ensure_finalized_snapshots: bool = False,
 66    ) -> UniqueKeyDict[str, Model]:
 67        """Given a set of selections returns models from the current state with names matching the
 68        selection while sourcing the remaining models from the target environment.
 69
 70        Args:
 71            model_selections: A set of selections.
 72            target_env_name: The name of the target environment.
 73            fallback_env_name: The name of the fallback environment that will be used if the target
 74                environment doesn't exist.
 75            ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized
 76                environment state, or to use whatever snapshots are in the current environment state even if
 77                the environment is not finalized.
 78
 79        Returns:
 80            A dictionary of models.
 81        """
 82        target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name))
 83        if target_env and target_env.expired:
 84            target_env = None
 85
 86        if not target_env and fallback_env_name:
 87            target_env = self._state_reader.get_environment(
 88                Environment.sanitize_name(fallback_env_name)
 89            )
 90
 91        env_models: t.Dict[str, Model] = {}
 92        if target_env:
 93            environment_snapshot_infos = (
 94                target_env.snapshots
 95                if not ensure_finalized_snapshots
 96                else target_env.finalized_or_current_snapshots
 97            )
 98            env_models = {
 99                s.name: s.model
100                for s in self._state_reader.get_snapshots(environment_snapshot_infos).values()
101                if s.is_model
102            }
103
104        all_selected_models = self.expand_model_selections(
105            model_selections, models={**env_models, **self._models}
106        )
107
108        dag: DAG[str] = DAG()
109        subdag = set()
110
111        for fqn in all_selected_models:
112            if fqn not in subdag:
113                subdag.add(fqn)
114                subdag.update(self._dag.downstream(fqn))
115
116        models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
117        all_model_fqns = set(self._models) | set(env_models)
118        needs_update = False
119
120        def get_model(fqn: str) -> t.Optional[Model]:
121            if fqn not in all_selected_models and fqn in env_models:
122                # Unselected modified or added model.
123                model_from_env = env_models[fqn]
124                try:
125                    # this triggers a render_query() which can throw an exception
126                    model_from_env.depends_on
127                    return model_from_env
128                except Exception as e:
129                    raise SQLMeshError(
130                        f"Model '{model_from_env.name}' sourced from state cannot be rendered "
131                        f"in the local environment due to:\n> {str(e)}"
132                    ) from e
133            if fqn in all_selected_models and fqn in self._models:
134                # Selected modified or removed model.
135                return self._models[fqn]
136            return None
137
138        for fqn in all_model_fqns:
139            model = get_model(fqn)
140
141            if not model:
142                continue
143
144            if model.fqn in subdag:
145                dag.add(model.fqn, model.depends_on)
146
147                for dep in model.depends_on:
148                    schema = model.mapping_schema
149
150                    for part in exp.to_table(dep).parts:
151                        schema = schema.get(part.sql()) or {}
152
153                    parent = get_model(dep)
154
155                    parent_schema = {
156                        c: t.sql(dialect=model.dialect)
157                        for c, t in ((parent and parent.columns_to_types) or {}).items()
158                    }
159
160                    if schema != parent_schema:
161                        model = model.copy(update={"mapping_schema": {}})
162                        needs_update = True
163                        break
164
165            models[model.fqn] = model
166
167        if needs_update:
168            update_model_schemas(dag, models=models, cache_dir=self._cache_dir)
169
170        return models

Given a set of selections returns models from the current state with names matching the selection while sourcing the remaining models from the target environment.

Arguments:
  • model_selections: A set of selections.
  • target_env_name: The name of the target environment.
  • fallback_env_name: The name of the fallback environment that will be used if the target environment doesn't exist.
  • ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
Returns:

A dictionary of models.

def expand_model_selections( self, model_selections: Iterable[str], models: Optional[Dict[str, Annotated[Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel, sqlmesh.core.audit.definition.StandaloneAudit], FieldInfo(annotation=NoneType, required=True, discriminator='source_type')]]] = None) -> Set[str]:
172    def expand_model_selections(
173        self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None
174    ) -> t.Set[str]:
175        """Expands a set of model selections into a set of model fqns that can be looked up in the Context.
176
177        Args:
178            model_selections: A set of model selections.
179
180        Returns:
181            A set of model fqns.
182        """
183
184        node = parse(" | ".join(f"({s})" for s in model_selections))
185
186        all_models: t.Dict[str, Node] = models or dict(self._models)
187        models_by_tags: t.Dict[str, t.Set[str]] = {}
188
189        for fqn, model in all_models.items():
190            for tag in model.tags:
191                tag = tag.lower()
192                models_by_tags.setdefault(tag, set())
193                models_by_tags[tag].add(model.fqn)
194
195        def evaluate(node: exp.Expr) -> t.Set[str]:
196            if isinstance(node, exp.Var):
197                pattern = node.this
198                if "*" in pattern:
199                    return {
200                        fqn
201                        for fqn, model in all_models.items()
202                        if fnmatch.fnmatchcase(self._model_name(model), node.this)
203                    }
204                return self._pattern_to_model_fqns(pattern, all_models)
205            if isinstance(node, exp.And):
206                return evaluate(node.left) & evaluate(node.right)
207            if isinstance(node, exp.Or):
208                return evaluate(node.left) | evaluate(node.right)
209            if isinstance(node, exp.Paren):
210                return evaluate(node.this)
211            if isinstance(node, exp.Not):
212                return set(all_models) - evaluate(node.this)
213            if isinstance(node, Git):
214                target_branch = node.name
215                git_modified_files = {
216                    *self._git_client.list_untracked_files(),
217                    *self._git_client.list_uncommitted_changed_files(),
218                    *self._git_client.list_committed_changed_files(target_branch=target_branch),
219                }
220                return {m.fqn for m in all_models.values() if m._path in git_modified_files}
221            if isinstance(node, Tag):
222                pattern = node.name.lower()
223
224                if "*" in pattern:
225                    return {
226                        model
227                        for tag, models in models_by_tags.items()
228                        for model in models
229                        if fnmatch.fnmatchcase(tag, pattern)
230                    }
231                return models_by_tags.get(pattern, set())
232            if isinstance(node, ResourceType):
233                resource_type = node.name.lower()
234                return {
235                    fqn
236                    for fqn, model in all_models.items()
237                    if self._matches_resource_type(resource_type, model)
238                }
239            if isinstance(node, Direction):
240                selected = set()
241
242                for model_name in evaluate(node.this):
243                    selected.add(model_name)
244                    if node.args.get("up"):
245                        for u in self._dag.upstream(model_name):
246                            if u in all_models:
247                                selected.add(u)
248                    if node.args.get("down"):
249                        selected.update(self._dag.downstream(model_name))
250                return selected
251            raise ParseError(f"Unexpected node {node}")
252
253        return evaluate(node)

Expands a set of model selections into a set of model fqns that can be looked up in the Context.

Arguments:
  • model_selections: A set of model selections.
Returns:

A set of model fqns.

class NativeSelector(Selector):
271class NativeSelector(Selector):
272    """Implementation of selectors that matches objects based on SQLMesh native names"""
273
274    def _model_name(self, model: Node) -> str:
275        return model.name
276
277    def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]:
278        fqn = normalize_model_name(pattern, self._default_catalog, self._dialect)
279        return {fqn} if fqn in all_models else set()
280
281    def _matches_resource_type(self, resource_type: str, model: Node) -> bool:
282        if resource_type == "model":
283            return model.is_model
284        if resource_type == "audit":
285            return isinstance(model, StandaloneAudit)
286
287        raise SQLMeshError(f"Unsupported resource type: {resource_type}")

Implementation of selectors that matches objects based on SQLMesh native names

class DbtSelector(Selector):
290class DbtSelector(Selector):
291    """Implementation of selectors that matches objects based on the DBT names instead of the SQLMesh native names"""
292
293    def _model_name(self, model: Node) -> str:
294        if dbt_fqn := model.dbt_fqn:
295            return dbt_fqn
296        raise SQLMeshError("dbt node information must be populated to use dbt selectors")
297
298    def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]:
299        # a pattern like "staging.customers" should match a model called "jaffle_shop.staging.customers"
300        # but not a model called "jaffle_shop.customers.staging"
301        # also a pattern like "aging" should not match "staging" so we need to consider components; not substrings
302        pattern_components = pattern.split(".")
303        first_pattern_component = pattern_components[0]
304        matches = set()
305        for fqn, model in all_models.items():
306            if not model.dbt_fqn:
307                continue
308
309            dbt_fqn_components = model.dbt_fqn.split(".")
310            try:
311                starting_idx = dbt_fqn_components.index(first_pattern_component)
312            except ValueError:
313                continue
314            for pattern_component, fqn_component in zip_longest(
315                pattern_components, dbt_fqn_components[starting_idx:]
316            ):
317                if pattern_component and not fqn_component:
318                    # the pattern still goes but we have run out of fqn components to match; no match
319                    break
320                if fqn_component and not pattern_component:
321                    # all elements of the pattern have matched elements of the fqn; match
322                    matches.add(fqn)
323                    break
324                if pattern_component != fqn_component:
325                    # the pattern explicitly doesnt match a component; no match
326                    break
327            else:
328                # called if no explicit break, indicating all components of the pattern matched all components of the fqn
329                matches.add(fqn)
330        return matches
331
332    def _matches_resource_type(self, resource_type: str, model: Node) -> bool:
333        """
334        ref: https://docs.getdbt.com/reference/node-selection/methods#resource_type
335
336        # supported by SQLMesh
337        "model"
338        "seed"
339        "source" # external model
340        "test" # standalone audit
341
342        # not supported by SQLMesh yet, commented out to throw an error if someone tries to use them
343        "analysis"
344        "exposure"
345        "metric"
346        "saved_query"
347        "semantic_model"
348        "snapshot"
349        "unit_test"
350        """
351        if resource_type not in ("model", "seed", "source", "test"):
352            raise SQLMeshError(f"Unsupported resource type: {resource_type}")
353
354        if isinstance(model, StandaloneAudit):
355            return resource_type == "test"
356
357        if resource_type == "model":
358            return model.is_model and not model.kind.is_external and not model.kind.is_seed
359        if resource_type == "source":
360            return model.kind.is_external
361        if resource_type == "seed":
362            return model.kind.is_seed
363
364        return False

Implementation of selectors that matches objects based on the DBT names instead of the SQLMesh native names

class SelectorDialect(sqlglot.dialects.dialect.Dialect):
367class SelectorDialect(Dialect):
368    IDENTIFIERS_CAN_START_WITH_DIGIT = True
369
370    class Tokenizer(BaseTokenizer):
371        SINGLE_TOKENS = {
372            "(": TokenType.L_PAREN,
373            ")": TokenType.R_PAREN,
374            "&": TokenType.AMP,
375            "|": TokenType.PIPE,
376            "^": TokenType.CARET,
377            "+": TokenType.PLUS,
378            "*": TokenType.STAR,
379            ":": TokenType.COLON,
380        }
381
382        KEYWORDS = {}
383        IDENTIFIERS = ["\\"]  # there are no identifiers but need to put something here
384        IDENTIFIER_START = ""
385        IDENTIFIER_END = ""
IDENTIFIERS_CAN_START_WITH_DIGIT = True

Whether an unquoted identifier can start with a digit.

SUPPORTS_COLUMN_JOIN_MARKS = False

Whether the old-style outer join (+) syntax is supported.

INITCAP_SUPPORTS_CUSTOM_DELIMITERS = False
tokenizer_class = <class 'SelectorDialect.Tokenizer'>
jsonpath_tokenizer_class = <class 'sqlglot.dialects.dialect.JSONPathTokenizer'>
parser_class = <class 'sqlglot.parsers.base.BaseParser'>
generator_class = <class 'sqlglot.generator.Generator'>
TIME_TRIE: Dict = {}
FORMAT_TRIE: Dict = {}
INVERSE_TIME_MAPPING: Dict[str, str] = {}
INVERSE_TIME_TRIE: Dict = {}
INVERSE_FORMAT_MAPPING: Dict[str, str] = {}
INVERSE_FORMAT_TRIE: Dict = {}
INVERSE_CREATABLE_KIND_MAPPING: dict[str, str] = {}
ESCAPED_SEQUENCES: Dict[str, str] = {}
QUOTE_START = "'"
QUOTE_END = "'"
IDENTIFIER_START = '\\'
IDENTIFIER_END = '\\'
VALID_INTERVAL_UNITS: Set[str] = {'WEEK', 'NANOSECOND', 'MICROSECONDS', 'EPOCH_MILLISECOND', 'CENTURIES', 'HRS', 'CENTS', 'EPOCH_MILLISECONDS', 'USECONDS', 'WEEK_ISO', 'WY', 'MSEC', 'TIMEZONE_MINUTE', 'NANOSECS', 'SECONDS', 'Y', 'MIN', 'CENTURY', 'YEAR', 'DAYOFWEEK', 'MINUTE', 'EPOCH_MICROSECOND', 'MSECOND', 'EPOCH_NANOSECONDS', 'MI', 'NS', 'WEEKOFYEAR_ISO', 'MICROSEC', 'EPOCH_SECONDS', 'YYY', 'WEEKDAY', 'MILLENNIUM', 'MIL', 'YEARS', 'WEEKOFYEARISO', 'SECOND', 'EPOCH_SECOND', 'CENT', 'SECS', 'NSECONDS', 'MSECONDS', 'DAYOFWEEKISO', 'MILLENIA', 'DAY OF YEAR', 'EPOCH', 'MILLISEC', 'C', 'TZH', 'DAY', 'MONS', 'WOY', 'MILLISECON', 'DW', 'W', 'WK', 'HOURS', 'WEEKISO', 'USECS', 'DECADE', 'H', 'MS', 'MILS', 'DD', 'MICROSECOND', 'DAYOFMONTH', 'DAYS', 'EPOCH_NANOSECOND', 'QUARTER', 'TZM', 'WEEKDAY_ISO', 'YR', 'DOW', 'DW_ISO', 'DAY OF WEEK', 'TIMEZONE_HOUR', 'MONTHS', 'DOW_ISO', 'USEC', 'DY', 'MILLISECOND', 'MICROSECS', 'MILLISECONDS', 'YRS', 'USECOND', 'EPOCH_MICROSECONDS', 'MILLISECS', 'MINUTES', 'NANOSEC', 'MSECS', 'QTR', 'Q', 'HOUR', 'NSECOND', 'S', 'HR', 'DECADES', 'MINS', 'M', 'NSEC', 'MONTH', 'MM', 'HH', 'DEC', 'MON', 'QTRS', 'SEC', 'QUARTERS', 'YYYY', 'YY', 'DAYOFYEAR', 'DAYOFWEEK_ISO', 'DECS', 'US', 'WEEKOFYEAR', 'D', 'DOY'}
BIT_START: Optional[str] = None
BIT_END: Optional[str] = None
HEX_START: Optional[str] = None
HEX_END: Optional[str] = None
BYTE_START: Optional[str] = None
BYTE_END: Optional[str] = None
UNICODE_START: Optional[str] = None
UNICODE_END: Optional[str] = None
STRINGS_SUPPORT_ESCAPED_SEQUENCES = False
BYTE_STRINGS_SUPPORT_ESCAPED_SEQUENCES = False
Inherited Members
sqlglot.dialects.dialect.Dialect
Dialect
INDEX_OFFSET
WEEK_OFFSET
UNNEST_COLUMN_ONLY
ALIAS_POST_TABLESAMPLE
TABLESAMPLE_SIZE_IS_PERCENT
NORMALIZATION_STRATEGY
DPIPE_IS_STRING_CONCAT
STRICT_STRING_CONCAT
SUPPORTS_USER_DEFINED_TYPES
COPY_PARAMS_ARE_CSV
NORMALIZE_FUNCTIONS
PRESERVE_ORIGINAL_NAMES
LOG_BASE_FIRST
NULL_ORDERING
TYPED_DIVISION
SAFE_DIVISION
CONCAT_COALESCE
HEX_LOWERCASE
DATE_FORMAT
DATEINT_FORMAT
TIME_FORMAT
TIME_MAPPING
FORMAT_MAPPING
UNESCAPED_SEQUENCES
PSEUDOCOLUMNS
PREFER_CTE_ALIAS_COLUMN
FORCE_EARLY_ALIAS_REF_EXPANSION
EXPAND_ONLY_GROUP_ALIAS_REF
ANNOTATE_ALL_SCOPES
DISABLES_ALIAS_REF_EXPANSION
SUPPORTS_ALIAS_REFS_IN_JOIN_CONDITIONS
SUPPORTS_ORDER_BY_ALL
PROJECTION_ALIASES_SHADOW_SOURCE_NAMES
TABLES_REFERENCEABLE_AS_COLUMNS
SUPPORTS_STRUCT_STAR_EXPANSION
EXCLUDES_PSEUDOCOLUMNS_FROM_STAR
QUERY_RESULTS_ARE_STRUCTS
REQUIRES_PARENTHESIZED_STRUCT_ACCESS
SUPPORTS_NULL_TYPE
COALESCE_COMPARISON_NON_STANDARD
HAS_DISTINCT_ARRAY_CONSTRUCTORS
SUPPORTS_FIXED_SIZE_ARRAYS
STRICT_JSON_PATH_SYNTAX
ON_CONDITION_EMPTY_BEFORE_ERROR
ARRAY_AGG_INCLUDES_NULLS
ARRAY_FUNCS_PROPAGATES_NULLS
PROMOTE_TO_INFERRED_DATETIME_TYPE
SUPPORTS_VALUES_DEFAULT
NUMBERS_CAN_BE_UNDERSCORE_SEPARATED
HEX_STRING_IS_INTEGER_TYPE
REGEXP_EXTRACT_DEFAULT_GROUP
REGEXP_EXTRACT_POSITION_OVERFLOW_RETURNS_NULL
SET_OP_DISTINCT_BY_DEFAULT
CREATABLE_KIND_MAPPING
ALTER_TABLE_SUPPORTS_CASCADE
ALTER_TABLE_ADD_REQUIRED_FOR_EACH_COLUMN
TRY_CAST_REQUIRES_STRING
SAFE_TO_ELIMINATE_DOUBLE_NEGATION
INITCAP_DEFAULT_DELIMITER_CHARS
BYTE_STRING_IS_BYTES_TYPE
UUID_IS_STRING_TYPE
JSON_EXTRACT_SCALAR_SCALAR_ONLY
DEFAULT_FUNCTIONS_COLUMN_NAMES
DEFAULT_NULL_TYPE
LEAST_GREATEST_IGNORES_NULLS
PRIORITIZE_NON_LITERAL_TYPES
ALIAS_POST_VERSION
DATE_PART_MAPPING
COERCES_TO
EXPRESSION_METADATA
SUPPORTED_SETTINGS
get_or_raise
format_time
version
settings
normalize_identifier
case_sensitive
can_quote
quote_identifier
to_json_path
parse
parse_into
generate
transpile
tokenize
tokenizer
jsonpath_tokenizer
parser
generator
generate_values_aliases
class SelectorDialect.Tokenizer(sqlglot.tokens.Tokenizer):
370    class Tokenizer(BaseTokenizer):
371        SINGLE_TOKENS = {
372            "(": TokenType.L_PAREN,
373            ")": TokenType.R_PAREN,
374            "&": TokenType.AMP,
375            "|": TokenType.PIPE,
376            "^": TokenType.CARET,
377            "+": TokenType.PLUS,
378            "*": TokenType.STAR,
379            ":": TokenType.COLON,
380        }
381
382        KEYWORDS = {}
383        IDENTIFIERS = ["\\"]  # there are no identifiers but need to put something here
384        IDENTIFIER_START = ""
385        IDENTIFIER_END = ""
SINGLE_TOKENS = {'(': <TokenType.L_PAREN: 1>, ')': <TokenType.R_PAREN: 2>, '&': <TokenType.AMP: 36>, '|': <TokenType.PIPE: 39>, '^': <TokenType.CARET: 42>, '+': <TokenType.PLUS: 10>, '*': <TokenType.STAR: 20>, ':': <TokenType.COLON: 11>}
KEYWORDS = {}
IDENTIFIERS = ['\\']
IDENTIFIER_START = ''
IDENTIFIER_END = ''
BYTE_STRING_ESCAPES: ClassVar[List[str]] = ["'"]
Inherited Members
sqlglot.tokens.Tokenizer
Tokenizer
BIT_STRINGS
BYTE_STRINGS
HEX_STRINGS
RAW_STRINGS
HEREDOC_STRINGS
UNICODE_STRINGS
QUOTES
STRING_ESCAPES
VAR_SINGLE_TOKENS
ESCAPE_FOLLOW_CHARS
IDENTIFIER_ESCAPES
HEREDOC_TAG_IS_IDENTIFIER
HEREDOC_STRING_ALTERNATIVE
STRING_ESCAPES_ALLOWED_IN_RAW_STRINGS
NESTED_COMMENTS
HINT_START
TOKENS_PRECEDING_HINT
COMMANDS
COMMAND_PREFIX_TOKENS
NUMERIC_LITERALS
COMMENTS
dialect
tokenize
sql
size
tokens
class Git(sqlglot.expressions.core.Expression):
388class Git(exp.Expression):
389    pass
key: ClassVar[str] = 'git'
required_args: ClassVar[Set[str]] = {'this'}
Inherited Members
sqlglot.expressions.core.Expr
Expr
arg_types
is_var_len_args
is_subquery
is_cast
is_primitive
dump
load
sqlglot.expressions.core.Expression
this
expression
expressions
text
is_string
is_number
to_py
is_int
is_star
alias
alias_column_names
name
alias_or_name
output_name
type
is_type
is_leaf
meta
copy
add_comments
pop_comments
append
set
depth
iter_expressions
find
find_all
find_ancestor
parent_select
same_parent
root
walk
dfs
bfs
unnest
unalias
unnest_operands
flatten
to_s
sql
transform
replace
pop
assert_is
error_messages
and_
or_
not_
update_positions
as_
isin
between
is_
like
ilike
eq
neq
rlike
div
asc
desc
args
parent
arg_key
index
comments
class Tag(sqlglot.expressions.core.Expression):
392class Tag(exp.Expression):
393    pass
key: ClassVar[str] = 'tag'
required_args: ClassVar[Set[str]] = {'this'}
Inherited Members
sqlglot.expressions.core.Expr
Expr
arg_types
is_var_len_args
is_subquery
is_cast
is_primitive
dump
load
sqlglot.expressions.core.Expression
this
expression
expressions
text
is_string
is_number
to_py
is_int
is_star
alias
alias_column_names
name
alias_or_name
output_name
type
is_type
is_leaf
meta
copy
add_comments
pop_comments
append
set
depth
iter_expressions
find
find_all
find_ancestor
parent_select
same_parent
root
walk
dfs
bfs
unnest
unalias
unnest_operands
flatten
to_s
sql
transform
replace
pop
assert_is
error_messages
and_
or_
not_
update_positions
as_
isin
between
is_
like
ilike
eq
neq
rlike
div
asc
desc
args
parent
arg_key
index
comments
class ResourceType(sqlglot.expressions.core.Expression):
396class ResourceType(exp.Expression):
397    pass
key: ClassVar[str] = 'resourcetype'
required_args: ClassVar[Set[str]] = {'this'}
Inherited Members
sqlglot.expressions.core.Expr
Expr
arg_types
is_var_len_args
is_subquery
is_cast
is_primitive
dump
load
sqlglot.expressions.core.Expression
this
expression
expressions
text
is_string
is_number
to_py
is_int
is_star
alias
alias_column_names
name
alias_or_name
output_name
type
is_type
is_leaf
meta
copy
add_comments
pop_comments
append
set
depth
iter_expressions
find
find_all
find_ancestor
parent_select
same_parent
root
walk
dfs
bfs
unnest
unalias
unnest_operands
flatten
to_s
sql
transform
replace
pop
assert_is
error_messages
and_
or_
not_
update_positions
as_
isin
between
is_
like
ilike
eq
neq
rlike
div
asc
desc
args
parent
arg_key
index
comments
class Direction(sqlglot.expressions.core.Expression):
400class Direction(exp.Expression):
401    pass
key: ClassVar[str] = 'direction'
required_args: ClassVar[Set[str]] = {'this'}
Inherited Members
sqlglot.expressions.core.Expr
Expr
arg_types
is_var_len_args
is_subquery
is_cast
is_primitive
dump
load
sqlglot.expressions.core.Expression
this
expression
expressions
text
is_string
is_number
to_py
is_int
is_star
alias
alias_column_names
name
alias_or_name
output_name
type
is_type
is_leaf
meta
copy
add_comments
pop_comments
append
set
depth
iter_expressions
find
find_all
find_ancestor
parent_select
same_parent
root
walk
dfs
bfs
unnest
unalias
unnest_operands
flatten
to_s
sql
transform
replace
pop
assert_is
error_messages
and_
or_
not_
update_positions
as_
isin
between
is_
like
ilike
eq
neq
rlike
div
asc
desc
args
parent
arg_key
index
comments
def parse( selector: str, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> sqlglot.expressions.core.Expr:
404def parse(selector: str, dialect: DialectType = None) -> exp.Expr:
405    tokens = SelectorDialect().tokenize(selector)
406    i = 0
407
408    def _curr() -> t.Optional[Token]:
409        return seq_get(tokens, i)
410
411    def _prev() -> Token:
412        return tokens[i - 1]
413
414    def _advance(num: int = 1) -> Token:
415        nonlocal i
416        i += num
417        return _prev()
418
419    def _next() -> t.Optional[Token]:
420        return seq_get(tokens, i + 1)
421
422    def _error(msg: str) -> str:
423        return f"{msg} at index {i}: {selector}"
424
425    def _match(token_type: TokenType, raise_unmatched: bool = False) -> t.Optional[Token]:
426        token = _curr()
427        if token and token.token_type == token_type:
428            return _advance()
429        if raise_unmatched:
430            raise ParseError(_error(f"Expected {token_type}"))
431        return None
432
433    def _parse_kind(kind: str) -> bool:
434        token = _curr()
435        next_token = _next()
436
437        if (
438            token
439            and token.token_type == TokenType.VAR
440            and token.text.lower() == kind
441            and next_token
442            and next_token.token_type == TokenType.COLON
443        ):
444            _advance(2)
445            return True
446        return False
447
448    def _parse_var() -> exp.Expr:
449        upstream = _match(TokenType.PLUS)
450        downstream = None
451        tag = _parse_kind("tag")
452        resource_type = False if tag else _parse_kind("resource_type")
453        git = False if resource_type else _parse_kind("git")
454        lstar = "*" if _match(TokenType.STAR) else ""
455        directions = {}
456
457        if _match(TokenType.VAR) or _match(TokenType.NUMBER):
458            name = _prev().text
459            rstar = "*" if _match(TokenType.STAR) else ""
460            downstream = _match(TokenType.PLUS)
461            this: exp.Expr = exp.Var(this=f"{lstar}{name}{rstar}")
462
463        elif _match(TokenType.L_PAREN):
464            this = exp.Paren(this=_parse_conjunction())
465            downstream = _match(TokenType.PLUS)
466            _match(TokenType.R_PAREN, True)
467        elif lstar:
468            this = exp.var("*")
469        else:
470            raise ParseError(_error("Expected model name."))
471
472        if upstream:
473            directions["up"] = True
474        if downstream:
475            directions["down"] = True
476
477        if tag:
478            this = Tag(this=this)
479        if resource_type:
480            this = ResourceType(this=this)
481        if git:
482            this = Git(this=this)
483        if directions:
484            this = Direction(this=this, **directions)
485        return this
486
487    def _parse_unary() -> exp.Expr:
488        if _match(TokenType.CARET):
489            return exp.Not(this=_parse_unary())
490        return _parse_var()
491
492    def _parse_conjunction() -> exp.Expr:
493        this = _parse_unary()
494
495        if _match(TokenType.AMP):
496            this = exp.And(this=this, expression=_parse_unary())
497        if _match(TokenType.PIPE):
498            this = exp.Or(this=this, expression=_parse_conjunction())
499
500        return this
501
502    return _parse_conjunction()