Edit on GitHub

sqlmesh.core.state_sync.common

  1from __future__ import annotations
  2
  3import logging
  4import typing as t
  5from functools import wraps
  6import itertools
  7import abc
  8
  9from dataclasses import dataclass
 10
 11from pydantic_core.core_schema import ValidationInfo
 12from sqlglot import exp
 13
 14from sqlmesh.utils.pydantic import PydanticModel, field_validator
 15from sqlmesh.core.environment import Environment, EnvironmentStatements, EnvironmentNamingInfo
 16from sqlmesh.core.snapshot import (
 17    Snapshot,
 18    SnapshotId,
 19    SnapshotTableCleanupTask,
 20    SnapshotTableInfo,
 21)
 22
 23if t.TYPE_CHECKING:
 24    from sqlmesh.core.state_sync.base import Versions, StateReader
 25
 26logger = logging.getLogger(__name__)
 27
 28EXPIRED_SNAPSHOT_DEFAULT_BATCH_SIZE = 200
 29
 30
 31def transactional() -> t.Callable[[t.Callable], t.Callable]:
 32    def decorator(func: t.Callable) -> t.Callable:
 33        @wraps(func)
 34        def wrapper(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
 35            if not hasattr(self, "_transaction"):
 36                return func(self, *args, **kwargs)
 37
 38            with self._transaction():
 39                return func(self, *args, **kwargs)
 40
 41        return wrapper
 42
 43    return decorator
 44
 45
 46T = t.TypeVar("T")
 47
 48
 49def chunk_iterable(iterable: t.Iterable[T], size: int = 10) -> t.Iterable[t.Iterable[T]]:
 50    iterator = iter(iterable)
 51    for first in iterator:
 52        yield itertools.chain([first], itertools.islice(iterator, size - 1))
 53
 54
 55class EnvironmentWithStatements(PydanticModel):
 56    environment: Environment
 57    statements: t.List[EnvironmentStatements] = []
 58
 59
 60@dataclass
 61class VersionsChunk:
 62    versions: Versions
 63
 64
 65class SnapshotsChunk:
 66    def __init__(self, items: t.Iterator[Snapshot]):
 67        self.items = items
 68
 69    def __iter__(self) -> t.Iterator[Snapshot]:
 70        return self.items
 71
 72
 73class EnvironmentsChunk:
 74    def __init__(self, items: t.Iterator[EnvironmentWithStatements]):
 75        self.items = items
 76
 77    def __iter__(self) -> t.Iterator[EnvironmentWithStatements]:
 78        return self.items
 79
 80
 81StateStreamContents = t.Union[VersionsChunk, SnapshotsChunk, EnvironmentsChunk]
 82
 83
 84class StateStream(abc.ABC):
 85    """
 86    Represents a stream of state either going into the StateSync (perhaps loaded from a file)
 87    or out of the StateSync (perhaps being dumped to a file)
 88
 89    Iterating over the stream produces the following chunks:
 90
 91        VersionsChunk: The versions of the objects contained in this StateStream
 92        SnapshotsChunk: Is itself an iterator that streams Snapshot objects. Note that they should be fully populated with any relevant Intervals
 93        EnvironmentsChunk: Is itself an iterator emitting a stream of Environments with any EnvironmentStatements attached
 94
 95    The idea here is to give some structure to the stream and ensure that callers have the opportunity to process all its components while not
 96    needing to worry about the order they are emitted in
 97    """
 98
 99    @abc.abstractmethod
100    def __iter__(self) -> t.Iterator[StateStreamContents]:
101        pass
102
103    @classmethod
104    def from_iterators(
105        cls: t.Type["StateStream"],
106        versions: Versions,
107        snapshots: t.Iterator[Snapshot],
108        environments: t.Iterator[EnvironmentWithStatements],
109    ) -> "StateStream":
110        class _StateStream(cls):  # type: ignore
111            def __iter__(self) -> t.Iterator[StateStreamContents]:
112                yield VersionsChunk(versions)
113
114                yield SnapshotsChunk(snapshots)
115
116                yield EnvironmentsChunk(environments)
117
118        return _StateStream()
119
120
121class ExpiredBatchRange(PydanticModel):
122    start: RowBoundary
123    end: t.Union[RowBoundary, LimitBoundary]
124
125    @classmethod
126    def init_batch_range(cls, batch_size: int) -> ExpiredBatchRange:
127        return ExpiredBatchRange(
128            start=RowBoundary.lowest_boundary(),
129            end=LimitBoundary(batch_size=batch_size),
130        )
131
132    @classmethod
133    def all_batch_range(cls) -> ExpiredBatchRange:
134        return ExpiredBatchRange(
135            start=RowBoundary.lowest_boundary(),
136            end=RowBoundary.highest_boundary(),
137        )
138
139    @classmethod
140    def _expanded_tuple_comparison(
141        cls,
142        columns: t.List[exp.Column],
143        values: t.List[t.Union[exp.Literal, exp.Neg]],
144        operator: t.Type[exp.Expr],
145    ) -> exp.Condition:
146        """Generate expanded tuple comparison that works across all SQL engines.
147
148        Converts tuple comparisons like (a, b, c) OP (x, y, z) into an expanded form
149        that's compatible with all SQL engines, since native tuple comparisons have
150        inconsistent support across engines (especially DuckDB, MySQL, SQLite).
151
152        Repro of problem with DuckDB:
153            "SELECT * FROM VALUES(1,'2') as test(a,b) WHERE ((a, b) > (1, 'foo')) AND ((a, b) <= (10, 'baz'))"
154
155        Args:
156            columns: List of column expressions to compare
157            values: List of value expressions to compare against
158            operator: The comparison operator class (exp.GT, exp.GTE, exp.LT, exp.LTE)
159
160        Examples:
161            (a, b, c) > (x, y, z) expands to:
162                a > x OR (a = x AND b > y) OR (a = x AND b = y AND c > z)
163
164            (a, b, c) <= (x, y, z) expands to:
165                a < x OR (a = x AND b < y) OR (a = x AND b = y AND c <= z)
166
167            (a, b, c) >= (x, y, z) expands to:
168                a > x OR (a = x AND b > y) OR (a = x AND b = y AND c >= z)
169
170        Returns:
171            An expanded OR expression representing the tuple comparison
172        """
173        if operator not in (exp.GT, exp.GTE, exp.LT, exp.LTE):
174            raise ValueError(f"Unsupported operator: {operator}. Use GT, GTE, LT, or LTE.")
175
176        # For <= and >=, we use the strict operator for all but the last column
177        # e.g., (a, b) <= (x, y) becomes: a < x OR (a = x AND b <= y)
178        # For < and >, we use the strict operator throughout
179        # e.g., (a, b) > (x, y) becomes: a > x OR (a = x AND b > x)
180        strict_operator: t.Type[exp.Expr]
181        final_operator: t.Type[exp.Expr]
182
183        if operator in (exp.LTE, exp.GTE):
184            # For inclusive operators (<=, >=), use strict form for intermediate columns
185            # but keep inclusive form for the last column
186            strict_operator = exp.LT if operator == exp.LTE else exp.GT
187            final_operator = operator  # Keep LTE/GTE for last column
188        else:
189            # For strict operators (<, >), use them throughout
190            strict_operator = operator
191            final_operator = operator
192
193        conditions: t.List[exp.Expr] = []
194        for i in range(len(columns)):
195            # Build equality conditions for all columns before current
196            equality_conditions = [exp.EQ(this=columns[j], expression=values[j]) for j in range(i)]
197
198            # Use the final operator for the last column, strict for others
199            comparison_op = final_operator if i == len(columns) - 1 else strict_operator
200            comparison_condition = comparison_op(this=columns[i], expression=values[i])
201
202            if equality_conditions:
203                conditions.append(exp.and_(*equality_conditions, comparison_condition))
204            else:
205                conditions.append(comparison_condition)
206
207        return exp.or_(*conditions) if len(conditions) > 1 else t.cast(exp.Condition, conditions[0])
208
209    @property
210    def where_filter(self) -> exp.Condition:
211        # Use expanded tuple comparisons for cross-engine compatibility
212        # Native tuple comparisons like (a, b) > (x, y) don't work reliably across all SQL engines
213        columns = [
214            exp.column("updated_ts"),
215            exp.column("name"),
216            exp.column("identifier"),
217        ]
218        start_values = [
219            exp.Literal.number(self.start.updated_ts),
220            exp.Literal.string(self.start.name),
221            exp.Literal.string(self.start.identifier),
222        ]
223
224        start_condition = self._expanded_tuple_comparison(columns, start_values, exp.GT)
225
226        range_filter: exp.Condition
227        if isinstance(self.end, RowBoundary):
228            end_values = [
229                exp.Literal.number(self.end.updated_ts),
230                exp.Literal.string(self.end.name),
231                exp.Literal.string(self.end.identifier),
232            ]
233            end_condition = self._expanded_tuple_comparison(columns, end_values, exp.LTE)
234            range_filter = exp.and_(start_condition, end_condition)
235        else:
236            range_filter = start_condition
237        return range_filter
238
239
240class RowBoundary(PydanticModel):
241    updated_ts: int
242    name: str
243    identifier: str
244
245    @classmethod
246    def lowest_boundary(cls) -> RowBoundary:
247        return RowBoundary(updated_ts=0, name="", identifier="")
248
249    @classmethod
250    def highest_boundary(cls) -> RowBoundary:
251        # 9999-12-31T23:59:59.999Z in epoch milliseconds
252        return RowBoundary(updated_ts=253_402_300_799_999, name="", identifier="")
253
254
255class LimitBoundary(PydanticModel):
256    batch_size: int
257
258    @classmethod
259    def init_batch_boundary(cls, batch_size: int) -> LimitBoundary:
260        return LimitBoundary(batch_size=batch_size)
261
262
263class PromotionResult(PydanticModel):
264    added: t.List[SnapshotTableInfo]
265    removed: t.List[SnapshotTableInfo]
266    removed_environment_naming_info: t.Optional[EnvironmentNamingInfo]
267
268    @field_validator("removed_environment_naming_info")
269    def _validate_removed_environment_naming_info(
270        cls, v: t.Optional[EnvironmentNamingInfo], info: ValidationInfo
271    ) -> t.Optional[EnvironmentNamingInfo]:
272        if v and not info.data.get("removed"):
273            raise ValueError("removed_environment_naming_info must be None if removed is empty")
274        return v
275
276
277class ExpiredSnapshotBatch(PydanticModel):
278    """A batch of expired snapshots to be cleaned up."""
279
280    expired_snapshot_ids: t.Set[SnapshotId]
281    cleanup_tasks: t.List[SnapshotTableCleanupTask]
282    batch_range: ExpiredBatchRange
283
284
285def iter_expired_snapshot_batches(
286    state_reader: StateReader,
287    *,
288    current_ts: int,
289    ignore_ttl: bool = False,
290    batch_size: t.Optional[int] = None,
291) -> t.Iterator[ExpiredSnapshotBatch]:
292    """Yields expired snapshot batches.
293
294    Args:
295        state_reader: StateReader instance to query expired snapshots from.
296        current_ts: Timestamp used to evaluate expiration.
297        ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
298        batch_size: Maximum number of snapshots to fetch per batch.
299    """
300
301    batch_size = batch_size if batch_size is not None else EXPIRED_SNAPSHOT_DEFAULT_BATCH_SIZE
302    batch_range = ExpiredBatchRange.init_batch_range(batch_size=batch_size)
303
304    while True:
305        batch = state_reader.get_expired_snapshots(
306            current_ts=current_ts,
307            ignore_ttl=ignore_ttl,
308            batch_range=batch_range,
309        )
310
311        if batch is None:
312            return
313
314        yield batch
315
316        assert isinstance(batch.batch_range.end, RowBoundary), (
317            "Only RowBoundary is supported for pagination currently"
318        )
319        batch_range = ExpiredBatchRange(
320            start=batch.batch_range.end,
321            end=LimitBoundary(batch_size=batch_size),
322        )
logger = <Logger sqlmesh.core.state_sync.common (WARNING)>
EXPIRED_SNAPSHOT_DEFAULT_BATCH_SIZE = 200
def transactional() -> Callable[[Callable], Callable]:
32def transactional() -> t.Callable[[t.Callable], t.Callable]:
33    def decorator(func: t.Callable) -> t.Callable:
34        @wraps(func)
35        def wrapper(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
36            if not hasattr(self, "_transaction"):
37                return func(self, *args, **kwargs)
38
39            with self._transaction():
40                return func(self, *args, **kwargs)
41
42        return wrapper
43
44    return decorator
def chunk_iterable(iterable: Iterable[~T], size: int = 10) -> Iterable[Iterable[~T]]:
50def chunk_iterable(iterable: t.Iterable[T], size: int = 10) -> t.Iterable[t.Iterable[T]]:
51    iterator = iter(iterable)
52    for first in iterator:
53        yield itertools.chain([first], itertools.islice(iterator, size - 1))
class EnvironmentWithStatements(sqlmesh.utils.pydantic.PydanticModel):
56class EnvironmentWithStatements(PydanticModel):
57    environment: Environment
58    statements: t.List[EnvironmentStatements] = []

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
@dataclass
class VersionsChunk:
61@dataclass
62class VersionsChunk:
63    versions: Versions
VersionsChunk(versions: sqlmesh.core.state_sync.base.Versions)
class SnapshotsChunk:
66class SnapshotsChunk:
67    def __init__(self, items: t.Iterator[Snapshot]):
68        self.items = items
69
70    def __iter__(self) -> t.Iterator[Snapshot]:
71        return self.items
SnapshotsChunk(items: Iterator[sqlmesh.core.snapshot.definition.Snapshot])
67    def __init__(self, items: t.Iterator[Snapshot]):
68        self.items = items
items
class EnvironmentsChunk:
74class EnvironmentsChunk:
75    def __init__(self, items: t.Iterator[EnvironmentWithStatements]):
76        self.items = items
77
78    def __iter__(self) -> t.Iterator[EnvironmentWithStatements]:
79        return self.items
EnvironmentsChunk( items: Iterator[EnvironmentWithStatements])
75    def __init__(self, items: t.Iterator[EnvironmentWithStatements]):
76        self.items = items
items
StateStreamContents = typing.Union[VersionsChunk, SnapshotsChunk, EnvironmentsChunk]
class StateStream(abc.ABC):
 85class StateStream(abc.ABC):
 86    """
 87    Represents a stream of state either going into the StateSync (perhaps loaded from a file)
 88    or out of the StateSync (perhaps being dumped to a file)
 89
 90    Iterating over the stream produces the following chunks:
 91
 92        VersionsChunk: The versions of the objects contained in this StateStream
 93        SnapshotsChunk: Is itself an iterator that streams Snapshot objects. Note that they should be fully populated with any relevant Intervals
 94        EnvironmentsChunk: Is itself an iterator emitting a stream of Environments with any EnvironmentStatements attached
 95
 96    The idea here is to give some structure to the stream and ensure that callers have the opportunity to process all its components while not
 97    needing to worry about the order they are emitted in
 98    """
 99
100    @abc.abstractmethod
101    def __iter__(self) -> t.Iterator[StateStreamContents]:
102        pass
103
104    @classmethod
105    def from_iterators(
106        cls: t.Type["StateStream"],
107        versions: Versions,
108        snapshots: t.Iterator[Snapshot],
109        environments: t.Iterator[EnvironmentWithStatements],
110    ) -> "StateStream":
111        class _StateStream(cls):  # type: ignore
112            def __iter__(self) -> t.Iterator[StateStreamContents]:
113                yield VersionsChunk(versions)
114
115                yield SnapshotsChunk(snapshots)
116
117                yield EnvironmentsChunk(environments)
118
119        return _StateStream()

Represents a stream of state either going into the StateSync (perhaps loaded from a file) or out of the StateSync (perhaps being dumped to a file)

Iterating over the stream produces the following chunks:

VersionsChunk: The versions of the objects contained in this StateStream SnapshotsChunk: Is itself an iterator that streams Snapshot objects. Note that they should be fully populated with any relevant Intervals EnvironmentsChunk: Is itself an iterator emitting a stream of Environments with any EnvironmentStatements attached

The idea here is to give some structure to the stream and ensure that callers have the opportunity to process all its components while not needing to worry about the order they are emitted in

@classmethod
def from_iterators( cls: Type[StateStream], versions: sqlmesh.core.state_sync.base.Versions, snapshots: Iterator[sqlmesh.core.snapshot.definition.Snapshot], environments: Iterator[EnvironmentWithStatements]) -> StateStream:
104    @classmethod
105    def from_iterators(
106        cls: t.Type["StateStream"],
107        versions: Versions,
108        snapshots: t.Iterator[Snapshot],
109        environments: t.Iterator[EnvironmentWithStatements],
110    ) -> "StateStream":
111        class _StateStream(cls):  # type: ignore
112            def __iter__(self) -> t.Iterator[StateStreamContents]:
113                yield VersionsChunk(versions)
114
115                yield SnapshotsChunk(snapshots)
116
117                yield EnvironmentsChunk(environments)
118
119        return _StateStream()
class ExpiredBatchRange(sqlmesh.utils.pydantic.PydanticModel):
122class ExpiredBatchRange(PydanticModel):
123    start: RowBoundary
124    end: t.Union[RowBoundary, LimitBoundary]
125
126    @classmethod
127    def init_batch_range(cls, batch_size: int) -> ExpiredBatchRange:
128        return ExpiredBatchRange(
129            start=RowBoundary.lowest_boundary(),
130            end=LimitBoundary(batch_size=batch_size),
131        )
132
133    @classmethod
134    def all_batch_range(cls) -> ExpiredBatchRange:
135        return ExpiredBatchRange(
136            start=RowBoundary.lowest_boundary(),
137            end=RowBoundary.highest_boundary(),
138        )
139
140    @classmethod
141    def _expanded_tuple_comparison(
142        cls,
143        columns: t.List[exp.Column],
144        values: t.List[t.Union[exp.Literal, exp.Neg]],
145        operator: t.Type[exp.Expr],
146    ) -> exp.Condition:
147        """Generate expanded tuple comparison that works across all SQL engines.
148
149        Converts tuple comparisons like (a, b, c) OP (x, y, z) into an expanded form
150        that's compatible with all SQL engines, since native tuple comparisons have
151        inconsistent support across engines (especially DuckDB, MySQL, SQLite).
152
153        Repro of problem with DuckDB:
154            "SELECT * FROM VALUES(1,'2') as test(a,b) WHERE ((a, b) > (1, 'foo')) AND ((a, b) <= (10, 'baz'))"
155
156        Args:
157            columns: List of column expressions to compare
158            values: List of value expressions to compare against
159            operator: The comparison operator class (exp.GT, exp.GTE, exp.LT, exp.LTE)
160
161        Examples:
162            (a, b, c) > (x, y, z) expands to:
163                a > x OR (a = x AND b > y) OR (a = x AND b = y AND c > z)
164
165            (a, b, c) <= (x, y, z) expands to:
166                a < x OR (a = x AND b < y) OR (a = x AND b = y AND c <= z)
167
168            (a, b, c) >= (x, y, z) expands to:
169                a > x OR (a = x AND b > y) OR (a = x AND b = y AND c >= z)
170
171        Returns:
172            An expanded OR expression representing the tuple comparison
173        """
174        if operator not in (exp.GT, exp.GTE, exp.LT, exp.LTE):
175            raise ValueError(f"Unsupported operator: {operator}. Use GT, GTE, LT, or LTE.")
176
177        # For <= and >=, we use the strict operator for all but the last column
178        # e.g., (a, b) <= (x, y) becomes: a < x OR (a = x AND b <= y)
179        # For < and >, we use the strict operator throughout
180        # e.g., (a, b) > (x, y) becomes: a > x OR (a = x AND b > x)
181        strict_operator: t.Type[exp.Expr]
182        final_operator: t.Type[exp.Expr]
183
184        if operator in (exp.LTE, exp.GTE):
185            # For inclusive operators (<=, >=), use strict form for intermediate columns
186            # but keep inclusive form for the last column
187            strict_operator = exp.LT if operator == exp.LTE else exp.GT
188            final_operator = operator  # Keep LTE/GTE for last column
189        else:
190            # For strict operators (<, >), use them throughout
191            strict_operator = operator
192            final_operator = operator
193
194        conditions: t.List[exp.Expr] = []
195        for i in range(len(columns)):
196            # Build equality conditions for all columns before current
197            equality_conditions = [exp.EQ(this=columns[j], expression=values[j]) for j in range(i)]
198
199            # Use the final operator for the last column, strict for others
200            comparison_op = final_operator if i == len(columns) - 1 else strict_operator
201            comparison_condition = comparison_op(this=columns[i], expression=values[i])
202
203            if equality_conditions:
204                conditions.append(exp.and_(*equality_conditions, comparison_condition))
205            else:
206                conditions.append(comparison_condition)
207
208        return exp.or_(*conditions) if len(conditions) > 1 else t.cast(exp.Condition, conditions[0])
209
210    @property
211    def where_filter(self) -> exp.Condition:
212        # Use expanded tuple comparisons for cross-engine compatibility
213        # Native tuple comparisons like (a, b) > (x, y) don't work reliably across all SQL engines
214        columns = [
215            exp.column("updated_ts"),
216            exp.column("name"),
217            exp.column("identifier"),
218        ]
219        start_values = [
220            exp.Literal.number(self.start.updated_ts),
221            exp.Literal.string(self.start.name),
222            exp.Literal.string(self.start.identifier),
223        ]
224
225        start_condition = self._expanded_tuple_comparison(columns, start_values, exp.GT)
226
227        range_filter: exp.Condition
228        if isinstance(self.end, RowBoundary):
229            end_values = [
230                exp.Literal.number(self.end.updated_ts),
231                exp.Literal.string(self.end.name),
232                exp.Literal.string(self.end.identifier),
233            ]
234            end_condition = self._expanded_tuple_comparison(columns, end_values, exp.LTE)
235            range_filter = exp.and_(start_condition, end_condition)
236        else:
237            range_filter = start_condition
238        return range_filter

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
start: RowBoundary
end: Union[RowBoundary, LimitBoundary]
@classmethod
def init_batch_range(cls, batch_size: int) -> ExpiredBatchRange:
126    @classmethod
127    def init_batch_range(cls, batch_size: int) -> ExpiredBatchRange:
128        return ExpiredBatchRange(
129            start=RowBoundary.lowest_boundary(),
130            end=LimitBoundary(batch_size=batch_size),
131        )
@classmethod
def all_batch_range(cls) -> ExpiredBatchRange:
133    @classmethod
134    def all_batch_range(cls) -> ExpiredBatchRange:
135        return ExpiredBatchRange(
136            start=RowBoundary.lowest_boundary(),
137            end=RowBoundary.highest_boundary(),
138        )
where_filter: sqlglot.expressions.core.Condition
210    @property
211    def where_filter(self) -> exp.Condition:
212        # Use expanded tuple comparisons for cross-engine compatibility
213        # Native tuple comparisons like (a, b) > (x, y) don't work reliably across all SQL engines
214        columns = [
215            exp.column("updated_ts"),
216            exp.column("name"),
217            exp.column("identifier"),
218        ]
219        start_values = [
220            exp.Literal.number(self.start.updated_ts),
221            exp.Literal.string(self.start.name),
222            exp.Literal.string(self.start.identifier),
223        ]
224
225        start_condition = self._expanded_tuple_comparison(columns, start_values, exp.GT)
226
227        range_filter: exp.Condition
228        if isinstance(self.end, RowBoundary):
229            end_values = [
230                exp.Literal.number(self.end.updated_ts),
231                exp.Literal.string(self.end.name),
232                exp.Literal.string(self.end.identifier),
233            ]
234            end_condition = self._expanded_tuple_comparison(columns, end_values, exp.LTE)
235            range_filter = exp.and_(start_condition, end_condition)
236        else:
237            range_filter = start_condition
238        return range_filter
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class RowBoundary(sqlmesh.utils.pydantic.PydanticModel):
241class RowBoundary(PydanticModel):
242    updated_ts: int
243    name: str
244    identifier: str
245
246    @classmethod
247    def lowest_boundary(cls) -> RowBoundary:
248        return RowBoundary(updated_ts=0, name="", identifier="")
249
250    @classmethod
251    def highest_boundary(cls) -> RowBoundary:
252        # 9999-12-31T23:59:59.999Z in epoch milliseconds
253        return RowBoundary(updated_ts=253_402_300_799_999, name="", identifier="")

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
updated_ts: int
name: str
identifier: str
@classmethod
def lowest_boundary(cls) -> RowBoundary:
246    @classmethod
247    def lowest_boundary(cls) -> RowBoundary:
248        return RowBoundary(updated_ts=0, name="", identifier="")
@classmethod
def highest_boundary(cls) -> RowBoundary:
250    @classmethod
251    def highest_boundary(cls) -> RowBoundary:
252        # 9999-12-31T23:59:59.999Z in epoch milliseconds
253        return RowBoundary(updated_ts=253_402_300_799_999, name="", identifier="")
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class LimitBoundary(sqlmesh.utils.pydantic.PydanticModel):
256class LimitBoundary(PydanticModel):
257    batch_size: int
258
259    @classmethod
260    def init_batch_boundary(cls, batch_size: int) -> LimitBoundary:
261        return LimitBoundary(batch_size=batch_size)

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
batch_size: int
@classmethod
def init_batch_boundary(cls, batch_size: int) -> LimitBoundary:
259    @classmethod
260    def init_batch_boundary(cls, batch_size: int) -> LimitBoundary:
261        return LimitBoundary(batch_size=batch_size)
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class PromotionResult(sqlmesh.utils.pydantic.PydanticModel):
264class PromotionResult(PydanticModel):
265    added: t.List[SnapshotTableInfo]
266    removed: t.List[SnapshotTableInfo]
267    removed_environment_naming_info: t.Optional[EnvironmentNamingInfo]
268
269    @field_validator("removed_environment_naming_info")
270    def _validate_removed_environment_naming_info(
271        cls, v: t.Optional[EnvironmentNamingInfo], info: ValidationInfo
272    ) -> t.Optional[EnvironmentNamingInfo]:
273        if v and not info.data.get("removed"):
274            raise ValueError("removed_environment_naming_info must be None if removed is empty")
275        return v

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
removed_environment_naming_info: Optional[sqlmesh.core.environment.EnvironmentNamingInfo]
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class ExpiredSnapshotBatch(sqlmesh.utils.pydantic.PydanticModel):
278class ExpiredSnapshotBatch(PydanticModel):
279    """A batch of expired snapshots to be cleaned up."""
280
281    expired_snapshot_ids: t.Set[SnapshotId]
282    cleanup_tasks: t.List[SnapshotTableCleanupTask]
283    batch_range: ExpiredBatchRange

A batch of expired snapshots to be cleaned up.

expired_snapshot_ids: Set[sqlmesh.core.snapshot.definition.SnapshotId]
batch_range: ExpiredBatchRange
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
def iter_expired_snapshot_batches( state_reader: sqlmesh.core.state_sync.base.StateReader, *, current_ts: int, ignore_ttl: bool = False, batch_size: Optional[int] = None) -> Iterator[ExpiredSnapshotBatch]:
286def iter_expired_snapshot_batches(
287    state_reader: StateReader,
288    *,
289    current_ts: int,
290    ignore_ttl: bool = False,
291    batch_size: t.Optional[int] = None,
292) -> t.Iterator[ExpiredSnapshotBatch]:
293    """Yields expired snapshot batches.
294
295    Args:
296        state_reader: StateReader instance to query expired snapshots from.
297        current_ts: Timestamp used to evaluate expiration.
298        ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
299        batch_size: Maximum number of snapshots to fetch per batch.
300    """
301
302    batch_size = batch_size if batch_size is not None else EXPIRED_SNAPSHOT_DEFAULT_BATCH_SIZE
303    batch_range = ExpiredBatchRange.init_batch_range(batch_size=batch_size)
304
305    while True:
306        batch = state_reader.get_expired_snapshots(
307            current_ts=current_ts,
308            ignore_ttl=ignore_ttl,
309            batch_range=batch_range,
310        )
311
312        if batch is None:
313            return
314
315        yield batch
316
317        assert isinstance(batch.batch_range.end, RowBoundary), (
318            "Only RowBoundary is supported for pagination currently"
319        )
320        batch_range = ExpiredBatchRange(
321            start=batch.batch_range.end,
322            end=LimitBoundary(batch_size=batch_size),
323        )

Yields expired snapshot batches.

Arguments:
  • state_reader: StateReader instance to query expired snapshots from.
  • current_ts: Timestamp used to evaluate expiration.
  • ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
  • batch_size: Maximum number of snapshots to fetch per batch.