sqlmesh.core.environment
1from __future__ import annotations 2 3import json 4import re 5import typing as t 6 7from pydantic import Field 8 9from sqlmesh.core import constants as c 10from sqlmesh.core.config import EnvironmentSuffixTarget 11from sqlmesh.core.engine_adapter.base import EngineAdapter 12from sqlmesh.core.macros import RuntimeStage 13from sqlmesh.core.renderer import render_statements 14from sqlmesh.core.snapshot import SnapshotId, SnapshotTableInfo, Snapshot 15from sqlmesh.utils import word_characters_only 16from sqlmesh.utils.date import TimeLike, now_timestamp 17from sqlmesh.utils.errors import SQLMeshError 18from sqlmesh.utils.jinja import JinjaMacroRegistry 19from sqlmesh.utils.metaprogramming import Executable 20from sqlmesh.utils.pydantic import PydanticModel, field_validator, ValidationInfo 21 22T = t.TypeVar("T", bound="EnvironmentNamingInfo") 23PydanticType = t.TypeVar("PydanticType", bound="PydanticModel") 24 25 26class EnvironmentNamingInfo(PydanticModel): 27 """ 28 Information required for creating an object within an environment 29 30 Args: 31 name: The name of the environment. 32 suffix_target: Indicates whether to append the environment name to the schema or table name. 33 catalog_name_override: The name of the catalog to use for this environment if an override was provided 34 normalize_name: Indicates whether the environment's name will be normalized. For example, if it's 35 `dev`, then it will become `DEV` when targeting Snowflake. 36 gateway_managed: Determines whether the virtual layer's views are created by the model-specific 37 gateways, otherwise the default gateway is used. Default: False. 38 """ 39 40 name: str = c.PROD 41 suffix_target: EnvironmentSuffixTarget = Field(default=EnvironmentSuffixTarget.SCHEMA) 42 catalog_name_override: t.Optional[str] = None 43 normalize_name: bool = True 44 gateway_managed: bool = False 45 46 @property 47 def is_dev(self) -> bool: 48 return self.name.lower() != c.PROD 49 50 @field_validator("name", mode="before") 51 @classmethod 52 def _sanitize_name(cls, v: str) -> str: 53 return word_characters_only(v).lower() 54 55 @field_validator("normalize_name", "gateway_managed", mode="before") 56 @classmethod 57 def _validate_boolean_field(cls, v: t.Any, info: ValidationInfo) -> bool: 58 if v is None: 59 return info.field_name == "normalize_name" 60 return bool(v) 61 62 @t.overload 63 @classmethod 64 def sanitize_name(cls, v: str) -> str: ... 65 66 @t.overload 67 @classmethod 68 def sanitize_name(cls, v: Environment) -> Environment: ... 69 70 @classmethod 71 def sanitize_name(cls, v: str | Environment) -> str | Environment: 72 """ 73 Sanitizes the environment name so we create names that are valid names for database objects. 74 This means alphanumeric and underscores only. Invalid characters are replaced with underscores. 75 """ 76 if isinstance(v, Environment): 77 return v 78 if not isinstance(v, str): 79 raise TypeError(f"Expected str or Environment, got {type(v).__name__}") 80 return cls._sanitize_name(v) 81 82 @classmethod 83 def sanitize_names(cls, values: t.Iterable[str]) -> t.Set[str]: 84 return {cls.sanitize_name(value) for value in values} 85 86 @classmethod 87 def from_environment_catalog_mapping( 88 cls: t.Type[T], 89 environment_catalog_mapping: t.Dict[re.Pattern, str], 90 name: str = c.PROD, 91 **kwargs: t.Any, 92 ) -> T: 93 construction_kwargs = dict(name=name, **kwargs) 94 for re_pattern, catalog_name in environment_catalog_mapping.items(): 95 if re.match(re_pattern, name): 96 return cls( 97 catalog_name_override=catalog_name, 98 **construction_kwargs, 99 ) 100 return cls(**construction_kwargs) 101 102 103class EnvironmentSummary(PydanticModel): 104 """Represents summary information of an isolated environment. 105 106 Args: 107 name: The name of the environment. 108 start_at: The start time of the environment. 109 end_at: The end time of the environment. 110 plan_id: The ID of the plan that last updated this environment. 111 previous_plan_id: The ID of the previous plan that updated this environment. 112 expiration_ts: The timestamp when this environment will expire. 113 finalized_ts: The timestamp when this environment was finalized. 114 """ 115 116 name: str 117 start_at: TimeLike 118 end_at: t.Optional[TimeLike] = None 119 plan_id: str 120 previous_plan_id: t.Optional[str] = None 121 expiration_ts: t.Optional[int] = None 122 finalized_ts: t.Optional[int] = None 123 124 @property 125 def expired(self) -> bool: 126 return self.expiration_ts is not None and self.expiration_ts <= now_timestamp() 127 128 129class Environment(EnvironmentNamingInfo, EnvironmentSummary): 130 """Represents an isolated environment. 131 132 Environments are isolated workspaces that hold pointers to physical tables. 133 134 Args: 135 snapshots: The snapshots that are part of this environment. 136 promoted_snapshot_ids: The IDs of the snapshots that are promoted in this environment 137 (i.e. for which the views are created). If not specified, all snapshots are promoted. 138 previous_finalized_snapshots: Snapshots that were part of this environment last time it was finalized. 139 requirements: A mapping of library versions for all the snapshots in this environment. 140 """ 141 142 snapshots_: t.List[t.Any] = Field(alias="snapshots") 143 promoted_snapshot_ids_: t.Optional[t.List[t.Any]] = Field( 144 default=None, alias="promoted_snapshot_ids" 145 ) 146 previous_finalized_snapshots_: t.Optional[t.List[t.Any]] = Field( 147 default=None, alias="previous_finalized_snapshots" 148 ) 149 requirements: t.Dict[str, str] = {} 150 151 @field_validator("snapshots_", "previous_finalized_snapshots_", mode="before") 152 @classmethod 153 def _load_snapshots(cls, v: str | t.List[t.Any] | None) -> t.List[t.Any] | None: 154 if isinstance(v, str): 155 return json.loads(v) 156 if v and not isinstance(next(iter(v)), (dict, SnapshotTableInfo)): 157 raise ValueError("Must be a list of SnapshotTableInfo dicts or objects") 158 return v 159 160 @field_validator("promoted_snapshot_ids_", mode="before") 161 @classmethod 162 def _load_snapshot_ids(cls, v: str | t.List[t.Any] | None) -> t.List[t.Any] | None: 163 if isinstance(v, str): 164 return json.loads(v) 165 if v and not isinstance(next(iter(v)), (dict, SnapshotId)): 166 raise ValueError("Must be a list of SnapshotId dicts or objects") 167 return v 168 169 @field_validator("requirements", mode="before") 170 def _load_requirements(cls, v: t.Any) -> t.Any: 171 if isinstance(v, str): 172 v = json.loads(v) 173 return v or {} 174 175 @property 176 def snapshots(self) -> t.List[SnapshotTableInfo]: 177 return self._convert_list_to_models_and_store("snapshots_", SnapshotTableInfo) or [] 178 179 def snapshot_dicts(self) -> t.List[dict]: 180 return self._convert_list_to_dicts(self.snapshots_) 181 182 @property 183 def promoted_snapshot_ids(self) -> t.Optional[t.List[SnapshotId]]: 184 return self._convert_list_to_models_and_store("promoted_snapshot_ids_", SnapshotId) 185 186 def promoted_snapshot_id_dicts(self) -> t.List[dict]: 187 return self._convert_list_to_dicts(self.promoted_snapshot_ids_) 188 189 @property 190 def promoted_snapshots(self) -> t.List[SnapshotTableInfo]: 191 if self.promoted_snapshot_ids is None: 192 return self.snapshots 193 194 promoted_snapshot_ids = set(self.promoted_snapshot_ids) 195 return [s for s in self.snapshots if s.snapshot_id in promoted_snapshot_ids] 196 197 @property 198 def previous_finalized_snapshots(self) -> t.Optional[t.List[SnapshotTableInfo]]: 199 return self._convert_list_to_models_and_store( 200 "previous_finalized_snapshots_", SnapshotTableInfo 201 ) 202 203 def previous_finalized_snapshot_dicts(self) -> t.List[dict]: 204 return self._convert_list_to_dicts(self.previous_finalized_snapshots_) 205 206 @property 207 def finalized_or_current_snapshots(self) -> t.List[SnapshotTableInfo]: 208 return ( 209 self.snapshots 210 if self.finalized_ts 211 else self.previous_finalized_snapshots or self.snapshots 212 ) 213 214 @property 215 def naming_info(self) -> EnvironmentNamingInfo: 216 return EnvironmentNamingInfo( 217 name=self.name, 218 suffix_target=self.suffix_target, 219 catalog_name_override=self.catalog_name_override, 220 normalize_name=self.normalize_name, 221 gateway_managed=self.gateway_managed, 222 ) 223 224 @property 225 def summary(self) -> EnvironmentSummary: 226 return EnvironmentSummary( 227 name=self.name, 228 start_at=self.start_at, 229 end_at=self.end_at, 230 plan_id=self.plan_id, 231 previous_plan_id=self.previous_plan_id, 232 expiration_ts=self.expiration_ts, 233 finalized_ts=self.finalized_ts, 234 ) 235 236 def can_partially_promote(self, existing_environment: Environment) -> bool: 237 """Returns True if the existing environment can be partially promoted to the current environment. 238 239 Partial promotion means that we don't need to re-create views for snapshots that are already promoted in the 240 target environment. 241 """ 242 return ( 243 bool(existing_environment.finalized_ts) 244 and not existing_environment.expired 245 and existing_environment.gateway_managed == self.gateway_managed 246 and existing_environment.name == c.PROD 247 ) 248 249 def _convert_list_to_models_and_store( 250 self, field: str, type_: t.Type[PydanticType] 251 ) -> t.Optional[t.List[PydanticType]]: 252 value = getattr(self, field) 253 if value and not isinstance(value[0], type_): 254 value = [type_.parse_obj(obj) for obj in value] 255 setattr(self, field, value) 256 return value 257 258 def _convert_list_to_dicts(self, value: t.Optional[t.List[t.Any]]) -> t.List[dict]: 259 if not value: 260 return [] 261 return value if isinstance(value[0], dict) else [v.dict() for v in value] 262 263 264class EnvironmentStatements(PydanticModel): 265 before_all: t.List[str] 266 after_all: t.List[str] 267 python_env: t.Dict[str, Executable] 268 jinja_macros: t.Optional[JinjaMacroRegistry] = None 269 project: t.Optional[str] = None 270 271 def render_before_all( 272 self, 273 dialect: str, 274 default_catalog: t.Optional[str] = None, 275 **render_kwargs: t.Any, 276 ) -> t.List[str]: 277 return self.render(RuntimeStage.BEFORE_ALL, dialect, default_catalog, **render_kwargs) 278 279 def render_after_all( 280 self, 281 dialect: str, 282 default_catalog: t.Optional[str] = None, 283 **render_kwargs: t.Any, 284 ) -> t.List[str]: 285 return self.render(RuntimeStage.AFTER_ALL, dialect, default_catalog, **render_kwargs) 286 287 def render( 288 self, 289 runtime_stage: RuntimeStage, 290 dialect: str, 291 default_catalog: t.Optional[str] = None, 292 **render_kwargs: t.Any, 293 ) -> t.List[str]: 294 return render_statements( 295 statements=getattr(self, runtime_stage.value), 296 dialect=dialect, 297 default_catalog=default_catalog, 298 python_env=self.python_env, 299 jinja_macros=self.jinja_macros, 300 runtime_stage=runtime_stage, 301 **render_kwargs, 302 ) 303 304 305def execute_environment_statements( 306 adapter: EngineAdapter, 307 environment_statements: t.List[EnvironmentStatements], 308 runtime_stage: RuntimeStage, 309 environment_naming_info: EnvironmentNamingInfo, 310 default_catalog: t.Optional[str] = None, 311 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 312 start: t.Optional[TimeLike] = None, 313 end: t.Optional[TimeLike] = None, 314 execution_time: t.Optional[TimeLike] = None, 315 selected_models: t.Optional[t.Set[str]] = None, 316) -> None: 317 try: 318 rendered_expressions = [ 319 expr 320 for statements in environment_statements 321 for expr in statements.render( 322 runtime_stage=runtime_stage, 323 dialect=adapter.dialect, 324 default_catalog=default_catalog, 325 snapshots=snapshots, 326 start=start, 327 end=end, 328 execution_time=execution_time, 329 environment_naming_info=environment_naming_info, 330 engine_adapter=adapter, 331 selected_models=selected_models, 332 ) 333 ] 334 except Exception as e: 335 raise SQLMeshError( 336 f"An error occurred during rendering of the '{runtime_stage.value}' statements:\n\n{e}" 337 ) 338 if rendered_expressions: 339 with adapter.transaction(): 340 for expr in rendered_expressions: 341 try: 342 adapter.execute(expr) 343 except Exception as e: 344 raise SQLMeshError( 345 f"An error occurred during execution of the following '{runtime_stage.value}' statement:\n\n{expr}\n\n{e}" 346 )
27class EnvironmentNamingInfo(PydanticModel): 28 """ 29 Information required for creating an object within an environment 30 31 Args: 32 name: The name of the environment. 33 suffix_target: Indicates whether to append the environment name to the schema or table name. 34 catalog_name_override: The name of the catalog to use for this environment if an override was provided 35 normalize_name: Indicates whether the environment's name will be normalized. For example, if it's 36 `dev`, then it will become `DEV` when targeting Snowflake. 37 gateway_managed: Determines whether the virtual layer's views are created by the model-specific 38 gateways, otherwise the default gateway is used. Default: False. 39 """ 40 41 name: str = c.PROD 42 suffix_target: EnvironmentSuffixTarget = Field(default=EnvironmentSuffixTarget.SCHEMA) 43 catalog_name_override: t.Optional[str] = None 44 normalize_name: bool = True 45 gateway_managed: bool = False 46 47 @property 48 def is_dev(self) -> bool: 49 return self.name.lower() != c.PROD 50 51 @field_validator("name", mode="before") 52 @classmethod 53 def _sanitize_name(cls, v: str) -> str: 54 return word_characters_only(v).lower() 55 56 @field_validator("normalize_name", "gateway_managed", mode="before") 57 @classmethod 58 def _validate_boolean_field(cls, v: t.Any, info: ValidationInfo) -> bool: 59 if v is None: 60 return info.field_name == "normalize_name" 61 return bool(v) 62 63 @t.overload 64 @classmethod 65 def sanitize_name(cls, v: str) -> str: ... 66 67 @t.overload 68 @classmethod 69 def sanitize_name(cls, v: Environment) -> Environment: ... 70 71 @classmethod 72 def sanitize_name(cls, v: str | Environment) -> str | Environment: 73 """ 74 Sanitizes the environment name so we create names that are valid names for database objects. 75 This means alphanumeric and underscores only. Invalid characters are replaced with underscores. 76 """ 77 if isinstance(v, Environment): 78 return v 79 if not isinstance(v, str): 80 raise TypeError(f"Expected str or Environment, got {type(v).__name__}") 81 return cls._sanitize_name(v) 82 83 @classmethod 84 def sanitize_names(cls, values: t.Iterable[str]) -> t.Set[str]: 85 return {cls.sanitize_name(value) for value in values} 86 87 @classmethod 88 def from_environment_catalog_mapping( 89 cls: t.Type[T], 90 environment_catalog_mapping: t.Dict[re.Pattern, str], 91 name: str = c.PROD, 92 **kwargs: t.Any, 93 ) -> T: 94 construction_kwargs = dict(name=name, **kwargs) 95 for re_pattern, catalog_name in environment_catalog_mapping.items(): 96 if re.match(re_pattern, name): 97 return cls( 98 catalog_name_override=catalog_name, 99 **construction_kwargs, 100 ) 101 return cls(**construction_kwargs)
Information required for creating an object within an environment
Arguments:
- name: The name of the environment.
- suffix_target: Indicates whether to append the environment name to the schema or table name.
- catalog_name_override: The name of the catalog to use for this environment if an override was provided
- normalize_name: Indicates whether the environment's name will be normalized. For example, if it's
dev, then it will becomeDEVwhen targeting Snowflake. - gateway_managed: Determines whether the virtual layer's views are created by the model-specific gateways, otherwise the default gateway is used. Default: False.
71 @classmethod 72 def sanitize_name(cls, v: str | Environment) -> str | Environment: 73 """ 74 Sanitizes the environment name so we create names that are valid names for database objects. 75 This means alphanumeric and underscores only. Invalid characters are replaced with underscores. 76 """ 77 if isinstance(v, Environment): 78 return v 79 if not isinstance(v, str): 80 raise TypeError(f"Expected str or Environment, got {type(v).__name__}") 81 return cls._sanitize_name(v)
Sanitizes the environment name so we create names that are valid names for database objects. This means alphanumeric and underscores only. Invalid characters are replaced with underscores.
87 @classmethod 88 def from_environment_catalog_mapping( 89 cls: t.Type[T], 90 environment_catalog_mapping: t.Dict[re.Pattern, str], 91 name: str = c.PROD, 92 **kwargs: t.Any, 93 ) -> T: 94 construction_kwargs = dict(name=name, **kwargs) 95 for re_pattern, catalog_name in environment_catalog_mapping.items(): 96 if re.match(re_pattern, name): 97 return cls( 98 catalog_name_override=catalog_name, 99 **construction_kwargs, 100 ) 101 return cls(**construction_kwargs)
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
104class EnvironmentSummary(PydanticModel): 105 """Represents summary information of an isolated environment. 106 107 Args: 108 name: The name of the environment. 109 start_at: The start time of the environment. 110 end_at: The end time of the environment. 111 plan_id: The ID of the plan that last updated this environment. 112 previous_plan_id: The ID of the previous plan that updated this environment. 113 expiration_ts: The timestamp when this environment will expire. 114 finalized_ts: The timestamp when this environment was finalized. 115 """ 116 117 name: str 118 start_at: TimeLike 119 end_at: t.Optional[TimeLike] = None 120 plan_id: str 121 previous_plan_id: t.Optional[str] = None 122 expiration_ts: t.Optional[int] = None 123 finalized_ts: t.Optional[int] = None 124 125 @property 126 def expired(self) -> bool: 127 return self.expiration_ts is not None and self.expiration_ts <= now_timestamp()
Represents summary information of an isolated environment.
Arguments:
- name: The name of the environment.
- start_at: The start time of the environment.
- end_at: The end time of the environment.
- plan_id: The ID of the plan that last updated this environment.
- previous_plan_id: The ID of the previous plan that updated this environment.
- expiration_ts: The timestamp when this environment will expire.
- finalized_ts: The timestamp when this environment was finalized.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
130class Environment(EnvironmentNamingInfo, EnvironmentSummary): 131 """Represents an isolated environment. 132 133 Environments are isolated workspaces that hold pointers to physical tables. 134 135 Args: 136 snapshots: The snapshots that are part of this environment. 137 promoted_snapshot_ids: The IDs of the snapshots that are promoted in this environment 138 (i.e. for which the views are created). If not specified, all snapshots are promoted. 139 previous_finalized_snapshots: Snapshots that were part of this environment last time it was finalized. 140 requirements: A mapping of library versions for all the snapshots in this environment. 141 """ 142 143 snapshots_: t.List[t.Any] = Field(alias="snapshots") 144 promoted_snapshot_ids_: t.Optional[t.List[t.Any]] = Field( 145 default=None, alias="promoted_snapshot_ids" 146 ) 147 previous_finalized_snapshots_: t.Optional[t.List[t.Any]] = Field( 148 default=None, alias="previous_finalized_snapshots" 149 ) 150 requirements: t.Dict[str, str] = {} 151 152 @field_validator("snapshots_", "previous_finalized_snapshots_", mode="before") 153 @classmethod 154 def _load_snapshots(cls, v: str | t.List[t.Any] | None) -> t.List[t.Any] | None: 155 if isinstance(v, str): 156 return json.loads(v) 157 if v and not isinstance(next(iter(v)), (dict, SnapshotTableInfo)): 158 raise ValueError("Must be a list of SnapshotTableInfo dicts or objects") 159 return v 160 161 @field_validator("promoted_snapshot_ids_", mode="before") 162 @classmethod 163 def _load_snapshot_ids(cls, v: str | t.List[t.Any] | None) -> t.List[t.Any] | None: 164 if isinstance(v, str): 165 return json.loads(v) 166 if v and not isinstance(next(iter(v)), (dict, SnapshotId)): 167 raise ValueError("Must be a list of SnapshotId dicts or objects") 168 return v 169 170 @field_validator("requirements", mode="before") 171 def _load_requirements(cls, v: t.Any) -> t.Any: 172 if isinstance(v, str): 173 v = json.loads(v) 174 return v or {} 175 176 @property 177 def snapshots(self) -> t.List[SnapshotTableInfo]: 178 return self._convert_list_to_models_and_store("snapshots_", SnapshotTableInfo) or [] 179 180 def snapshot_dicts(self) -> t.List[dict]: 181 return self._convert_list_to_dicts(self.snapshots_) 182 183 @property 184 def promoted_snapshot_ids(self) -> t.Optional[t.List[SnapshotId]]: 185 return self._convert_list_to_models_and_store("promoted_snapshot_ids_", SnapshotId) 186 187 def promoted_snapshot_id_dicts(self) -> t.List[dict]: 188 return self._convert_list_to_dicts(self.promoted_snapshot_ids_) 189 190 @property 191 def promoted_snapshots(self) -> t.List[SnapshotTableInfo]: 192 if self.promoted_snapshot_ids is None: 193 return self.snapshots 194 195 promoted_snapshot_ids = set(self.promoted_snapshot_ids) 196 return [s for s in self.snapshots if s.snapshot_id in promoted_snapshot_ids] 197 198 @property 199 def previous_finalized_snapshots(self) -> t.Optional[t.List[SnapshotTableInfo]]: 200 return self._convert_list_to_models_and_store( 201 "previous_finalized_snapshots_", SnapshotTableInfo 202 ) 203 204 def previous_finalized_snapshot_dicts(self) -> t.List[dict]: 205 return self._convert_list_to_dicts(self.previous_finalized_snapshots_) 206 207 @property 208 def finalized_or_current_snapshots(self) -> t.List[SnapshotTableInfo]: 209 return ( 210 self.snapshots 211 if self.finalized_ts 212 else self.previous_finalized_snapshots or self.snapshots 213 ) 214 215 @property 216 def naming_info(self) -> EnvironmentNamingInfo: 217 return EnvironmentNamingInfo( 218 name=self.name, 219 suffix_target=self.suffix_target, 220 catalog_name_override=self.catalog_name_override, 221 normalize_name=self.normalize_name, 222 gateway_managed=self.gateway_managed, 223 ) 224 225 @property 226 def summary(self) -> EnvironmentSummary: 227 return EnvironmentSummary( 228 name=self.name, 229 start_at=self.start_at, 230 end_at=self.end_at, 231 plan_id=self.plan_id, 232 previous_plan_id=self.previous_plan_id, 233 expiration_ts=self.expiration_ts, 234 finalized_ts=self.finalized_ts, 235 ) 236 237 def can_partially_promote(self, existing_environment: Environment) -> bool: 238 """Returns True if the existing environment can be partially promoted to the current environment. 239 240 Partial promotion means that we don't need to re-create views for snapshots that are already promoted in the 241 target environment. 242 """ 243 return ( 244 bool(existing_environment.finalized_ts) 245 and not existing_environment.expired 246 and existing_environment.gateway_managed == self.gateway_managed 247 and existing_environment.name == c.PROD 248 ) 249 250 def _convert_list_to_models_and_store( 251 self, field: str, type_: t.Type[PydanticType] 252 ) -> t.Optional[t.List[PydanticType]]: 253 value = getattr(self, field) 254 if value and not isinstance(value[0], type_): 255 value = [type_.parse_obj(obj) for obj in value] 256 setattr(self, field, value) 257 return value 258 259 def _convert_list_to_dicts(self, value: t.Optional[t.List[t.Any]]) -> t.List[dict]: 260 if not value: 261 return [] 262 return value if isinstance(value[0], dict) else [v.dict() for v in value]
Represents an isolated environment.
Environments are isolated workspaces that hold pointers to physical tables.
Arguments:
- snapshots: The snapshots that are part of this environment.
- promoted_snapshot_ids: The IDs of the snapshots that are promoted in this environment (i.e. for which the views are created). If not specified, all snapshots are promoted.
- previous_finalized_snapshots: Snapshots that were part of this environment last time it was finalized.
- requirements: A mapping of library versions for all the snapshots in this environment.
225 @property 226 def summary(self) -> EnvironmentSummary: 227 return EnvironmentSummary( 228 name=self.name, 229 start_at=self.start_at, 230 end_at=self.end_at, 231 plan_id=self.plan_id, 232 previous_plan_id=self.previous_plan_id, 233 expiration_ts=self.expiration_ts, 234 finalized_ts=self.finalized_ts, 235 )
237 def can_partially_promote(self, existing_environment: Environment) -> bool: 238 """Returns True if the existing environment can be partially promoted to the current environment. 239 240 Partial promotion means that we don't need to re-create views for snapshots that are already promoted in the 241 target environment. 242 """ 243 return ( 244 bool(existing_environment.finalized_ts) 245 and not existing_environment.expired 246 and existing_environment.gateway_managed == self.gateway_managed 247 and existing_environment.name == c.PROD 248 )
Returns True if the existing environment can be partially promoted to the current environment.
Partial promotion means that we don't need to re-create views for snapshots that are already promoted in the target environment.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
265class EnvironmentStatements(PydanticModel): 266 before_all: t.List[str] 267 after_all: t.List[str] 268 python_env: t.Dict[str, Executable] 269 jinja_macros: t.Optional[JinjaMacroRegistry] = None 270 project: t.Optional[str] = None 271 272 def render_before_all( 273 self, 274 dialect: str, 275 default_catalog: t.Optional[str] = None, 276 **render_kwargs: t.Any, 277 ) -> t.List[str]: 278 return self.render(RuntimeStage.BEFORE_ALL, dialect, default_catalog, **render_kwargs) 279 280 def render_after_all( 281 self, 282 dialect: str, 283 default_catalog: t.Optional[str] = None, 284 **render_kwargs: t.Any, 285 ) -> t.List[str]: 286 return self.render(RuntimeStage.AFTER_ALL, dialect, default_catalog, **render_kwargs) 287 288 def render( 289 self, 290 runtime_stage: RuntimeStage, 291 dialect: str, 292 default_catalog: t.Optional[str] = None, 293 **render_kwargs: t.Any, 294 ) -> t.List[str]: 295 return render_statements( 296 statements=getattr(self, runtime_stage.value), 297 dialect=dialect, 298 default_catalog=default_catalog, 299 python_env=self.python_env, 300 jinja_macros=self.jinja_macros, 301 runtime_stage=runtime_stage, 302 **render_kwargs, 303 )
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
288 def render( 289 self, 290 runtime_stage: RuntimeStage, 291 dialect: str, 292 default_catalog: t.Optional[str] = None, 293 **render_kwargs: t.Any, 294 ) -> t.List[str]: 295 return render_statements( 296 statements=getattr(self, runtime_stage.value), 297 dialect=dialect, 298 default_catalog=default_catalog, 299 python_env=self.python_env, 300 jinja_macros=self.jinja_macros, 301 runtime_stage=runtime_stage, 302 **render_kwargs, 303 )
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
306def execute_environment_statements( 307 adapter: EngineAdapter, 308 environment_statements: t.List[EnvironmentStatements], 309 runtime_stage: RuntimeStage, 310 environment_naming_info: EnvironmentNamingInfo, 311 default_catalog: t.Optional[str] = None, 312 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 313 start: t.Optional[TimeLike] = None, 314 end: t.Optional[TimeLike] = None, 315 execution_time: t.Optional[TimeLike] = None, 316 selected_models: t.Optional[t.Set[str]] = None, 317) -> None: 318 try: 319 rendered_expressions = [ 320 expr 321 for statements in environment_statements 322 for expr in statements.render( 323 runtime_stage=runtime_stage, 324 dialect=adapter.dialect, 325 default_catalog=default_catalog, 326 snapshots=snapshots, 327 start=start, 328 end=end, 329 execution_time=execution_time, 330 environment_naming_info=environment_naming_info, 331 engine_adapter=adapter, 332 selected_models=selected_models, 333 ) 334 ] 335 except Exception as e: 336 raise SQLMeshError( 337 f"An error occurred during rendering of the '{runtime_stage.value}' statements:\n\n{e}" 338 ) 339 if rendered_expressions: 340 with adapter.transaction(): 341 for expr in rendered_expressions: 342 try: 343 adapter.execute(expr) 344 except Exception as e: 345 raise SQLMeshError( 346 f"An error occurred during execution of the following '{runtime_stage.value}' statement:\n\n{expr}\n\n{e}" 347 )