Edit on GitHub

sqlmesh.core.environment

  1from __future__ import annotations
  2
  3import json
  4import re
  5import typing as t
  6
  7from pydantic import Field
  8
  9from sqlmesh.core import constants as c
 10from sqlmesh.core.config import EnvironmentSuffixTarget
 11from sqlmesh.core.engine_adapter.base import EngineAdapter
 12from sqlmesh.core.macros import RuntimeStage
 13from sqlmesh.core.renderer import render_statements
 14from sqlmesh.core.snapshot import SnapshotId, SnapshotTableInfo, Snapshot
 15from sqlmesh.utils import word_characters_only
 16from sqlmesh.utils.date import TimeLike, now_timestamp
 17from sqlmesh.utils.errors import SQLMeshError
 18from sqlmesh.utils.jinja import JinjaMacroRegistry
 19from sqlmesh.utils.metaprogramming import Executable
 20from sqlmesh.utils.pydantic import PydanticModel, field_validator, ValidationInfo
 21
 22T = t.TypeVar("T", bound="EnvironmentNamingInfo")
 23PydanticType = t.TypeVar("PydanticType", bound="PydanticModel")
 24
 25
 26class EnvironmentNamingInfo(PydanticModel):
 27    """
 28    Information required for creating an object within an environment
 29
 30    Args:
 31        name: The name of the environment.
 32        suffix_target: Indicates whether to append the environment name to the schema or table name.
 33        catalog_name_override: The name of the catalog to use for this environment if an override was provided
 34        normalize_name: Indicates whether the environment's name will be normalized. For example, if it's
 35            `dev`, then it will become `DEV` when targeting Snowflake.
 36        gateway_managed: Determines whether the virtual layer's views are created by the model-specific
 37            gateways, otherwise the default gateway is used. Default: False.
 38    """
 39
 40    name: str = c.PROD
 41    suffix_target: EnvironmentSuffixTarget = Field(default=EnvironmentSuffixTarget.SCHEMA)
 42    catalog_name_override: t.Optional[str] = None
 43    normalize_name: bool = True
 44    gateway_managed: bool = False
 45
 46    @property
 47    def is_dev(self) -> bool:
 48        return self.name.lower() != c.PROD
 49
 50    @field_validator("name", mode="before")
 51    @classmethod
 52    def _sanitize_name(cls, v: str) -> str:
 53        return word_characters_only(v).lower()
 54
 55    @field_validator("normalize_name", "gateway_managed", mode="before")
 56    @classmethod
 57    def _validate_boolean_field(cls, v: t.Any, info: ValidationInfo) -> bool:
 58        if v is None:
 59            return info.field_name == "normalize_name"
 60        return bool(v)
 61
 62    @t.overload
 63    @classmethod
 64    def sanitize_name(cls, v: str) -> str: ...
 65
 66    @t.overload
 67    @classmethod
 68    def sanitize_name(cls, v: Environment) -> Environment: ...
 69
 70    @classmethod
 71    def sanitize_name(cls, v: str | Environment) -> str | Environment:
 72        """
 73        Sanitizes the environment name so we create names that are valid names for database objects.
 74        This means alphanumeric and underscores only. Invalid characters are replaced with underscores.
 75        """
 76        if isinstance(v, Environment):
 77            return v
 78        if not isinstance(v, str):
 79            raise TypeError(f"Expected str or Environment, got {type(v).__name__}")
 80        return cls._sanitize_name(v)
 81
 82    @classmethod
 83    def sanitize_names(cls, values: t.Iterable[str]) -> t.Set[str]:
 84        return {cls.sanitize_name(value) for value in values}
 85
 86    @classmethod
 87    def from_environment_catalog_mapping(
 88        cls: t.Type[T],
 89        environment_catalog_mapping: t.Dict[re.Pattern, str],
 90        name: str = c.PROD,
 91        **kwargs: t.Any,
 92    ) -> T:
 93        construction_kwargs = dict(name=name, **kwargs)
 94        for re_pattern, catalog_name in environment_catalog_mapping.items():
 95            if re.match(re_pattern, name):
 96                return cls(
 97                    catalog_name_override=catalog_name,
 98                    **construction_kwargs,
 99                )
100        return cls(**construction_kwargs)
101
102
103class EnvironmentSummary(PydanticModel):
104    """Represents summary information of an isolated environment.
105
106    Args:
107        name: The name of the environment.
108        start_at: The start time of the environment.
109        end_at: The end time of the environment.
110        plan_id: The ID of the plan that last updated this environment.
111        previous_plan_id: The ID of the previous plan that updated this environment.
112        expiration_ts: The timestamp when this environment will expire.
113        finalized_ts: The timestamp when this environment was finalized.
114    """
115
116    name: str
117    start_at: TimeLike
118    end_at: t.Optional[TimeLike] = None
119    plan_id: str
120    previous_plan_id: t.Optional[str] = None
121    expiration_ts: t.Optional[int] = None
122    finalized_ts: t.Optional[int] = None
123
124    @property
125    def expired(self) -> bool:
126        return self.expiration_ts is not None and self.expiration_ts <= now_timestamp()
127
128
129class Environment(EnvironmentNamingInfo, EnvironmentSummary):
130    """Represents an isolated environment.
131
132    Environments are isolated workspaces that hold pointers to physical tables.
133
134    Args:
135        snapshots: The snapshots that are part of this environment.
136        promoted_snapshot_ids: The IDs of the snapshots that are promoted in this environment
137            (i.e. for which the views are created). If not specified, all snapshots are promoted.
138        previous_finalized_snapshots: Snapshots that were part of this environment last time it was finalized.
139        requirements: A mapping of library versions for all the snapshots in this environment.
140    """
141
142    snapshots_: t.List[t.Any] = Field(alias="snapshots")
143    promoted_snapshot_ids_: t.Optional[t.List[t.Any]] = Field(
144        default=None, alias="promoted_snapshot_ids"
145    )
146    previous_finalized_snapshots_: t.Optional[t.List[t.Any]] = Field(
147        default=None, alias="previous_finalized_snapshots"
148    )
149    requirements: t.Dict[str, str] = {}
150
151    @field_validator("snapshots_", "previous_finalized_snapshots_", mode="before")
152    @classmethod
153    def _load_snapshots(cls, v: str | t.List[t.Any] | None) -> t.List[t.Any] | None:
154        if isinstance(v, str):
155            return json.loads(v)
156        if v and not isinstance(next(iter(v)), (dict, SnapshotTableInfo)):
157            raise ValueError("Must be a list of SnapshotTableInfo dicts or objects")
158        return v
159
160    @field_validator("promoted_snapshot_ids_", mode="before")
161    @classmethod
162    def _load_snapshot_ids(cls, v: str | t.List[t.Any] | None) -> t.List[t.Any] | None:
163        if isinstance(v, str):
164            return json.loads(v)
165        if v and not isinstance(next(iter(v)), (dict, SnapshotId)):
166            raise ValueError("Must be a list of SnapshotId dicts or objects")
167        return v
168
169    @field_validator("requirements", mode="before")
170    def _load_requirements(cls, v: t.Any) -> t.Any:
171        if isinstance(v, str):
172            v = json.loads(v)
173        return v or {}
174
175    @property
176    def snapshots(self) -> t.List[SnapshotTableInfo]:
177        return self._convert_list_to_models_and_store("snapshots_", SnapshotTableInfo) or []
178
179    def snapshot_dicts(self) -> t.List[dict]:
180        return self._convert_list_to_dicts(self.snapshots_)
181
182    @property
183    def promoted_snapshot_ids(self) -> t.Optional[t.List[SnapshotId]]:
184        return self._convert_list_to_models_and_store("promoted_snapshot_ids_", SnapshotId)
185
186    def promoted_snapshot_id_dicts(self) -> t.List[dict]:
187        return self._convert_list_to_dicts(self.promoted_snapshot_ids_)
188
189    @property
190    def promoted_snapshots(self) -> t.List[SnapshotTableInfo]:
191        if self.promoted_snapshot_ids is None:
192            return self.snapshots
193
194        promoted_snapshot_ids = set(self.promoted_snapshot_ids)
195        return [s for s in self.snapshots if s.snapshot_id in promoted_snapshot_ids]
196
197    @property
198    def previous_finalized_snapshots(self) -> t.Optional[t.List[SnapshotTableInfo]]:
199        return self._convert_list_to_models_and_store(
200            "previous_finalized_snapshots_", SnapshotTableInfo
201        )
202
203    def previous_finalized_snapshot_dicts(self) -> t.List[dict]:
204        return self._convert_list_to_dicts(self.previous_finalized_snapshots_)
205
206    @property
207    def finalized_or_current_snapshots(self) -> t.List[SnapshotTableInfo]:
208        return (
209            self.snapshots
210            if self.finalized_ts
211            else self.previous_finalized_snapshots or self.snapshots
212        )
213
214    @property
215    def naming_info(self) -> EnvironmentNamingInfo:
216        return EnvironmentNamingInfo(
217            name=self.name,
218            suffix_target=self.suffix_target,
219            catalog_name_override=self.catalog_name_override,
220            normalize_name=self.normalize_name,
221            gateway_managed=self.gateway_managed,
222        )
223
224    @property
225    def summary(self) -> EnvironmentSummary:
226        return EnvironmentSummary(
227            name=self.name,
228            start_at=self.start_at,
229            end_at=self.end_at,
230            plan_id=self.plan_id,
231            previous_plan_id=self.previous_plan_id,
232            expiration_ts=self.expiration_ts,
233            finalized_ts=self.finalized_ts,
234        )
235
236    def can_partially_promote(self, existing_environment: Environment) -> bool:
237        """Returns True if the existing environment can be partially promoted to the current environment.
238
239        Partial promotion means that we don't need to re-create views for snapshots that are already promoted in the
240        target environment.
241        """
242        return (
243            bool(existing_environment.finalized_ts)
244            and not existing_environment.expired
245            and existing_environment.gateway_managed == self.gateway_managed
246            and existing_environment.name == c.PROD
247        )
248
249    def _convert_list_to_models_and_store(
250        self, field: str, type_: t.Type[PydanticType]
251    ) -> t.Optional[t.List[PydanticType]]:
252        value = getattr(self, field)
253        if value and not isinstance(value[0], type_):
254            value = [type_.parse_obj(obj) for obj in value]
255            setattr(self, field, value)
256        return value
257
258    def _convert_list_to_dicts(self, value: t.Optional[t.List[t.Any]]) -> t.List[dict]:
259        if not value:
260            return []
261        return value if isinstance(value[0], dict) else [v.dict() for v in value]
262
263
264class EnvironmentStatements(PydanticModel):
265    before_all: t.List[str]
266    after_all: t.List[str]
267    python_env: t.Dict[str, Executable]
268    jinja_macros: t.Optional[JinjaMacroRegistry] = None
269    project: t.Optional[str] = None
270
271    def render_before_all(
272        self,
273        dialect: str,
274        default_catalog: t.Optional[str] = None,
275        **render_kwargs: t.Any,
276    ) -> t.List[str]:
277        return self.render(RuntimeStage.BEFORE_ALL, dialect, default_catalog, **render_kwargs)
278
279    def render_after_all(
280        self,
281        dialect: str,
282        default_catalog: t.Optional[str] = None,
283        **render_kwargs: t.Any,
284    ) -> t.List[str]:
285        return self.render(RuntimeStage.AFTER_ALL, dialect, default_catalog, **render_kwargs)
286
287    def render(
288        self,
289        runtime_stage: RuntimeStage,
290        dialect: str,
291        default_catalog: t.Optional[str] = None,
292        **render_kwargs: t.Any,
293    ) -> t.List[str]:
294        return render_statements(
295            statements=getattr(self, runtime_stage.value),
296            dialect=dialect,
297            default_catalog=default_catalog,
298            python_env=self.python_env,
299            jinja_macros=self.jinja_macros,
300            runtime_stage=runtime_stage,
301            **render_kwargs,
302        )
303
304
305def execute_environment_statements(
306    adapter: EngineAdapter,
307    environment_statements: t.List[EnvironmentStatements],
308    runtime_stage: RuntimeStage,
309    environment_naming_info: EnvironmentNamingInfo,
310    default_catalog: t.Optional[str] = None,
311    snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
312    start: t.Optional[TimeLike] = None,
313    end: t.Optional[TimeLike] = None,
314    execution_time: t.Optional[TimeLike] = None,
315    selected_models: t.Optional[t.Set[str]] = None,
316) -> None:
317    try:
318        rendered_expressions = [
319            expr
320            for statements in environment_statements
321            for expr in statements.render(
322                runtime_stage=runtime_stage,
323                dialect=adapter.dialect,
324                default_catalog=default_catalog,
325                snapshots=snapshots,
326                start=start,
327                end=end,
328                execution_time=execution_time,
329                environment_naming_info=environment_naming_info,
330                engine_adapter=adapter,
331                selected_models=selected_models,
332            )
333        ]
334    except Exception as e:
335        raise SQLMeshError(
336            f"An error occurred during rendering of the '{runtime_stage.value}' statements:\n\n{e}"
337        )
338    if rendered_expressions:
339        with adapter.transaction():
340            for expr in rendered_expressions:
341                try:
342                    adapter.execute(expr)
343                except Exception as e:
344                    raise SQLMeshError(
345                        f"An error occurred during execution of the following '{runtime_stage.value}' statement:\n\n{expr}\n\n{e}"
346                    )
class EnvironmentNamingInfo(sqlmesh.utils.pydantic.PydanticModel):
 27class EnvironmentNamingInfo(PydanticModel):
 28    """
 29    Information required for creating an object within an environment
 30
 31    Args:
 32        name: The name of the environment.
 33        suffix_target: Indicates whether to append the environment name to the schema or table name.
 34        catalog_name_override: The name of the catalog to use for this environment if an override was provided
 35        normalize_name: Indicates whether the environment's name will be normalized. For example, if it's
 36            `dev`, then it will become `DEV` when targeting Snowflake.
 37        gateway_managed: Determines whether the virtual layer's views are created by the model-specific
 38            gateways, otherwise the default gateway is used. Default: False.
 39    """
 40
 41    name: str = c.PROD
 42    suffix_target: EnvironmentSuffixTarget = Field(default=EnvironmentSuffixTarget.SCHEMA)
 43    catalog_name_override: t.Optional[str] = None
 44    normalize_name: bool = True
 45    gateway_managed: bool = False
 46
 47    @property
 48    def is_dev(self) -> bool:
 49        return self.name.lower() != c.PROD
 50
 51    @field_validator("name", mode="before")
 52    @classmethod
 53    def _sanitize_name(cls, v: str) -> str:
 54        return word_characters_only(v).lower()
 55
 56    @field_validator("normalize_name", "gateway_managed", mode="before")
 57    @classmethod
 58    def _validate_boolean_field(cls, v: t.Any, info: ValidationInfo) -> bool:
 59        if v is None:
 60            return info.field_name == "normalize_name"
 61        return bool(v)
 62
 63    @t.overload
 64    @classmethod
 65    def sanitize_name(cls, v: str) -> str: ...
 66
 67    @t.overload
 68    @classmethod
 69    def sanitize_name(cls, v: Environment) -> Environment: ...
 70
 71    @classmethod
 72    def sanitize_name(cls, v: str | Environment) -> str | Environment:
 73        """
 74        Sanitizes the environment name so we create names that are valid names for database objects.
 75        This means alphanumeric and underscores only. Invalid characters are replaced with underscores.
 76        """
 77        if isinstance(v, Environment):
 78            return v
 79        if not isinstance(v, str):
 80            raise TypeError(f"Expected str or Environment, got {type(v).__name__}")
 81        return cls._sanitize_name(v)
 82
 83    @classmethod
 84    def sanitize_names(cls, values: t.Iterable[str]) -> t.Set[str]:
 85        return {cls.sanitize_name(value) for value in values}
 86
 87    @classmethod
 88    def from_environment_catalog_mapping(
 89        cls: t.Type[T],
 90        environment_catalog_mapping: t.Dict[re.Pattern, str],
 91        name: str = c.PROD,
 92        **kwargs: t.Any,
 93    ) -> T:
 94        construction_kwargs = dict(name=name, **kwargs)
 95        for re_pattern, catalog_name in environment_catalog_mapping.items():
 96            if re.match(re_pattern, name):
 97                return cls(
 98                    catalog_name_override=catalog_name,
 99                    **construction_kwargs,
100                )
101        return cls(**construction_kwargs)

Information required for creating an object within an environment

Arguments:
  • name: The name of the environment.
  • suffix_target: Indicates whether to append the environment name to the schema or table name.
  • catalog_name_override: The name of the catalog to use for this environment if an override was provided
  • normalize_name: Indicates whether the environment's name will be normalized. For example, if it's dev, then it will become DEV when targeting Snowflake.
  • gateway_managed: Determines whether the virtual layer's views are created by the model-specific gateways, otherwise the default gateway is used. Default: False.
name: str
catalog_name_override: Optional[str]
normalize_name: bool
gateway_managed: bool
is_dev: bool
47    @property
48    def is_dev(self) -> bool:
49        return self.name.lower() != c.PROD
@classmethod
def sanitize_name( cls, v: str | Environment) -> str | Environment:
71    @classmethod
72    def sanitize_name(cls, v: str | Environment) -> str | Environment:
73        """
74        Sanitizes the environment name so we create names that are valid names for database objects.
75        This means alphanumeric and underscores only. Invalid characters are replaced with underscores.
76        """
77        if isinstance(v, Environment):
78            return v
79        if not isinstance(v, str):
80            raise TypeError(f"Expected str or Environment, got {type(v).__name__}")
81        return cls._sanitize_name(v)

Sanitizes the environment name so we create names that are valid names for database objects. This means alphanumeric and underscores only. Invalid characters are replaced with underscores.

@classmethod
def sanitize_names(cls, values: Iterable[str]) -> Set[str]:
83    @classmethod
84    def sanitize_names(cls, values: t.Iterable[str]) -> t.Set[str]:
85        return {cls.sanitize_name(value) for value in values}
@classmethod
def from_environment_catalog_mapping( cls: Type[~T], environment_catalog_mapping: Dict[re.Pattern, str], name: str = 'prod', **kwargs: Any) -> ~T:
 87    @classmethod
 88    def from_environment_catalog_mapping(
 89        cls: t.Type[T],
 90        environment_catalog_mapping: t.Dict[re.Pattern, str],
 91        name: str = c.PROD,
 92        **kwargs: t.Any,
 93    ) -> T:
 94        construction_kwargs = dict(name=name, **kwargs)
 95        for re_pattern, catalog_name in environment_catalog_mapping.items():
 96            if re.match(re_pattern, name):
 97                return cls(
 98                    catalog_name_override=catalog_name,
 99                    **construction_kwargs,
100                )
101        return cls(**construction_kwargs)
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class EnvironmentSummary(sqlmesh.utils.pydantic.PydanticModel):
104class EnvironmentSummary(PydanticModel):
105    """Represents summary information of an isolated environment.
106
107    Args:
108        name: The name of the environment.
109        start_at: The start time of the environment.
110        end_at: The end time of the environment.
111        plan_id: The ID of the plan that last updated this environment.
112        previous_plan_id: The ID of the previous plan that updated this environment.
113        expiration_ts: The timestamp when this environment will expire.
114        finalized_ts: The timestamp when this environment was finalized.
115    """
116
117    name: str
118    start_at: TimeLike
119    end_at: t.Optional[TimeLike] = None
120    plan_id: str
121    previous_plan_id: t.Optional[str] = None
122    expiration_ts: t.Optional[int] = None
123    finalized_ts: t.Optional[int] = None
124
125    @property
126    def expired(self) -> bool:
127        return self.expiration_ts is not None and self.expiration_ts <= now_timestamp()

Represents summary information of an isolated environment.

Arguments:
  • name: The name of the environment.
  • start_at: The start time of the environment.
  • end_at: The end time of the environment.
  • plan_id: The ID of the plan that last updated this environment.
  • previous_plan_id: The ID of the previous plan that updated this environment.
  • expiration_ts: The timestamp when this environment will expire.
  • finalized_ts: The timestamp when this environment was finalized.
name: str
start_at: Union[datetime.date, datetime.datetime, str, int, float]
end_at: Union[datetime.date, datetime.datetime, str, int, float, NoneType]
plan_id: str
previous_plan_id: Optional[str]
expiration_ts: Optional[int]
finalized_ts: Optional[int]
expired: bool
125    @property
126    def expired(self) -> bool:
127        return self.expiration_ts is not None and self.expiration_ts <= now_timestamp()
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class Environment(EnvironmentNamingInfo, EnvironmentSummary):
130class Environment(EnvironmentNamingInfo, EnvironmentSummary):
131    """Represents an isolated environment.
132
133    Environments are isolated workspaces that hold pointers to physical tables.
134
135    Args:
136        snapshots: The snapshots that are part of this environment.
137        promoted_snapshot_ids: The IDs of the snapshots that are promoted in this environment
138            (i.e. for which the views are created). If not specified, all snapshots are promoted.
139        previous_finalized_snapshots: Snapshots that were part of this environment last time it was finalized.
140        requirements: A mapping of library versions for all the snapshots in this environment.
141    """
142
143    snapshots_: t.List[t.Any] = Field(alias="snapshots")
144    promoted_snapshot_ids_: t.Optional[t.List[t.Any]] = Field(
145        default=None, alias="promoted_snapshot_ids"
146    )
147    previous_finalized_snapshots_: t.Optional[t.List[t.Any]] = Field(
148        default=None, alias="previous_finalized_snapshots"
149    )
150    requirements: t.Dict[str, str] = {}
151
152    @field_validator("snapshots_", "previous_finalized_snapshots_", mode="before")
153    @classmethod
154    def _load_snapshots(cls, v: str | t.List[t.Any] | None) -> t.List[t.Any] | None:
155        if isinstance(v, str):
156            return json.loads(v)
157        if v and not isinstance(next(iter(v)), (dict, SnapshotTableInfo)):
158            raise ValueError("Must be a list of SnapshotTableInfo dicts or objects")
159        return v
160
161    @field_validator("promoted_snapshot_ids_", mode="before")
162    @classmethod
163    def _load_snapshot_ids(cls, v: str | t.List[t.Any] | None) -> t.List[t.Any] | None:
164        if isinstance(v, str):
165            return json.loads(v)
166        if v and not isinstance(next(iter(v)), (dict, SnapshotId)):
167            raise ValueError("Must be a list of SnapshotId dicts or objects")
168        return v
169
170    @field_validator("requirements", mode="before")
171    def _load_requirements(cls, v: t.Any) -> t.Any:
172        if isinstance(v, str):
173            v = json.loads(v)
174        return v or {}
175
176    @property
177    def snapshots(self) -> t.List[SnapshotTableInfo]:
178        return self._convert_list_to_models_and_store("snapshots_", SnapshotTableInfo) or []
179
180    def snapshot_dicts(self) -> t.List[dict]:
181        return self._convert_list_to_dicts(self.snapshots_)
182
183    @property
184    def promoted_snapshot_ids(self) -> t.Optional[t.List[SnapshotId]]:
185        return self._convert_list_to_models_and_store("promoted_snapshot_ids_", SnapshotId)
186
187    def promoted_snapshot_id_dicts(self) -> t.List[dict]:
188        return self._convert_list_to_dicts(self.promoted_snapshot_ids_)
189
190    @property
191    def promoted_snapshots(self) -> t.List[SnapshotTableInfo]:
192        if self.promoted_snapshot_ids is None:
193            return self.snapshots
194
195        promoted_snapshot_ids = set(self.promoted_snapshot_ids)
196        return [s for s in self.snapshots if s.snapshot_id in promoted_snapshot_ids]
197
198    @property
199    def previous_finalized_snapshots(self) -> t.Optional[t.List[SnapshotTableInfo]]:
200        return self._convert_list_to_models_and_store(
201            "previous_finalized_snapshots_", SnapshotTableInfo
202        )
203
204    def previous_finalized_snapshot_dicts(self) -> t.List[dict]:
205        return self._convert_list_to_dicts(self.previous_finalized_snapshots_)
206
207    @property
208    def finalized_or_current_snapshots(self) -> t.List[SnapshotTableInfo]:
209        return (
210            self.snapshots
211            if self.finalized_ts
212            else self.previous_finalized_snapshots or self.snapshots
213        )
214
215    @property
216    def naming_info(self) -> EnvironmentNamingInfo:
217        return EnvironmentNamingInfo(
218            name=self.name,
219            suffix_target=self.suffix_target,
220            catalog_name_override=self.catalog_name_override,
221            normalize_name=self.normalize_name,
222            gateway_managed=self.gateway_managed,
223        )
224
225    @property
226    def summary(self) -> EnvironmentSummary:
227        return EnvironmentSummary(
228            name=self.name,
229            start_at=self.start_at,
230            end_at=self.end_at,
231            plan_id=self.plan_id,
232            previous_plan_id=self.previous_plan_id,
233            expiration_ts=self.expiration_ts,
234            finalized_ts=self.finalized_ts,
235        )
236
237    def can_partially_promote(self, existing_environment: Environment) -> bool:
238        """Returns True if the existing environment can be partially promoted to the current environment.
239
240        Partial promotion means that we don't need to re-create views for snapshots that are already promoted in the
241        target environment.
242        """
243        return (
244            bool(existing_environment.finalized_ts)
245            and not existing_environment.expired
246            and existing_environment.gateway_managed == self.gateway_managed
247            and existing_environment.name == c.PROD
248        )
249
250    def _convert_list_to_models_and_store(
251        self, field: str, type_: t.Type[PydanticType]
252    ) -> t.Optional[t.List[PydanticType]]:
253        value = getattr(self, field)
254        if value and not isinstance(value[0], type_):
255            value = [type_.parse_obj(obj) for obj in value]
256            setattr(self, field, value)
257        return value
258
259    def _convert_list_to_dicts(self, value: t.Optional[t.List[t.Any]]) -> t.List[dict]:
260        if not value:
261            return []
262        return value if isinstance(value[0], dict) else [v.dict() for v in value]

Represents an isolated environment.

Environments are isolated workspaces that hold pointers to physical tables.

Arguments:
  • snapshots: The snapshots that are part of this environment.
  • promoted_snapshot_ids: The IDs of the snapshots that are promoted in this environment (i.e. for which the views are created). If not specified, all snapshots are promoted.
  • previous_finalized_snapshots: Snapshots that were part of this environment last time it was finalized.
  • requirements: A mapping of library versions for all the snapshots in this environment.
snapshots_: List[Any]
promoted_snapshot_ids_: Optional[List[Any]]
previous_finalized_snapshots_: Optional[List[Any]]
requirements: Dict[str, str]
176    @property
177    def snapshots(self) -> t.List[SnapshotTableInfo]:
178        return self._convert_list_to_models_and_store("snapshots_", SnapshotTableInfo) or []
def snapshot_dicts(self) -> List[dict]:
180    def snapshot_dicts(self) -> t.List[dict]:
181        return self._convert_list_to_dicts(self.snapshots_)
promoted_snapshot_ids: Optional[List[sqlmesh.core.snapshot.definition.SnapshotId]]
183    @property
184    def promoted_snapshot_ids(self) -> t.Optional[t.List[SnapshotId]]:
185        return self._convert_list_to_models_and_store("promoted_snapshot_ids_", SnapshotId)
def promoted_snapshot_id_dicts(self) -> List[dict]:
187    def promoted_snapshot_id_dicts(self) -> t.List[dict]:
188        return self._convert_list_to_dicts(self.promoted_snapshot_ids_)
promoted_snapshots: List[sqlmesh.core.snapshot.definition.SnapshotTableInfo]
190    @property
191    def promoted_snapshots(self) -> t.List[SnapshotTableInfo]:
192        if self.promoted_snapshot_ids is None:
193            return self.snapshots
194
195        promoted_snapshot_ids = set(self.promoted_snapshot_ids)
196        return [s for s in self.snapshots if s.snapshot_id in promoted_snapshot_ids]
previous_finalized_snapshots: Optional[List[sqlmesh.core.snapshot.definition.SnapshotTableInfo]]
198    @property
199    def previous_finalized_snapshots(self) -> t.Optional[t.List[SnapshotTableInfo]]:
200        return self._convert_list_to_models_and_store(
201            "previous_finalized_snapshots_", SnapshotTableInfo
202        )
def previous_finalized_snapshot_dicts(self) -> List[dict]:
204    def previous_finalized_snapshot_dicts(self) -> t.List[dict]:
205        return self._convert_list_to_dicts(self.previous_finalized_snapshots_)
finalized_or_current_snapshots: List[sqlmesh.core.snapshot.definition.SnapshotTableInfo]
207    @property
208    def finalized_or_current_snapshots(self) -> t.List[SnapshotTableInfo]:
209        return (
210            self.snapshots
211            if self.finalized_ts
212            else self.previous_finalized_snapshots or self.snapshots
213        )
naming_info: EnvironmentNamingInfo
215    @property
216    def naming_info(self) -> EnvironmentNamingInfo:
217        return EnvironmentNamingInfo(
218            name=self.name,
219            suffix_target=self.suffix_target,
220            catalog_name_override=self.catalog_name_override,
221            normalize_name=self.normalize_name,
222            gateway_managed=self.gateway_managed,
223        )
summary: EnvironmentSummary
225    @property
226    def summary(self) -> EnvironmentSummary:
227        return EnvironmentSummary(
228            name=self.name,
229            start_at=self.start_at,
230            end_at=self.end_at,
231            plan_id=self.plan_id,
232            previous_plan_id=self.previous_plan_id,
233            expiration_ts=self.expiration_ts,
234            finalized_ts=self.finalized_ts,
235        )
def can_partially_promote(self, existing_environment: Environment) -> bool:
237    def can_partially_promote(self, existing_environment: Environment) -> bool:
238        """Returns True if the existing environment can be partially promoted to the current environment.
239
240        Partial promotion means that we don't need to re-create views for snapshots that are already promoted in the
241        target environment.
242        """
243        return (
244            bool(existing_environment.finalized_ts)
245            and not existing_environment.expired
246            and existing_environment.gateway_managed == self.gateway_managed
247            and existing_environment.name == c.PROD
248        )

Returns True if the existing environment can be partially promoted to the current environment.

Partial promotion means that we don't need to re-create views for snapshots that are already promoted in the target environment.

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
EnvironmentNamingInfo
name
suffix_target
catalog_name_override
normalize_name
gateway_managed
is_dev
sanitize_name
sanitize_names
from_environment_catalog_mapping
EnvironmentSummary
start_at
end_at
plan_id
previous_plan_id
expiration_ts
finalized_ts
expired
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class EnvironmentStatements(sqlmesh.utils.pydantic.PydanticModel):
265class EnvironmentStatements(PydanticModel):
266    before_all: t.List[str]
267    after_all: t.List[str]
268    python_env: t.Dict[str, Executable]
269    jinja_macros: t.Optional[JinjaMacroRegistry] = None
270    project: t.Optional[str] = None
271
272    def render_before_all(
273        self,
274        dialect: str,
275        default_catalog: t.Optional[str] = None,
276        **render_kwargs: t.Any,
277    ) -> t.List[str]:
278        return self.render(RuntimeStage.BEFORE_ALL, dialect, default_catalog, **render_kwargs)
279
280    def render_after_all(
281        self,
282        dialect: str,
283        default_catalog: t.Optional[str] = None,
284        **render_kwargs: t.Any,
285    ) -> t.List[str]:
286        return self.render(RuntimeStage.AFTER_ALL, dialect, default_catalog, **render_kwargs)
287
288    def render(
289        self,
290        runtime_stage: RuntimeStage,
291        dialect: str,
292        default_catalog: t.Optional[str] = None,
293        **render_kwargs: t.Any,
294    ) -> t.List[str]:
295        return render_statements(
296            statements=getattr(self, runtime_stage.value),
297            dialect=dialect,
298            default_catalog=default_catalog,
299            python_env=self.python_env,
300            jinja_macros=self.jinja_macros,
301            runtime_stage=runtime_stage,
302            **render_kwargs,
303        )

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
before_all: List[str]
after_all: List[str]
jinja_macros: Optional[sqlmesh.utils.jinja.JinjaMacroRegistry]
project: Optional[str]
def render_before_all( self, dialect: str, default_catalog: Optional[str] = None, **render_kwargs: Any) -> List[str]:
272    def render_before_all(
273        self,
274        dialect: str,
275        default_catalog: t.Optional[str] = None,
276        **render_kwargs: t.Any,
277    ) -> t.List[str]:
278        return self.render(RuntimeStage.BEFORE_ALL, dialect, default_catalog, **render_kwargs)
def render_after_all( self, dialect: str, default_catalog: Optional[str] = None, **render_kwargs: Any) -> List[str]:
280    def render_after_all(
281        self,
282        dialect: str,
283        default_catalog: t.Optional[str] = None,
284        **render_kwargs: t.Any,
285    ) -> t.List[str]:
286        return self.render(RuntimeStage.AFTER_ALL, dialect, default_catalog, **render_kwargs)
def render( self, runtime_stage: sqlmesh.core.macros.RuntimeStage, dialect: str, default_catalog: Optional[str] = None, **render_kwargs: Any) -> List[str]:
288    def render(
289        self,
290        runtime_stage: RuntimeStage,
291        dialect: str,
292        default_catalog: t.Optional[str] = None,
293        **render_kwargs: t.Any,
294    ) -> t.List[str]:
295        return render_statements(
296            statements=getattr(self, runtime_stage.value),
297            dialect=dialect,
298            default_catalog=default_catalog,
299            python_env=self.python_env,
300            jinja_macros=self.jinja_macros,
301            runtime_stage=runtime_stage,
302            **render_kwargs,
303        )
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
def execute_environment_statements( adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, environment_statements: List[EnvironmentStatements], runtime_stage: sqlmesh.core.macros.RuntimeStage, environment_naming_info: EnvironmentNamingInfo, default_catalog: Optional[str] = None, snapshots: Optional[Dict[str, sqlmesh.core.snapshot.definition.Snapshot]] = None, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, selected_models: Optional[Set[str]] = None) -> None:
306def execute_environment_statements(
307    adapter: EngineAdapter,
308    environment_statements: t.List[EnvironmentStatements],
309    runtime_stage: RuntimeStage,
310    environment_naming_info: EnvironmentNamingInfo,
311    default_catalog: t.Optional[str] = None,
312    snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
313    start: t.Optional[TimeLike] = None,
314    end: t.Optional[TimeLike] = None,
315    execution_time: t.Optional[TimeLike] = None,
316    selected_models: t.Optional[t.Set[str]] = None,
317) -> None:
318    try:
319        rendered_expressions = [
320            expr
321            for statements in environment_statements
322            for expr in statements.render(
323                runtime_stage=runtime_stage,
324                dialect=adapter.dialect,
325                default_catalog=default_catalog,
326                snapshots=snapshots,
327                start=start,
328                end=end,
329                execution_time=execution_time,
330                environment_naming_info=environment_naming_info,
331                engine_adapter=adapter,
332                selected_models=selected_models,
333            )
334        ]
335    except Exception as e:
336        raise SQLMeshError(
337            f"An error occurred during rendering of the '{runtime_stage.value}' statements:\n\n{e}"
338        )
339    if rendered_expressions:
340        with adapter.transaction():
341            for expr in rendered_expressions:
342                try:
343                    adapter.execute(expr)
344                except Exception as e:
345                    raise SQLMeshError(
346                        f"An error occurred during execution of the following '{runtime_stage.value}' statement:\n\n{expr}\n\n{e}"
347                    )