sqlmesh.core.environment
1from __future__ import annotations 2 3import json 4import re 5import typing as t 6 7from pydantic import Field 8 9from sqlmesh.core import constants as c 10from sqlmesh.core.config import EnvironmentSuffixTarget 11from sqlmesh.core.snapshot import SnapshotId, SnapshotTableInfo 12from sqlmesh.utils import word_characters_only 13from sqlmesh.utils.date import TimeLike 14from sqlmesh.utils.pydantic import PydanticModel, field_validator 15 16T = t.TypeVar("T", bound="EnvironmentNamingInfo") 17 18 19class EnvironmentNamingInfo(PydanticModel): 20 """ 21 Information required for creating an object within an environment 22 23 Args: 24 name: The name of the environment. 25 suffix_target: Indicates whether to append the environment name to the schema or table name. 26 catalog_name_override: The name of the catalog to use for this environment if an override was provided 27 28 """ 29 30 name: str = c.PROD 31 suffix_target: EnvironmentSuffixTarget = Field(default=EnvironmentSuffixTarget.SCHEMA) 32 catalog_name_override: t.Optional[str] = None 33 34 @field_validator("name", mode="before") 35 @classmethod 36 def _normalize_name(cls, v: str) -> str: 37 return word_characters_only(v).lower() 38 39 @t.overload 40 @classmethod 41 def normalize_name(cls, v: str) -> str: ... 42 43 @t.overload 44 @classmethod 45 def normalize_name(cls, v: Environment) -> Environment: ... 46 47 @classmethod 48 def normalize_name(cls, v: str | Environment) -> str | Environment: 49 """ 50 Normalizes the environment name so we create names that are valid names for database objects. 51 This means alphanumeric and underscores only. Invalid characters are replaced with underscores. 52 """ 53 if isinstance(v, Environment): 54 return v 55 if not isinstance(v, str): 56 raise TypeError(f"Expected str or Environment, got {type(v).__name__}") 57 return cls._normalize_name(v) 58 59 @classmethod 60 def normalize_names(cls, values: t.Iterable[str]) -> t.Set[str]: 61 return {cls.normalize_name(value) for value in values} 62 63 @classmethod 64 def from_environment_catalog_mapping( 65 cls: t.Type[T], 66 environment_catalog_mapping: t.Dict[re.Pattern, str], 67 name: str = c.PROD, 68 **kwargs: t.Any, 69 ) -> T: 70 construction_kwargs = dict(name=name, **kwargs) 71 for re_pattern, catalog_name in environment_catalog_mapping.items(): 72 if re.match(re_pattern, name): 73 return cls( 74 catalog_name_override=catalog_name, 75 **construction_kwargs, 76 ) 77 return cls(**construction_kwargs) 78 79 80class Environment(EnvironmentNamingInfo): 81 """Represents an isolated environment. 82 83 Environments are isolated workspaces that hold pointers to physical tables. 84 85 Args: 86 snapshots: The snapshots that are part of this environment. 87 start_at: The start time of the environment. 88 end_at: The end time of the environment. 89 plan_id: The ID of the plan that last updated this environment. 90 previous_plan_id: The ID of the previous plan that updated this environment. 91 expiration_ts: The timestamp when this environment will expire. 92 finalized_ts: The timestamp when this environment was finalized. 93 promoted_snapshot_ids: The IDs of the snapshots that are promoted in this environment 94 (i.e. for which the views are created). If not specified, all snapshots are promoted. 95 previous_finalized_snapshots: Snapshots that were part of this environment last time it was finalized. 96 """ 97 98 snapshots: t.List[SnapshotTableInfo] 99 start_at: TimeLike 100 end_at: t.Optional[TimeLike] = None 101 plan_id: str 102 previous_plan_id: t.Optional[str] = None 103 expiration_ts: t.Optional[int] = None 104 finalized_ts: t.Optional[int] = None 105 promoted_snapshot_ids: t.Optional[t.List[SnapshotId]] = None 106 previous_finalized_snapshots: t.Optional[t.List[SnapshotTableInfo]] = None 107 108 @field_validator("snapshots", "previous_finalized_snapshots", mode="before") 109 @classmethod 110 def _convert_snapshots( 111 cls, v: str | t.List[SnapshotTableInfo] | None 112 ) -> t.List[SnapshotTableInfo] | None: 113 if isinstance(v, str): 114 return [SnapshotTableInfo.parse_obj(obj) for obj in json.loads(v)] 115 return v 116 117 @field_validator("promoted_snapshot_ids", mode="before") 118 @classmethod 119 def _convert_snapshot_ids(cls, v: str | t.List[SnapshotId]) -> t.List[SnapshotId]: 120 if isinstance(v, str): 121 return [SnapshotId.parse_obj(obj) for obj in json.loads(v)] 122 return v 123 124 @property 125 def promoted_snapshots(self) -> t.List[SnapshotTableInfo]: 126 if self.promoted_snapshot_ids is None: 127 return self.snapshots 128 129 promoted_snapshot_ids = set(self.promoted_snapshot_ids) 130 return [s for s in self.snapshots if s.snapshot_id in promoted_snapshot_ids] 131 132 @property 133 def finalized_or_current_snapshots(self) -> t.List[SnapshotTableInfo]: 134 return ( 135 self.snapshots 136 if self.finalized_ts 137 else self.previous_finalized_snapshots or self.snapshots 138 ) 139 140 @property 141 def naming_info(self) -> EnvironmentNamingInfo: 142 return EnvironmentNamingInfo( 143 name=self.name, 144 suffix_target=self.suffix_target, 145 catalog_name_override=self.catalog_name_override, 146 )
20class EnvironmentNamingInfo(PydanticModel): 21 """ 22 Information required for creating an object within an environment 23 24 Args: 25 name: The name of the environment. 26 suffix_target: Indicates whether to append the environment name to the schema or table name. 27 catalog_name_override: The name of the catalog to use for this environment if an override was provided 28 29 """ 30 31 name: str = c.PROD 32 suffix_target: EnvironmentSuffixTarget = Field(default=EnvironmentSuffixTarget.SCHEMA) 33 catalog_name_override: t.Optional[str] = None 34 35 @field_validator("name", mode="before") 36 @classmethod 37 def _normalize_name(cls, v: str) -> str: 38 return word_characters_only(v).lower() 39 40 @t.overload 41 @classmethod 42 def normalize_name(cls, v: str) -> str: ... 43 44 @t.overload 45 @classmethod 46 def normalize_name(cls, v: Environment) -> Environment: ... 47 48 @classmethod 49 def normalize_name(cls, v: str | Environment) -> str | Environment: 50 """ 51 Normalizes the environment name so we create names that are valid names for database objects. 52 This means alphanumeric and underscores only. Invalid characters are replaced with underscores. 53 """ 54 if isinstance(v, Environment): 55 return v 56 if not isinstance(v, str): 57 raise TypeError(f"Expected str or Environment, got {type(v).__name__}") 58 return cls._normalize_name(v) 59 60 @classmethod 61 def normalize_names(cls, values: t.Iterable[str]) -> t.Set[str]: 62 return {cls.normalize_name(value) for value in values} 63 64 @classmethod 65 def from_environment_catalog_mapping( 66 cls: t.Type[T], 67 environment_catalog_mapping: t.Dict[re.Pattern, str], 68 name: str = c.PROD, 69 **kwargs: t.Any, 70 ) -> T: 71 construction_kwargs = dict(name=name, **kwargs) 72 for re_pattern, catalog_name in environment_catalog_mapping.items(): 73 if re.match(re_pattern, name): 74 return cls( 75 catalog_name_override=catalog_name, 76 **construction_kwargs, 77 ) 78 return cls(**construction_kwargs)
Information required for creating an object within an environment
Arguments:
- name: The name of the environment.
- suffix_target: Indicates whether to append the environment name to the schema or table name.
- catalog_name_override: The name of the catalog to use for this environment if an override was provided
@classmethod
def
normalize_name(cls, v: 'str | Environment') -> 'str | Environment':
48 @classmethod 49 def normalize_name(cls, v: str | Environment) -> str | Environment: 50 """ 51 Normalizes the environment name so we create names that are valid names for database objects. 52 This means alphanumeric and underscores only. Invalid characters are replaced with underscores. 53 """ 54 if isinstance(v, Environment): 55 return v 56 if not isinstance(v, str): 57 raise TypeError(f"Expected str or Environment, got {type(v).__name__}") 58 return cls._normalize_name(v)
Normalizes the environment name so we create names that are valid names for database objects. This means alphanumeric and underscores only. Invalid characters are replaced with underscores.
@classmethod
def
from_environment_catalog_mapping( cls: Type[~T], environment_catalog_mapping: Dict[re.Pattern, str], name: str = 'prod', **kwargs: Any) -> ~T:
64 @classmethod 65 def from_environment_catalog_mapping( 66 cls: t.Type[T], 67 environment_catalog_mapping: t.Dict[re.Pattern, str], 68 name: str = c.PROD, 69 **kwargs: t.Any, 70 ) -> T: 71 construction_kwargs = dict(name=name, **kwargs) 72 for re_pattern, catalog_name in environment_catalog_mapping.items(): 73 if re.match(re_pattern, name): 74 return cls( 75 catalog_name_override=catalog_name, 76 **construction_kwargs, 77 ) 78 return cls(**construction_kwargs)
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
81class Environment(EnvironmentNamingInfo): 82 """Represents an isolated environment. 83 84 Environments are isolated workspaces that hold pointers to physical tables. 85 86 Args: 87 snapshots: The snapshots that are part of this environment. 88 start_at: The start time of the environment. 89 end_at: The end time of the environment. 90 plan_id: The ID of the plan that last updated this environment. 91 previous_plan_id: The ID of the previous plan that updated this environment. 92 expiration_ts: The timestamp when this environment will expire. 93 finalized_ts: The timestamp when this environment was finalized. 94 promoted_snapshot_ids: The IDs of the snapshots that are promoted in this environment 95 (i.e. for which the views are created). If not specified, all snapshots are promoted. 96 previous_finalized_snapshots: Snapshots that were part of this environment last time it was finalized. 97 """ 98 99 snapshots: t.List[SnapshotTableInfo] 100 start_at: TimeLike 101 end_at: t.Optional[TimeLike] = None 102 plan_id: str 103 previous_plan_id: t.Optional[str] = None 104 expiration_ts: t.Optional[int] = None 105 finalized_ts: t.Optional[int] = None 106 promoted_snapshot_ids: t.Optional[t.List[SnapshotId]] = None 107 previous_finalized_snapshots: t.Optional[t.List[SnapshotTableInfo]] = None 108 109 @field_validator("snapshots", "previous_finalized_snapshots", mode="before") 110 @classmethod 111 def _convert_snapshots( 112 cls, v: str | t.List[SnapshotTableInfo] | None 113 ) -> t.List[SnapshotTableInfo] | None: 114 if isinstance(v, str): 115 return [SnapshotTableInfo.parse_obj(obj) for obj in json.loads(v)] 116 return v 117 118 @field_validator("promoted_snapshot_ids", mode="before") 119 @classmethod 120 def _convert_snapshot_ids(cls, v: str | t.List[SnapshotId]) -> t.List[SnapshotId]: 121 if isinstance(v, str): 122 return [SnapshotId.parse_obj(obj) for obj in json.loads(v)] 123 return v 124 125 @property 126 def promoted_snapshots(self) -> t.List[SnapshotTableInfo]: 127 if self.promoted_snapshot_ids is None: 128 return self.snapshots 129 130 promoted_snapshot_ids = set(self.promoted_snapshot_ids) 131 return [s for s in self.snapshots if s.snapshot_id in promoted_snapshot_ids] 132 133 @property 134 def finalized_or_current_snapshots(self) -> t.List[SnapshotTableInfo]: 135 return ( 136 self.snapshots 137 if self.finalized_ts 138 else self.previous_finalized_snapshots or self.snapshots 139 ) 140 141 @property 142 def naming_info(self) -> EnvironmentNamingInfo: 143 return EnvironmentNamingInfo( 144 name=self.name, 145 suffix_target=self.suffix_target, 146 catalog_name_override=self.catalog_name_override, 147 )
Represents an isolated environment.
Environments are isolated workspaces that hold pointers to physical tables.
Arguments:
- snapshots: The snapshots that are part of this environment.
- start_at: The start time of the environment.
- end_at: The end time of the environment.
- plan_id: The ID of the plan that last updated this environment.
- previous_plan_id: The ID of the previous plan that updated this environment.
- expiration_ts: The timestamp when this environment will expire.
- finalized_ts: The timestamp when this environment was finalized.
- promoted_snapshot_ids: The IDs of the snapshots that are promoted in this environment (i.e. for which the views are created). If not specified, all snapshots are promoted.
- previous_finalized_snapshots: Snapshots that were part of this environment last time it was finalized.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs