Edit on GitHub

ContextDiff

ContextDiff encapsulates the differences between two environments. The two environments can be the local environment and a remote environment, or two remote environments. ContextDiff is an important part of SQLMesh. SQLMesh plans use ContextDiff to determine what nodes were changed between two environments. The SQLMesh CLI diff command uses ContextDiff to determine what to visualize.

When creating a ContextDiff object, SQLMesh will compare the snapshots from one environment with those of another remote environment and determine if nodes have been added, removed, or modified.

  1"""
  2# ContextDiff
  3
  4ContextDiff encapsulates the differences between two environments. The two environments can be the local
  5environment and a remote environment, or two remote environments. ContextDiff is an important part of
  6SQLMesh. SQLMesh plans use ContextDiff to determine what nodes were changed between two environments.
  7The SQLMesh CLI diff command uses ContextDiff to determine what to visualize.
  8
  9When creating a ContextDiff object, SQLMesh will compare the snapshots from one environment with those of
 10another remote environment and determine if nodes have been added, removed, or modified.
 11"""
 12
 13from __future__ import annotations
 14
 15import logging
 16import typing as t
 17from functools import cached_property
 18
 19from sqlmesh.core.snapshot import Snapshot, SnapshotId, SnapshotTableInfo
 20from sqlmesh.utils.errors import SQLMeshError
 21from sqlmesh.utils.pydantic import PydanticModel
 22
 23if t.TYPE_CHECKING:
 24    from sqlmesh.core.state_sync import StateReader
 25
 26logger = logging.getLogger(__name__)
 27
 28
 29class ContextDiff(PydanticModel):
 30    """ContextDiff is an object representing the difference between two environments.
 31
 32    The two environments can be the local environment and a remote environment, or two remote
 33    environments.
 34    """
 35
 36    environment: str
 37    """The environment to diff."""
 38    is_new_environment: bool
 39    """Whether the target environment is new."""
 40    is_unfinalized_environment: bool
 41    """Whether the currently stored environment record is in unfinalized state."""
 42    create_from: str
 43    """The name of the environment the target environment will be created from if new."""
 44    added: t.Set[SnapshotId]
 45    """New nodes."""
 46    removed_snapshots: t.Dict[SnapshotId, SnapshotTableInfo]
 47    """Deleted nodes."""
 48    modified_snapshots: t.Dict[str, t.Tuple[Snapshot, Snapshot]]
 49    """Modified snapshots."""
 50    snapshots: t.Dict[SnapshotId, Snapshot]
 51    """Merged snapshots."""
 52    new_snapshots: t.Dict[SnapshotId, Snapshot]
 53    """New snapshots."""
 54    previous_plan_id: t.Optional[str]
 55    """Previous plan id."""
 56    previously_promoted_snapshot_ids: t.Set[SnapshotId]
 57    """Snapshot IDs that were promoted by the previous plan."""
 58    previous_finalized_snapshots: t.Optional[t.List[SnapshotTableInfo]]
 59    """Snapshots from the previous finalized state."""
 60
 61    @classmethod
 62    def create(
 63        cls,
 64        environment: str,
 65        snapshots: t.Dict[str, Snapshot],
 66        create_from: str,
 67        state_reader: StateReader,
 68        ensure_finalized_snapshots: bool = False,
 69    ) -> ContextDiff:
 70        """Create a ContextDiff object.
 71
 72        Args:
 73            environment: The remote environment to diff.
 74            snapshots: The snapshots of the current environment.
 75            create_from: The environment to create the target environment from if it
 76                doesn't exist.
 77            state_reader: StateReader to access the remote environment to diff.
 78            ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized
 79                environment state, or to use whatever snapshots are in the current environment state even if
 80                the environment is not finalized.
 81
 82        Returns:
 83            The ContextDiff object.
 84        """
 85        environment = environment.lower()
 86        env = state_reader.get_environment(environment)
 87
 88        if env is None:
 89            env = state_reader.get_environment(create_from.lower())
 90            is_new_environment = True
 91            previously_promoted_snapshot_ids = set()
 92        else:
 93            is_new_environment = False
 94            previously_promoted_snapshot_ids = {s.snapshot_id for s in env.promoted_snapshots}
 95
 96        environment_snapshot_infos = []
 97        if env:
 98            environment_snapshot_infos = (
 99                env.snapshots
100                if not ensure_finalized_snapshots
101                else env.finalized_or_current_snapshots
102            )
103        remote_snapshot_name_to_info = {
104            snapshot_info.name: snapshot_info for snapshot_info in environment_snapshot_infos
105        }
106        removed = {
107            snapshot_table_info.snapshot_id: snapshot_table_info
108            for snapshot_table_info in environment_snapshot_infos
109            if snapshot_table_info.name not in snapshots
110        }
111        added = {
112            snapshot.snapshot_id
113            for snapshot in snapshots.values()
114            if snapshot.name not in remote_snapshot_name_to_info
115        }
116        modified_snapshot_name_to_snapshot_info = {
117            snapshot.name: remote_snapshot_name_to_info[snapshot.name]
118            for snapshot in snapshots.values()
119            if snapshot.snapshot_id not in added
120            and snapshot.fingerprint != remote_snapshot_name_to_info[snapshot.name].fingerprint
121        }
122        modified_local_seed_snapshot_ids = {
123            s.snapshot_id
124            for s in snapshots.values()
125            if s.is_seed and s.name in modified_snapshot_name_to_snapshot_info
126        }
127        modified_remote_snapshot_ids = {
128            s.snapshot_id for s in modified_snapshot_name_to_snapshot_info.values()
129        }
130
131        stored = {
132            **state_reader.get_snapshots(
133                [
134                    snapshot.snapshot_id
135                    for snapshot in snapshots.values()
136                    if snapshot.snapshot_id not in modified_local_seed_snapshot_ids
137                ]
138            ),
139            **state_reader.get_snapshots(
140                modified_remote_snapshot_ids | modified_local_seed_snapshot_ids,
141                hydrate_seeds=True,
142            ),
143        }
144
145        merged_snapshots = {}
146        modified_snapshots = {}
147        new_snapshots = {}
148
149        for snapshot in snapshots.values():
150            s_id = snapshot.snapshot_id
151            modified_snapshot_info = modified_snapshot_name_to_snapshot_info.get(snapshot.name)
152            existing_snapshot = stored.get(s_id)
153
154            if modified_snapshot_info and snapshot.node_type != modified_snapshot_info.node_type:
155                added.add(snapshot.snapshot_id)
156                removed[modified_snapshot_info.snapshot_id] = modified_snapshot_info
157                modified_snapshot_name_to_snapshot_info.pop(snapshot.name)
158            elif existing_snapshot:
159                # Keep the original node instance to preserve the query cache.
160                existing_snapshot.node = snapshot.node
161
162                merged_snapshots[s_id] = existing_snapshot.copy()
163                if modified_snapshot_info:
164                    modified_snapshots[s_id.name] = (
165                        existing_snapshot,
166                        stored[modified_snapshot_info.snapshot_id],
167                    )
168            else:
169                snapshot = snapshot.copy()
170                merged_snapshots[s_id] = snapshot
171                new_snapshots[snapshot.snapshot_id] = snapshot
172                if modified_snapshot_info:
173                    snapshot.previous_versions = modified_snapshot_info.all_versions
174                    modified_snapshots[s_id.name] = (
175                        snapshot,
176                        stored[modified_snapshot_info.snapshot_id],
177                    )
178
179        return ContextDiff(
180            environment=environment,
181            is_new_environment=is_new_environment,
182            is_unfinalized_environment=bool(env and not env.finalized_ts),
183            create_from=create_from,
184            added=added,
185            removed_snapshots=removed,
186            modified_snapshots=modified_snapshots,
187            snapshots=merged_snapshots,
188            new_snapshots=new_snapshots,
189            previous_plan_id=env.plan_id if env and not is_new_environment else None,
190            previously_promoted_snapshot_ids=previously_promoted_snapshot_ids,
191            previous_finalized_snapshots=env.previous_finalized_snapshots if env else None,
192        )
193
194    @classmethod
195    def create_no_diff(cls, environment: str) -> ContextDiff:
196        """Create a no-op ContextDiff object.
197
198        Args:
199            environment: The environment to diff.
200
201        Returns:
202            The ContextDiff object.
203        """
204        return ContextDiff(
205            environment=environment,
206            is_new_environment=False,
207            is_unfinalized_environment=False,
208            create_from="",
209            added=set(),
210            removed_snapshots={},
211            modified_snapshots={},
212            snapshots={},
213            new_snapshots={},
214            previous_plan_id=None,
215            previously_promoted_snapshot_ids=set(),
216            previous_finalized_snapshots=None,
217        )
218
219    @property
220    def has_changes(self) -> bool:
221        return (
222            self.has_snapshot_changes or self.is_new_environment or self.is_unfinalized_environment
223        )
224
225    @property
226    def has_snapshot_changes(self) -> bool:
227        return bool(self.added or self.removed_snapshots or self.modified_snapshots)
228
229    @property
230    def added_materialized_snapshot_ids(self) -> t.Set[SnapshotId]:
231        """Returns the set of added internal snapshot ids."""
232        return {
233            s_id
234            for s_id in self.added
235            if self.snapshots[s_id].model_kind_name
236            and self.snapshots[s_id].model_kind_name.is_materialized  # type: ignore
237        }
238
239    @property
240    def promotable_snapshot_ids(self) -> t.Set[SnapshotId]:
241        """The set of snapshot ids that have to be promoted in the target environment."""
242        return {
243            *self.previously_promoted_snapshot_ids,
244            *self.added,
245            *self.current_modified_snapshot_ids,
246        } - set(self.removed_snapshots)
247
248    @property
249    def unpromoted_models(self) -> t.Set[SnapshotId]:
250        """The set of snapshot IDs that have not yet been promoted in the target environment."""
251        return set(self.snapshots) - self.previously_promoted_snapshot_ids
252
253    @property
254    def current_modified_snapshot_ids(self) -> t.Set[SnapshotId]:
255        return {current.snapshot_id for current, _ in self.modified_snapshots.values()}
256
257    @cached_property
258    def snapshots_by_name(self) -> t.Dict[str, Snapshot]:
259        return {x.name: x for x in self.snapshots.values()}
260
261    @property
262    def environment_snapshots(self) -> t.List[SnapshotTableInfo]:
263        """Returns current snapshots in the environment."""
264        return [
265            *self.removed_snapshots.values(),
266            *(old.table_info for _, old in self.modified_snapshots.values()),
267            *[
268                s.table_info
269                for s_id, s in self.snapshots.items()
270                if s_id not in self.added and s.name not in self.modified_snapshots
271            ],
272        ]
273
274    def directly_modified(self, name: str) -> bool:
275        """Returns whether or not a node was directly modified in this context.
276
277        Args:
278            name: The snapshot name to check.
279
280        Returns:
281            Whether or not the node was directly modified.
282        """
283
284        if name not in self.modified_snapshots:
285            return False
286
287        current, previous = self.modified_snapshots[name]
288        return current.fingerprint.data_hash != previous.fingerprint.data_hash
289
290    def indirectly_modified(self, name: str) -> bool:
291        """Returns whether or not a node was indirectly modified in this context.
292
293        Args:
294            name: The snapshot name to check.
295
296        Returns:
297            Whether or not the node was indirectly modified.
298        """
299
300        if name not in self.modified_snapshots:
301            return False
302
303        current, previous = self.modified_snapshots[name]
304        return (
305            current.fingerprint.data_hash == previous.fingerprint.data_hash
306            and current.fingerprint.parent_data_hash != previous.fingerprint.parent_data_hash
307        )
308
309    def metadata_updated(self, name: str) -> bool:
310        """Returns whether or not the given node's metadata has been updated.
311
312        Args:
313            name: The node to check.
314
315        Returns:
316            Whether or not the node's metadata has been updated.
317        """
318
319        if name not in self.modified_snapshots:
320            return False
321
322        current, previous = self.modified_snapshots[name]
323        return current.fingerprint.metadata_hash != previous.fingerprint.metadata_hash
324
325    def text_diff(self, name: str) -> str:
326        """Finds the difference of a node between the current and remote environment.
327
328        Args:
329            name: The Snapshot name.
330
331        Returns:
332            A unified text diff of the node.
333        """
334        if name not in self.snapshots_by_name:
335            raise SQLMeshError(f"`{name}` does not exist.")
336        if name not in self.modified_snapshots:
337            return ""
338
339        new, old = self.modified_snapshots[name]
340        try:
341            return old.node.text_diff(new.node)
342        except SQLMeshError as e:
343            logger.warning("Failed to diff model '%s': %s", name, str(e))
344            return ""
class ContextDiff(sqlmesh.utils.pydantic.PydanticModel):
 30class ContextDiff(PydanticModel):
 31    """ContextDiff is an object representing the difference between two environments.
 32
 33    The two environments can be the local environment and a remote environment, or two remote
 34    environments.
 35    """
 36
 37    environment: str
 38    """The environment to diff."""
 39    is_new_environment: bool
 40    """Whether the target environment is new."""
 41    is_unfinalized_environment: bool
 42    """Whether the currently stored environment record is in unfinalized state."""
 43    create_from: str
 44    """The name of the environment the target environment will be created from if new."""
 45    added: t.Set[SnapshotId]
 46    """New nodes."""
 47    removed_snapshots: t.Dict[SnapshotId, SnapshotTableInfo]
 48    """Deleted nodes."""
 49    modified_snapshots: t.Dict[str, t.Tuple[Snapshot, Snapshot]]
 50    """Modified snapshots."""
 51    snapshots: t.Dict[SnapshotId, Snapshot]
 52    """Merged snapshots."""
 53    new_snapshots: t.Dict[SnapshotId, Snapshot]
 54    """New snapshots."""
 55    previous_plan_id: t.Optional[str]
 56    """Previous plan id."""
 57    previously_promoted_snapshot_ids: t.Set[SnapshotId]
 58    """Snapshot IDs that were promoted by the previous plan."""
 59    previous_finalized_snapshots: t.Optional[t.List[SnapshotTableInfo]]
 60    """Snapshots from the previous finalized state."""
 61
 62    @classmethod
 63    def create(
 64        cls,
 65        environment: str,
 66        snapshots: t.Dict[str, Snapshot],
 67        create_from: str,
 68        state_reader: StateReader,
 69        ensure_finalized_snapshots: bool = False,
 70    ) -> ContextDiff:
 71        """Create a ContextDiff object.
 72
 73        Args:
 74            environment: The remote environment to diff.
 75            snapshots: The snapshots of the current environment.
 76            create_from: The environment to create the target environment from if it
 77                doesn't exist.
 78            state_reader: StateReader to access the remote environment to diff.
 79            ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized
 80                environment state, or to use whatever snapshots are in the current environment state even if
 81                the environment is not finalized.
 82
 83        Returns:
 84            The ContextDiff object.
 85        """
 86        environment = environment.lower()
 87        env = state_reader.get_environment(environment)
 88
 89        if env is None:
 90            env = state_reader.get_environment(create_from.lower())
 91            is_new_environment = True
 92            previously_promoted_snapshot_ids = set()
 93        else:
 94            is_new_environment = False
 95            previously_promoted_snapshot_ids = {s.snapshot_id for s in env.promoted_snapshots}
 96
 97        environment_snapshot_infos = []
 98        if env:
 99            environment_snapshot_infos = (
100                env.snapshots
101                if not ensure_finalized_snapshots
102                else env.finalized_or_current_snapshots
103            )
104        remote_snapshot_name_to_info = {
105            snapshot_info.name: snapshot_info for snapshot_info in environment_snapshot_infos
106        }
107        removed = {
108            snapshot_table_info.snapshot_id: snapshot_table_info
109            for snapshot_table_info in environment_snapshot_infos
110            if snapshot_table_info.name not in snapshots
111        }
112        added = {
113            snapshot.snapshot_id
114            for snapshot in snapshots.values()
115            if snapshot.name not in remote_snapshot_name_to_info
116        }
117        modified_snapshot_name_to_snapshot_info = {
118            snapshot.name: remote_snapshot_name_to_info[snapshot.name]
119            for snapshot in snapshots.values()
120            if snapshot.snapshot_id not in added
121            and snapshot.fingerprint != remote_snapshot_name_to_info[snapshot.name].fingerprint
122        }
123        modified_local_seed_snapshot_ids = {
124            s.snapshot_id
125            for s in snapshots.values()
126            if s.is_seed and s.name in modified_snapshot_name_to_snapshot_info
127        }
128        modified_remote_snapshot_ids = {
129            s.snapshot_id for s in modified_snapshot_name_to_snapshot_info.values()
130        }
131
132        stored = {
133            **state_reader.get_snapshots(
134                [
135                    snapshot.snapshot_id
136                    for snapshot in snapshots.values()
137                    if snapshot.snapshot_id not in modified_local_seed_snapshot_ids
138                ]
139            ),
140            **state_reader.get_snapshots(
141                modified_remote_snapshot_ids | modified_local_seed_snapshot_ids,
142                hydrate_seeds=True,
143            ),
144        }
145
146        merged_snapshots = {}
147        modified_snapshots = {}
148        new_snapshots = {}
149
150        for snapshot in snapshots.values():
151            s_id = snapshot.snapshot_id
152            modified_snapshot_info = modified_snapshot_name_to_snapshot_info.get(snapshot.name)
153            existing_snapshot = stored.get(s_id)
154
155            if modified_snapshot_info and snapshot.node_type != modified_snapshot_info.node_type:
156                added.add(snapshot.snapshot_id)
157                removed[modified_snapshot_info.snapshot_id] = modified_snapshot_info
158                modified_snapshot_name_to_snapshot_info.pop(snapshot.name)
159            elif existing_snapshot:
160                # Keep the original node instance to preserve the query cache.
161                existing_snapshot.node = snapshot.node
162
163                merged_snapshots[s_id] = existing_snapshot.copy()
164                if modified_snapshot_info:
165                    modified_snapshots[s_id.name] = (
166                        existing_snapshot,
167                        stored[modified_snapshot_info.snapshot_id],
168                    )
169            else:
170                snapshot = snapshot.copy()
171                merged_snapshots[s_id] = snapshot
172                new_snapshots[snapshot.snapshot_id] = snapshot
173                if modified_snapshot_info:
174                    snapshot.previous_versions = modified_snapshot_info.all_versions
175                    modified_snapshots[s_id.name] = (
176                        snapshot,
177                        stored[modified_snapshot_info.snapshot_id],
178                    )
179
180        return ContextDiff(
181            environment=environment,
182            is_new_environment=is_new_environment,
183            is_unfinalized_environment=bool(env and not env.finalized_ts),
184            create_from=create_from,
185            added=added,
186            removed_snapshots=removed,
187            modified_snapshots=modified_snapshots,
188            snapshots=merged_snapshots,
189            new_snapshots=new_snapshots,
190            previous_plan_id=env.plan_id if env and not is_new_environment else None,
191            previously_promoted_snapshot_ids=previously_promoted_snapshot_ids,
192            previous_finalized_snapshots=env.previous_finalized_snapshots if env else None,
193        )
194
195    @classmethod
196    def create_no_diff(cls, environment: str) -> ContextDiff:
197        """Create a no-op ContextDiff object.
198
199        Args:
200            environment: The environment to diff.
201
202        Returns:
203            The ContextDiff object.
204        """
205        return ContextDiff(
206            environment=environment,
207            is_new_environment=False,
208            is_unfinalized_environment=False,
209            create_from="",
210            added=set(),
211            removed_snapshots={},
212            modified_snapshots={},
213            snapshots={},
214            new_snapshots={},
215            previous_plan_id=None,
216            previously_promoted_snapshot_ids=set(),
217            previous_finalized_snapshots=None,
218        )
219
220    @property
221    def has_changes(self) -> bool:
222        return (
223            self.has_snapshot_changes or self.is_new_environment or self.is_unfinalized_environment
224        )
225
226    @property
227    def has_snapshot_changes(self) -> bool:
228        return bool(self.added or self.removed_snapshots or self.modified_snapshots)
229
230    @property
231    def added_materialized_snapshot_ids(self) -> t.Set[SnapshotId]:
232        """Returns the set of added internal snapshot ids."""
233        return {
234            s_id
235            for s_id in self.added
236            if self.snapshots[s_id].model_kind_name
237            and self.snapshots[s_id].model_kind_name.is_materialized  # type: ignore
238        }
239
240    @property
241    def promotable_snapshot_ids(self) -> t.Set[SnapshotId]:
242        """The set of snapshot ids that have to be promoted in the target environment."""
243        return {
244            *self.previously_promoted_snapshot_ids,
245            *self.added,
246            *self.current_modified_snapshot_ids,
247        } - set(self.removed_snapshots)
248
249    @property
250    def unpromoted_models(self) -> t.Set[SnapshotId]:
251        """The set of snapshot IDs that have not yet been promoted in the target environment."""
252        return set(self.snapshots) - self.previously_promoted_snapshot_ids
253
254    @property
255    def current_modified_snapshot_ids(self) -> t.Set[SnapshotId]:
256        return {current.snapshot_id for current, _ in self.modified_snapshots.values()}
257
258    @cached_property
259    def snapshots_by_name(self) -> t.Dict[str, Snapshot]:
260        return {x.name: x for x in self.snapshots.values()}
261
262    @property
263    def environment_snapshots(self) -> t.List[SnapshotTableInfo]:
264        """Returns current snapshots in the environment."""
265        return [
266            *self.removed_snapshots.values(),
267            *(old.table_info for _, old in self.modified_snapshots.values()),
268            *[
269                s.table_info
270                for s_id, s in self.snapshots.items()
271                if s_id not in self.added and s.name not in self.modified_snapshots
272            ],
273        ]
274
275    def directly_modified(self, name: str) -> bool:
276        """Returns whether or not a node was directly modified in this context.
277
278        Args:
279            name: The snapshot name to check.
280
281        Returns:
282            Whether or not the node was directly modified.
283        """
284
285        if name not in self.modified_snapshots:
286            return False
287
288        current, previous = self.modified_snapshots[name]
289        return current.fingerprint.data_hash != previous.fingerprint.data_hash
290
291    def indirectly_modified(self, name: str) -> bool:
292        """Returns whether or not a node was indirectly modified in this context.
293
294        Args:
295            name: The snapshot name to check.
296
297        Returns:
298            Whether or not the node was indirectly modified.
299        """
300
301        if name not in self.modified_snapshots:
302            return False
303
304        current, previous = self.modified_snapshots[name]
305        return (
306            current.fingerprint.data_hash == previous.fingerprint.data_hash
307            and current.fingerprint.parent_data_hash != previous.fingerprint.parent_data_hash
308        )
309
310    def metadata_updated(self, name: str) -> bool:
311        """Returns whether or not the given node's metadata has been updated.
312
313        Args:
314            name: The node to check.
315
316        Returns:
317            Whether or not the node's metadata has been updated.
318        """
319
320        if name not in self.modified_snapshots:
321            return False
322
323        current, previous = self.modified_snapshots[name]
324        return current.fingerprint.metadata_hash != previous.fingerprint.metadata_hash
325
326    def text_diff(self, name: str) -> str:
327        """Finds the difference of a node between the current and remote environment.
328
329        Args:
330            name: The Snapshot name.
331
332        Returns:
333            A unified text diff of the node.
334        """
335        if name not in self.snapshots_by_name:
336            raise SQLMeshError(f"`{name}` does not exist.")
337        if name not in self.modified_snapshots:
338            return ""
339
340        new, old = self.modified_snapshots[name]
341        try:
342            return old.node.text_diff(new.node)
343        except SQLMeshError as e:
344            logger.warning("Failed to diff model '%s': %s", name, str(e))
345            return ""

ContextDiff is an object representing the difference between two environments.

The two environments can be the local environment and a remote environment, or two remote environments.

environment: str

The environment to diff.

is_new_environment: bool

Whether the target environment is new.

is_unfinalized_environment: bool

Whether the currently stored environment record is in unfinalized state.

create_from: str

The name of the environment the target environment will be created from if new.

Modified snapshots.

previous_plan_id: Union[str, NoneType]

Previous plan id.

previously_promoted_snapshot_ids: Set[sqlmesh.core.snapshot.definition.SnapshotId]

Snapshot IDs that were promoted by the previous plan.

previous_finalized_snapshots: Union[List[sqlmesh.core.snapshot.definition.SnapshotTableInfo], NoneType]

Snapshots from the previous finalized state.

@classmethod
def create( cls, environment: str, snapshots: Dict[str, sqlmesh.core.snapshot.definition.Snapshot], create_from: str, state_reader: sqlmesh.core.state_sync.base.StateReader, ensure_finalized_snapshots: bool = False) -> sqlmesh.core.context_diff.ContextDiff:
 62    @classmethod
 63    def create(
 64        cls,
 65        environment: str,
 66        snapshots: t.Dict[str, Snapshot],
 67        create_from: str,
 68        state_reader: StateReader,
 69        ensure_finalized_snapshots: bool = False,
 70    ) -> ContextDiff:
 71        """Create a ContextDiff object.
 72
 73        Args:
 74            environment: The remote environment to diff.
 75            snapshots: The snapshots of the current environment.
 76            create_from: The environment to create the target environment from if it
 77                doesn't exist.
 78            state_reader: StateReader to access the remote environment to diff.
 79            ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized
 80                environment state, or to use whatever snapshots are in the current environment state even if
 81                the environment is not finalized.
 82
 83        Returns:
 84            The ContextDiff object.
 85        """
 86        environment = environment.lower()
 87        env = state_reader.get_environment(environment)
 88
 89        if env is None:
 90            env = state_reader.get_environment(create_from.lower())
 91            is_new_environment = True
 92            previously_promoted_snapshot_ids = set()
 93        else:
 94            is_new_environment = False
 95            previously_promoted_snapshot_ids = {s.snapshot_id for s in env.promoted_snapshots}
 96
 97        environment_snapshot_infos = []
 98        if env:
 99            environment_snapshot_infos = (
100                env.snapshots
101                if not ensure_finalized_snapshots
102                else env.finalized_or_current_snapshots
103            )
104        remote_snapshot_name_to_info = {
105            snapshot_info.name: snapshot_info for snapshot_info in environment_snapshot_infos
106        }
107        removed = {
108            snapshot_table_info.snapshot_id: snapshot_table_info
109            for snapshot_table_info in environment_snapshot_infos
110            if snapshot_table_info.name not in snapshots
111        }
112        added = {
113            snapshot.snapshot_id
114            for snapshot in snapshots.values()
115            if snapshot.name not in remote_snapshot_name_to_info
116        }
117        modified_snapshot_name_to_snapshot_info = {
118            snapshot.name: remote_snapshot_name_to_info[snapshot.name]
119            for snapshot in snapshots.values()
120            if snapshot.snapshot_id not in added
121            and snapshot.fingerprint != remote_snapshot_name_to_info[snapshot.name].fingerprint
122        }
123        modified_local_seed_snapshot_ids = {
124            s.snapshot_id
125            for s in snapshots.values()
126            if s.is_seed and s.name in modified_snapshot_name_to_snapshot_info
127        }
128        modified_remote_snapshot_ids = {
129            s.snapshot_id for s in modified_snapshot_name_to_snapshot_info.values()
130        }
131
132        stored = {
133            **state_reader.get_snapshots(
134                [
135                    snapshot.snapshot_id
136                    for snapshot in snapshots.values()
137                    if snapshot.snapshot_id not in modified_local_seed_snapshot_ids
138                ]
139            ),
140            **state_reader.get_snapshots(
141                modified_remote_snapshot_ids | modified_local_seed_snapshot_ids,
142                hydrate_seeds=True,
143            ),
144        }
145
146        merged_snapshots = {}
147        modified_snapshots = {}
148        new_snapshots = {}
149
150        for snapshot in snapshots.values():
151            s_id = snapshot.snapshot_id
152            modified_snapshot_info = modified_snapshot_name_to_snapshot_info.get(snapshot.name)
153            existing_snapshot = stored.get(s_id)
154
155            if modified_snapshot_info and snapshot.node_type != modified_snapshot_info.node_type:
156                added.add(snapshot.snapshot_id)
157                removed[modified_snapshot_info.snapshot_id] = modified_snapshot_info
158                modified_snapshot_name_to_snapshot_info.pop(snapshot.name)
159            elif existing_snapshot:
160                # Keep the original node instance to preserve the query cache.
161                existing_snapshot.node = snapshot.node
162
163                merged_snapshots[s_id] = existing_snapshot.copy()
164                if modified_snapshot_info:
165                    modified_snapshots[s_id.name] = (
166                        existing_snapshot,
167                        stored[modified_snapshot_info.snapshot_id],
168                    )
169            else:
170                snapshot = snapshot.copy()
171                merged_snapshots[s_id] = snapshot
172                new_snapshots[snapshot.snapshot_id] = snapshot
173                if modified_snapshot_info:
174                    snapshot.previous_versions = modified_snapshot_info.all_versions
175                    modified_snapshots[s_id.name] = (
176                        snapshot,
177                        stored[modified_snapshot_info.snapshot_id],
178                    )
179
180        return ContextDiff(
181            environment=environment,
182            is_new_environment=is_new_environment,
183            is_unfinalized_environment=bool(env and not env.finalized_ts),
184            create_from=create_from,
185            added=added,
186            removed_snapshots=removed,
187            modified_snapshots=modified_snapshots,
188            snapshots=merged_snapshots,
189            new_snapshots=new_snapshots,
190            previous_plan_id=env.plan_id if env and not is_new_environment else None,
191            previously_promoted_snapshot_ids=previously_promoted_snapshot_ids,
192            previous_finalized_snapshots=env.previous_finalized_snapshots if env else None,
193        )

Create a ContextDiff object.

Arguments:
  • environment: The remote environment to diff.
  • snapshots: The snapshots of the current environment.
  • create_from: The environment to create the target environment from if it doesn't exist.
  • state_reader: StateReader to access the remote environment to diff.
  • ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
Returns:

The ContextDiff object.

@classmethod
def create_no_diff(cls, environment: str) -> sqlmesh.core.context_diff.ContextDiff:
195    @classmethod
196    def create_no_diff(cls, environment: str) -> ContextDiff:
197        """Create a no-op ContextDiff object.
198
199        Args:
200            environment: The environment to diff.
201
202        Returns:
203            The ContextDiff object.
204        """
205        return ContextDiff(
206            environment=environment,
207            is_new_environment=False,
208            is_unfinalized_environment=False,
209            create_from="",
210            added=set(),
211            removed_snapshots={},
212            modified_snapshots={},
213            snapshots={},
214            new_snapshots={},
215            previous_plan_id=None,
216            previously_promoted_snapshot_ids=set(),
217            previous_finalized_snapshots=None,
218        )

Create a no-op ContextDiff object.

Arguments:
  • environment: The environment to diff.
Returns:

The ContextDiff object.

added_materialized_snapshot_ids: Set[sqlmesh.core.snapshot.definition.SnapshotId]

Returns the set of added internal snapshot ids.

promotable_snapshot_ids: Set[sqlmesh.core.snapshot.definition.SnapshotId]

The set of snapshot ids that have to be promoted in the target environment.

The set of snapshot IDs that have not yet been promoted in the target environment.

Returns current snapshots in the environment.

def directly_modified(self, name: str) -> bool:
275    def directly_modified(self, name: str) -> bool:
276        """Returns whether or not a node was directly modified in this context.
277
278        Args:
279            name: The snapshot name to check.
280
281        Returns:
282            Whether or not the node was directly modified.
283        """
284
285        if name not in self.modified_snapshots:
286            return False
287
288        current, previous = self.modified_snapshots[name]
289        return current.fingerprint.data_hash != previous.fingerprint.data_hash

Returns whether or not a node was directly modified in this context.

Arguments:
  • name: The snapshot name to check.
Returns:

Whether or not the node was directly modified.

def indirectly_modified(self, name: str) -> bool:
291    def indirectly_modified(self, name: str) -> bool:
292        """Returns whether or not a node was indirectly modified in this context.
293
294        Args:
295            name: The snapshot name to check.
296
297        Returns:
298            Whether or not the node was indirectly modified.
299        """
300
301        if name not in self.modified_snapshots:
302            return False
303
304        current, previous = self.modified_snapshots[name]
305        return (
306            current.fingerprint.data_hash == previous.fingerprint.data_hash
307            and current.fingerprint.parent_data_hash != previous.fingerprint.parent_data_hash
308        )

Returns whether or not a node was indirectly modified in this context.

Arguments:
  • name: The snapshot name to check.
Returns:

Whether or not the node was indirectly modified.

def metadata_updated(self, name: str) -> bool:
310    def metadata_updated(self, name: str) -> bool:
311        """Returns whether or not the given node's metadata has been updated.
312
313        Args:
314            name: The node to check.
315
316        Returns:
317            Whether or not the node's metadata has been updated.
318        """
319
320        if name not in self.modified_snapshots:
321            return False
322
323        current, previous = self.modified_snapshots[name]
324        return current.fingerprint.metadata_hash != previous.fingerprint.metadata_hash

Returns whether or not the given node's metadata has been updated.

Arguments:
  • name: The node to check.
Returns:

Whether or not the node's metadata has been updated.

def text_diff(self, name: str) -> str:
326    def text_diff(self, name: str) -> str:
327        """Finds the difference of a node between the current and remote environment.
328
329        Args:
330            name: The Snapshot name.
331
332        Returns:
333            A unified text diff of the node.
334        """
335        if name not in self.snapshots_by_name:
336            raise SQLMeshError(f"`{name}` does not exist.")
337        if name not in self.modified_snapshots:
338            return ""
339
340        new, old = self.modified_snapshots[name]
341        try:
342            return old.node.text_diff(new.node)
343        except SQLMeshError as e:
344            logger.warning("Failed to diff model '%s': %s", name, str(e))
345            return ""

Finds the difference of a node between the current and remote environment.

Arguments:
  • name: The Snapshot name.
Returns:

A unified text diff of the node.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
model_post_init