sqlmesh.core.state_sync.common
1from __future__ import annotations 2 3import logging 4import typing as t 5from functools import wraps 6import itertools 7import abc 8 9from dataclasses import dataclass 10 11from pydantic_core.core_schema import ValidationInfo 12from sqlglot import exp 13 14from sqlmesh.utils.pydantic import PydanticModel, field_validator 15from sqlmesh.core.environment import Environment, EnvironmentStatements, EnvironmentNamingInfo 16from sqlmesh.core.snapshot import ( 17 Snapshot, 18 SnapshotId, 19 SnapshotTableCleanupTask, 20 SnapshotTableInfo, 21) 22 23if t.TYPE_CHECKING: 24 from sqlmesh.core.state_sync.base import Versions, StateReader 25 26logger = logging.getLogger(__name__) 27 28EXPIRED_SNAPSHOT_DEFAULT_BATCH_SIZE = 200 29 30 31def transactional() -> t.Callable[[t.Callable], t.Callable]: 32 def decorator(func: t.Callable) -> t.Callable: 33 @wraps(func) 34 def wrapper(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 35 if not hasattr(self, "_transaction"): 36 return func(self, *args, **kwargs) 37 38 with self._transaction(): 39 return func(self, *args, **kwargs) 40 41 return wrapper 42 43 return decorator 44 45 46T = t.TypeVar("T") 47 48 49def chunk_iterable(iterable: t.Iterable[T], size: int = 10) -> t.Iterable[t.Iterable[T]]: 50 iterator = iter(iterable) 51 for first in iterator: 52 yield itertools.chain([first], itertools.islice(iterator, size - 1)) 53 54 55class EnvironmentWithStatements(PydanticModel): 56 environment: Environment 57 statements: t.List[EnvironmentStatements] = [] 58 59 60@dataclass 61class VersionsChunk: 62 versions: Versions 63 64 65class SnapshotsChunk: 66 def __init__(self, items: t.Iterator[Snapshot]): 67 self.items = items 68 69 def __iter__(self) -> t.Iterator[Snapshot]: 70 return self.items 71 72 73class EnvironmentsChunk: 74 def __init__(self, items: t.Iterator[EnvironmentWithStatements]): 75 self.items = items 76 77 def __iter__(self) -> t.Iterator[EnvironmentWithStatements]: 78 return self.items 79 80 81StateStreamContents = t.Union[VersionsChunk, SnapshotsChunk, EnvironmentsChunk] 82 83 84class StateStream(abc.ABC): 85 """ 86 Represents a stream of state either going into the StateSync (perhaps loaded from a file) 87 or out of the StateSync (perhaps being dumped to a file) 88 89 Iterating over the stream produces the following chunks: 90 91 VersionsChunk: The versions of the objects contained in this StateStream 92 SnapshotsChunk: Is itself an iterator that streams Snapshot objects. Note that they should be fully populated with any relevant Intervals 93 EnvironmentsChunk: Is itself an iterator emitting a stream of Environments with any EnvironmentStatements attached 94 95 The idea here is to give some structure to the stream and ensure that callers have the opportunity to process all its components while not 96 needing to worry about the order they are emitted in 97 """ 98 99 @abc.abstractmethod 100 def __iter__(self) -> t.Iterator[StateStreamContents]: 101 pass 102 103 @classmethod 104 def from_iterators( 105 cls: t.Type["StateStream"], 106 versions: Versions, 107 snapshots: t.Iterator[Snapshot], 108 environments: t.Iterator[EnvironmentWithStatements], 109 ) -> "StateStream": 110 class _StateStream(cls): # type: ignore 111 def __iter__(self) -> t.Iterator[StateStreamContents]: 112 yield VersionsChunk(versions) 113 114 yield SnapshotsChunk(snapshots) 115 116 yield EnvironmentsChunk(environments) 117 118 return _StateStream() 119 120 121class ExpiredBatchRange(PydanticModel): 122 start: RowBoundary 123 end: t.Union[RowBoundary, LimitBoundary] 124 125 @classmethod 126 def init_batch_range(cls, batch_size: int) -> ExpiredBatchRange: 127 return ExpiredBatchRange( 128 start=RowBoundary.lowest_boundary(), 129 end=LimitBoundary(batch_size=batch_size), 130 ) 131 132 @classmethod 133 def all_batch_range(cls) -> ExpiredBatchRange: 134 return ExpiredBatchRange( 135 start=RowBoundary.lowest_boundary(), 136 end=RowBoundary.highest_boundary(), 137 ) 138 139 @classmethod 140 def _expanded_tuple_comparison( 141 cls, 142 columns: t.List[exp.Column], 143 values: t.List[t.Union[exp.Literal, exp.Neg]], 144 operator: t.Type[exp.Expr], 145 ) -> exp.Condition: 146 """Generate expanded tuple comparison that works across all SQL engines. 147 148 Converts tuple comparisons like (a, b, c) OP (x, y, z) into an expanded form 149 that's compatible with all SQL engines, since native tuple comparisons have 150 inconsistent support across engines (especially DuckDB, MySQL, SQLite). 151 152 Repro of problem with DuckDB: 153 "SELECT * FROM VALUES(1,'2') as test(a,b) WHERE ((a, b) > (1, 'foo')) AND ((a, b) <= (10, 'baz'))" 154 155 Args: 156 columns: List of column expressions to compare 157 values: List of value expressions to compare against 158 operator: The comparison operator class (exp.GT, exp.GTE, exp.LT, exp.LTE) 159 160 Examples: 161 (a, b, c) > (x, y, z) expands to: 162 a > x OR (a = x AND b > y) OR (a = x AND b = y AND c > z) 163 164 (a, b, c) <= (x, y, z) expands to: 165 a < x OR (a = x AND b < y) OR (a = x AND b = y AND c <= z) 166 167 (a, b, c) >= (x, y, z) expands to: 168 a > x OR (a = x AND b > y) OR (a = x AND b = y AND c >= z) 169 170 Returns: 171 An expanded OR expression representing the tuple comparison 172 """ 173 if operator not in (exp.GT, exp.GTE, exp.LT, exp.LTE): 174 raise ValueError(f"Unsupported operator: {operator}. Use GT, GTE, LT, or LTE.") 175 176 # For <= and >=, we use the strict operator for all but the last column 177 # e.g., (a, b) <= (x, y) becomes: a < x OR (a = x AND b <= y) 178 # For < and >, we use the strict operator throughout 179 # e.g., (a, b) > (x, y) becomes: a > x OR (a = x AND b > x) 180 strict_operator: t.Type[exp.Expr] 181 final_operator: t.Type[exp.Expr] 182 183 if operator in (exp.LTE, exp.GTE): 184 # For inclusive operators (<=, >=), use strict form for intermediate columns 185 # but keep inclusive form for the last column 186 strict_operator = exp.LT if operator == exp.LTE else exp.GT 187 final_operator = operator # Keep LTE/GTE for last column 188 else: 189 # For strict operators (<, >), use them throughout 190 strict_operator = operator 191 final_operator = operator 192 193 conditions: t.List[exp.Expr] = [] 194 for i in range(len(columns)): 195 # Build equality conditions for all columns before current 196 equality_conditions = [exp.EQ(this=columns[j], expression=values[j]) for j in range(i)] 197 198 # Use the final operator for the last column, strict for others 199 comparison_op = final_operator if i == len(columns) - 1 else strict_operator 200 comparison_condition = comparison_op(this=columns[i], expression=values[i]) 201 202 if equality_conditions: 203 conditions.append(exp.and_(*equality_conditions, comparison_condition)) 204 else: 205 conditions.append(comparison_condition) 206 207 return exp.or_(*conditions) if len(conditions) > 1 else t.cast(exp.Condition, conditions[0]) 208 209 @property 210 def where_filter(self) -> exp.Condition: 211 # Use expanded tuple comparisons for cross-engine compatibility 212 # Native tuple comparisons like (a, b) > (x, y) don't work reliably across all SQL engines 213 columns = [ 214 exp.column("updated_ts"), 215 exp.column("name"), 216 exp.column("identifier"), 217 ] 218 start_values = [ 219 exp.Literal.number(self.start.updated_ts), 220 exp.Literal.string(self.start.name), 221 exp.Literal.string(self.start.identifier), 222 ] 223 224 start_condition = self._expanded_tuple_comparison(columns, start_values, exp.GT) 225 226 range_filter: exp.Condition 227 if isinstance(self.end, RowBoundary): 228 end_values = [ 229 exp.Literal.number(self.end.updated_ts), 230 exp.Literal.string(self.end.name), 231 exp.Literal.string(self.end.identifier), 232 ] 233 end_condition = self._expanded_tuple_comparison(columns, end_values, exp.LTE) 234 range_filter = exp.and_(start_condition, end_condition) 235 else: 236 range_filter = start_condition 237 return range_filter 238 239 240class RowBoundary(PydanticModel): 241 updated_ts: int 242 name: str 243 identifier: str 244 245 @classmethod 246 def lowest_boundary(cls) -> RowBoundary: 247 return RowBoundary(updated_ts=0, name="", identifier="") 248 249 @classmethod 250 def highest_boundary(cls) -> RowBoundary: 251 # 9999-12-31T23:59:59.999Z in epoch milliseconds 252 return RowBoundary(updated_ts=253_402_300_799_999, name="", identifier="") 253 254 255class LimitBoundary(PydanticModel): 256 batch_size: int 257 258 @classmethod 259 def init_batch_boundary(cls, batch_size: int) -> LimitBoundary: 260 return LimitBoundary(batch_size=batch_size) 261 262 263class PromotionResult(PydanticModel): 264 added: t.List[SnapshotTableInfo] 265 removed: t.List[SnapshotTableInfo] 266 removed_environment_naming_info: t.Optional[EnvironmentNamingInfo] 267 268 @field_validator("removed_environment_naming_info") 269 def _validate_removed_environment_naming_info( 270 cls, v: t.Optional[EnvironmentNamingInfo], info: ValidationInfo 271 ) -> t.Optional[EnvironmentNamingInfo]: 272 if v and not info.data.get("removed"): 273 raise ValueError("removed_environment_naming_info must be None if removed is empty") 274 return v 275 276 277class ExpiredSnapshotBatch(PydanticModel): 278 """A batch of expired snapshots to be cleaned up.""" 279 280 expired_snapshot_ids: t.Set[SnapshotId] 281 cleanup_tasks: t.List[SnapshotTableCleanupTask] 282 batch_range: ExpiredBatchRange 283 284 285def iter_expired_snapshot_batches( 286 state_reader: StateReader, 287 *, 288 current_ts: int, 289 ignore_ttl: bool = False, 290 batch_size: t.Optional[int] = None, 291) -> t.Iterator[ExpiredSnapshotBatch]: 292 """Yields expired snapshot batches. 293 294 Args: 295 state_reader: StateReader instance to query expired snapshots from. 296 current_ts: Timestamp used to evaluate expiration. 297 ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced). 298 batch_size: Maximum number of snapshots to fetch per batch. 299 """ 300 301 batch_size = batch_size if batch_size is not None else EXPIRED_SNAPSHOT_DEFAULT_BATCH_SIZE 302 batch_range = ExpiredBatchRange.init_batch_range(batch_size=batch_size) 303 304 while True: 305 batch = state_reader.get_expired_snapshots( 306 current_ts=current_ts, 307 ignore_ttl=ignore_ttl, 308 batch_range=batch_range, 309 ) 310 311 if batch is None: 312 return 313 314 yield batch 315 316 assert isinstance(batch.batch_range.end, RowBoundary), ( 317 "Only RowBoundary is supported for pagination currently" 318 ) 319 batch_range = ExpiredBatchRange( 320 start=batch.batch_range.end, 321 end=LimitBoundary(batch_size=batch_size), 322 )
32def transactional() -> t.Callable[[t.Callable], t.Callable]: 33 def decorator(func: t.Callable) -> t.Callable: 34 @wraps(func) 35 def wrapper(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 36 if not hasattr(self, "_transaction"): 37 return func(self, *args, **kwargs) 38 39 with self._transaction(): 40 return func(self, *args, **kwargs) 41 42 return wrapper 43 44 return decorator
56class EnvironmentWithStatements(PydanticModel): 57 environment: Environment 58 statements: t.List[EnvironmentStatements] = []
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
66class SnapshotsChunk: 67 def __init__(self, items: t.Iterator[Snapshot]): 68 self.items = items 69 70 def __iter__(self) -> t.Iterator[Snapshot]: 71 return self.items
74class EnvironmentsChunk: 75 def __init__(self, items: t.Iterator[EnvironmentWithStatements]): 76 self.items = items 77 78 def __iter__(self) -> t.Iterator[EnvironmentWithStatements]: 79 return self.items
85class StateStream(abc.ABC): 86 """ 87 Represents a stream of state either going into the StateSync (perhaps loaded from a file) 88 or out of the StateSync (perhaps being dumped to a file) 89 90 Iterating over the stream produces the following chunks: 91 92 VersionsChunk: The versions of the objects contained in this StateStream 93 SnapshotsChunk: Is itself an iterator that streams Snapshot objects. Note that they should be fully populated with any relevant Intervals 94 EnvironmentsChunk: Is itself an iterator emitting a stream of Environments with any EnvironmentStatements attached 95 96 The idea here is to give some structure to the stream and ensure that callers have the opportunity to process all its components while not 97 needing to worry about the order they are emitted in 98 """ 99 100 @abc.abstractmethod 101 def __iter__(self) -> t.Iterator[StateStreamContents]: 102 pass 103 104 @classmethod 105 def from_iterators( 106 cls: t.Type["StateStream"], 107 versions: Versions, 108 snapshots: t.Iterator[Snapshot], 109 environments: t.Iterator[EnvironmentWithStatements], 110 ) -> "StateStream": 111 class _StateStream(cls): # type: ignore 112 def __iter__(self) -> t.Iterator[StateStreamContents]: 113 yield VersionsChunk(versions) 114 115 yield SnapshotsChunk(snapshots) 116 117 yield EnvironmentsChunk(environments) 118 119 return _StateStream()
Represents a stream of state either going into the StateSync (perhaps loaded from a file) or out of the StateSync (perhaps being dumped to a file)
Iterating over the stream produces the following chunks:
VersionsChunk: The versions of the objects contained in this StateStream SnapshotsChunk: Is itself an iterator that streams Snapshot objects. Note that they should be fully populated with any relevant Intervals EnvironmentsChunk: Is itself an iterator emitting a stream of Environments with any EnvironmentStatements attached
The idea here is to give some structure to the stream and ensure that callers have the opportunity to process all its components while not needing to worry about the order they are emitted in
104 @classmethod 105 def from_iterators( 106 cls: t.Type["StateStream"], 107 versions: Versions, 108 snapshots: t.Iterator[Snapshot], 109 environments: t.Iterator[EnvironmentWithStatements], 110 ) -> "StateStream": 111 class _StateStream(cls): # type: ignore 112 def __iter__(self) -> t.Iterator[StateStreamContents]: 113 yield VersionsChunk(versions) 114 115 yield SnapshotsChunk(snapshots) 116 117 yield EnvironmentsChunk(environments) 118 119 return _StateStream()
122class ExpiredBatchRange(PydanticModel): 123 start: RowBoundary 124 end: t.Union[RowBoundary, LimitBoundary] 125 126 @classmethod 127 def init_batch_range(cls, batch_size: int) -> ExpiredBatchRange: 128 return ExpiredBatchRange( 129 start=RowBoundary.lowest_boundary(), 130 end=LimitBoundary(batch_size=batch_size), 131 ) 132 133 @classmethod 134 def all_batch_range(cls) -> ExpiredBatchRange: 135 return ExpiredBatchRange( 136 start=RowBoundary.lowest_boundary(), 137 end=RowBoundary.highest_boundary(), 138 ) 139 140 @classmethod 141 def _expanded_tuple_comparison( 142 cls, 143 columns: t.List[exp.Column], 144 values: t.List[t.Union[exp.Literal, exp.Neg]], 145 operator: t.Type[exp.Expr], 146 ) -> exp.Condition: 147 """Generate expanded tuple comparison that works across all SQL engines. 148 149 Converts tuple comparisons like (a, b, c) OP (x, y, z) into an expanded form 150 that's compatible with all SQL engines, since native tuple comparisons have 151 inconsistent support across engines (especially DuckDB, MySQL, SQLite). 152 153 Repro of problem with DuckDB: 154 "SELECT * FROM VALUES(1,'2') as test(a,b) WHERE ((a, b) > (1, 'foo')) AND ((a, b) <= (10, 'baz'))" 155 156 Args: 157 columns: List of column expressions to compare 158 values: List of value expressions to compare against 159 operator: The comparison operator class (exp.GT, exp.GTE, exp.LT, exp.LTE) 160 161 Examples: 162 (a, b, c) > (x, y, z) expands to: 163 a > x OR (a = x AND b > y) OR (a = x AND b = y AND c > z) 164 165 (a, b, c) <= (x, y, z) expands to: 166 a < x OR (a = x AND b < y) OR (a = x AND b = y AND c <= z) 167 168 (a, b, c) >= (x, y, z) expands to: 169 a > x OR (a = x AND b > y) OR (a = x AND b = y AND c >= z) 170 171 Returns: 172 An expanded OR expression representing the tuple comparison 173 """ 174 if operator not in (exp.GT, exp.GTE, exp.LT, exp.LTE): 175 raise ValueError(f"Unsupported operator: {operator}. Use GT, GTE, LT, or LTE.") 176 177 # For <= and >=, we use the strict operator for all but the last column 178 # e.g., (a, b) <= (x, y) becomes: a < x OR (a = x AND b <= y) 179 # For < and >, we use the strict operator throughout 180 # e.g., (a, b) > (x, y) becomes: a > x OR (a = x AND b > x) 181 strict_operator: t.Type[exp.Expr] 182 final_operator: t.Type[exp.Expr] 183 184 if operator in (exp.LTE, exp.GTE): 185 # For inclusive operators (<=, >=), use strict form for intermediate columns 186 # but keep inclusive form for the last column 187 strict_operator = exp.LT if operator == exp.LTE else exp.GT 188 final_operator = operator # Keep LTE/GTE for last column 189 else: 190 # For strict operators (<, >), use them throughout 191 strict_operator = operator 192 final_operator = operator 193 194 conditions: t.List[exp.Expr] = [] 195 for i in range(len(columns)): 196 # Build equality conditions for all columns before current 197 equality_conditions = [exp.EQ(this=columns[j], expression=values[j]) for j in range(i)] 198 199 # Use the final operator for the last column, strict for others 200 comparison_op = final_operator if i == len(columns) - 1 else strict_operator 201 comparison_condition = comparison_op(this=columns[i], expression=values[i]) 202 203 if equality_conditions: 204 conditions.append(exp.and_(*equality_conditions, comparison_condition)) 205 else: 206 conditions.append(comparison_condition) 207 208 return exp.or_(*conditions) if len(conditions) > 1 else t.cast(exp.Condition, conditions[0]) 209 210 @property 211 def where_filter(self) -> exp.Condition: 212 # Use expanded tuple comparisons for cross-engine compatibility 213 # Native tuple comparisons like (a, b) > (x, y) don't work reliably across all SQL engines 214 columns = [ 215 exp.column("updated_ts"), 216 exp.column("name"), 217 exp.column("identifier"), 218 ] 219 start_values = [ 220 exp.Literal.number(self.start.updated_ts), 221 exp.Literal.string(self.start.name), 222 exp.Literal.string(self.start.identifier), 223 ] 224 225 start_condition = self._expanded_tuple_comparison(columns, start_values, exp.GT) 226 227 range_filter: exp.Condition 228 if isinstance(self.end, RowBoundary): 229 end_values = [ 230 exp.Literal.number(self.end.updated_ts), 231 exp.Literal.string(self.end.name), 232 exp.Literal.string(self.end.identifier), 233 ] 234 end_condition = self._expanded_tuple_comparison(columns, end_values, exp.LTE) 235 range_filter = exp.and_(start_condition, end_condition) 236 else: 237 range_filter = start_condition 238 return range_filter
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
210 @property 211 def where_filter(self) -> exp.Condition: 212 # Use expanded tuple comparisons for cross-engine compatibility 213 # Native tuple comparisons like (a, b) > (x, y) don't work reliably across all SQL engines 214 columns = [ 215 exp.column("updated_ts"), 216 exp.column("name"), 217 exp.column("identifier"), 218 ] 219 start_values = [ 220 exp.Literal.number(self.start.updated_ts), 221 exp.Literal.string(self.start.name), 222 exp.Literal.string(self.start.identifier), 223 ] 224 225 start_condition = self._expanded_tuple_comparison(columns, start_values, exp.GT) 226 227 range_filter: exp.Condition 228 if isinstance(self.end, RowBoundary): 229 end_values = [ 230 exp.Literal.number(self.end.updated_ts), 231 exp.Literal.string(self.end.name), 232 exp.Literal.string(self.end.identifier), 233 ] 234 end_condition = self._expanded_tuple_comparison(columns, end_values, exp.LTE) 235 range_filter = exp.and_(start_condition, end_condition) 236 else: 237 range_filter = start_condition 238 return range_filter
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
241class RowBoundary(PydanticModel): 242 updated_ts: int 243 name: str 244 identifier: str 245 246 @classmethod 247 def lowest_boundary(cls) -> RowBoundary: 248 return RowBoundary(updated_ts=0, name="", identifier="") 249 250 @classmethod 251 def highest_boundary(cls) -> RowBoundary: 252 # 9999-12-31T23:59:59.999Z in epoch milliseconds 253 return RowBoundary(updated_ts=253_402_300_799_999, name="", identifier="")
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
256class LimitBoundary(PydanticModel): 257 batch_size: int 258 259 @classmethod 260 def init_batch_boundary(cls, batch_size: int) -> LimitBoundary: 261 return LimitBoundary(batch_size=batch_size)
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
264class PromotionResult(PydanticModel): 265 added: t.List[SnapshotTableInfo] 266 removed: t.List[SnapshotTableInfo] 267 removed_environment_naming_info: t.Optional[EnvironmentNamingInfo] 268 269 @field_validator("removed_environment_naming_info") 270 def _validate_removed_environment_naming_info( 271 cls, v: t.Optional[EnvironmentNamingInfo], info: ValidationInfo 272 ) -> t.Optional[EnvironmentNamingInfo]: 273 if v and not info.data.get("removed"): 274 raise ValueError("removed_environment_naming_info must be None if removed is empty") 275 return v
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
278class ExpiredSnapshotBatch(PydanticModel): 279 """A batch of expired snapshots to be cleaned up.""" 280 281 expired_snapshot_ids: t.Set[SnapshotId] 282 cleanup_tasks: t.List[SnapshotTableCleanupTask] 283 batch_range: ExpiredBatchRange
A batch of expired snapshots to be cleaned up.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
286def iter_expired_snapshot_batches( 287 state_reader: StateReader, 288 *, 289 current_ts: int, 290 ignore_ttl: bool = False, 291 batch_size: t.Optional[int] = None, 292) -> t.Iterator[ExpiredSnapshotBatch]: 293 """Yields expired snapshot batches. 294 295 Args: 296 state_reader: StateReader instance to query expired snapshots from. 297 current_ts: Timestamp used to evaluate expiration. 298 ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced). 299 batch_size: Maximum number of snapshots to fetch per batch. 300 """ 301 302 batch_size = batch_size if batch_size is not None else EXPIRED_SNAPSHOT_DEFAULT_BATCH_SIZE 303 batch_range = ExpiredBatchRange.init_batch_range(batch_size=batch_size) 304 305 while True: 306 batch = state_reader.get_expired_snapshots( 307 current_ts=current_ts, 308 ignore_ttl=ignore_ttl, 309 batch_range=batch_range, 310 ) 311 312 if batch is None: 313 return 314 315 yield batch 316 317 assert isinstance(batch.batch_range.end, RowBoundary), ( 318 "Only RowBoundary is supported for pagination currently" 319 ) 320 batch_range = ExpiredBatchRange( 321 start=batch.batch_range.end, 322 end=LimitBoundary(batch_size=batch_size), 323 )
Yields expired snapshot batches.
Arguments:
- state_reader: StateReader instance to query expired snapshots from.
- current_ts: Timestamp used to evaluate expiration.
- ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
- batch_size: Maximum number of snapshots to fetch per batch.