Edit on GitHub

ContextDiff

ContextDiff encapsulates the differences between two environments. The two environments can be the local environment and a remote environment, or two remote environments. ContextDiff is an important part of SQLMesh. SQLMesh plans use ContextDiff to determine what nodes were changed between two environments. The SQLMesh CLI diff command uses ContextDiff to determine what to visualize.

When creating a ContextDiff object, SQLMesh will compare the snapshots from one environment with those of another remote environment and determine if nodes have been added, removed, or modified.

  1"""
  2# ContextDiff
  3
  4ContextDiff encapsulates the differences between two environments. The two environments can be the local
  5environment and a remote environment, or two remote environments. ContextDiff is an important part of
  6SQLMesh. SQLMesh plans use ContextDiff to determine what nodes were changed between two environments.
  7The SQLMesh CLI diff command uses ContextDiff to determine what to visualize.
  8
  9When creating a ContextDiff object, SQLMesh will compare the snapshots from one environment with those of
 10another remote environment and determine if nodes have been added, removed, or modified.
 11"""
 12
 13from __future__ import annotations
 14
 15import sys
 16import typing as t
 17from difflib import ndiff, unified_diff
 18from functools import cached_property
 19from sqlmesh.core import constants as c
 20from sqlmesh.core.console import get_console
 21from sqlmesh.core.macros import RuntimeStage
 22from sqlmesh.core.model.common import sorted_python_env_payloads
 23from sqlmesh.core.snapshot import Snapshot, SnapshotId, SnapshotTableInfo
 24from sqlmesh.utils.errors import SQLMeshError
 25from sqlmesh.utils.pydantic import PydanticModel
 26
 27if sys.version_info >= (3, 12):
 28    from importlib import metadata
 29else:
 30    import importlib_metadata as metadata  # type: ignore
 31
 32
 33if t.TYPE_CHECKING:
 34    from sqlmesh.core.state_sync import StateReader
 35
 36from sqlmesh.utils.metaprogramming import Executable  # noqa
 37from sqlmesh.core.environment import EnvironmentStatements
 38
 39IGNORED_PACKAGES = {"sqlmesh", "sqlglot", "sqlglotc"}
 40
 41
 42class ContextDiff(PydanticModel):
 43    """ContextDiff is an object representing the difference between two environments.
 44
 45    The two environments can be the local environment and a remote environment, or two remote
 46    environments.
 47    """
 48
 49    environment: str
 50    """The environment to diff."""
 51    is_new_environment: bool
 52    """Whether the target environment is new."""
 53    is_unfinalized_environment: bool
 54    """Whether the currently stored environment record is in unfinalized state."""
 55    normalize_environment_name: bool
 56    """Whether the environment name should be normalized."""
 57    previous_gateway_managed_virtual_layer: bool
 58    """Whether the previous environment's virtual layer's views were created by the model specified gateways."""
 59    gateway_managed_virtual_layer: bool
 60    """Whether the virtual layer's views will be created by the model specified gateways."""
 61    create_from: str
 62    """The name of the environment the target environment will be created from if new."""
 63    create_from_env_exists: bool
 64    """Whether the create_from environment already exists at plan time."""
 65    added: t.Set[SnapshotId]
 66    """New nodes."""
 67    removed_snapshots: t.Dict[SnapshotId, SnapshotTableInfo]
 68    """Deleted nodes."""
 69    modified_snapshots: t.Dict[str, t.Tuple[Snapshot, Snapshot]]
 70    """Modified snapshots."""
 71    snapshots: t.Dict[SnapshotId, Snapshot]
 72    """Merged snapshots."""
 73    new_snapshots: t.Dict[SnapshotId, Snapshot]
 74    """New snapshots."""
 75    previous_plan_id: t.Optional[str]
 76    """Previous plan id."""
 77    previously_promoted_snapshot_ids: t.Set[SnapshotId]
 78    """Snapshot IDs that were promoted by the previous plan."""
 79    previous_finalized_snapshots: t.Optional[t.List[SnapshotTableInfo]]
 80    """Snapshots from the previous finalized state."""
 81    previous_requirements: t.Dict[str, str] = {}
 82    """Previous requirements."""
 83    requirements: t.Dict[str, str] = {}
 84    """Python dependencies."""
 85    previous_environment_statements: t.List[EnvironmentStatements] = []
 86    """Previous environment statements."""
 87    environment_statements: t.List[EnvironmentStatements]
 88    """Environment statements."""
 89    diff_rendered: bool = False
 90    """Whether the diff should compare raw vs rendered models"""
 91
 92    @classmethod
 93    def create(
 94        cls,
 95        environment: str,
 96        snapshots: t.Dict[str, Snapshot],
 97        create_from: str,
 98        state_reader: StateReader,
 99        ensure_finalized_snapshots: bool = False,
100        provided_requirements: t.Optional[t.Dict[str, str]] = None,
101        excluded_requirements: t.Optional[t.Set[str]] = None,
102        diff_rendered: bool = False,
103        environment_statements: t.Optional[t.List[EnvironmentStatements]] = [],
104        gateway_managed_virtual_layer: bool = False,
105        infer_python_dependencies: bool = True,
106        always_recreate_environment: bool = False,
107    ) -> ContextDiff:
108        """Create a ContextDiff object.
109
110        Args:
111            environment: The remote environment to diff.
112            snapshots: The snapshots of the current environment.
113            create_from: The environment to create the target environment from if it
114                doesn't exist.
115            state_reader: StateReader to access the remote environment to diff.
116            ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized
117                environment state, or to use whatever snapshots are in the current environment state even if
118                the environment is not finalized.
119            provided_requirements: Python dependencies sourced from the lock file.
120            excluded_requirements: Python dependencies to exclude.
121            diff_rendered: Whether to compute the diff of the rendered version of the compared expressions.
122            environment_statements: A list of `before_all` or `after_all` statements associated with the environment.
123            gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the
124                model-specific gateway rather than the default gateway.
125            infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python
126                package requirements.
127
128        Returns:
129            The ContextDiff object.
130        """
131        environment = environment.lower()
132        existing_env = state_reader.get_environment(environment)
133        create_from_env_exists = False
134
135        recreate_environment = always_recreate_environment and not environment == create_from
136
137        if existing_env is None or existing_env.expired or recreate_environment:
138            env = state_reader.get_environment(create_from.lower())
139
140            if not env and create_from != c.PROD:
141                get_console().log_warning(
142                    f"The environment name '{create_from}' was passed to the `plan` command's `--create-from` argument, but '{create_from}' does not exist. Initializing new environment '{environment}' from scratch."
143                )
144
145            is_new_environment = True
146            create_from_env_exists = env is not None
147            previously_promoted_snapshot_ids = set()
148        else:
149            env = existing_env
150            is_new_environment = False
151            previously_promoted_snapshot_ids = {s.snapshot_id for s in env.promoted_snapshots}
152
153        environment_snapshot_infos = []
154        if env:
155            environment_snapshot_infos = (
156                env.snapshots
157                if not ensure_finalized_snapshots
158                else env.finalized_or_current_snapshots
159            )
160        remote_snapshot_name_to_info = {
161            snapshot_info.name: snapshot_info for snapshot_info in environment_snapshot_infos
162        }
163        removed = {
164            snapshot_table_info.snapshot_id: snapshot_table_info
165            for snapshot_table_info in environment_snapshot_infos
166            if snapshot_table_info.name not in snapshots
167        }
168        added = {
169            snapshot.snapshot_id
170            for snapshot in snapshots.values()
171            if snapshot.name not in remote_snapshot_name_to_info
172        }
173        modified_snapshot_name_to_snapshot_info = {
174            snapshot.name: remote_snapshot_name_to_info[snapshot.name]
175            for snapshot in snapshots.values()
176            if snapshot.snapshot_id not in added
177            and snapshot.fingerprint != remote_snapshot_name_to_info[snapshot.name].fingerprint
178        }
179
180        stored = state_reader.get_snapshots(
181            [*snapshots.values(), *modified_snapshot_name_to_snapshot_info.values()]
182        )
183
184        merged_snapshots = {}
185        modified_snapshots = {}
186        new_snapshots = {}
187
188        for snapshot in snapshots.values():
189            s_id = snapshot.snapshot_id
190            modified_snapshot_info = modified_snapshot_name_to_snapshot_info.get(snapshot.name)
191            existing_snapshot = stored.get(s_id)
192
193            if modified_snapshot_info and snapshot.node_type != modified_snapshot_info.node_type:
194                added.add(snapshot.snapshot_id)
195                removed[modified_snapshot_info.snapshot_id] = modified_snapshot_info
196                modified_snapshot_name_to_snapshot_info.pop(snapshot.name)
197            elif existing_snapshot:
198                # Keep the original node instance to preserve the query cache.
199                existing_snapshot.node = snapshot.node
200
201                merged_snapshots[s_id] = existing_snapshot.copy()
202                if modified_snapshot_info:
203                    modified_snapshots[s_id.name] = (
204                        existing_snapshot,
205                        stored[modified_snapshot_info.snapshot_id],
206                    )
207            else:
208                snapshot = snapshot.copy()
209                merged_snapshots[s_id] = snapshot
210                new_snapshots[snapshot.snapshot_id] = snapshot
211                if modified_snapshot_info:
212                    snapshot.previous_versions = modified_snapshot_info.all_versions
213                    modified_snapshots[s_id.name] = (
214                        snapshot,
215                        stored[modified_snapshot_info.snapshot_id],
216                    )
217
218        requirements = _build_requirements(
219            provided_requirements or {},
220            excluded_requirements or set(),
221            snapshots.values(),
222            infer_python_dependencies=infer_python_dependencies,
223        )
224
225        previous_environment_statements = (
226            state_reader.get_environment_statements(env.name) if env else []
227        )
228
229        if existing_env and always_recreate_environment:
230            previous_plan_id: t.Optional[str] = existing_env.plan_id
231        else:
232            previous_plan_id = env.plan_id if env and not is_new_environment else None
233
234        return ContextDiff(
235            environment=environment,
236            is_new_environment=is_new_environment,
237            is_unfinalized_environment=bool(env and not env.finalized_ts),
238            normalize_environment_name=is_new_environment or bool(env and env.normalize_name),
239            create_from=create_from,
240            create_from_env_exists=create_from_env_exists,
241            added=added,
242            removed_snapshots=removed,
243            modified_snapshots=modified_snapshots,
244            snapshots=merged_snapshots,
245            new_snapshots=new_snapshots,
246            previous_plan_id=previous_plan_id,
247            previously_promoted_snapshot_ids=previously_promoted_snapshot_ids,
248            previous_finalized_snapshots=env.previous_finalized_snapshots if env else None,
249            previous_requirements=env.requirements if env else {},
250            requirements=requirements,
251            diff_rendered=diff_rendered,
252            previous_environment_statements=previous_environment_statements,
253            environment_statements=environment_statements,
254            previous_gateway_managed_virtual_layer=env.gateway_managed if env else False,
255            gateway_managed_virtual_layer=gateway_managed_virtual_layer,
256        )
257
258    @classmethod
259    def create_no_diff(cls, environment: str, state_reader: StateReader) -> ContextDiff:
260        """Create a no-op ContextDiff object.
261
262        Args:
263            environment: The target environment.
264            state_reader: StateReader to access the remote environment record.
265
266        Returns:
267            The ContextDiff object.
268        """
269        env = state_reader.get_environment(environment.lower())
270        if not env:
271            raise SQLMeshError(f"Environment '{environment}' must exist for this operation.")
272
273        environment_statements = state_reader.get_environment_statements(environment)
274        snapshots = state_reader.get_snapshots(env.snapshots)
275
276        return ContextDiff(
277            environment=env.name,
278            is_new_environment=False,
279            is_unfinalized_environment=False,
280            normalize_environment_name=env.normalize_name,
281            create_from="",
282            create_from_env_exists=False,
283            added=set(),
284            removed_snapshots={},
285            modified_snapshots={},
286            snapshots=snapshots,
287            new_snapshots={},
288            previous_plan_id=env.plan_id,
289            previously_promoted_snapshot_ids={s.snapshot_id for s in env.promoted_snapshots},
290            previous_finalized_snapshots=env.previous_finalized_snapshots,
291            previous_requirements=env.requirements,
292            requirements=env.requirements,
293            previous_environment_statements=environment_statements,
294            environment_statements=environment_statements,
295            previous_gateway_managed_virtual_layer=env.gateway_managed,
296            gateway_managed_virtual_layer=env.gateway_managed,
297        )
298
299    @property
300    def has_changes(self) -> bool:
301        return (
302            self.has_snapshot_changes
303            or self.is_new_environment
304            or self.is_unfinalized_environment
305            or self.has_requirement_changes
306            or self.has_environment_statements_changes
307            or self.previous_gateway_managed_virtual_layer != self.gateway_managed_virtual_layer
308        )
309
310    @property
311    def has_requirement_changes(self) -> bool:
312        return self.previous_requirements != self.requirements
313
314    @property
315    def has_environment_statements_changes(self) -> bool:
316        return sorted(self.environment_statements, key=lambda s: s.project or "") != sorted(
317            self.previous_environment_statements, key=lambda s: s.project or ""
318        )
319
320    @property
321    def has_snapshot_changes(self) -> bool:
322        return bool(self.added or self.removed_snapshots or self.modified_snapshots)
323
324    @property
325    def added_materialized_snapshot_ids(self) -> t.Set[SnapshotId]:
326        """Returns the set of added internal snapshot ids."""
327        return {
328            s_id
329            for s_id in self.added
330            if self.snapshots[s_id].model_kind_name
331            and self.snapshots[s_id].model_kind_name.is_materialized  # type: ignore
332        }
333
334    @property
335    def promotable_snapshot_ids(self) -> t.Set[SnapshotId]:
336        """The set of snapshot ids that have to be promoted in the target environment."""
337        return {
338            *self.previously_promoted_snapshot_ids,
339            *self.added,
340            *self.current_modified_snapshot_ids,
341        } - set(self.removed_snapshots)
342
343    @property
344    def unpromoted_models(self) -> t.Set[SnapshotId]:
345        """The set of snapshot IDs that have not yet been promoted in the target environment."""
346        return set(self.snapshots) - self.previously_promoted_snapshot_ids
347
348    @property
349    def current_modified_snapshot_ids(self) -> t.Set[SnapshotId]:
350        return {current.snapshot_id for current, _ in self.modified_snapshots.values()}
351
352    @cached_property
353    def snapshots_by_name(self) -> t.Dict[str, Snapshot]:
354        return {x.name: x for x in self.snapshots.values()}
355
356    def requirements_diff(self) -> str:
357        return "    " + "\n    ".join(
358            ndiff(
359                [
360                    f"{k}=={self.previous_requirements[k]}"
361                    for k in sorted(self.previous_requirements)
362                ],
363                [f"{k}=={self.requirements[k]}" for k in sorted(self.requirements)],
364            )
365        )
366
367    def environment_statements_diff(
368        self, include_python_env: bool = False
369    ) -> t.List[t.Tuple[str, str]]:
370        def extract_statements(statements: t.List[EnvironmentStatements], attr: str) -> t.List[str]:
371            return [
372                string
373                for statement in statements
374                for expr in (
375                    sorted_python_env_payloads(statement.python_env)
376                    if attr == "python_env"
377                    else getattr(statement, attr)
378                )
379                for string in expr.split("\n")
380            ]
381
382        def compute_diff(attribute: str) -> t.Optional[t.Tuple[str, str]]:
383            previous = extract_statements(self.previous_environment_statements, attribute)
384            current = extract_statements(self.environment_statements, attribute)
385
386            if previous == current:
387                return None
388
389            diff_text = attribute if not attribute == "python_env" else "dependencies"
390            diff_text += ":\n"
391            if attribute == "python_env":
392                diff = list(unified_diff(previous, current))
393                diff_text += "\n".join(diff[2:] if len(diff) > 1 else diff)
394                return "python", diff_text + "\n"
395
396            diff_lines = list(ndiff(previous, current))
397            if any(line.startswith(("-", "+")) for line in diff_lines):
398                diff_text += "  " + "\n  ".join(diff_lines) + "\n"
399            return "sql", diff_text
400
401        return [
402            diff
403            for attribute in [
404                RuntimeStage.BEFORE_ALL.value,
405                RuntimeStage.AFTER_ALL.value,
406                *(["python_env"] if include_python_env else []),
407            ]
408            if (diff := compute_diff(attribute)) is not None
409        ]
410
411    @property
412    def environment_snapshots(self) -> t.List[SnapshotTableInfo]:
413        """Returns current snapshots in the environment."""
414        return [
415            *self.removed_snapshots.values(),
416            *(old.table_info for _, old in self.modified_snapshots.values()),
417            *[
418                s.table_info
419                for s_id, s in self.snapshots.items()
420                if s_id not in self.added and s.name not in self.modified_snapshots
421            ],
422        ]
423
424    def directly_modified(self, name: str) -> bool:
425        """Returns whether or not a node was directly modified in this context.
426
427        Args:
428            name: The snapshot name to check.
429
430        Returns:
431            Whether or not the node was directly modified.
432        """
433
434        if name not in self.modified_snapshots:
435            return False
436
437        current, previous = self.modified_snapshots[name]
438        return current.is_directly_modified(previous)
439
440    def indirectly_modified(self, name: str) -> bool:
441        """Returns whether or not a node was indirectly modified in this context.
442
443        Args:
444            name: The snapshot name to check.
445
446        Returns:
447            Whether or not the node was indirectly modified.
448        """
449
450        if name not in self.modified_snapshots:
451            return False
452
453        current, previous = self.modified_snapshots[name]
454        return current.is_indirectly_modified(previous)
455
456    def metadata_updated(self, name: str) -> bool:
457        """Returns whether or not the given node's metadata has been updated.
458
459        Args:
460            name: The node to check.
461
462        Returns:
463            Whether or not the node's metadata has been updated.
464        """
465
466        if name not in self.modified_snapshots:
467            return False
468
469        current, previous = self.modified_snapshots[name]
470        return current.is_metadata_updated(previous)
471
472    def text_diff(self, name: str) -> str:
473        """Finds the difference of a node between the current and remote environment.
474
475        Args:
476            name: The Snapshot name.
477
478        Returns:
479            A unified text diff of the node.
480        """
481        if name not in self.snapshots_by_name:
482            raise SQLMeshError(f"`{name}` does not exist.")
483        if name not in self.modified_snapshots:
484            return ""
485
486        new, old = self.modified_snapshots[name]
487        try:
488            return old.node.text_diff(new.node, rendered=self.diff_rendered)
489        except SQLMeshError as e:
490            get_console().log_warning(f"Failed to diff model '{name}': {str(e)}.")
491            return ""
492
493
494def _build_requirements(
495    provided_requirements: t.Dict[str, str],
496    excluded_requirements: t.Set[str],
497    snapshots: t.Collection[Snapshot],
498    infer_python_dependencies: bool = True,
499) -> t.Dict[str, str]:
500    requirements = {
501        k: v for k, v in provided_requirements.items() if k not in excluded_requirements
502    }
503
504    if not infer_python_dependencies:
505        return requirements
506
507    distributions = metadata.packages_distributions()
508
509    for snapshot in snapshots:
510        if not snapshot.is_model:
511            continue
512
513        for executable in snapshot.model.python_env.values():
514            if executable.kind != "import":
515                continue
516
517            try:
518                start = "from " if executable.payload.startswith("from ") else "import "
519                lib = executable.payload.split(start)[1].split()[0].split(".")[0]
520                if lib not in distributions:
521                    continue
522
523                for dist in distributions[lib]:
524                    if (
525                        dist not in requirements
526                        and dist not in IGNORED_PACKAGES
527                        and dist not in excluded_requirements
528                    ):
529                        requirements[dist] = metadata.version(dist)
530            except metadata.PackageNotFoundError:
531                from sqlmesh.core.console import get_console
532
533                get_console().log_warning(f"Failed to find package for {lib}.")
534
535    return requirements
IGNORED_PACKAGES = {'sqlglot', 'sqlmesh', 'sqlglotc'}
class ContextDiff(sqlmesh.utils.pydantic.PydanticModel):
 43class ContextDiff(PydanticModel):
 44    """ContextDiff is an object representing the difference between two environments.
 45
 46    The two environments can be the local environment and a remote environment, or two remote
 47    environments.
 48    """
 49
 50    environment: str
 51    """The environment to diff."""
 52    is_new_environment: bool
 53    """Whether the target environment is new."""
 54    is_unfinalized_environment: bool
 55    """Whether the currently stored environment record is in unfinalized state."""
 56    normalize_environment_name: bool
 57    """Whether the environment name should be normalized."""
 58    previous_gateway_managed_virtual_layer: bool
 59    """Whether the previous environment's virtual layer's views were created by the model specified gateways."""
 60    gateway_managed_virtual_layer: bool
 61    """Whether the virtual layer's views will be created by the model specified gateways."""
 62    create_from: str
 63    """The name of the environment the target environment will be created from if new."""
 64    create_from_env_exists: bool
 65    """Whether the create_from environment already exists at plan time."""
 66    added: t.Set[SnapshotId]
 67    """New nodes."""
 68    removed_snapshots: t.Dict[SnapshotId, SnapshotTableInfo]
 69    """Deleted nodes."""
 70    modified_snapshots: t.Dict[str, t.Tuple[Snapshot, Snapshot]]
 71    """Modified snapshots."""
 72    snapshots: t.Dict[SnapshotId, Snapshot]
 73    """Merged snapshots."""
 74    new_snapshots: t.Dict[SnapshotId, Snapshot]
 75    """New snapshots."""
 76    previous_plan_id: t.Optional[str]
 77    """Previous plan id."""
 78    previously_promoted_snapshot_ids: t.Set[SnapshotId]
 79    """Snapshot IDs that were promoted by the previous plan."""
 80    previous_finalized_snapshots: t.Optional[t.List[SnapshotTableInfo]]
 81    """Snapshots from the previous finalized state."""
 82    previous_requirements: t.Dict[str, str] = {}
 83    """Previous requirements."""
 84    requirements: t.Dict[str, str] = {}
 85    """Python dependencies."""
 86    previous_environment_statements: t.List[EnvironmentStatements] = []
 87    """Previous environment statements."""
 88    environment_statements: t.List[EnvironmentStatements]
 89    """Environment statements."""
 90    diff_rendered: bool = False
 91    """Whether the diff should compare raw vs rendered models"""
 92
 93    @classmethod
 94    def create(
 95        cls,
 96        environment: str,
 97        snapshots: t.Dict[str, Snapshot],
 98        create_from: str,
 99        state_reader: StateReader,
100        ensure_finalized_snapshots: bool = False,
101        provided_requirements: t.Optional[t.Dict[str, str]] = None,
102        excluded_requirements: t.Optional[t.Set[str]] = None,
103        diff_rendered: bool = False,
104        environment_statements: t.Optional[t.List[EnvironmentStatements]] = [],
105        gateway_managed_virtual_layer: bool = False,
106        infer_python_dependencies: bool = True,
107        always_recreate_environment: bool = False,
108    ) -> ContextDiff:
109        """Create a ContextDiff object.
110
111        Args:
112            environment: The remote environment to diff.
113            snapshots: The snapshots of the current environment.
114            create_from: The environment to create the target environment from if it
115                doesn't exist.
116            state_reader: StateReader to access the remote environment to diff.
117            ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized
118                environment state, or to use whatever snapshots are in the current environment state even if
119                the environment is not finalized.
120            provided_requirements: Python dependencies sourced from the lock file.
121            excluded_requirements: Python dependencies to exclude.
122            diff_rendered: Whether to compute the diff of the rendered version of the compared expressions.
123            environment_statements: A list of `before_all` or `after_all` statements associated with the environment.
124            gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the
125                model-specific gateway rather than the default gateway.
126            infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python
127                package requirements.
128
129        Returns:
130            The ContextDiff object.
131        """
132        environment = environment.lower()
133        existing_env = state_reader.get_environment(environment)
134        create_from_env_exists = False
135
136        recreate_environment = always_recreate_environment and not environment == create_from
137
138        if existing_env is None or existing_env.expired or recreate_environment:
139            env = state_reader.get_environment(create_from.lower())
140
141            if not env and create_from != c.PROD:
142                get_console().log_warning(
143                    f"The environment name '{create_from}' was passed to the `plan` command's `--create-from` argument, but '{create_from}' does not exist. Initializing new environment '{environment}' from scratch."
144                )
145
146            is_new_environment = True
147            create_from_env_exists = env is not None
148            previously_promoted_snapshot_ids = set()
149        else:
150            env = existing_env
151            is_new_environment = False
152            previously_promoted_snapshot_ids = {s.snapshot_id for s in env.promoted_snapshots}
153
154        environment_snapshot_infos = []
155        if env:
156            environment_snapshot_infos = (
157                env.snapshots
158                if not ensure_finalized_snapshots
159                else env.finalized_or_current_snapshots
160            )
161        remote_snapshot_name_to_info = {
162            snapshot_info.name: snapshot_info for snapshot_info in environment_snapshot_infos
163        }
164        removed = {
165            snapshot_table_info.snapshot_id: snapshot_table_info
166            for snapshot_table_info in environment_snapshot_infos
167            if snapshot_table_info.name not in snapshots
168        }
169        added = {
170            snapshot.snapshot_id
171            for snapshot in snapshots.values()
172            if snapshot.name not in remote_snapshot_name_to_info
173        }
174        modified_snapshot_name_to_snapshot_info = {
175            snapshot.name: remote_snapshot_name_to_info[snapshot.name]
176            for snapshot in snapshots.values()
177            if snapshot.snapshot_id not in added
178            and snapshot.fingerprint != remote_snapshot_name_to_info[snapshot.name].fingerprint
179        }
180
181        stored = state_reader.get_snapshots(
182            [*snapshots.values(), *modified_snapshot_name_to_snapshot_info.values()]
183        )
184
185        merged_snapshots = {}
186        modified_snapshots = {}
187        new_snapshots = {}
188
189        for snapshot in snapshots.values():
190            s_id = snapshot.snapshot_id
191            modified_snapshot_info = modified_snapshot_name_to_snapshot_info.get(snapshot.name)
192            existing_snapshot = stored.get(s_id)
193
194            if modified_snapshot_info and snapshot.node_type != modified_snapshot_info.node_type:
195                added.add(snapshot.snapshot_id)
196                removed[modified_snapshot_info.snapshot_id] = modified_snapshot_info
197                modified_snapshot_name_to_snapshot_info.pop(snapshot.name)
198            elif existing_snapshot:
199                # Keep the original node instance to preserve the query cache.
200                existing_snapshot.node = snapshot.node
201
202                merged_snapshots[s_id] = existing_snapshot.copy()
203                if modified_snapshot_info:
204                    modified_snapshots[s_id.name] = (
205                        existing_snapshot,
206                        stored[modified_snapshot_info.snapshot_id],
207                    )
208            else:
209                snapshot = snapshot.copy()
210                merged_snapshots[s_id] = snapshot
211                new_snapshots[snapshot.snapshot_id] = snapshot
212                if modified_snapshot_info:
213                    snapshot.previous_versions = modified_snapshot_info.all_versions
214                    modified_snapshots[s_id.name] = (
215                        snapshot,
216                        stored[modified_snapshot_info.snapshot_id],
217                    )
218
219        requirements = _build_requirements(
220            provided_requirements or {},
221            excluded_requirements or set(),
222            snapshots.values(),
223            infer_python_dependencies=infer_python_dependencies,
224        )
225
226        previous_environment_statements = (
227            state_reader.get_environment_statements(env.name) if env else []
228        )
229
230        if existing_env and always_recreate_environment:
231            previous_plan_id: t.Optional[str] = existing_env.plan_id
232        else:
233            previous_plan_id = env.plan_id if env and not is_new_environment else None
234
235        return ContextDiff(
236            environment=environment,
237            is_new_environment=is_new_environment,
238            is_unfinalized_environment=bool(env and not env.finalized_ts),
239            normalize_environment_name=is_new_environment or bool(env and env.normalize_name),
240            create_from=create_from,
241            create_from_env_exists=create_from_env_exists,
242            added=added,
243            removed_snapshots=removed,
244            modified_snapshots=modified_snapshots,
245            snapshots=merged_snapshots,
246            new_snapshots=new_snapshots,
247            previous_plan_id=previous_plan_id,
248            previously_promoted_snapshot_ids=previously_promoted_snapshot_ids,
249            previous_finalized_snapshots=env.previous_finalized_snapshots if env else None,
250            previous_requirements=env.requirements if env else {},
251            requirements=requirements,
252            diff_rendered=diff_rendered,
253            previous_environment_statements=previous_environment_statements,
254            environment_statements=environment_statements,
255            previous_gateway_managed_virtual_layer=env.gateway_managed if env else False,
256            gateway_managed_virtual_layer=gateway_managed_virtual_layer,
257        )
258
259    @classmethod
260    def create_no_diff(cls, environment: str, state_reader: StateReader) -> ContextDiff:
261        """Create a no-op ContextDiff object.
262
263        Args:
264            environment: The target environment.
265            state_reader: StateReader to access the remote environment record.
266
267        Returns:
268            The ContextDiff object.
269        """
270        env = state_reader.get_environment(environment.lower())
271        if not env:
272            raise SQLMeshError(f"Environment '{environment}' must exist for this operation.")
273
274        environment_statements = state_reader.get_environment_statements(environment)
275        snapshots = state_reader.get_snapshots(env.snapshots)
276
277        return ContextDiff(
278            environment=env.name,
279            is_new_environment=False,
280            is_unfinalized_environment=False,
281            normalize_environment_name=env.normalize_name,
282            create_from="",
283            create_from_env_exists=False,
284            added=set(),
285            removed_snapshots={},
286            modified_snapshots={},
287            snapshots=snapshots,
288            new_snapshots={},
289            previous_plan_id=env.plan_id,
290            previously_promoted_snapshot_ids={s.snapshot_id for s in env.promoted_snapshots},
291            previous_finalized_snapshots=env.previous_finalized_snapshots,
292            previous_requirements=env.requirements,
293            requirements=env.requirements,
294            previous_environment_statements=environment_statements,
295            environment_statements=environment_statements,
296            previous_gateway_managed_virtual_layer=env.gateway_managed,
297            gateway_managed_virtual_layer=env.gateway_managed,
298        )
299
300    @property
301    def has_changes(self) -> bool:
302        return (
303            self.has_snapshot_changes
304            or self.is_new_environment
305            or self.is_unfinalized_environment
306            or self.has_requirement_changes
307            or self.has_environment_statements_changes
308            or self.previous_gateway_managed_virtual_layer != self.gateway_managed_virtual_layer
309        )
310
311    @property
312    def has_requirement_changes(self) -> bool:
313        return self.previous_requirements != self.requirements
314
315    @property
316    def has_environment_statements_changes(self) -> bool:
317        return sorted(self.environment_statements, key=lambda s: s.project or "") != sorted(
318            self.previous_environment_statements, key=lambda s: s.project or ""
319        )
320
321    @property
322    def has_snapshot_changes(self) -> bool:
323        return bool(self.added or self.removed_snapshots or self.modified_snapshots)
324
325    @property
326    def added_materialized_snapshot_ids(self) -> t.Set[SnapshotId]:
327        """Returns the set of added internal snapshot ids."""
328        return {
329            s_id
330            for s_id in self.added
331            if self.snapshots[s_id].model_kind_name
332            and self.snapshots[s_id].model_kind_name.is_materialized  # type: ignore
333        }
334
335    @property
336    def promotable_snapshot_ids(self) -> t.Set[SnapshotId]:
337        """The set of snapshot ids that have to be promoted in the target environment."""
338        return {
339            *self.previously_promoted_snapshot_ids,
340            *self.added,
341            *self.current_modified_snapshot_ids,
342        } - set(self.removed_snapshots)
343
344    @property
345    def unpromoted_models(self) -> t.Set[SnapshotId]:
346        """The set of snapshot IDs that have not yet been promoted in the target environment."""
347        return set(self.snapshots) - self.previously_promoted_snapshot_ids
348
349    @property
350    def current_modified_snapshot_ids(self) -> t.Set[SnapshotId]:
351        return {current.snapshot_id for current, _ in self.modified_snapshots.values()}
352
353    @cached_property
354    def snapshots_by_name(self) -> t.Dict[str, Snapshot]:
355        return {x.name: x for x in self.snapshots.values()}
356
357    def requirements_diff(self) -> str:
358        return "    " + "\n    ".join(
359            ndiff(
360                [
361                    f"{k}=={self.previous_requirements[k]}"
362                    for k in sorted(self.previous_requirements)
363                ],
364                [f"{k}=={self.requirements[k]}" for k in sorted(self.requirements)],
365            )
366        )
367
368    def environment_statements_diff(
369        self, include_python_env: bool = False
370    ) -> t.List[t.Tuple[str, str]]:
371        def extract_statements(statements: t.List[EnvironmentStatements], attr: str) -> t.List[str]:
372            return [
373                string
374                for statement in statements
375                for expr in (
376                    sorted_python_env_payloads(statement.python_env)
377                    if attr == "python_env"
378                    else getattr(statement, attr)
379                )
380                for string in expr.split("\n")
381            ]
382
383        def compute_diff(attribute: str) -> t.Optional[t.Tuple[str, str]]:
384            previous = extract_statements(self.previous_environment_statements, attribute)
385            current = extract_statements(self.environment_statements, attribute)
386
387            if previous == current:
388                return None
389
390            diff_text = attribute if not attribute == "python_env" else "dependencies"
391            diff_text += ":\n"
392            if attribute == "python_env":
393                diff = list(unified_diff(previous, current))
394                diff_text += "\n".join(diff[2:] if len(diff) > 1 else diff)
395                return "python", diff_text + "\n"
396
397            diff_lines = list(ndiff(previous, current))
398            if any(line.startswith(("-", "+")) for line in diff_lines):
399                diff_text += "  " + "\n  ".join(diff_lines) + "\n"
400            return "sql", diff_text
401
402        return [
403            diff
404            for attribute in [
405                RuntimeStage.BEFORE_ALL.value,
406                RuntimeStage.AFTER_ALL.value,
407                *(["python_env"] if include_python_env else []),
408            ]
409            if (diff := compute_diff(attribute)) is not None
410        ]
411
412    @property
413    def environment_snapshots(self) -> t.List[SnapshotTableInfo]:
414        """Returns current snapshots in the environment."""
415        return [
416            *self.removed_snapshots.values(),
417            *(old.table_info for _, old in self.modified_snapshots.values()),
418            *[
419                s.table_info
420                for s_id, s in self.snapshots.items()
421                if s_id not in self.added and s.name not in self.modified_snapshots
422            ],
423        ]
424
425    def directly_modified(self, name: str) -> bool:
426        """Returns whether or not a node was directly modified in this context.
427
428        Args:
429            name: The snapshot name to check.
430
431        Returns:
432            Whether or not the node was directly modified.
433        """
434
435        if name not in self.modified_snapshots:
436            return False
437
438        current, previous = self.modified_snapshots[name]
439        return current.is_directly_modified(previous)
440
441    def indirectly_modified(self, name: str) -> bool:
442        """Returns whether or not a node was indirectly modified in this context.
443
444        Args:
445            name: The snapshot name to check.
446
447        Returns:
448            Whether or not the node was indirectly modified.
449        """
450
451        if name not in self.modified_snapshots:
452            return False
453
454        current, previous = self.modified_snapshots[name]
455        return current.is_indirectly_modified(previous)
456
457    def metadata_updated(self, name: str) -> bool:
458        """Returns whether or not the given node's metadata has been updated.
459
460        Args:
461            name: The node to check.
462
463        Returns:
464            Whether or not the node's metadata has been updated.
465        """
466
467        if name not in self.modified_snapshots:
468            return False
469
470        current, previous = self.modified_snapshots[name]
471        return current.is_metadata_updated(previous)
472
473    def text_diff(self, name: str) -> str:
474        """Finds the difference of a node between the current and remote environment.
475
476        Args:
477            name: The Snapshot name.
478
479        Returns:
480            A unified text diff of the node.
481        """
482        if name not in self.snapshots_by_name:
483            raise SQLMeshError(f"`{name}` does not exist.")
484        if name not in self.modified_snapshots:
485            return ""
486
487        new, old = self.modified_snapshots[name]
488        try:
489            return old.node.text_diff(new.node, rendered=self.diff_rendered)
490        except SQLMeshError as e:
491            get_console().log_warning(f"Failed to diff model '{name}': {str(e)}.")
492            return ""

ContextDiff is an object representing the difference between two environments.

The two environments can be the local environment and a remote environment, or two remote environments.

environment: str

The environment to diff.

is_new_environment: bool

Whether the target environment is new.

is_unfinalized_environment: bool

Whether the currently stored environment record is in unfinalized state.

normalize_environment_name: bool

Whether the environment name should be normalized.

previous_gateway_managed_virtual_layer: bool

Whether the previous environment's virtual layer's views were created by the model specified gateways.

gateway_managed_virtual_layer: bool

Whether the virtual layer's views will be created by the model specified gateways.

create_from: str

The name of the environment the target environment will be created from if new.

create_from_env_exists: bool

Whether the create_from environment already exists at plan time.

Modified snapshots.

previous_plan_id: Optional[str]

Previous plan id.

previously_promoted_snapshot_ids: Set[sqlmesh.core.snapshot.definition.SnapshotId]

Snapshot IDs that were promoted by the previous plan.

previous_finalized_snapshots: Optional[List[sqlmesh.core.snapshot.definition.SnapshotTableInfo]]

Snapshots from the previous finalized state.

previous_requirements: Dict[str, str]

Previous requirements.

requirements: Dict[str, str]

Python dependencies.

previous_environment_statements: List[sqlmesh.core.environment.EnvironmentStatements]

Previous environment statements.

environment_statements: List[sqlmesh.core.environment.EnvironmentStatements]

Environment statements.

diff_rendered: bool

Whether the diff should compare raw vs rendered models

@classmethod
def create( cls, environment: str, snapshots: Dict[str, sqlmesh.core.snapshot.definition.Snapshot], create_from: str, state_reader: sqlmesh.core.state_sync.base.StateReader, ensure_finalized_snapshots: bool = False, provided_requirements: Optional[Dict[str, str]] = None, excluded_requirements: Optional[Set[str]] = None, diff_rendered: bool = False, environment_statements: Optional[List[sqlmesh.core.environment.EnvironmentStatements]] = [], gateway_managed_virtual_layer: bool = False, infer_python_dependencies: bool = True, always_recreate_environment: bool = False) -> ContextDiff:
 93    @classmethod
 94    def create(
 95        cls,
 96        environment: str,
 97        snapshots: t.Dict[str, Snapshot],
 98        create_from: str,
 99        state_reader: StateReader,
100        ensure_finalized_snapshots: bool = False,
101        provided_requirements: t.Optional[t.Dict[str, str]] = None,
102        excluded_requirements: t.Optional[t.Set[str]] = None,
103        diff_rendered: bool = False,
104        environment_statements: t.Optional[t.List[EnvironmentStatements]] = [],
105        gateway_managed_virtual_layer: bool = False,
106        infer_python_dependencies: bool = True,
107        always_recreate_environment: bool = False,
108    ) -> ContextDiff:
109        """Create a ContextDiff object.
110
111        Args:
112            environment: The remote environment to diff.
113            snapshots: The snapshots of the current environment.
114            create_from: The environment to create the target environment from if it
115                doesn't exist.
116            state_reader: StateReader to access the remote environment to diff.
117            ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized
118                environment state, or to use whatever snapshots are in the current environment state even if
119                the environment is not finalized.
120            provided_requirements: Python dependencies sourced from the lock file.
121            excluded_requirements: Python dependencies to exclude.
122            diff_rendered: Whether to compute the diff of the rendered version of the compared expressions.
123            environment_statements: A list of `before_all` or `after_all` statements associated with the environment.
124            gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the
125                model-specific gateway rather than the default gateway.
126            infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python
127                package requirements.
128
129        Returns:
130            The ContextDiff object.
131        """
132        environment = environment.lower()
133        existing_env = state_reader.get_environment(environment)
134        create_from_env_exists = False
135
136        recreate_environment = always_recreate_environment and not environment == create_from
137
138        if existing_env is None or existing_env.expired or recreate_environment:
139            env = state_reader.get_environment(create_from.lower())
140
141            if not env and create_from != c.PROD:
142                get_console().log_warning(
143                    f"The environment name '{create_from}' was passed to the `plan` command's `--create-from` argument, but '{create_from}' does not exist. Initializing new environment '{environment}' from scratch."
144                )
145
146            is_new_environment = True
147            create_from_env_exists = env is not None
148            previously_promoted_snapshot_ids = set()
149        else:
150            env = existing_env
151            is_new_environment = False
152            previously_promoted_snapshot_ids = {s.snapshot_id for s in env.promoted_snapshots}
153
154        environment_snapshot_infos = []
155        if env:
156            environment_snapshot_infos = (
157                env.snapshots
158                if not ensure_finalized_snapshots
159                else env.finalized_or_current_snapshots
160            )
161        remote_snapshot_name_to_info = {
162            snapshot_info.name: snapshot_info for snapshot_info in environment_snapshot_infos
163        }
164        removed = {
165            snapshot_table_info.snapshot_id: snapshot_table_info
166            for snapshot_table_info in environment_snapshot_infos
167            if snapshot_table_info.name not in snapshots
168        }
169        added = {
170            snapshot.snapshot_id
171            for snapshot in snapshots.values()
172            if snapshot.name not in remote_snapshot_name_to_info
173        }
174        modified_snapshot_name_to_snapshot_info = {
175            snapshot.name: remote_snapshot_name_to_info[snapshot.name]
176            for snapshot in snapshots.values()
177            if snapshot.snapshot_id not in added
178            and snapshot.fingerprint != remote_snapshot_name_to_info[snapshot.name].fingerprint
179        }
180
181        stored = state_reader.get_snapshots(
182            [*snapshots.values(), *modified_snapshot_name_to_snapshot_info.values()]
183        )
184
185        merged_snapshots = {}
186        modified_snapshots = {}
187        new_snapshots = {}
188
189        for snapshot in snapshots.values():
190            s_id = snapshot.snapshot_id
191            modified_snapshot_info = modified_snapshot_name_to_snapshot_info.get(snapshot.name)
192            existing_snapshot = stored.get(s_id)
193
194            if modified_snapshot_info and snapshot.node_type != modified_snapshot_info.node_type:
195                added.add(snapshot.snapshot_id)
196                removed[modified_snapshot_info.snapshot_id] = modified_snapshot_info
197                modified_snapshot_name_to_snapshot_info.pop(snapshot.name)
198            elif existing_snapshot:
199                # Keep the original node instance to preserve the query cache.
200                existing_snapshot.node = snapshot.node
201
202                merged_snapshots[s_id] = existing_snapshot.copy()
203                if modified_snapshot_info:
204                    modified_snapshots[s_id.name] = (
205                        existing_snapshot,
206                        stored[modified_snapshot_info.snapshot_id],
207                    )
208            else:
209                snapshot = snapshot.copy()
210                merged_snapshots[s_id] = snapshot
211                new_snapshots[snapshot.snapshot_id] = snapshot
212                if modified_snapshot_info:
213                    snapshot.previous_versions = modified_snapshot_info.all_versions
214                    modified_snapshots[s_id.name] = (
215                        snapshot,
216                        stored[modified_snapshot_info.snapshot_id],
217                    )
218
219        requirements = _build_requirements(
220            provided_requirements or {},
221            excluded_requirements or set(),
222            snapshots.values(),
223            infer_python_dependencies=infer_python_dependencies,
224        )
225
226        previous_environment_statements = (
227            state_reader.get_environment_statements(env.name) if env else []
228        )
229
230        if existing_env and always_recreate_environment:
231            previous_plan_id: t.Optional[str] = existing_env.plan_id
232        else:
233            previous_plan_id = env.plan_id if env and not is_new_environment else None
234
235        return ContextDiff(
236            environment=environment,
237            is_new_environment=is_new_environment,
238            is_unfinalized_environment=bool(env and not env.finalized_ts),
239            normalize_environment_name=is_new_environment or bool(env and env.normalize_name),
240            create_from=create_from,
241            create_from_env_exists=create_from_env_exists,
242            added=added,
243            removed_snapshots=removed,
244            modified_snapshots=modified_snapshots,
245            snapshots=merged_snapshots,
246            new_snapshots=new_snapshots,
247            previous_plan_id=previous_plan_id,
248            previously_promoted_snapshot_ids=previously_promoted_snapshot_ids,
249            previous_finalized_snapshots=env.previous_finalized_snapshots if env else None,
250            previous_requirements=env.requirements if env else {},
251            requirements=requirements,
252            diff_rendered=diff_rendered,
253            previous_environment_statements=previous_environment_statements,
254            environment_statements=environment_statements,
255            previous_gateway_managed_virtual_layer=env.gateway_managed if env else False,
256            gateway_managed_virtual_layer=gateway_managed_virtual_layer,
257        )

Create a ContextDiff object.

Arguments:
  • environment: The remote environment to diff.
  • snapshots: The snapshots of the current environment.
  • create_from: The environment to create the target environment from if it doesn't exist.
  • state_reader: StateReader to access the remote environment to diff.
  • ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
  • provided_requirements: Python dependencies sourced from the lock file.
  • excluded_requirements: Python dependencies to exclude.
  • diff_rendered: Whether to compute the diff of the rendered version of the compared expressions.
  • environment_statements: A list of before_all or after_all statements associated with the environment.
  • gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway.
  • infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python package requirements.
Returns:

The ContextDiff object.

@classmethod
def create_no_diff( cls, environment: str, state_reader: sqlmesh.core.state_sync.base.StateReader) -> ContextDiff:
259    @classmethod
260    def create_no_diff(cls, environment: str, state_reader: StateReader) -> ContextDiff:
261        """Create a no-op ContextDiff object.
262
263        Args:
264            environment: The target environment.
265            state_reader: StateReader to access the remote environment record.
266
267        Returns:
268            The ContextDiff object.
269        """
270        env = state_reader.get_environment(environment.lower())
271        if not env:
272            raise SQLMeshError(f"Environment '{environment}' must exist for this operation.")
273
274        environment_statements = state_reader.get_environment_statements(environment)
275        snapshots = state_reader.get_snapshots(env.snapshots)
276
277        return ContextDiff(
278            environment=env.name,
279            is_new_environment=False,
280            is_unfinalized_environment=False,
281            normalize_environment_name=env.normalize_name,
282            create_from="",
283            create_from_env_exists=False,
284            added=set(),
285            removed_snapshots={},
286            modified_snapshots={},
287            snapshots=snapshots,
288            new_snapshots={},
289            previous_plan_id=env.plan_id,
290            previously_promoted_snapshot_ids={s.snapshot_id for s in env.promoted_snapshots},
291            previous_finalized_snapshots=env.previous_finalized_snapshots,
292            previous_requirements=env.requirements,
293            requirements=env.requirements,
294            previous_environment_statements=environment_statements,
295            environment_statements=environment_statements,
296            previous_gateway_managed_virtual_layer=env.gateway_managed,
297            gateway_managed_virtual_layer=env.gateway_managed,
298        )

Create a no-op ContextDiff object.

Arguments:
  • environment: The target environment.
  • state_reader: StateReader to access the remote environment record.
Returns:

The ContextDiff object.

has_changes: bool
300    @property
301    def has_changes(self) -> bool:
302        return (
303            self.has_snapshot_changes
304            or self.is_new_environment
305            or self.is_unfinalized_environment
306            or self.has_requirement_changes
307            or self.has_environment_statements_changes
308            or self.previous_gateway_managed_virtual_layer != self.gateway_managed_virtual_layer
309        )
has_requirement_changes: bool
311    @property
312    def has_requirement_changes(self) -> bool:
313        return self.previous_requirements != self.requirements
has_environment_statements_changes: bool
315    @property
316    def has_environment_statements_changes(self) -> bool:
317        return sorted(self.environment_statements, key=lambda s: s.project or "") != sorted(
318            self.previous_environment_statements, key=lambda s: s.project or ""
319        )
has_snapshot_changes: bool
321    @property
322    def has_snapshot_changes(self) -> bool:
323        return bool(self.added or self.removed_snapshots or self.modified_snapshots)
added_materialized_snapshot_ids: Set[sqlmesh.core.snapshot.definition.SnapshotId]
325    @property
326    def added_materialized_snapshot_ids(self) -> t.Set[SnapshotId]:
327        """Returns the set of added internal snapshot ids."""
328        return {
329            s_id
330            for s_id in self.added
331            if self.snapshots[s_id].model_kind_name
332            and self.snapshots[s_id].model_kind_name.is_materialized  # type: ignore
333        }

Returns the set of added internal snapshot ids.

promotable_snapshot_ids: Set[sqlmesh.core.snapshot.definition.SnapshotId]
335    @property
336    def promotable_snapshot_ids(self) -> t.Set[SnapshotId]:
337        """The set of snapshot ids that have to be promoted in the target environment."""
338        return {
339            *self.previously_promoted_snapshot_ids,
340            *self.added,
341            *self.current_modified_snapshot_ids,
342        } - set(self.removed_snapshots)

The set of snapshot ids that have to be promoted in the target environment.

unpromoted_models: Set[sqlmesh.core.snapshot.definition.SnapshotId]
344    @property
345    def unpromoted_models(self) -> t.Set[SnapshotId]:
346        """The set of snapshot IDs that have not yet been promoted in the target environment."""
347        return set(self.snapshots) - self.previously_promoted_snapshot_ids

The set of snapshot IDs that have not yet been promoted in the target environment.

current_modified_snapshot_ids: Set[sqlmesh.core.snapshot.definition.SnapshotId]
349    @property
350    def current_modified_snapshot_ids(self) -> t.Set[SnapshotId]:
351        return {current.snapshot_id for current, _ in self.modified_snapshots.values()}
snapshots_by_name: Dict[str, sqlmesh.core.snapshot.definition.Snapshot]
353    @cached_property
354    def snapshots_by_name(self) -> t.Dict[str, Snapshot]:
355        return {x.name: x for x in self.snapshots.values()}
def requirements_diff(self) -> str:
357    def requirements_diff(self) -> str:
358        return "    " + "\n    ".join(
359            ndiff(
360                [
361                    f"{k}=={self.previous_requirements[k]}"
362                    for k in sorted(self.previous_requirements)
363                ],
364                [f"{k}=={self.requirements[k]}" for k in sorted(self.requirements)],
365            )
366        )
def environment_statements_diff(self, include_python_env: bool = False) -> List[Tuple[str, str]]:
368    def environment_statements_diff(
369        self, include_python_env: bool = False
370    ) -> t.List[t.Tuple[str, str]]:
371        def extract_statements(statements: t.List[EnvironmentStatements], attr: str) -> t.List[str]:
372            return [
373                string
374                for statement in statements
375                for expr in (
376                    sorted_python_env_payloads(statement.python_env)
377                    if attr == "python_env"
378                    else getattr(statement, attr)
379                )
380                for string in expr.split("\n")
381            ]
382
383        def compute_diff(attribute: str) -> t.Optional[t.Tuple[str, str]]:
384            previous = extract_statements(self.previous_environment_statements, attribute)
385            current = extract_statements(self.environment_statements, attribute)
386
387            if previous == current:
388                return None
389
390            diff_text = attribute if not attribute == "python_env" else "dependencies"
391            diff_text += ":\n"
392            if attribute == "python_env":
393                diff = list(unified_diff(previous, current))
394                diff_text += "\n".join(diff[2:] if len(diff) > 1 else diff)
395                return "python", diff_text + "\n"
396
397            diff_lines = list(ndiff(previous, current))
398            if any(line.startswith(("-", "+")) for line in diff_lines):
399                diff_text += "  " + "\n  ".join(diff_lines) + "\n"
400            return "sql", diff_text
401
402        return [
403            diff
404            for attribute in [
405                RuntimeStage.BEFORE_ALL.value,
406                RuntimeStage.AFTER_ALL.value,
407                *(["python_env"] if include_python_env else []),
408            ]
409            if (diff := compute_diff(attribute)) is not None
410        ]
environment_snapshots: List[sqlmesh.core.snapshot.definition.SnapshotTableInfo]
412    @property
413    def environment_snapshots(self) -> t.List[SnapshotTableInfo]:
414        """Returns current snapshots in the environment."""
415        return [
416            *self.removed_snapshots.values(),
417            *(old.table_info for _, old in self.modified_snapshots.values()),
418            *[
419                s.table_info
420                for s_id, s in self.snapshots.items()
421                if s_id not in self.added and s.name not in self.modified_snapshots
422            ],
423        ]

Returns current snapshots in the environment.

def directly_modified(self, name: str) -> bool:
425    def directly_modified(self, name: str) -> bool:
426        """Returns whether or not a node was directly modified in this context.
427
428        Args:
429            name: The snapshot name to check.
430
431        Returns:
432            Whether or not the node was directly modified.
433        """
434
435        if name not in self.modified_snapshots:
436            return False
437
438        current, previous = self.modified_snapshots[name]
439        return current.is_directly_modified(previous)

Returns whether or not a node was directly modified in this context.

Arguments:
  • name: The snapshot name to check.
Returns:

Whether or not the node was directly modified.

def indirectly_modified(self, name: str) -> bool:
441    def indirectly_modified(self, name: str) -> bool:
442        """Returns whether or not a node was indirectly modified in this context.
443
444        Args:
445            name: The snapshot name to check.
446
447        Returns:
448            Whether or not the node was indirectly modified.
449        """
450
451        if name not in self.modified_snapshots:
452            return False
453
454        current, previous = self.modified_snapshots[name]
455        return current.is_indirectly_modified(previous)

Returns whether or not a node was indirectly modified in this context.

Arguments:
  • name: The snapshot name to check.
Returns:

Whether or not the node was indirectly modified.

def metadata_updated(self, name: str) -> bool:
457    def metadata_updated(self, name: str) -> bool:
458        """Returns whether or not the given node's metadata has been updated.
459
460        Args:
461            name: The node to check.
462
463        Returns:
464            Whether or not the node's metadata has been updated.
465        """
466
467        if name not in self.modified_snapshots:
468            return False
469
470        current, previous = self.modified_snapshots[name]
471        return current.is_metadata_updated(previous)

Returns whether or not the given node's metadata has been updated.

Arguments:
  • name: The node to check.
Returns:

Whether or not the node's metadata has been updated.

def text_diff(self, name: str) -> str:
473    def text_diff(self, name: str) -> str:
474        """Finds the difference of a node between the current and remote environment.
475
476        Args:
477            name: The Snapshot name.
478
479        Returns:
480            A unified text diff of the node.
481        """
482        if name not in self.snapshots_by_name:
483            raise SQLMeshError(f"`{name}` does not exist.")
484        if name not in self.modified_snapshots:
485            return ""
486
487        new, old = self.modified_snapshots[name]
488        try:
489            return old.node.text_diff(new.node, rendered=self.diff_rendered)
490        except SQLMeshError as e:
491            get_console().log_warning(f"Failed to diff model '{name}': {str(e)}.")
492            return ""

Finds the difference of a node between the current and remote environment.

Arguments:
  • name: The Snapshot name.
Returns:

A unified text diff of the node.

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields