ContextDiff
ContextDiff encapsulates the differences between two environments. The two environments can be the local environment and a remote environment, or two remote environments. ContextDiff is an important part of SQLMesh. SQLMesh plans use ContextDiff to determine what nodes were changed between two environments. The SQLMesh CLI diff command uses ContextDiff to determine what to visualize.
When creating a ContextDiff object, SQLMesh will compare the snapshots from one environment with those of another remote environment and determine if nodes have been added, removed, or modified.
1""" 2# ContextDiff 3 4ContextDiff encapsulates the differences between two environments. The two environments can be the local 5environment and a remote environment, or two remote environments. ContextDiff is an important part of 6SQLMesh. SQLMesh plans use ContextDiff to determine what nodes were changed between two environments. 7The SQLMesh CLI diff command uses ContextDiff to determine what to visualize. 8 9When creating a ContextDiff object, SQLMesh will compare the snapshots from one environment with those of 10another remote environment and determine if nodes have been added, removed, or modified. 11""" 12 13from __future__ import annotations 14 15import sys 16import typing as t 17from difflib import ndiff, unified_diff 18from functools import cached_property 19from sqlmesh.core import constants as c 20from sqlmesh.core.console import get_console 21from sqlmesh.core.macros import RuntimeStage 22from sqlmesh.core.model.common import sorted_python_env_payloads 23from sqlmesh.core.snapshot import Snapshot, SnapshotId, SnapshotTableInfo 24from sqlmesh.utils.errors import SQLMeshError 25from sqlmesh.utils.pydantic import PydanticModel 26 27if sys.version_info >= (3, 12): 28 from importlib import metadata 29else: 30 import importlib_metadata as metadata # type: ignore 31 32 33if t.TYPE_CHECKING: 34 from sqlmesh.core.state_sync import StateReader 35 36from sqlmesh.utils.metaprogramming import Executable # noqa 37from sqlmesh.core.environment import EnvironmentStatements 38 39IGNORED_PACKAGES = {"sqlmesh", "sqlglot", "sqlglotc"} 40 41 42class ContextDiff(PydanticModel): 43 """ContextDiff is an object representing the difference between two environments. 44 45 The two environments can be the local environment and a remote environment, or two remote 46 environments. 47 """ 48 49 environment: str 50 """The environment to diff.""" 51 is_new_environment: bool 52 """Whether the target environment is new.""" 53 is_unfinalized_environment: bool 54 """Whether the currently stored environment record is in unfinalized state.""" 55 normalize_environment_name: bool 56 """Whether the environment name should be normalized.""" 57 previous_gateway_managed_virtual_layer: bool 58 """Whether the previous environment's virtual layer's views were created by the model specified gateways.""" 59 gateway_managed_virtual_layer: bool 60 """Whether the virtual layer's views will be created by the model specified gateways.""" 61 create_from: str 62 """The name of the environment the target environment will be created from if new.""" 63 create_from_env_exists: bool 64 """Whether the create_from environment already exists at plan time.""" 65 added: t.Set[SnapshotId] 66 """New nodes.""" 67 removed_snapshots: t.Dict[SnapshotId, SnapshotTableInfo] 68 """Deleted nodes.""" 69 modified_snapshots: t.Dict[str, t.Tuple[Snapshot, Snapshot]] 70 """Modified snapshots.""" 71 snapshots: t.Dict[SnapshotId, Snapshot] 72 """Merged snapshots.""" 73 new_snapshots: t.Dict[SnapshotId, Snapshot] 74 """New snapshots.""" 75 previous_plan_id: t.Optional[str] 76 """Previous plan id.""" 77 previously_promoted_snapshot_ids: t.Set[SnapshotId] 78 """Snapshot IDs that were promoted by the previous plan.""" 79 previous_finalized_snapshots: t.Optional[t.List[SnapshotTableInfo]] 80 """Snapshots from the previous finalized state.""" 81 previous_requirements: t.Dict[str, str] = {} 82 """Previous requirements.""" 83 requirements: t.Dict[str, str] = {} 84 """Python dependencies.""" 85 previous_environment_statements: t.List[EnvironmentStatements] = [] 86 """Previous environment statements.""" 87 environment_statements: t.List[EnvironmentStatements] 88 """Environment statements.""" 89 diff_rendered: bool = False 90 """Whether the diff should compare raw vs rendered models""" 91 92 @classmethod 93 def create( 94 cls, 95 environment: str, 96 snapshots: t.Dict[str, Snapshot], 97 create_from: str, 98 state_reader: StateReader, 99 ensure_finalized_snapshots: bool = False, 100 provided_requirements: t.Optional[t.Dict[str, str]] = None, 101 excluded_requirements: t.Optional[t.Set[str]] = None, 102 diff_rendered: bool = False, 103 environment_statements: t.Optional[t.List[EnvironmentStatements]] = [], 104 gateway_managed_virtual_layer: bool = False, 105 infer_python_dependencies: bool = True, 106 always_recreate_environment: bool = False, 107 ) -> ContextDiff: 108 """Create a ContextDiff object. 109 110 Args: 111 environment: The remote environment to diff. 112 snapshots: The snapshots of the current environment. 113 create_from: The environment to create the target environment from if it 114 doesn't exist. 115 state_reader: StateReader to access the remote environment to diff. 116 ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized 117 environment state, or to use whatever snapshots are in the current environment state even if 118 the environment is not finalized. 119 provided_requirements: Python dependencies sourced from the lock file. 120 excluded_requirements: Python dependencies to exclude. 121 diff_rendered: Whether to compute the diff of the rendered version of the compared expressions. 122 environment_statements: A list of `before_all` or `after_all` statements associated with the environment. 123 gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the 124 model-specific gateway rather than the default gateway. 125 infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python 126 package requirements. 127 128 Returns: 129 The ContextDiff object. 130 """ 131 environment = environment.lower() 132 existing_env = state_reader.get_environment(environment) 133 create_from_env_exists = False 134 135 recreate_environment = always_recreate_environment and not environment == create_from 136 137 if existing_env is None or existing_env.expired or recreate_environment: 138 env = state_reader.get_environment(create_from.lower()) 139 140 if not env and create_from != c.PROD: 141 get_console().log_warning( 142 f"The environment name '{create_from}' was passed to the `plan` command's `--create-from` argument, but '{create_from}' does not exist. Initializing new environment '{environment}' from scratch." 143 ) 144 145 is_new_environment = True 146 create_from_env_exists = env is not None 147 previously_promoted_snapshot_ids = set() 148 else: 149 env = existing_env 150 is_new_environment = False 151 previously_promoted_snapshot_ids = {s.snapshot_id for s in env.promoted_snapshots} 152 153 environment_snapshot_infos = [] 154 if env: 155 environment_snapshot_infos = ( 156 env.snapshots 157 if not ensure_finalized_snapshots 158 else env.finalized_or_current_snapshots 159 ) 160 remote_snapshot_name_to_info = { 161 snapshot_info.name: snapshot_info for snapshot_info in environment_snapshot_infos 162 } 163 removed = { 164 snapshot_table_info.snapshot_id: snapshot_table_info 165 for snapshot_table_info in environment_snapshot_infos 166 if snapshot_table_info.name not in snapshots 167 } 168 added = { 169 snapshot.snapshot_id 170 for snapshot in snapshots.values() 171 if snapshot.name not in remote_snapshot_name_to_info 172 } 173 modified_snapshot_name_to_snapshot_info = { 174 snapshot.name: remote_snapshot_name_to_info[snapshot.name] 175 for snapshot in snapshots.values() 176 if snapshot.snapshot_id not in added 177 and snapshot.fingerprint != remote_snapshot_name_to_info[snapshot.name].fingerprint 178 } 179 180 stored = state_reader.get_snapshots( 181 [*snapshots.values(), *modified_snapshot_name_to_snapshot_info.values()] 182 ) 183 184 merged_snapshots = {} 185 modified_snapshots = {} 186 new_snapshots = {} 187 188 for snapshot in snapshots.values(): 189 s_id = snapshot.snapshot_id 190 modified_snapshot_info = modified_snapshot_name_to_snapshot_info.get(snapshot.name) 191 existing_snapshot = stored.get(s_id) 192 193 if modified_snapshot_info and snapshot.node_type != modified_snapshot_info.node_type: 194 added.add(snapshot.snapshot_id) 195 removed[modified_snapshot_info.snapshot_id] = modified_snapshot_info 196 modified_snapshot_name_to_snapshot_info.pop(snapshot.name) 197 elif existing_snapshot: 198 # Keep the original node instance to preserve the query cache. 199 existing_snapshot.node = snapshot.node 200 201 merged_snapshots[s_id] = existing_snapshot.copy() 202 if modified_snapshot_info: 203 modified_snapshots[s_id.name] = ( 204 existing_snapshot, 205 stored[modified_snapshot_info.snapshot_id], 206 ) 207 else: 208 snapshot = snapshot.copy() 209 merged_snapshots[s_id] = snapshot 210 new_snapshots[snapshot.snapshot_id] = snapshot 211 if modified_snapshot_info: 212 snapshot.previous_versions = modified_snapshot_info.all_versions 213 modified_snapshots[s_id.name] = ( 214 snapshot, 215 stored[modified_snapshot_info.snapshot_id], 216 ) 217 218 requirements = _build_requirements( 219 provided_requirements or {}, 220 excluded_requirements or set(), 221 snapshots.values(), 222 infer_python_dependencies=infer_python_dependencies, 223 ) 224 225 previous_environment_statements = ( 226 state_reader.get_environment_statements(env.name) if env else [] 227 ) 228 229 if existing_env and always_recreate_environment: 230 previous_plan_id: t.Optional[str] = existing_env.plan_id 231 else: 232 previous_plan_id = env.plan_id if env and not is_new_environment else None 233 234 return ContextDiff( 235 environment=environment, 236 is_new_environment=is_new_environment, 237 is_unfinalized_environment=bool(env and not env.finalized_ts), 238 normalize_environment_name=is_new_environment or bool(env and env.normalize_name), 239 create_from=create_from, 240 create_from_env_exists=create_from_env_exists, 241 added=added, 242 removed_snapshots=removed, 243 modified_snapshots=modified_snapshots, 244 snapshots=merged_snapshots, 245 new_snapshots=new_snapshots, 246 previous_plan_id=previous_plan_id, 247 previously_promoted_snapshot_ids=previously_promoted_snapshot_ids, 248 previous_finalized_snapshots=env.previous_finalized_snapshots if env else None, 249 previous_requirements=env.requirements if env else {}, 250 requirements=requirements, 251 diff_rendered=diff_rendered, 252 previous_environment_statements=previous_environment_statements, 253 environment_statements=environment_statements, 254 previous_gateway_managed_virtual_layer=env.gateway_managed if env else False, 255 gateway_managed_virtual_layer=gateway_managed_virtual_layer, 256 ) 257 258 @classmethod 259 def create_no_diff(cls, environment: str, state_reader: StateReader) -> ContextDiff: 260 """Create a no-op ContextDiff object. 261 262 Args: 263 environment: The target environment. 264 state_reader: StateReader to access the remote environment record. 265 266 Returns: 267 The ContextDiff object. 268 """ 269 env = state_reader.get_environment(environment.lower()) 270 if not env: 271 raise SQLMeshError(f"Environment '{environment}' must exist for this operation.") 272 273 environment_statements = state_reader.get_environment_statements(environment) 274 snapshots = state_reader.get_snapshots(env.snapshots) 275 276 return ContextDiff( 277 environment=env.name, 278 is_new_environment=False, 279 is_unfinalized_environment=False, 280 normalize_environment_name=env.normalize_name, 281 create_from="", 282 create_from_env_exists=False, 283 added=set(), 284 removed_snapshots={}, 285 modified_snapshots={}, 286 snapshots=snapshots, 287 new_snapshots={}, 288 previous_plan_id=env.plan_id, 289 previously_promoted_snapshot_ids={s.snapshot_id for s in env.promoted_snapshots}, 290 previous_finalized_snapshots=env.previous_finalized_snapshots, 291 previous_requirements=env.requirements, 292 requirements=env.requirements, 293 previous_environment_statements=environment_statements, 294 environment_statements=environment_statements, 295 previous_gateway_managed_virtual_layer=env.gateway_managed, 296 gateway_managed_virtual_layer=env.gateway_managed, 297 ) 298 299 @property 300 def has_changes(self) -> bool: 301 return ( 302 self.has_snapshot_changes 303 or self.is_new_environment 304 or self.is_unfinalized_environment 305 or self.has_requirement_changes 306 or self.has_environment_statements_changes 307 or self.previous_gateway_managed_virtual_layer != self.gateway_managed_virtual_layer 308 ) 309 310 @property 311 def has_requirement_changes(self) -> bool: 312 return self.previous_requirements != self.requirements 313 314 @property 315 def has_environment_statements_changes(self) -> bool: 316 return sorted(self.environment_statements, key=lambda s: s.project or "") != sorted( 317 self.previous_environment_statements, key=lambda s: s.project or "" 318 ) 319 320 @property 321 def has_snapshot_changes(self) -> bool: 322 return bool(self.added or self.removed_snapshots or self.modified_snapshots) 323 324 @property 325 def added_materialized_snapshot_ids(self) -> t.Set[SnapshotId]: 326 """Returns the set of added internal snapshot ids.""" 327 return { 328 s_id 329 for s_id in self.added 330 if self.snapshots[s_id].model_kind_name 331 and self.snapshots[s_id].model_kind_name.is_materialized # type: ignore 332 } 333 334 @property 335 def promotable_snapshot_ids(self) -> t.Set[SnapshotId]: 336 """The set of snapshot ids that have to be promoted in the target environment.""" 337 return { 338 *self.previously_promoted_snapshot_ids, 339 *self.added, 340 *self.current_modified_snapshot_ids, 341 } - set(self.removed_snapshots) 342 343 @property 344 def unpromoted_models(self) -> t.Set[SnapshotId]: 345 """The set of snapshot IDs that have not yet been promoted in the target environment.""" 346 return set(self.snapshots) - self.previously_promoted_snapshot_ids 347 348 @property 349 def current_modified_snapshot_ids(self) -> t.Set[SnapshotId]: 350 return {current.snapshot_id for current, _ in self.modified_snapshots.values()} 351 352 @cached_property 353 def snapshots_by_name(self) -> t.Dict[str, Snapshot]: 354 return {x.name: x for x in self.snapshots.values()} 355 356 def requirements_diff(self) -> str: 357 return " " + "\n ".join( 358 ndiff( 359 [ 360 f"{k}=={self.previous_requirements[k]}" 361 for k in sorted(self.previous_requirements) 362 ], 363 [f"{k}=={self.requirements[k]}" for k in sorted(self.requirements)], 364 ) 365 ) 366 367 def environment_statements_diff( 368 self, include_python_env: bool = False 369 ) -> t.List[t.Tuple[str, str]]: 370 def extract_statements(statements: t.List[EnvironmentStatements], attr: str) -> t.List[str]: 371 return [ 372 string 373 for statement in statements 374 for expr in ( 375 sorted_python_env_payloads(statement.python_env) 376 if attr == "python_env" 377 else getattr(statement, attr) 378 ) 379 for string in expr.split("\n") 380 ] 381 382 def compute_diff(attribute: str) -> t.Optional[t.Tuple[str, str]]: 383 previous = extract_statements(self.previous_environment_statements, attribute) 384 current = extract_statements(self.environment_statements, attribute) 385 386 if previous == current: 387 return None 388 389 diff_text = attribute if not attribute == "python_env" else "dependencies" 390 diff_text += ":\n" 391 if attribute == "python_env": 392 diff = list(unified_diff(previous, current)) 393 diff_text += "\n".join(diff[2:] if len(diff) > 1 else diff) 394 return "python", diff_text + "\n" 395 396 diff_lines = list(ndiff(previous, current)) 397 if any(line.startswith(("-", "+")) for line in diff_lines): 398 diff_text += " " + "\n ".join(diff_lines) + "\n" 399 return "sql", diff_text 400 401 return [ 402 diff 403 for attribute in [ 404 RuntimeStage.BEFORE_ALL.value, 405 RuntimeStage.AFTER_ALL.value, 406 *(["python_env"] if include_python_env else []), 407 ] 408 if (diff := compute_diff(attribute)) is not None 409 ] 410 411 @property 412 def environment_snapshots(self) -> t.List[SnapshotTableInfo]: 413 """Returns current snapshots in the environment.""" 414 return [ 415 *self.removed_snapshots.values(), 416 *(old.table_info for _, old in self.modified_snapshots.values()), 417 *[ 418 s.table_info 419 for s_id, s in self.snapshots.items() 420 if s_id not in self.added and s.name not in self.modified_snapshots 421 ], 422 ] 423 424 def directly_modified(self, name: str) -> bool: 425 """Returns whether or not a node was directly modified in this context. 426 427 Args: 428 name: The snapshot name to check. 429 430 Returns: 431 Whether or not the node was directly modified. 432 """ 433 434 if name not in self.modified_snapshots: 435 return False 436 437 current, previous = self.modified_snapshots[name] 438 return current.is_directly_modified(previous) 439 440 def indirectly_modified(self, name: str) -> bool: 441 """Returns whether or not a node was indirectly modified in this context. 442 443 Args: 444 name: The snapshot name to check. 445 446 Returns: 447 Whether or not the node was indirectly modified. 448 """ 449 450 if name not in self.modified_snapshots: 451 return False 452 453 current, previous = self.modified_snapshots[name] 454 return current.is_indirectly_modified(previous) 455 456 def metadata_updated(self, name: str) -> bool: 457 """Returns whether or not the given node's metadata has been updated. 458 459 Args: 460 name: The node to check. 461 462 Returns: 463 Whether or not the node's metadata has been updated. 464 """ 465 466 if name not in self.modified_snapshots: 467 return False 468 469 current, previous = self.modified_snapshots[name] 470 return current.is_metadata_updated(previous) 471 472 def text_diff(self, name: str) -> str: 473 """Finds the difference of a node between the current and remote environment. 474 475 Args: 476 name: The Snapshot name. 477 478 Returns: 479 A unified text diff of the node. 480 """ 481 if name not in self.snapshots_by_name: 482 raise SQLMeshError(f"`{name}` does not exist.") 483 if name not in self.modified_snapshots: 484 return "" 485 486 new, old = self.modified_snapshots[name] 487 try: 488 return old.node.text_diff(new.node, rendered=self.diff_rendered) 489 except SQLMeshError as e: 490 get_console().log_warning(f"Failed to diff model '{name}': {str(e)}.") 491 return "" 492 493 494def _build_requirements( 495 provided_requirements: t.Dict[str, str], 496 excluded_requirements: t.Set[str], 497 snapshots: t.Collection[Snapshot], 498 infer_python_dependencies: bool = True, 499) -> t.Dict[str, str]: 500 requirements = { 501 k: v for k, v in provided_requirements.items() if k not in excluded_requirements 502 } 503 504 if not infer_python_dependencies: 505 return requirements 506 507 distributions = metadata.packages_distributions() 508 509 for snapshot in snapshots: 510 if not snapshot.is_model: 511 continue 512 513 for executable in snapshot.model.python_env.values(): 514 if executable.kind != "import": 515 continue 516 517 try: 518 start = "from " if executable.payload.startswith("from ") else "import " 519 lib = executable.payload.split(start)[1].split()[0].split(".")[0] 520 if lib not in distributions: 521 continue 522 523 for dist in distributions[lib]: 524 if ( 525 dist not in requirements 526 and dist not in IGNORED_PACKAGES 527 and dist not in excluded_requirements 528 ): 529 requirements[dist] = metadata.version(dist) 530 except metadata.PackageNotFoundError: 531 from sqlmesh.core.console import get_console 532 533 get_console().log_warning(f"Failed to find package for {lib}.") 534 535 return requirements
43class ContextDiff(PydanticModel): 44 """ContextDiff is an object representing the difference between two environments. 45 46 The two environments can be the local environment and a remote environment, or two remote 47 environments. 48 """ 49 50 environment: str 51 """The environment to diff.""" 52 is_new_environment: bool 53 """Whether the target environment is new.""" 54 is_unfinalized_environment: bool 55 """Whether the currently stored environment record is in unfinalized state.""" 56 normalize_environment_name: bool 57 """Whether the environment name should be normalized.""" 58 previous_gateway_managed_virtual_layer: bool 59 """Whether the previous environment's virtual layer's views were created by the model specified gateways.""" 60 gateway_managed_virtual_layer: bool 61 """Whether the virtual layer's views will be created by the model specified gateways.""" 62 create_from: str 63 """The name of the environment the target environment will be created from if new.""" 64 create_from_env_exists: bool 65 """Whether the create_from environment already exists at plan time.""" 66 added: t.Set[SnapshotId] 67 """New nodes.""" 68 removed_snapshots: t.Dict[SnapshotId, SnapshotTableInfo] 69 """Deleted nodes.""" 70 modified_snapshots: t.Dict[str, t.Tuple[Snapshot, Snapshot]] 71 """Modified snapshots.""" 72 snapshots: t.Dict[SnapshotId, Snapshot] 73 """Merged snapshots.""" 74 new_snapshots: t.Dict[SnapshotId, Snapshot] 75 """New snapshots.""" 76 previous_plan_id: t.Optional[str] 77 """Previous plan id.""" 78 previously_promoted_snapshot_ids: t.Set[SnapshotId] 79 """Snapshot IDs that were promoted by the previous plan.""" 80 previous_finalized_snapshots: t.Optional[t.List[SnapshotTableInfo]] 81 """Snapshots from the previous finalized state.""" 82 previous_requirements: t.Dict[str, str] = {} 83 """Previous requirements.""" 84 requirements: t.Dict[str, str] = {} 85 """Python dependencies.""" 86 previous_environment_statements: t.List[EnvironmentStatements] = [] 87 """Previous environment statements.""" 88 environment_statements: t.List[EnvironmentStatements] 89 """Environment statements.""" 90 diff_rendered: bool = False 91 """Whether the diff should compare raw vs rendered models""" 92 93 @classmethod 94 def create( 95 cls, 96 environment: str, 97 snapshots: t.Dict[str, Snapshot], 98 create_from: str, 99 state_reader: StateReader, 100 ensure_finalized_snapshots: bool = False, 101 provided_requirements: t.Optional[t.Dict[str, str]] = None, 102 excluded_requirements: t.Optional[t.Set[str]] = None, 103 diff_rendered: bool = False, 104 environment_statements: t.Optional[t.List[EnvironmentStatements]] = [], 105 gateway_managed_virtual_layer: bool = False, 106 infer_python_dependencies: bool = True, 107 always_recreate_environment: bool = False, 108 ) -> ContextDiff: 109 """Create a ContextDiff object. 110 111 Args: 112 environment: The remote environment to diff. 113 snapshots: The snapshots of the current environment. 114 create_from: The environment to create the target environment from if it 115 doesn't exist. 116 state_reader: StateReader to access the remote environment to diff. 117 ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized 118 environment state, or to use whatever snapshots are in the current environment state even if 119 the environment is not finalized. 120 provided_requirements: Python dependencies sourced from the lock file. 121 excluded_requirements: Python dependencies to exclude. 122 diff_rendered: Whether to compute the diff of the rendered version of the compared expressions. 123 environment_statements: A list of `before_all` or `after_all` statements associated with the environment. 124 gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the 125 model-specific gateway rather than the default gateway. 126 infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python 127 package requirements. 128 129 Returns: 130 The ContextDiff object. 131 """ 132 environment = environment.lower() 133 existing_env = state_reader.get_environment(environment) 134 create_from_env_exists = False 135 136 recreate_environment = always_recreate_environment and not environment == create_from 137 138 if existing_env is None or existing_env.expired or recreate_environment: 139 env = state_reader.get_environment(create_from.lower()) 140 141 if not env and create_from != c.PROD: 142 get_console().log_warning( 143 f"The environment name '{create_from}' was passed to the `plan` command's `--create-from` argument, but '{create_from}' does not exist. Initializing new environment '{environment}' from scratch." 144 ) 145 146 is_new_environment = True 147 create_from_env_exists = env is not None 148 previously_promoted_snapshot_ids = set() 149 else: 150 env = existing_env 151 is_new_environment = False 152 previously_promoted_snapshot_ids = {s.snapshot_id for s in env.promoted_snapshots} 153 154 environment_snapshot_infos = [] 155 if env: 156 environment_snapshot_infos = ( 157 env.snapshots 158 if not ensure_finalized_snapshots 159 else env.finalized_or_current_snapshots 160 ) 161 remote_snapshot_name_to_info = { 162 snapshot_info.name: snapshot_info for snapshot_info in environment_snapshot_infos 163 } 164 removed = { 165 snapshot_table_info.snapshot_id: snapshot_table_info 166 for snapshot_table_info in environment_snapshot_infos 167 if snapshot_table_info.name not in snapshots 168 } 169 added = { 170 snapshot.snapshot_id 171 for snapshot in snapshots.values() 172 if snapshot.name not in remote_snapshot_name_to_info 173 } 174 modified_snapshot_name_to_snapshot_info = { 175 snapshot.name: remote_snapshot_name_to_info[snapshot.name] 176 for snapshot in snapshots.values() 177 if snapshot.snapshot_id not in added 178 and snapshot.fingerprint != remote_snapshot_name_to_info[snapshot.name].fingerprint 179 } 180 181 stored = state_reader.get_snapshots( 182 [*snapshots.values(), *modified_snapshot_name_to_snapshot_info.values()] 183 ) 184 185 merged_snapshots = {} 186 modified_snapshots = {} 187 new_snapshots = {} 188 189 for snapshot in snapshots.values(): 190 s_id = snapshot.snapshot_id 191 modified_snapshot_info = modified_snapshot_name_to_snapshot_info.get(snapshot.name) 192 existing_snapshot = stored.get(s_id) 193 194 if modified_snapshot_info and snapshot.node_type != modified_snapshot_info.node_type: 195 added.add(snapshot.snapshot_id) 196 removed[modified_snapshot_info.snapshot_id] = modified_snapshot_info 197 modified_snapshot_name_to_snapshot_info.pop(snapshot.name) 198 elif existing_snapshot: 199 # Keep the original node instance to preserve the query cache. 200 existing_snapshot.node = snapshot.node 201 202 merged_snapshots[s_id] = existing_snapshot.copy() 203 if modified_snapshot_info: 204 modified_snapshots[s_id.name] = ( 205 existing_snapshot, 206 stored[modified_snapshot_info.snapshot_id], 207 ) 208 else: 209 snapshot = snapshot.copy() 210 merged_snapshots[s_id] = snapshot 211 new_snapshots[snapshot.snapshot_id] = snapshot 212 if modified_snapshot_info: 213 snapshot.previous_versions = modified_snapshot_info.all_versions 214 modified_snapshots[s_id.name] = ( 215 snapshot, 216 stored[modified_snapshot_info.snapshot_id], 217 ) 218 219 requirements = _build_requirements( 220 provided_requirements or {}, 221 excluded_requirements or set(), 222 snapshots.values(), 223 infer_python_dependencies=infer_python_dependencies, 224 ) 225 226 previous_environment_statements = ( 227 state_reader.get_environment_statements(env.name) if env else [] 228 ) 229 230 if existing_env and always_recreate_environment: 231 previous_plan_id: t.Optional[str] = existing_env.plan_id 232 else: 233 previous_plan_id = env.plan_id if env and not is_new_environment else None 234 235 return ContextDiff( 236 environment=environment, 237 is_new_environment=is_new_environment, 238 is_unfinalized_environment=bool(env and not env.finalized_ts), 239 normalize_environment_name=is_new_environment or bool(env and env.normalize_name), 240 create_from=create_from, 241 create_from_env_exists=create_from_env_exists, 242 added=added, 243 removed_snapshots=removed, 244 modified_snapshots=modified_snapshots, 245 snapshots=merged_snapshots, 246 new_snapshots=new_snapshots, 247 previous_plan_id=previous_plan_id, 248 previously_promoted_snapshot_ids=previously_promoted_snapshot_ids, 249 previous_finalized_snapshots=env.previous_finalized_snapshots if env else None, 250 previous_requirements=env.requirements if env else {}, 251 requirements=requirements, 252 diff_rendered=diff_rendered, 253 previous_environment_statements=previous_environment_statements, 254 environment_statements=environment_statements, 255 previous_gateway_managed_virtual_layer=env.gateway_managed if env else False, 256 gateway_managed_virtual_layer=gateway_managed_virtual_layer, 257 ) 258 259 @classmethod 260 def create_no_diff(cls, environment: str, state_reader: StateReader) -> ContextDiff: 261 """Create a no-op ContextDiff object. 262 263 Args: 264 environment: The target environment. 265 state_reader: StateReader to access the remote environment record. 266 267 Returns: 268 The ContextDiff object. 269 """ 270 env = state_reader.get_environment(environment.lower()) 271 if not env: 272 raise SQLMeshError(f"Environment '{environment}' must exist for this operation.") 273 274 environment_statements = state_reader.get_environment_statements(environment) 275 snapshots = state_reader.get_snapshots(env.snapshots) 276 277 return ContextDiff( 278 environment=env.name, 279 is_new_environment=False, 280 is_unfinalized_environment=False, 281 normalize_environment_name=env.normalize_name, 282 create_from="", 283 create_from_env_exists=False, 284 added=set(), 285 removed_snapshots={}, 286 modified_snapshots={}, 287 snapshots=snapshots, 288 new_snapshots={}, 289 previous_plan_id=env.plan_id, 290 previously_promoted_snapshot_ids={s.snapshot_id for s in env.promoted_snapshots}, 291 previous_finalized_snapshots=env.previous_finalized_snapshots, 292 previous_requirements=env.requirements, 293 requirements=env.requirements, 294 previous_environment_statements=environment_statements, 295 environment_statements=environment_statements, 296 previous_gateway_managed_virtual_layer=env.gateway_managed, 297 gateway_managed_virtual_layer=env.gateway_managed, 298 ) 299 300 @property 301 def has_changes(self) -> bool: 302 return ( 303 self.has_snapshot_changes 304 or self.is_new_environment 305 or self.is_unfinalized_environment 306 or self.has_requirement_changes 307 or self.has_environment_statements_changes 308 or self.previous_gateway_managed_virtual_layer != self.gateway_managed_virtual_layer 309 ) 310 311 @property 312 def has_requirement_changes(self) -> bool: 313 return self.previous_requirements != self.requirements 314 315 @property 316 def has_environment_statements_changes(self) -> bool: 317 return sorted(self.environment_statements, key=lambda s: s.project or "") != sorted( 318 self.previous_environment_statements, key=lambda s: s.project or "" 319 ) 320 321 @property 322 def has_snapshot_changes(self) -> bool: 323 return bool(self.added or self.removed_snapshots or self.modified_snapshots) 324 325 @property 326 def added_materialized_snapshot_ids(self) -> t.Set[SnapshotId]: 327 """Returns the set of added internal snapshot ids.""" 328 return { 329 s_id 330 for s_id in self.added 331 if self.snapshots[s_id].model_kind_name 332 and self.snapshots[s_id].model_kind_name.is_materialized # type: ignore 333 } 334 335 @property 336 def promotable_snapshot_ids(self) -> t.Set[SnapshotId]: 337 """The set of snapshot ids that have to be promoted in the target environment.""" 338 return { 339 *self.previously_promoted_snapshot_ids, 340 *self.added, 341 *self.current_modified_snapshot_ids, 342 } - set(self.removed_snapshots) 343 344 @property 345 def unpromoted_models(self) -> t.Set[SnapshotId]: 346 """The set of snapshot IDs that have not yet been promoted in the target environment.""" 347 return set(self.snapshots) - self.previously_promoted_snapshot_ids 348 349 @property 350 def current_modified_snapshot_ids(self) -> t.Set[SnapshotId]: 351 return {current.snapshot_id for current, _ in self.modified_snapshots.values()} 352 353 @cached_property 354 def snapshots_by_name(self) -> t.Dict[str, Snapshot]: 355 return {x.name: x for x in self.snapshots.values()} 356 357 def requirements_diff(self) -> str: 358 return " " + "\n ".join( 359 ndiff( 360 [ 361 f"{k}=={self.previous_requirements[k]}" 362 for k in sorted(self.previous_requirements) 363 ], 364 [f"{k}=={self.requirements[k]}" for k in sorted(self.requirements)], 365 ) 366 ) 367 368 def environment_statements_diff( 369 self, include_python_env: bool = False 370 ) -> t.List[t.Tuple[str, str]]: 371 def extract_statements(statements: t.List[EnvironmentStatements], attr: str) -> t.List[str]: 372 return [ 373 string 374 for statement in statements 375 for expr in ( 376 sorted_python_env_payloads(statement.python_env) 377 if attr == "python_env" 378 else getattr(statement, attr) 379 ) 380 for string in expr.split("\n") 381 ] 382 383 def compute_diff(attribute: str) -> t.Optional[t.Tuple[str, str]]: 384 previous = extract_statements(self.previous_environment_statements, attribute) 385 current = extract_statements(self.environment_statements, attribute) 386 387 if previous == current: 388 return None 389 390 diff_text = attribute if not attribute == "python_env" else "dependencies" 391 diff_text += ":\n" 392 if attribute == "python_env": 393 diff = list(unified_diff(previous, current)) 394 diff_text += "\n".join(diff[2:] if len(diff) > 1 else diff) 395 return "python", diff_text + "\n" 396 397 diff_lines = list(ndiff(previous, current)) 398 if any(line.startswith(("-", "+")) for line in diff_lines): 399 diff_text += " " + "\n ".join(diff_lines) + "\n" 400 return "sql", diff_text 401 402 return [ 403 diff 404 for attribute in [ 405 RuntimeStage.BEFORE_ALL.value, 406 RuntimeStage.AFTER_ALL.value, 407 *(["python_env"] if include_python_env else []), 408 ] 409 if (diff := compute_diff(attribute)) is not None 410 ] 411 412 @property 413 def environment_snapshots(self) -> t.List[SnapshotTableInfo]: 414 """Returns current snapshots in the environment.""" 415 return [ 416 *self.removed_snapshots.values(), 417 *(old.table_info for _, old in self.modified_snapshots.values()), 418 *[ 419 s.table_info 420 for s_id, s in self.snapshots.items() 421 if s_id not in self.added and s.name not in self.modified_snapshots 422 ], 423 ] 424 425 def directly_modified(self, name: str) -> bool: 426 """Returns whether or not a node was directly modified in this context. 427 428 Args: 429 name: The snapshot name to check. 430 431 Returns: 432 Whether or not the node was directly modified. 433 """ 434 435 if name not in self.modified_snapshots: 436 return False 437 438 current, previous = self.modified_snapshots[name] 439 return current.is_directly_modified(previous) 440 441 def indirectly_modified(self, name: str) -> bool: 442 """Returns whether or not a node was indirectly modified in this context. 443 444 Args: 445 name: The snapshot name to check. 446 447 Returns: 448 Whether or not the node was indirectly modified. 449 """ 450 451 if name not in self.modified_snapshots: 452 return False 453 454 current, previous = self.modified_snapshots[name] 455 return current.is_indirectly_modified(previous) 456 457 def metadata_updated(self, name: str) -> bool: 458 """Returns whether or not the given node's metadata has been updated. 459 460 Args: 461 name: The node to check. 462 463 Returns: 464 Whether or not the node's metadata has been updated. 465 """ 466 467 if name not in self.modified_snapshots: 468 return False 469 470 current, previous = self.modified_snapshots[name] 471 return current.is_metadata_updated(previous) 472 473 def text_diff(self, name: str) -> str: 474 """Finds the difference of a node between the current and remote environment. 475 476 Args: 477 name: The Snapshot name. 478 479 Returns: 480 A unified text diff of the node. 481 """ 482 if name not in self.snapshots_by_name: 483 raise SQLMeshError(f"`{name}` does not exist.") 484 if name not in self.modified_snapshots: 485 return "" 486 487 new, old = self.modified_snapshots[name] 488 try: 489 return old.node.text_diff(new.node, rendered=self.diff_rendered) 490 except SQLMeshError as e: 491 get_console().log_warning(f"Failed to diff model '{name}': {str(e)}.") 492 return ""
ContextDiff is an object representing the difference between two environments.
The two environments can be the local environment and a remote environment, or two remote environments.
Whether the currently stored environment record is in unfinalized state.
Whether the previous environment's virtual layer's views were created by the model specified gateways.
Whether the virtual layer's views will be created by the model specified gateways.
Deleted nodes.
Modified snapshots.
Merged snapshots.
New snapshots.
Snapshot IDs that were promoted by the previous plan.
Snapshots from the previous finalized state.
Previous environment statements.
Environment statements.
93 @classmethod 94 def create( 95 cls, 96 environment: str, 97 snapshots: t.Dict[str, Snapshot], 98 create_from: str, 99 state_reader: StateReader, 100 ensure_finalized_snapshots: bool = False, 101 provided_requirements: t.Optional[t.Dict[str, str]] = None, 102 excluded_requirements: t.Optional[t.Set[str]] = None, 103 diff_rendered: bool = False, 104 environment_statements: t.Optional[t.List[EnvironmentStatements]] = [], 105 gateway_managed_virtual_layer: bool = False, 106 infer_python_dependencies: bool = True, 107 always_recreate_environment: bool = False, 108 ) -> ContextDiff: 109 """Create a ContextDiff object. 110 111 Args: 112 environment: The remote environment to diff. 113 snapshots: The snapshots of the current environment. 114 create_from: The environment to create the target environment from if it 115 doesn't exist. 116 state_reader: StateReader to access the remote environment to diff. 117 ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized 118 environment state, or to use whatever snapshots are in the current environment state even if 119 the environment is not finalized. 120 provided_requirements: Python dependencies sourced from the lock file. 121 excluded_requirements: Python dependencies to exclude. 122 diff_rendered: Whether to compute the diff of the rendered version of the compared expressions. 123 environment_statements: A list of `before_all` or `after_all` statements associated with the environment. 124 gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the 125 model-specific gateway rather than the default gateway. 126 infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python 127 package requirements. 128 129 Returns: 130 The ContextDiff object. 131 """ 132 environment = environment.lower() 133 existing_env = state_reader.get_environment(environment) 134 create_from_env_exists = False 135 136 recreate_environment = always_recreate_environment and not environment == create_from 137 138 if existing_env is None or existing_env.expired or recreate_environment: 139 env = state_reader.get_environment(create_from.lower()) 140 141 if not env and create_from != c.PROD: 142 get_console().log_warning( 143 f"The environment name '{create_from}' was passed to the `plan` command's `--create-from` argument, but '{create_from}' does not exist. Initializing new environment '{environment}' from scratch." 144 ) 145 146 is_new_environment = True 147 create_from_env_exists = env is not None 148 previously_promoted_snapshot_ids = set() 149 else: 150 env = existing_env 151 is_new_environment = False 152 previously_promoted_snapshot_ids = {s.snapshot_id for s in env.promoted_snapshots} 153 154 environment_snapshot_infos = [] 155 if env: 156 environment_snapshot_infos = ( 157 env.snapshots 158 if not ensure_finalized_snapshots 159 else env.finalized_or_current_snapshots 160 ) 161 remote_snapshot_name_to_info = { 162 snapshot_info.name: snapshot_info for snapshot_info in environment_snapshot_infos 163 } 164 removed = { 165 snapshot_table_info.snapshot_id: snapshot_table_info 166 for snapshot_table_info in environment_snapshot_infos 167 if snapshot_table_info.name not in snapshots 168 } 169 added = { 170 snapshot.snapshot_id 171 for snapshot in snapshots.values() 172 if snapshot.name not in remote_snapshot_name_to_info 173 } 174 modified_snapshot_name_to_snapshot_info = { 175 snapshot.name: remote_snapshot_name_to_info[snapshot.name] 176 for snapshot in snapshots.values() 177 if snapshot.snapshot_id not in added 178 and snapshot.fingerprint != remote_snapshot_name_to_info[snapshot.name].fingerprint 179 } 180 181 stored = state_reader.get_snapshots( 182 [*snapshots.values(), *modified_snapshot_name_to_snapshot_info.values()] 183 ) 184 185 merged_snapshots = {} 186 modified_snapshots = {} 187 new_snapshots = {} 188 189 for snapshot in snapshots.values(): 190 s_id = snapshot.snapshot_id 191 modified_snapshot_info = modified_snapshot_name_to_snapshot_info.get(snapshot.name) 192 existing_snapshot = stored.get(s_id) 193 194 if modified_snapshot_info and snapshot.node_type != modified_snapshot_info.node_type: 195 added.add(snapshot.snapshot_id) 196 removed[modified_snapshot_info.snapshot_id] = modified_snapshot_info 197 modified_snapshot_name_to_snapshot_info.pop(snapshot.name) 198 elif existing_snapshot: 199 # Keep the original node instance to preserve the query cache. 200 existing_snapshot.node = snapshot.node 201 202 merged_snapshots[s_id] = existing_snapshot.copy() 203 if modified_snapshot_info: 204 modified_snapshots[s_id.name] = ( 205 existing_snapshot, 206 stored[modified_snapshot_info.snapshot_id], 207 ) 208 else: 209 snapshot = snapshot.copy() 210 merged_snapshots[s_id] = snapshot 211 new_snapshots[snapshot.snapshot_id] = snapshot 212 if modified_snapshot_info: 213 snapshot.previous_versions = modified_snapshot_info.all_versions 214 modified_snapshots[s_id.name] = ( 215 snapshot, 216 stored[modified_snapshot_info.snapshot_id], 217 ) 218 219 requirements = _build_requirements( 220 provided_requirements or {}, 221 excluded_requirements or set(), 222 snapshots.values(), 223 infer_python_dependencies=infer_python_dependencies, 224 ) 225 226 previous_environment_statements = ( 227 state_reader.get_environment_statements(env.name) if env else [] 228 ) 229 230 if existing_env and always_recreate_environment: 231 previous_plan_id: t.Optional[str] = existing_env.plan_id 232 else: 233 previous_plan_id = env.plan_id if env and not is_new_environment else None 234 235 return ContextDiff( 236 environment=environment, 237 is_new_environment=is_new_environment, 238 is_unfinalized_environment=bool(env and not env.finalized_ts), 239 normalize_environment_name=is_new_environment or bool(env and env.normalize_name), 240 create_from=create_from, 241 create_from_env_exists=create_from_env_exists, 242 added=added, 243 removed_snapshots=removed, 244 modified_snapshots=modified_snapshots, 245 snapshots=merged_snapshots, 246 new_snapshots=new_snapshots, 247 previous_plan_id=previous_plan_id, 248 previously_promoted_snapshot_ids=previously_promoted_snapshot_ids, 249 previous_finalized_snapshots=env.previous_finalized_snapshots if env else None, 250 previous_requirements=env.requirements if env else {}, 251 requirements=requirements, 252 diff_rendered=diff_rendered, 253 previous_environment_statements=previous_environment_statements, 254 environment_statements=environment_statements, 255 previous_gateway_managed_virtual_layer=env.gateway_managed if env else False, 256 gateway_managed_virtual_layer=gateway_managed_virtual_layer, 257 )
Create a ContextDiff object.
Arguments:
- environment: The remote environment to diff.
- snapshots: The snapshots of the current environment.
- create_from: The environment to create the target environment from if it doesn't exist.
- state_reader: StateReader to access the remote environment to diff.
- ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
- provided_requirements: Python dependencies sourced from the lock file.
- excluded_requirements: Python dependencies to exclude.
- diff_rendered: Whether to compute the diff of the rendered version of the compared expressions.
- environment_statements: A list of
before_allorafter_allstatements associated with the environment. - gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway.
- infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python package requirements.
Returns:
The ContextDiff object.
259 @classmethod 260 def create_no_diff(cls, environment: str, state_reader: StateReader) -> ContextDiff: 261 """Create a no-op ContextDiff object. 262 263 Args: 264 environment: The target environment. 265 state_reader: StateReader to access the remote environment record. 266 267 Returns: 268 The ContextDiff object. 269 """ 270 env = state_reader.get_environment(environment.lower()) 271 if not env: 272 raise SQLMeshError(f"Environment '{environment}' must exist for this operation.") 273 274 environment_statements = state_reader.get_environment_statements(environment) 275 snapshots = state_reader.get_snapshots(env.snapshots) 276 277 return ContextDiff( 278 environment=env.name, 279 is_new_environment=False, 280 is_unfinalized_environment=False, 281 normalize_environment_name=env.normalize_name, 282 create_from="", 283 create_from_env_exists=False, 284 added=set(), 285 removed_snapshots={}, 286 modified_snapshots={}, 287 snapshots=snapshots, 288 new_snapshots={}, 289 previous_plan_id=env.plan_id, 290 previously_promoted_snapshot_ids={s.snapshot_id for s in env.promoted_snapshots}, 291 previous_finalized_snapshots=env.previous_finalized_snapshots, 292 previous_requirements=env.requirements, 293 requirements=env.requirements, 294 previous_environment_statements=environment_statements, 295 environment_statements=environment_statements, 296 previous_gateway_managed_virtual_layer=env.gateway_managed, 297 gateway_managed_virtual_layer=env.gateway_managed, 298 )
Create a no-op ContextDiff object.
Arguments:
- environment: The target environment.
- state_reader: StateReader to access the remote environment record.
Returns:
The ContextDiff object.
300 @property 301 def has_changes(self) -> bool: 302 return ( 303 self.has_snapshot_changes 304 or self.is_new_environment 305 or self.is_unfinalized_environment 306 or self.has_requirement_changes 307 or self.has_environment_statements_changes 308 or self.previous_gateway_managed_virtual_layer != self.gateway_managed_virtual_layer 309 )
325 @property 326 def added_materialized_snapshot_ids(self) -> t.Set[SnapshotId]: 327 """Returns the set of added internal snapshot ids.""" 328 return { 329 s_id 330 for s_id in self.added 331 if self.snapshots[s_id].model_kind_name 332 and self.snapshots[s_id].model_kind_name.is_materialized # type: ignore 333 }
Returns the set of added internal snapshot ids.
335 @property 336 def promotable_snapshot_ids(self) -> t.Set[SnapshotId]: 337 """The set of snapshot ids that have to be promoted in the target environment.""" 338 return { 339 *self.previously_promoted_snapshot_ids, 340 *self.added, 341 *self.current_modified_snapshot_ids, 342 } - set(self.removed_snapshots)
The set of snapshot ids that have to be promoted in the target environment.
344 @property 345 def unpromoted_models(self) -> t.Set[SnapshotId]: 346 """The set of snapshot IDs that have not yet been promoted in the target environment.""" 347 return set(self.snapshots) - self.previously_promoted_snapshot_ids
The set of snapshot IDs that have not yet been promoted in the target environment.
368 def environment_statements_diff( 369 self, include_python_env: bool = False 370 ) -> t.List[t.Tuple[str, str]]: 371 def extract_statements(statements: t.List[EnvironmentStatements], attr: str) -> t.List[str]: 372 return [ 373 string 374 for statement in statements 375 for expr in ( 376 sorted_python_env_payloads(statement.python_env) 377 if attr == "python_env" 378 else getattr(statement, attr) 379 ) 380 for string in expr.split("\n") 381 ] 382 383 def compute_diff(attribute: str) -> t.Optional[t.Tuple[str, str]]: 384 previous = extract_statements(self.previous_environment_statements, attribute) 385 current = extract_statements(self.environment_statements, attribute) 386 387 if previous == current: 388 return None 389 390 diff_text = attribute if not attribute == "python_env" else "dependencies" 391 diff_text += ":\n" 392 if attribute == "python_env": 393 diff = list(unified_diff(previous, current)) 394 diff_text += "\n".join(diff[2:] if len(diff) > 1 else diff) 395 return "python", diff_text + "\n" 396 397 diff_lines = list(ndiff(previous, current)) 398 if any(line.startswith(("-", "+")) for line in diff_lines): 399 diff_text += " " + "\n ".join(diff_lines) + "\n" 400 return "sql", diff_text 401 402 return [ 403 diff 404 for attribute in [ 405 RuntimeStage.BEFORE_ALL.value, 406 RuntimeStage.AFTER_ALL.value, 407 *(["python_env"] if include_python_env else []), 408 ] 409 if (diff := compute_diff(attribute)) is not None 410 ]
412 @property 413 def environment_snapshots(self) -> t.List[SnapshotTableInfo]: 414 """Returns current snapshots in the environment.""" 415 return [ 416 *self.removed_snapshots.values(), 417 *(old.table_info for _, old in self.modified_snapshots.values()), 418 *[ 419 s.table_info 420 for s_id, s in self.snapshots.items() 421 if s_id not in self.added and s.name not in self.modified_snapshots 422 ], 423 ]
Returns current snapshots in the environment.
425 def directly_modified(self, name: str) -> bool: 426 """Returns whether or not a node was directly modified in this context. 427 428 Args: 429 name: The snapshot name to check. 430 431 Returns: 432 Whether or not the node was directly modified. 433 """ 434 435 if name not in self.modified_snapshots: 436 return False 437 438 current, previous = self.modified_snapshots[name] 439 return current.is_directly_modified(previous)
Returns whether or not a node was directly modified in this context.
Arguments:
- name: The snapshot name to check.
Returns:
Whether or not the node was directly modified.
441 def indirectly_modified(self, name: str) -> bool: 442 """Returns whether or not a node was indirectly modified in this context. 443 444 Args: 445 name: The snapshot name to check. 446 447 Returns: 448 Whether or not the node was indirectly modified. 449 """ 450 451 if name not in self.modified_snapshots: 452 return False 453 454 current, previous = self.modified_snapshots[name] 455 return current.is_indirectly_modified(previous)
Returns whether or not a node was indirectly modified in this context.
Arguments:
- name: The snapshot name to check.
Returns:
Whether or not the node was indirectly modified.
457 def metadata_updated(self, name: str) -> bool: 458 """Returns whether or not the given node's metadata has been updated. 459 460 Args: 461 name: The node to check. 462 463 Returns: 464 Whether or not the node's metadata has been updated. 465 """ 466 467 if name not in self.modified_snapshots: 468 return False 469 470 current, previous = self.modified_snapshots[name] 471 return current.is_metadata_updated(previous)
Returns whether or not the given node's metadata has been updated.
Arguments:
- name: The node to check.
Returns:
Whether or not the node's metadata has been updated.
473 def text_diff(self, name: str) -> str: 474 """Finds the difference of a node between the current and remote environment. 475 476 Args: 477 name: The Snapshot name. 478 479 Returns: 480 A unified text diff of the node. 481 """ 482 if name not in self.snapshots_by_name: 483 raise SQLMeshError(f"`{name}` does not exist.") 484 if name not in self.modified_snapshots: 485 return "" 486 487 new, old = self.modified_snapshots[name] 488 try: 489 return old.node.text_diff(new.node, rendered=self.diff_rendered) 490 except SQLMeshError as e: 491 get_console().log_warning(f"Failed to diff model '{name}': {str(e)}.") 492 return ""
Finds the difference of a node between the current and remote environment.
Arguments:
- name: The Snapshot name.
Returns:
A unified text diff of the node.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs