sqlmesh.core.selector
1from __future__ import annotations 2 3import fnmatch 4import logging 5import typing as t 6from collections import defaultdict 7from pathlib import Path 8 9from sqlmesh.core.dialect import normalize_model_name 10from sqlmesh.core.environment import Environment 11from sqlmesh.core.loader import update_model_schemas 12from sqlmesh.core.model import Model 13from sqlmesh.core.state_sync import StateReader 14from sqlmesh.utils import UniqueKeyDict 15from sqlmesh.utils.dag import DAG 16from sqlmesh.utils.git import GitClient 17 18logger = logging.getLogger(__name__) 19 20 21class Selector: 22 def __init__( 23 self, 24 state_reader: StateReader, 25 models: UniqueKeyDict[str, Model], 26 context_path: Path = Path("."), 27 dag: t.Optional[DAG[str]] = None, 28 default_catalog: t.Optional[str] = None, 29 dialect: t.Optional[str] = None, 30 ): 31 self._state_reader = state_reader 32 self._models = models 33 self._context_path = context_path 34 self._default_catalog = default_catalog 35 self._dialect = dialect 36 self._git_client = GitClient(context_path) 37 self.__models_by_tag: t.Optional[t.Dict[str, t.Set[str]]] = None 38 39 if dag is None: 40 self._dag: DAG[str] = DAG() 41 for fqn, model in models.items(): 42 self._dag.add(fqn, model.depends_on) 43 else: 44 self._dag = dag 45 46 def select_models( 47 self, 48 model_selections: t.Iterable[str], 49 target_env_name: str, 50 fallback_env_name: t.Optional[str] = None, 51 ensure_finalized_snapshots: bool = False, 52 ) -> UniqueKeyDict[str, Model]: 53 """Given a set of selections returns models from the current state with names matching the 54 selection while sourcing the remaining models from the target environment. 55 56 Args: 57 model_selections: A set of selections. 58 target_env_name: The name of the target environment. 59 fallback_env_name: The name of the fallback environment that will be used if the target 60 environment doesn't exist. 61 ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized 62 environment state, or to use whatever snapshots are in the current environment state even if 63 the environment is not finalized. 64 65 Returns: 66 A dictionary of models. 67 """ 68 target_env = self._state_reader.get_environment(Environment.normalize_name(target_env_name)) 69 if not target_env and fallback_env_name: 70 target_env = self._state_reader.get_environment( 71 Environment.normalize_name(fallback_env_name) 72 ) 73 74 env_models: t.Dict[str, Model] = {} 75 if target_env: 76 environment_snapshot_infos = ( 77 target_env.snapshots 78 if not ensure_finalized_snapshots 79 else target_env.finalized_or_current_snapshots 80 ) 81 env_models = { 82 s.name: s.model 83 for s in self._state_reader.get_snapshots( 84 environment_snapshot_infos, hydrate_seeds=True 85 ).values() 86 if s.is_model 87 } 88 89 all_selected_models = self.expand_model_selections( 90 model_selections, models={**self._models, **env_models} 91 ) 92 93 dag: DAG[str] = DAG() 94 subdag = set() 95 96 for fqn in all_selected_models: 97 if fqn not in subdag: 98 subdag.add(fqn) 99 subdag.update(self._dag.downstream(fqn)) 100 101 models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 102 103 all_model_fqns = set(self._models) | set(env_models) 104 for fqn in all_model_fqns: 105 model: t.Optional[Model] = None 106 if fqn not in all_selected_models and fqn in env_models: 107 # Unselected modified or added model. 108 model = env_models[fqn] 109 elif fqn in all_selected_models and fqn in self._models: 110 # Selected modified or removed model. 111 model = self._models[fqn] 112 113 if model: 114 # model.copy() can't be used here due to a cached state that can be a part of a model instance. 115 if model.fqn in subdag: 116 model = type(model).parse_obj(model.dict(exclude={"mapping_schema"})) 117 dag.add(model.fqn, model.depends_on) 118 models[model.fqn] = model 119 120 update_model_schemas(dag, models, self._context_path) 121 122 return models 123 124 def expand_model_selections( 125 self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Model]] = None 126 ) -> t.Set[str]: 127 """Expands a set of model selections into a set of model names. 128 129 Args: 130 model_selections: A set of model selections. 131 132 Returns: 133 A set of model names. 134 """ 135 results: t.Set[str] = set() 136 models = models or self._models 137 models_by_tags: t.Optional[t.Dict[str, t.Set[str]]] = None 138 139 for selection in model_selections: 140 sub_results: t.Optional[t.Set[str]] = None 141 142 def add_sub_results(sr: t.Set[str]) -> None: 143 nonlocal sub_results 144 if sub_results is None: 145 sub_results = sr 146 else: 147 sub_results &= sr 148 149 sub_selections = [s.strip() for s in selection.split("&")] 150 for sub_selection in sub_selections: 151 if not sub_selection: 152 continue 153 154 if sub_selection.startswith("tag:"): 155 if models_by_tags is None: 156 models_by_tag = defaultdict(set) 157 for model in models.values(): 158 for tag in model.tags: 159 models_by_tag[tag.lower()].add(model.fqn) 160 add_sub_results( 161 self._expand_model_tag(sub_selection[4:], models, models_by_tag) 162 ) 163 elif sub_selection.startswith("git:"): 164 add_sub_results(self._expand_git(sub_selection[4:])) 165 else: 166 add_sub_results(self._expand_model_name(sub_selection, models)) 167 168 if sub_results: 169 results.update(sub_results) 170 else: 171 logger.warning(f"Expression '{selection}' doesn't match any models.") 172 173 return results 174 175 def _expand_git(self, target_branch: str) -> t.Set[str]: 176 git_modified_files = { 177 *self._git_client.list_untracked_files(), 178 *self._git_client.list_uncommitted_changed_files(), 179 *self._git_client.list_committed_changed_files(target_branch=target_branch), 180 } 181 matched_models = {m.fqn for m in self._models.values() if m._path in git_modified_files} 182 183 if not matched_models: 184 logger.warning(f"Expression 'git:{target_branch}' doesn't match any models.") 185 186 return matched_models 187 188 def _expand_model_name(self, selection: str, models: t.Dict[str, Model]) -> t.Set[str]: 189 results = set() 190 191 ( 192 selection, 193 include_upstream, 194 include_downstream, 195 ) = self._get_value_and_dependency_inclusion(selection.lower()) 196 197 matched_models = set() 198 199 if "*" in selection: 200 for model in models.values(): 201 if fnmatch.fnmatchcase(model.name, selection): 202 matched_models.add(model.fqn) 203 else: 204 model_fqn = normalize_model_name(selection, self._default_catalog, self._dialect) 205 if model_fqn in models: 206 matched_models.add(model_fqn) 207 208 if not matched_models: 209 logger.warning(f"Expression '{selection}' doesn't match any models.") 210 211 for model_fqn in matched_models: 212 results.update( 213 self._get_models(model_fqn, include_upstream, include_downstream, models) 214 ) 215 return results 216 217 def _expand_model_tag( 218 self, tag_selection: str, models: t.Dict[str, Model], models_by_tag: t.Dict[str, t.Set[str]] 219 ) -> t.Set[str]: 220 """ 221 Expands a set of model tags into a set of model names. 222 The tag matching is case-insensitive and supports wildcards and + prefix and suffix to 223 include upstream and downstream models. 224 225 Args: 226 tag_selection: A tag to match models against. 227 228 Returns: 229 A set of model names. 230 """ 231 result = set() 232 matched_tags = set() 233 ( 234 selection, 235 include_upstream, 236 include_downstream, 237 ) = self._get_value_and_dependency_inclusion(tag_selection.lower()) 238 239 if "*" in selection: 240 for model_tag in models_by_tag: 241 if fnmatch.fnmatchcase(model_tag, selection): 242 matched_tags.add(model_tag) 243 elif selection in models_by_tag: 244 matched_tags.add(selection) 245 246 if not matched_tags: 247 logger.warning(f"Expression 'tag:{tag_selection}' doesn't match any models.") 248 249 for tag in matched_tags: 250 for model in models_by_tag[tag]: 251 result.update(self._get_models(model, include_upstream, include_downstream, models)) 252 253 return result 254 255 def _get_models( 256 self, 257 model_name: str, 258 include_upstream: bool, 259 include_downstream: bool, 260 models: t.Dict[str, Model], 261 ) -> t.Set[str]: 262 result = {model_name} 263 if include_upstream: 264 result.update([u for u in self._dag.upstream(model_name) if u in models]) 265 if include_downstream: 266 result.update(self._dag.downstream(model_name)) 267 return result 268 269 @staticmethod 270 def _get_value_and_dependency_inclusion(value: str) -> t.Tuple[str, bool, bool]: 271 include_upstream = False 272 include_downstream = False 273 if value[0] == "+": 274 value = value[1:] 275 include_upstream = True 276 if value[-1] == "+": 277 value = value[:-1] 278 include_downstream = True 279 return value, include_upstream, include_downstream
class
Selector:
22class Selector: 23 def __init__( 24 self, 25 state_reader: StateReader, 26 models: UniqueKeyDict[str, Model], 27 context_path: Path = Path("."), 28 dag: t.Optional[DAG[str]] = None, 29 default_catalog: t.Optional[str] = None, 30 dialect: t.Optional[str] = None, 31 ): 32 self._state_reader = state_reader 33 self._models = models 34 self._context_path = context_path 35 self._default_catalog = default_catalog 36 self._dialect = dialect 37 self._git_client = GitClient(context_path) 38 self.__models_by_tag: t.Optional[t.Dict[str, t.Set[str]]] = None 39 40 if dag is None: 41 self._dag: DAG[str] = DAG() 42 for fqn, model in models.items(): 43 self._dag.add(fqn, model.depends_on) 44 else: 45 self._dag = dag 46 47 def select_models( 48 self, 49 model_selections: t.Iterable[str], 50 target_env_name: str, 51 fallback_env_name: t.Optional[str] = None, 52 ensure_finalized_snapshots: bool = False, 53 ) -> UniqueKeyDict[str, Model]: 54 """Given a set of selections returns models from the current state with names matching the 55 selection while sourcing the remaining models from the target environment. 56 57 Args: 58 model_selections: A set of selections. 59 target_env_name: The name of the target environment. 60 fallback_env_name: The name of the fallback environment that will be used if the target 61 environment doesn't exist. 62 ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized 63 environment state, or to use whatever snapshots are in the current environment state even if 64 the environment is not finalized. 65 66 Returns: 67 A dictionary of models. 68 """ 69 target_env = self._state_reader.get_environment(Environment.normalize_name(target_env_name)) 70 if not target_env and fallback_env_name: 71 target_env = self._state_reader.get_environment( 72 Environment.normalize_name(fallback_env_name) 73 ) 74 75 env_models: t.Dict[str, Model] = {} 76 if target_env: 77 environment_snapshot_infos = ( 78 target_env.snapshots 79 if not ensure_finalized_snapshots 80 else target_env.finalized_or_current_snapshots 81 ) 82 env_models = { 83 s.name: s.model 84 for s in self._state_reader.get_snapshots( 85 environment_snapshot_infos, hydrate_seeds=True 86 ).values() 87 if s.is_model 88 } 89 90 all_selected_models = self.expand_model_selections( 91 model_selections, models={**self._models, **env_models} 92 ) 93 94 dag: DAG[str] = DAG() 95 subdag = set() 96 97 for fqn in all_selected_models: 98 if fqn not in subdag: 99 subdag.add(fqn) 100 subdag.update(self._dag.downstream(fqn)) 101 102 models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 103 104 all_model_fqns = set(self._models) | set(env_models) 105 for fqn in all_model_fqns: 106 model: t.Optional[Model] = None 107 if fqn not in all_selected_models and fqn in env_models: 108 # Unselected modified or added model. 109 model = env_models[fqn] 110 elif fqn in all_selected_models and fqn in self._models: 111 # Selected modified or removed model. 112 model = self._models[fqn] 113 114 if model: 115 # model.copy() can't be used here due to a cached state that can be a part of a model instance. 116 if model.fqn in subdag: 117 model = type(model).parse_obj(model.dict(exclude={"mapping_schema"})) 118 dag.add(model.fqn, model.depends_on) 119 models[model.fqn] = model 120 121 update_model_schemas(dag, models, self._context_path) 122 123 return models 124 125 def expand_model_selections( 126 self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Model]] = None 127 ) -> t.Set[str]: 128 """Expands a set of model selections into a set of model names. 129 130 Args: 131 model_selections: A set of model selections. 132 133 Returns: 134 A set of model names. 135 """ 136 results: t.Set[str] = set() 137 models = models or self._models 138 models_by_tags: t.Optional[t.Dict[str, t.Set[str]]] = None 139 140 for selection in model_selections: 141 sub_results: t.Optional[t.Set[str]] = None 142 143 def add_sub_results(sr: t.Set[str]) -> None: 144 nonlocal sub_results 145 if sub_results is None: 146 sub_results = sr 147 else: 148 sub_results &= sr 149 150 sub_selections = [s.strip() for s in selection.split("&")] 151 for sub_selection in sub_selections: 152 if not sub_selection: 153 continue 154 155 if sub_selection.startswith("tag:"): 156 if models_by_tags is None: 157 models_by_tag = defaultdict(set) 158 for model in models.values(): 159 for tag in model.tags: 160 models_by_tag[tag.lower()].add(model.fqn) 161 add_sub_results( 162 self._expand_model_tag(sub_selection[4:], models, models_by_tag) 163 ) 164 elif sub_selection.startswith("git:"): 165 add_sub_results(self._expand_git(sub_selection[4:])) 166 else: 167 add_sub_results(self._expand_model_name(sub_selection, models)) 168 169 if sub_results: 170 results.update(sub_results) 171 else: 172 logger.warning(f"Expression '{selection}' doesn't match any models.") 173 174 return results 175 176 def _expand_git(self, target_branch: str) -> t.Set[str]: 177 git_modified_files = { 178 *self._git_client.list_untracked_files(), 179 *self._git_client.list_uncommitted_changed_files(), 180 *self._git_client.list_committed_changed_files(target_branch=target_branch), 181 } 182 matched_models = {m.fqn for m in self._models.values() if m._path in git_modified_files} 183 184 if not matched_models: 185 logger.warning(f"Expression 'git:{target_branch}' doesn't match any models.") 186 187 return matched_models 188 189 def _expand_model_name(self, selection: str, models: t.Dict[str, Model]) -> t.Set[str]: 190 results = set() 191 192 ( 193 selection, 194 include_upstream, 195 include_downstream, 196 ) = self._get_value_and_dependency_inclusion(selection.lower()) 197 198 matched_models = set() 199 200 if "*" in selection: 201 for model in models.values(): 202 if fnmatch.fnmatchcase(model.name, selection): 203 matched_models.add(model.fqn) 204 else: 205 model_fqn = normalize_model_name(selection, self._default_catalog, self._dialect) 206 if model_fqn in models: 207 matched_models.add(model_fqn) 208 209 if not matched_models: 210 logger.warning(f"Expression '{selection}' doesn't match any models.") 211 212 for model_fqn in matched_models: 213 results.update( 214 self._get_models(model_fqn, include_upstream, include_downstream, models) 215 ) 216 return results 217 218 def _expand_model_tag( 219 self, tag_selection: str, models: t.Dict[str, Model], models_by_tag: t.Dict[str, t.Set[str]] 220 ) -> t.Set[str]: 221 """ 222 Expands a set of model tags into a set of model names. 223 The tag matching is case-insensitive and supports wildcards and + prefix and suffix to 224 include upstream and downstream models. 225 226 Args: 227 tag_selection: A tag to match models against. 228 229 Returns: 230 A set of model names. 231 """ 232 result = set() 233 matched_tags = set() 234 ( 235 selection, 236 include_upstream, 237 include_downstream, 238 ) = self._get_value_and_dependency_inclusion(tag_selection.lower()) 239 240 if "*" in selection: 241 for model_tag in models_by_tag: 242 if fnmatch.fnmatchcase(model_tag, selection): 243 matched_tags.add(model_tag) 244 elif selection in models_by_tag: 245 matched_tags.add(selection) 246 247 if not matched_tags: 248 logger.warning(f"Expression 'tag:{tag_selection}' doesn't match any models.") 249 250 for tag in matched_tags: 251 for model in models_by_tag[tag]: 252 result.update(self._get_models(model, include_upstream, include_downstream, models)) 253 254 return result 255 256 def _get_models( 257 self, 258 model_name: str, 259 include_upstream: bool, 260 include_downstream: bool, 261 models: t.Dict[str, Model], 262 ) -> t.Set[str]: 263 result = {model_name} 264 if include_upstream: 265 result.update([u for u in self._dag.upstream(model_name) if u in models]) 266 if include_downstream: 267 result.update(self._dag.downstream(model_name)) 268 return result 269 270 @staticmethod 271 def _get_value_and_dependency_inclusion(value: str) -> t.Tuple[str, bool, bool]: 272 include_upstream = False 273 include_downstream = False 274 if value[0] == "+": 275 value = value[1:] 276 include_upstream = True 277 if value[-1] == "+": 278 value = value[:-1] 279 include_downstream = True 280 return value, include_upstream, include_downstream
Selector( state_reader: sqlmesh.core.state_sync.base.StateReader, models: sqlmesh.utils.UniqueKeyDict[str, typing.Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]], context_path: pathlib.Path = PosixPath('.'), dag: Union[sqlmesh.utils.dag.DAG[str], NoneType] = None, default_catalog: Union[str, NoneType] = None, dialect: Union[str, NoneType] = None)
23 def __init__( 24 self, 25 state_reader: StateReader, 26 models: UniqueKeyDict[str, Model], 27 context_path: Path = Path("."), 28 dag: t.Optional[DAG[str]] = None, 29 default_catalog: t.Optional[str] = None, 30 dialect: t.Optional[str] = None, 31 ): 32 self._state_reader = state_reader 33 self._models = models 34 self._context_path = context_path 35 self._default_catalog = default_catalog 36 self._dialect = dialect 37 self._git_client = GitClient(context_path) 38 self.__models_by_tag: t.Optional[t.Dict[str, t.Set[str]]] = None 39 40 if dag is None: 41 self._dag: DAG[str] = DAG() 42 for fqn, model in models.items(): 43 self._dag.add(fqn, model.depends_on) 44 else: 45 self._dag = dag
def
select_models( self, model_selections: Iterable[str], target_env_name: str, fallback_env_name: Union[str, NoneType] = None, ensure_finalized_snapshots: bool = False) -> sqlmesh.utils.UniqueKeyDict[str, typing.Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]]:
47 def select_models( 48 self, 49 model_selections: t.Iterable[str], 50 target_env_name: str, 51 fallback_env_name: t.Optional[str] = None, 52 ensure_finalized_snapshots: bool = False, 53 ) -> UniqueKeyDict[str, Model]: 54 """Given a set of selections returns models from the current state with names matching the 55 selection while sourcing the remaining models from the target environment. 56 57 Args: 58 model_selections: A set of selections. 59 target_env_name: The name of the target environment. 60 fallback_env_name: The name of the fallback environment that will be used if the target 61 environment doesn't exist. 62 ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized 63 environment state, or to use whatever snapshots are in the current environment state even if 64 the environment is not finalized. 65 66 Returns: 67 A dictionary of models. 68 """ 69 target_env = self._state_reader.get_environment(Environment.normalize_name(target_env_name)) 70 if not target_env and fallback_env_name: 71 target_env = self._state_reader.get_environment( 72 Environment.normalize_name(fallback_env_name) 73 ) 74 75 env_models: t.Dict[str, Model] = {} 76 if target_env: 77 environment_snapshot_infos = ( 78 target_env.snapshots 79 if not ensure_finalized_snapshots 80 else target_env.finalized_or_current_snapshots 81 ) 82 env_models = { 83 s.name: s.model 84 for s in self._state_reader.get_snapshots( 85 environment_snapshot_infos, hydrate_seeds=True 86 ).values() 87 if s.is_model 88 } 89 90 all_selected_models = self.expand_model_selections( 91 model_selections, models={**self._models, **env_models} 92 ) 93 94 dag: DAG[str] = DAG() 95 subdag = set() 96 97 for fqn in all_selected_models: 98 if fqn not in subdag: 99 subdag.add(fqn) 100 subdag.update(self._dag.downstream(fqn)) 101 102 models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 103 104 all_model_fqns = set(self._models) | set(env_models) 105 for fqn in all_model_fqns: 106 model: t.Optional[Model] = None 107 if fqn not in all_selected_models and fqn in env_models: 108 # Unselected modified or added model. 109 model = env_models[fqn] 110 elif fqn in all_selected_models and fqn in self._models: 111 # Selected modified or removed model. 112 model = self._models[fqn] 113 114 if model: 115 # model.copy() can't be used here due to a cached state that can be a part of a model instance. 116 if model.fqn in subdag: 117 model = type(model).parse_obj(model.dict(exclude={"mapping_schema"})) 118 dag.add(model.fqn, model.depends_on) 119 models[model.fqn] = model 120 121 update_model_schemas(dag, models, self._context_path) 122 123 return models
Given a set of selections returns models from the current state with names matching the selection while sourcing the remaining models from the target environment.
Arguments:
- model_selections: A set of selections.
- target_env_name: The name of the target environment.
- fallback_env_name: The name of the fallback environment that will be used if the target environment doesn't exist.
- ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
Returns:
A dictionary of models.
def
expand_model_selections( self, model_selections: Iterable[str], models: Union[Dict[str, Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]], NoneType] = None) -> Set[str]:
125 def expand_model_selections( 126 self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Model]] = None 127 ) -> t.Set[str]: 128 """Expands a set of model selections into a set of model names. 129 130 Args: 131 model_selections: A set of model selections. 132 133 Returns: 134 A set of model names. 135 """ 136 results: t.Set[str] = set() 137 models = models or self._models 138 models_by_tags: t.Optional[t.Dict[str, t.Set[str]]] = None 139 140 for selection in model_selections: 141 sub_results: t.Optional[t.Set[str]] = None 142 143 def add_sub_results(sr: t.Set[str]) -> None: 144 nonlocal sub_results 145 if sub_results is None: 146 sub_results = sr 147 else: 148 sub_results &= sr 149 150 sub_selections = [s.strip() for s in selection.split("&")] 151 for sub_selection in sub_selections: 152 if not sub_selection: 153 continue 154 155 if sub_selection.startswith("tag:"): 156 if models_by_tags is None: 157 models_by_tag = defaultdict(set) 158 for model in models.values(): 159 for tag in model.tags: 160 models_by_tag[tag.lower()].add(model.fqn) 161 add_sub_results( 162 self._expand_model_tag(sub_selection[4:], models, models_by_tag) 163 ) 164 elif sub_selection.startswith("git:"): 165 add_sub_results(self._expand_git(sub_selection[4:])) 166 else: 167 add_sub_results(self._expand_model_name(sub_selection, models)) 168 169 if sub_results: 170 results.update(sub_results) 171 else: 172 logger.warning(f"Expression '{selection}' doesn't match any models.") 173 174 return results
Expands a set of model selections into a set of model names.
Arguments:
- model_selections: A set of model selections.
Returns:
A set of model names.