Edit on GitHub

sqlmesh.core.environment

  1from __future__ import annotations
  2
  3import json
  4import re
  5import typing as t
  6
  7from pydantic import Field
  8
  9from sqlmesh.core import constants as c
 10from sqlmesh.core.config import EnvironmentSuffixTarget
 11from sqlmesh.core.snapshot import SnapshotId, SnapshotTableInfo
 12from sqlmesh.utils import word_characters_only
 13from sqlmesh.utils.date import TimeLike
 14from sqlmesh.utils.pydantic import PydanticModel, field_validator
 15
 16T = t.TypeVar("T", bound="EnvironmentNamingInfo")
 17
 18
 19class EnvironmentNamingInfo(PydanticModel):
 20    """
 21    Information required for creating an object within an environment
 22
 23    Args:
 24        name: The name of the environment.
 25        suffix_target: Indicates whether to append the environment name to the schema or table name.
 26        catalog_name_override: The name of the catalog to use for this environment if an override was provided
 27
 28    """
 29
 30    name: str = c.PROD
 31    suffix_target: EnvironmentSuffixTarget = Field(default=EnvironmentSuffixTarget.SCHEMA)
 32    catalog_name_override: t.Optional[str] = None
 33
 34    @field_validator("name", mode="before")
 35    @classmethod
 36    def _normalize_name(cls, v: str) -> str:
 37        return word_characters_only(v).lower()
 38
 39    @t.overload
 40    @classmethod
 41    def normalize_name(cls, v: str) -> str: ...
 42
 43    @t.overload
 44    @classmethod
 45    def normalize_name(cls, v: Environment) -> Environment: ...
 46
 47    @classmethod
 48    def normalize_name(cls, v: str | Environment) -> str | Environment:
 49        """
 50        Normalizes the environment name so we create names that are valid names for database objects.
 51        This means alphanumeric and underscores only. Invalid characters are replaced with underscores.
 52        """
 53        if isinstance(v, Environment):
 54            return v
 55        if not isinstance(v, str):
 56            raise TypeError(f"Expected str or Environment, got {type(v).__name__}")
 57        return cls._normalize_name(v)
 58
 59    @classmethod
 60    def normalize_names(cls, values: t.Iterable[str]) -> t.Set[str]:
 61        return {cls.normalize_name(value) for value in values}
 62
 63    @classmethod
 64    def from_environment_catalog_mapping(
 65        cls: t.Type[T],
 66        environment_catalog_mapping: t.Dict[re.Pattern, str],
 67        name: str = c.PROD,
 68        **kwargs: t.Any,
 69    ) -> T:
 70        construction_kwargs = dict(name=name, **kwargs)
 71        for re_pattern, catalog_name in environment_catalog_mapping.items():
 72            if re.match(re_pattern, name):
 73                return cls(
 74                    catalog_name_override=catalog_name,
 75                    **construction_kwargs,
 76                )
 77        return cls(**construction_kwargs)
 78
 79
 80class Environment(EnvironmentNamingInfo):
 81    """Represents an isolated environment.
 82
 83    Environments are isolated workspaces that hold pointers to physical tables.
 84
 85    Args:
 86        snapshots: The snapshots that are part of this environment.
 87        start_at: The start time of the environment.
 88        end_at: The end time of the environment.
 89        plan_id: The ID of the plan that last updated this environment.
 90        previous_plan_id: The ID of the previous plan that updated this environment.
 91        expiration_ts: The timestamp when this environment will expire.
 92        finalized_ts: The timestamp when this environment was finalized.
 93        promoted_snapshot_ids: The IDs of the snapshots that are promoted in this environment
 94            (i.e. for which the views are created). If not specified, all snapshots are promoted.
 95        previous_finalized_snapshots: Snapshots that were part of this environment last time it was finalized.
 96    """
 97
 98    snapshots: t.List[SnapshotTableInfo]
 99    start_at: TimeLike
100    end_at: t.Optional[TimeLike] = None
101    plan_id: str
102    previous_plan_id: t.Optional[str] = None
103    expiration_ts: t.Optional[int] = None
104    finalized_ts: t.Optional[int] = None
105    promoted_snapshot_ids: t.Optional[t.List[SnapshotId]] = None
106    previous_finalized_snapshots: t.Optional[t.List[SnapshotTableInfo]] = None
107
108    @field_validator("snapshots", "previous_finalized_snapshots", mode="before")
109    @classmethod
110    def _convert_snapshots(
111        cls, v: str | t.List[SnapshotTableInfo] | None
112    ) -> t.List[SnapshotTableInfo] | None:
113        if isinstance(v, str):
114            return [SnapshotTableInfo.parse_obj(obj) for obj in json.loads(v)]
115        return v
116
117    @field_validator("promoted_snapshot_ids", mode="before")
118    @classmethod
119    def _convert_snapshot_ids(cls, v: str | t.List[SnapshotId]) -> t.List[SnapshotId]:
120        if isinstance(v, str):
121            return [SnapshotId.parse_obj(obj) for obj in json.loads(v)]
122        return v
123
124    @property
125    def promoted_snapshots(self) -> t.List[SnapshotTableInfo]:
126        if self.promoted_snapshot_ids is None:
127            return self.snapshots
128
129        promoted_snapshot_ids = set(self.promoted_snapshot_ids)
130        return [s for s in self.snapshots if s.snapshot_id in promoted_snapshot_ids]
131
132    @property
133    def finalized_or_current_snapshots(self) -> t.List[SnapshotTableInfo]:
134        return (
135            self.snapshots
136            if self.finalized_ts
137            else self.previous_finalized_snapshots or self.snapshots
138        )
139
140    @property
141    def naming_info(self) -> EnvironmentNamingInfo:
142        return EnvironmentNamingInfo(
143            name=self.name,
144            suffix_target=self.suffix_target,
145            catalog_name_override=self.catalog_name_override,
146        )
class EnvironmentNamingInfo(sqlmesh.utils.pydantic.PydanticModel):
20class EnvironmentNamingInfo(PydanticModel):
21    """
22    Information required for creating an object within an environment
23
24    Args:
25        name: The name of the environment.
26        suffix_target: Indicates whether to append the environment name to the schema or table name.
27        catalog_name_override: The name of the catalog to use for this environment if an override was provided
28
29    """
30
31    name: str = c.PROD
32    suffix_target: EnvironmentSuffixTarget = Field(default=EnvironmentSuffixTarget.SCHEMA)
33    catalog_name_override: t.Optional[str] = None
34
35    @field_validator("name", mode="before")
36    @classmethod
37    def _normalize_name(cls, v: str) -> str:
38        return word_characters_only(v).lower()
39
40    @t.overload
41    @classmethod
42    def normalize_name(cls, v: str) -> str: ...
43
44    @t.overload
45    @classmethod
46    def normalize_name(cls, v: Environment) -> Environment: ...
47
48    @classmethod
49    def normalize_name(cls, v: str | Environment) -> str | Environment:
50        """
51        Normalizes the environment name so we create names that are valid names for database objects.
52        This means alphanumeric and underscores only. Invalid characters are replaced with underscores.
53        """
54        if isinstance(v, Environment):
55            return v
56        if not isinstance(v, str):
57            raise TypeError(f"Expected str or Environment, got {type(v).__name__}")
58        return cls._normalize_name(v)
59
60    @classmethod
61    def normalize_names(cls, values: t.Iterable[str]) -> t.Set[str]:
62        return {cls.normalize_name(value) for value in values}
63
64    @classmethod
65    def from_environment_catalog_mapping(
66        cls: t.Type[T],
67        environment_catalog_mapping: t.Dict[re.Pattern, str],
68        name: str = c.PROD,
69        **kwargs: t.Any,
70    ) -> T:
71        construction_kwargs = dict(name=name, **kwargs)
72        for re_pattern, catalog_name in environment_catalog_mapping.items():
73            if re.match(re_pattern, name):
74                return cls(
75                    catalog_name_override=catalog_name,
76                    **construction_kwargs,
77                )
78        return cls(**construction_kwargs)

Information required for creating an object within an environment

Arguments:
  • name: The name of the environment.
  • suffix_target: Indicates whether to append the environment name to the schema or table name.
  • catalog_name_override: The name of the catalog to use for this environment if an override was provided
@classmethod
def normalize_name(cls, v: 'str | Environment') -> 'str | Environment':
48    @classmethod
49    def normalize_name(cls, v: str | Environment) -> str | Environment:
50        """
51        Normalizes the environment name so we create names that are valid names for database objects.
52        This means alphanumeric and underscores only. Invalid characters are replaced with underscores.
53        """
54        if isinstance(v, Environment):
55            return v
56        if not isinstance(v, str):
57            raise TypeError(f"Expected str or Environment, got {type(v).__name__}")
58        return cls._normalize_name(v)

Normalizes the environment name so we create names that are valid names for database objects. This means alphanumeric and underscores only. Invalid characters are replaced with underscores.

@classmethod
def normalize_names(cls, values: Iterable[str]) -> Set[str]:
60    @classmethod
61    def normalize_names(cls, values: t.Iterable[str]) -> t.Set[str]:
62        return {cls.normalize_name(value) for value in values}
@classmethod
def from_environment_catalog_mapping( cls: Type[~T], environment_catalog_mapping: Dict[re.Pattern, str], name: str = 'prod', **kwargs: Any) -> ~T:
64    @classmethod
65    def from_environment_catalog_mapping(
66        cls: t.Type[T],
67        environment_catalog_mapping: t.Dict[re.Pattern, str],
68        name: str = c.PROD,
69        **kwargs: t.Any,
70    ) -> T:
71        construction_kwargs = dict(name=name, **kwargs)
72        for re_pattern, catalog_name in environment_catalog_mapping.items():
73            if re.match(re_pattern, name):
74                return cls(
75                    catalog_name_override=catalog_name,
76                    **construction_kwargs,
77                )
78        return cls(**construction_kwargs)
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
model_post_init
class Environment(EnvironmentNamingInfo):
 81class Environment(EnvironmentNamingInfo):
 82    """Represents an isolated environment.
 83
 84    Environments are isolated workspaces that hold pointers to physical tables.
 85
 86    Args:
 87        snapshots: The snapshots that are part of this environment.
 88        start_at: The start time of the environment.
 89        end_at: The end time of the environment.
 90        plan_id: The ID of the plan that last updated this environment.
 91        previous_plan_id: The ID of the previous plan that updated this environment.
 92        expiration_ts: The timestamp when this environment will expire.
 93        finalized_ts: The timestamp when this environment was finalized.
 94        promoted_snapshot_ids: The IDs of the snapshots that are promoted in this environment
 95            (i.e. for which the views are created). If not specified, all snapshots are promoted.
 96        previous_finalized_snapshots: Snapshots that were part of this environment last time it was finalized.
 97    """
 98
 99    snapshots: t.List[SnapshotTableInfo]
100    start_at: TimeLike
101    end_at: t.Optional[TimeLike] = None
102    plan_id: str
103    previous_plan_id: t.Optional[str] = None
104    expiration_ts: t.Optional[int] = None
105    finalized_ts: t.Optional[int] = None
106    promoted_snapshot_ids: t.Optional[t.List[SnapshotId]] = None
107    previous_finalized_snapshots: t.Optional[t.List[SnapshotTableInfo]] = None
108
109    @field_validator("snapshots", "previous_finalized_snapshots", mode="before")
110    @classmethod
111    def _convert_snapshots(
112        cls, v: str | t.List[SnapshotTableInfo] | None
113    ) -> t.List[SnapshotTableInfo] | None:
114        if isinstance(v, str):
115            return [SnapshotTableInfo.parse_obj(obj) for obj in json.loads(v)]
116        return v
117
118    @field_validator("promoted_snapshot_ids", mode="before")
119    @classmethod
120    def _convert_snapshot_ids(cls, v: str | t.List[SnapshotId]) -> t.List[SnapshotId]:
121        if isinstance(v, str):
122            return [SnapshotId.parse_obj(obj) for obj in json.loads(v)]
123        return v
124
125    @property
126    def promoted_snapshots(self) -> t.List[SnapshotTableInfo]:
127        if self.promoted_snapshot_ids is None:
128            return self.snapshots
129
130        promoted_snapshot_ids = set(self.promoted_snapshot_ids)
131        return [s for s in self.snapshots if s.snapshot_id in promoted_snapshot_ids]
132
133    @property
134    def finalized_or_current_snapshots(self) -> t.List[SnapshotTableInfo]:
135        return (
136            self.snapshots
137            if self.finalized_ts
138            else self.previous_finalized_snapshots or self.snapshots
139        )
140
141    @property
142    def naming_info(self) -> EnvironmentNamingInfo:
143        return EnvironmentNamingInfo(
144            name=self.name,
145            suffix_target=self.suffix_target,
146            catalog_name_override=self.catalog_name_override,
147        )

Represents an isolated environment.

Environments are isolated workspaces that hold pointers to physical tables.

Arguments:
  • snapshots: The snapshots that are part of this environment.
  • start_at: The start time of the environment.
  • end_at: The end time of the environment.
  • plan_id: The ID of the plan that last updated this environment.
  • previous_plan_id: The ID of the previous plan that updated this environment.
  • expiration_ts: The timestamp when this environment will expire.
  • finalized_ts: The timestamp when this environment was finalized.
  • promoted_snapshot_ids: The IDs of the snapshots that are promoted in this environment (i.e. for which the views are created). If not specified, all snapshots are promoted.
  • previous_finalized_snapshots: Snapshots that were part of this environment last time it was finalized.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
EnvironmentNamingInfo
normalize_name
normalize_names
from_environment_catalog_mapping
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
model_post_init