ContextDiff
ContextDiff encapsulates the differences between two environments. The two environments can be the local environment and a remote environment, or two remote environments. ContextDiff is an important part of SQLMesh. SQLMesh plans use ContextDiff to determine what nodes were changed between two environments. The SQLMesh CLI diff command uses ContextDiff to determine what to visualize.
When creating a ContextDiff object, SQLMesh will compare the snapshots from one environment with those of another remote environment and determine if nodes have been added, removed, or modified.
1""" 2# ContextDiff 3 4ContextDiff encapsulates the differences between two environments. The two environments can be the local 5environment and a remote environment, or two remote environments. ContextDiff is an important part of 6SQLMesh. SQLMesh plans use ContextDiff to determine what nodes were changed between two environments. 7The SQLMesh CLI diff command uses ContextDiff to determine what to visualize. 8 9When creating a ContextDiff object, SQLMesh will compare the snapshots from one environment with those of 10another remote environment and determine if nodes have been added, removed, or modified. 11""" 12 13from __future__ import annotations 14 15import logging 16import typing as t 17from functools import cached_property 18 19from sqlmesh.core.snapshot import Snapshot, SnapshotId, SnapshotTableInfo 20from sqlmesh.utils.errors import SQLMeshError 21from sqlmesh.utils.pydantic import PydanticModel 22 23if t.TYPE_CHECKING: 24 from sqlmesh.core.state_sync import StateReader 25 26logger = logging.getLogger(__name__) 27 28 29class ContextDiff(PydanticModel): 30 """ContextDiff is an object representing the difference between two environments. 31 32 The two environments can be the local environment and a remote environment, or two remote 33 environments. 34 """ 35 36 environment: str 37 """The environment to diff.""" 38 is_new_environment: bool 39 """Whether the target environment is new.""" 40 is_unfinalized_environment: bool 41 """Whether the currently stored environment record is in unfinalized state.""" 42 create_from: str 43 """The name of the environment the target environment will be created from if new.""" 44 added: t.Set[SnapshotId] 45 """New nodes.""" 46 removed_snapshots: t.Dict[SnapshotId, SnapshotTableInfo] 47 """Deleted nodes.""" 48 modified_snapshots: t.Dict[str, t.Tuple[Snapshot, Snapshot]] 49 """Modified snapshots.""" 50 snapshots: t.Dict[SnapshotId, Snapshot] 51 """Merged snapshots.""" 52 new_snapshots: t.Dict[SnapshotId, Snapshot] 53 """New snapshots.""" 54 previous_plan_id: t.Optional[str] 55 """Previous plan id.""" 56 previously_promoted_snapshot_ids: t.Set[SnapshotId] 57 """Snapshot IDs that were promoted by the previous plan.""" 58 previous_finalized_snapshots: t.Optional[t.List[SnapshotTableInfo]] 59 """Snapshots from the previous finalized state.""" 60 61 @classmethod 62 def create( 63 cls, 64 environment: str, 65 snapshots: t.Dict[str, Snapshot], 66 create_from: str, 67 state_reader: StateReader, 68 ensure_finalized_snapshots: bool = False, 69 ) -> ContextDiff: 70 """Create a ContextDiff object. 71 72 Args: 73 environment: The remote environment to diff. 74 snapshots: The snapshots of the current environment. 75 create_from: The environment to create the target environment from if it 76 doesn't exist. 77 state_reader: StateReader to access the remote environment to diff. 78 ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized 79 environment state, or to use whatever snapshots are in the current environment state even if 80 the environment is not finalized. 81 82 Returns: 83 The ContextDiff object. 84 """ 85 environment = environment.lower() 86 env = state_reader.get_environment(environment) 87 88 if env is None: 89 env = state_reader.get_environment(create_from.lower()) 90 is_new_environment = True 91 previously_promoted_snapshot_ids = set() 92 else: 93 is_new_environment = False 94 previously_promoted_snapshot_ids = {s.snapshot_id for s in env.promoted_snapshots} 95 96 environment_snapshot_infos = [] 97 if env: 98 environment_snapshot_infos = ( 99 env.snapshots 100 if not ensure_finalized_snapshots 101 else env.finalized_or_current_snapshots 102 ) 103 remote_snapshot_name_to_info = { 104 snapshot_info.name: snapshot_info for snapshot_info in environment_snapshot_infos 105 } 106 removed = { 107 snapshot_table_info.snapshot_id: snapshot_table_info 108 for snapshot_table_info in environment_snapshot_infos 109 if snapshot_table_info.name not in snapshots 110 } 111 added = { 112 snapshot.snapshot_id 113 for snapshot in snapshots.values() 114 if snapshot.name not in remote_snapshot_name_to_info 115 } 116 modified_snapshot_name_to_snapshot_info = { 117 snapshot.name: remote_snapshot_name_to_info[snapshot.name] 118 for snapshot in snapshots.values() 119 if snapshot.snapshot_id not in added 120 and snapshot.fingerprint != remote_snapshot_name_to_info[snapshot.name].fingerprint 121 } 122 modified_local_seed_snapshot_ids = { 123 s.snapshot_id 124 for s in snapshots.values() 125 if s.is_seed and s.name in modified_snapshot_name_to_snapshot_info 126 } 127 modified_remote_snapshot_ids = { 128 s.snapshot_id for s in modified_snapshot_name_to_snapshot_info.values() 129 } 130 131 stored = { 132 **state_reader.get_snapshots( 133 [ 134 snapshot.snapshot_id 135 for snapshot in snapshots.values() 136 if snapshot.snapshot_id not in modified_local_seed_snapshot_ids 137 ] 138 ), 139 **state_reader.get_snapshots( 140 modified_remote_snapshot_ids | modified_local_seed_snapshot_ids, 141 hydrate_seeds=True, 142 ), 143 } 144 145 merged_snapshots = {} 146 modified_snapshots = {} 147 new_snapshots = {} 148 149 for snapshot in snapshots.values(): 150 s_id = snapshot.snapshot_id 151 modified_snapshot_info = modified_snapshot_name_to_snapshot_info.get(snapshot.name) 152 existing_snapshot = stored.get(s_id) 153 154 if modified_snapshot_info and snapshot.node_type != modified_snapshot_info.node_type: 155 added.add(snapshot.snapshot_id) 156 removed[modified_snapshot_info.snapshot_id] = modified_snapshot_info 157 modified_snapshot_name_to_snapshot_info.pop(snapshot.name) 158 elif existing_snapshot: 159 # Keep the original node instance to preserve the query cache. 160 existing_snapshot.node = snapshot.node 161 162 merged_snapshots[s_id] = existing_snapshot.copy() 163 if modified_snapshot_info: 164 modified_snapshots[s_id.name] = ( 165 existing_snapshot, 166 stored[modified_snapshot_info.snapshot_id], 167 ) 168 else: 169 snapshot = snapshot.copy() 170 merged_snapshots[s_id] = snapshot 171 new_snapshots[snapshot.snapshot_id] = snapshot 172 if modified_snapshot_info: 173 snapshot.previous_versions = modified_snapshot_info.all_versions 174 modified_snapshots[s_id.name] = ( 175 snapshot, 176 stored[modified_snapshot_info.snapshot_id], 177 ) 178 179 return ContextDiff( 180 environment=environment, 181 is_new_environment=is_new_environment, 182 is_unfinalized_environment=bool(env and not env.finalized_ts), 183 create_from=create_from, 184 added=added, 185 removed_snapshots=removed, 186 modified_snapshots=modified_snapshots, 187 snapshots=merged_snapshots, 188 new_snapshots=new_snapshots, 189 previous_plan_id=env.plan_id if env and not is_new_environment else None, 190 previously_promoted_snapshot_ids=previously_promoted_snapshot_ids, 191 previous_finalized_snapshots=env.previous_finalized_snapshots if env else None, 192 ) 193 194 @classmethod 195 def create_no_diff(cls, environment: str) -> ContextDiff: 196 """Create a no-op ContextDiff object. 197 198 Args: 199 environment: The environment to diff. 200 201 Returns: 202 The ContextDiff object. 203 """ 204 return ContextDiff( 205 environment=environment, 206 is_new_environment=False, 207 is_unfinalized_environment=False, 208 create_from="", 209 added=set(), 210 removed_snapshots={}, 211 modified_snapshots={}, 212 snapshots={}, 213 new_snapshots={}, 214 previous_plan_id=None, 215 previously_promoted_snapshot_ids=set(), 216 previous_finalized_snapshots=None, 217 ) 218 219 @property 220 def has_changes(self) -> bool: 221 return ( 222 self.has_snapshot_changes or self.is_new_environment or self.is_unfinalized_environment 223 ) 224 225 @property 226 def has_snapshot_changes(self) -> bool: 227 return bool(self.added or self.removed_snapshots or self.modified_snapshots) 228 229 @property 230 def added_materialized_snapshot_ids(self) -> t.Set[SnapshotId]: 231 """Returns the set of added internal snapshot ids.""" 232 return { 233 s_id 234 for s_id in self.added 235 if self.snapshots[s_id].model_kind_name 236 and self.snapshots[s_id].model_kind_name.is_materialized # type: ignore 237 } 238 239 @property 240 def promotable_snapshot_ids(self) -> t.Set[SnapshotId]: 241 """The set of snapshot ids that have to be promoted in the target environment.""" 242 return { 243 *self.previously_promoted_snapshot_ids, 244 *self.added, 245 *self.current_modified_snapshot_ids, 246 } - set(self.removed_snapshots) 247 248 @property 249 def unpromoted_models(self) -> t.Set[SnapshotId]: 250 """The set of snapshot IDs that have not yet been promoted in the target environment.""" 251 return set(self.snapshots) - self.previously_promoted_snapshot_ids 252 253 @property 254 def current_modified_snapshot_ids(self) -> t.Set[SnapshotId]: 255 return {current.snapshot_id for current, _ in self.modified_snapshots.values()} 256 257 @cached_property 258 def snapshots_by_name(self) -> t.Dict[str, Snapshot]: 259 return {x.name: x for x in self.snapshots.values()} 260 261 @property 262 def environment_snapshots(self) -> t.List[SnapshotTableInfo]: 263 """Returns current snapshots in the environment.""" 264 return [ 265 *self.removed_snapshots.values(), 266 *(old.table_info for _, old in self.modified_snapshots.values()), 267 *[ 268 s.table_info 269 for s_id, s in self.snapshots.items() 270 if s_id not in self.added and s.name not in self.modified_snapshots 271 ], 272 ] 273 274 def directly_modified(self, name: str) -> bool: 275 """Returns whether or not a node was directly modified in this context. 276 277 Args: 278 name: The snapshot name to check. 279 280 Returns: 281 Whether or not the node was directly modified. 282 """ 283 284 if name not in self.modified_snapshots: 285 return False 286 287 current, previous = self.modified_snapshots[name] 288 return current.fingerprint.data_hash != previous.fingerprint.data_hash 289 290 def indirectly_modified(self, name: str) -> bool: 291 """Returns whether or not a node was indirectly modified in this context. 292 293 Args: 294 name: The snapshot name to check. 295 296 Returns: 297 Whether or not the node was indirectly modified. 298 """ 299 300 if name not in self.modified_snapshots: 301 return False 302 303 current, previous = self.modified_snapshots[name] 304 return ( 305 current.fingerprint.data_hash == previous.fingerprint.data_hash 306 and current.fingerprint.parent_data_hash != previous.fingerprint.parent_data_hash 307 ) 308 309 def metadata_updated(self, name: str) -> bool: 310 """Returns whether or not the given node's metadata has been updated. 311 312 Args: 313 name: The node to check. 314 315 Returns: 316 Whether or not the node's metadata has been updated. 317 """ 318 319 if name not in self.modified_snapshots: 320 return False 321 322 current, previous = self.modified_snapshots[name] 323 return current.fingerprint.metadata_hash != previous.fingerprint.metadata_hash 324 325 def text_diff(self, name: str) -> str: 326 """Finds the difference of a node between the current and remote environment. 327 328 Args: 329 name: The Snapshot name. 330 331 Returns: 332 A unified text diff of the node. 333 """ 334 if name not in self.snapshots_by_name: 335 raise SQLMeshError(f"`{name}` does not exist.") 336 if name not in self.modified_snapshots: 337 return "" 338 339 new, old = self.modified_snapshots[name] 340 try: 341 return old.node.text_diff(new.node) 342 except SQLMeshError as e: 343 logger.warning("Failed to diff model '%s': %s", name, str(e)) 344 return ""
30class ContextDiff(PydanticModel): 31 """ContextDiff is an object representing the difference between two environments. 32 33 The two environments can be the local environment and a remote environment, or two remote 34 environments. 35 """ 36 37 environment: str 38 """The environment to diff.""" 39 is_new_environment: bool 40 """Whether the target environment is new.""" 41 is_unfinalized_environment: bool 42 """Whether the currently stored environment record is in unfinalized state.""" 43 create_from: str 44 """The name of the environment the target environment will be created from if new.""" 45 added: t.Set[SnapshotId] 46 """New nodes.""" 47 removed_snapshots: t.Dict[SnapshotId, SnapshotTableInfo] 48 """Deleted nodes.""" 49 modified_snapshots: t.Dict[str, t.Tuple[Snapshot, Snapshot]] 50 """Modified snapshots.""" 51 snapshots: t.Dict[SnapshotId, Snapshot] 52 """Merged snapshots.""" 53 new_snapshots: t.Dict[SnapshotId, Snapshot] 54 """New snapshots.""" 55 previous_plan_id: t.Optional[str] 56 """Previous plan id.""" 57 previously_promoted_snapshot_ids: t.Set[SnapshotId] 58 """Snapshot IDs that were promoted by the previous plan.""" 59 previous_finalized_snapshots: t.Optional[t.List[SnapshotTableInfo]] 60 """Snapshots from the previous finalized state.""" 61 62 @classmethod 63 def create( 64 cls, 65 environment: str, 66 snapshots: t.Dict[str, Snapshot], 67 create_from: str, 68 state_reader: StateReader, 69 ensure_finalized_snapshots: bool = False, 70 ) -> ContextDiff: 71 """Create a ContextDiff object. 72 73 Args: 74 environment: The remote environment to diff. 75 snapshots: The snapshots of the current environment. 76 create_from: The environment to create the target environment from if it 77 doesn't exist. 78 state_reader: StateReader to access the remote environment to diff. 79 ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized 80 environment state, or to use whatever snapshots are in the current environment state even if 81 the environment is not finalized. 82 83 Returns: 84 The ContextDiff object. 85 """ 86 environment = environment.lower() 87 env = state_reader.get_environment(environment) 88 89 if env is None: 90 env = state_reader.get_environment(create_from.lower()) 91 is_new_environment = True 92 previously_promoted_snapshot_ids = set() 93 else: 94 is_new_environment = False 95 previously_promoted_snapshot_ids = {s.snapshot_id for s in env.promoted_snapshots} 96 97 environment_snapshot_infos = [] 98 if env: 99 environment_snapshot_infos = ( 100 env.snapshots 101 if not ensure_finalized_snapshots 102 else env.finalized_or_current_snapshots 103 ) 104 remote_snapshot_name_to_info = { 105 snapshot_info.name: snapshot_info for snapshot_info in environment_snapshot_infos 106 } 107 removed = { 108 snapshot_table_info.snapshot_id: snapshot_table_info 109 for snapshot_table_info in environment_snapshot_infos 110 if snapshot_table_info.name not in snapshots 111 } 112 added = { 113 snapshot.snapshot_id 114 for snapshot in snapshots.values() 115 if snapshot.name not in remote_snapshot_name_to_info 116 } 117 modified_snapshot_name_to_snapshot_info = { 118 snapshot.name: remote_snapshot_name_to_info[snapshot.name] 119 for snapshot in snapshots.values() 120 if snapshot.snapshot_id not in added 121 and snapshot.fingerprint != remote_snapshot_name_to_info[snapshot.name].fingerprint 122 } 123 modified_local_seed_snapshot_ids = { 124 s.snapshot_id 125 for s in snapshots.values() 126 if s.is_seed and s.name in modified_snapshot_name_to_snapshot_info 127 } 128 modified_remote_snapshot_ids = { 129 s.snapshot_id for s in modified_snapshot_name_to_snapshot_info.values() 130 } 131 132 stored = { 133 **state_reader.get_snapshots( 134 [ 135 snapshot.snapshot_id 136 for snapshot in snapshots.values() 137 if snapshot.snapshot_id not in modified_local_seed_snapshot_ids 138 ] 139 ), 140 **state_reader.get_snapshots( 141 modified_remote_snapshot_ids | modified_local_seed_snapshot_ids, 142 hydrate_seeds=True, 143 ), 144 } 145 146 merged_snapshots = {} 147 modified_snapshots = {} 148 new_snapshots = {} 149 150 for snapshot in snapshots.values(): 151 s_id = snapshot.snapshot_id 152 modified_snapshot_info = modified_snapshot_name_to_snapshot_info.get(snapshot.name) 153 existing_snapshot = stored.get(s_id) 154 155 if modified_snapshot_info and snapshot.node_type != modified_snapshot_info.node_type: 156 added.add(snapshot.snapshot_id) 157 removed[modified_snapshot_info.snapshot_id] = modified_snapshot_info 158 modified_snapshot_name_to_snapshot_info.pop(snapshot.name) 159 elif existing_snapshot: 160 # Keep the original node instance to preserve the query cache. 161 existing_snapshot.node = snapshot.node 162 163 merged_snapshots[s_id] = existing_snapshot.copy() 164 if modified_snapshot_info: 165 modified_snapshots[s_id.name] = ( 166 existing_snapshot, 167 stored[modified_snapshot_info.snapshot_id], 168 ) 169 else: 170 snapshot = snapshot.copy() 171 merged_snapshots[s_id] = snapshot 172 new_snapshots[snapshot.snapshot_id] = snapshot 173 if modified_snapshot_info: 174 snapshot.previous_versions = modified_snapshot_info.all_versions 175 modified_snapshots[s_id.name] = ( 176 snapshot, 177 stored[modified_snapshot_info.snapshot_id], 178 ) 179 180 return ContextDiff( 181 environment=environment, 182 is_new_environment=is_new_environment, 183 is_unfinalized_environment=bool(env and not env.finalized_ts), 184 create_from=create_from, 185 added=added, 186 removed_snapshots=removed, 187 modified_snapshots=modified_snapshots, 188 snapshots=merged_snapshots, 189 new_snapshots=new_snapshots, 190 previous_plan_id=env.plan_id if env and not is_new_environment else None, 191 previously_promoted_snapshot_ids=previously_promoted_snapshot_ids, 192 previous_finalized_snapshots=env.previous_finalized_snapshots if env else None, 193 ) 194 195 @classmethod 196 def create_no_diff(cls, environment: str) -> ContextDiff: 197 """Create a no-op ContextDiff object. 198 199 Args: 200 environment: The environment to diff. 201 202 Returns: 203 The ContextDiff object. 204 """ 205 return ContextDiff( 206 environment=environment, 207 is_new_environment=False, 208 is_unfinalized_environment=False, 209 create_from="", 210 added=set(), 211 removed_snapshots={}, 212 modified_snapshots={}, 213 snapshots={}, 214 new_snapshots={}, 215 previous_plan_id=None, 216 previously_promoted_snapshot_ids=set(), 217 previous_finalized_snapshots=None, 218 ) 219 220 @property 221 def has_changes(self) -> bool: 222 return ( 223 self.has_snapshot_changes or self.is_new_environment or self.is_unfinalized_environment 224 ) 225 226 @property 227 def has_snapshot_changes(self) -> bool: 228 return bool(self.added or self.removed_snapshots or self.modified_snapshots) 229 230 @property 231 def added_materialized_snapshot_ids(self) -> t.Set[SnapshotId]: 232 """Returns the set of added internal snapshot ids.""" 233 return { 234 s_id 235 for s_id in self.added 236 if self.snapshots[s_id].model_kind_name 237 and self.snapshots[s_id].model_kind_name.is_materialized # type: ignore 238 } 239 240 @property 241 def promotable_snapshot_ids(self) -> t.Set[SnapshotId]: 242 """The set of snapshot ids that have to be promoted in the target environment.""" 243 return { 244 *self.previously_promoted_snapshot_ids, 245 *self.added, 246 *self.current_modified_snapshot_ids, 247 } - set(self.removed_snapshots) 248 249 @property 250 def unpromoted_models(self) -> t.Set[SnapshotId]: 251 """The set of snapshot IDs that have not yet been promoted in the target environment.""" 252 return set(self.snapshots) - self.previously_promoted_snapshot_ids 253 254 @property 255 def current_modified_snapshot_ids(self) -> t.Set[SnapshotId]: 256 return {current.snapshot_id for current, _ in self.modified_snapshots.values()} 257 258 @cached_property 259 def snapshots_by_name(self) -> t.Dict[str, Snapshot]: 260 return {x.name: x for x in self.snapshots.values()} 261 262 @property 263 def environment_snapshots(self) -> t.List[SnapshotTableInfo]: 264 """Returns current snapshots in the environment.""" 265 return [ 266 *self.removed_snapshots.values(), 267 *(old.table_info for _, old in self.modified_snapshots.values()), 268 *[ 269 s.table_info 270 for s_id, s in self.snapshots.items() 271 if s_id not in self.added and s.name not in self.modified_snapshots 272 ], 273 ] 274 275 def directly_modified(self, name: str) -> bool: 276 """Returns whether or not a node was directly modified in this context. 277 278 Args: 279 name: The snapshot name to check. 280 281 Returns: 282 Whether or not the node was directly modified. 283 """ 284 285 if name not in self.modified_snapshots: 286 return False 287 288 current, previous = self.modified_snapshots[name] 289 return current.fingerprint.data_hash != previous.fingerprint.data_hash 290 291 def indirectly_modified(self, name: str) -> bool: 292 """Returns whether or not a node was indirectly modified in this context. 293 294 Args: 295 name: The snapshot name to check. 296 297 Returns: 298 Whether or not the node was indirectly modified. 299 """ 300 301 if name not in self.modified_snapshots: 302 return False 303 304 current, previous = self.modified_snapshots[name] 305 return ( 306 current.fingerprint.data_hash == previous.fingerprint.data_hash 307 and current.fingerprint.parent_data_hash != previous.fingerprint.parent_data_hash 308 ) 309 310 def metadata_updated(self, name: str) -> bool: 311 """Returns whether or not the given node's metadata has been updated. 312 313 Args: 314 name: The node to check. 315 316 Returns: 317 Whether or not the node's metadata has been updated. 318 """ 319 320 if name not in self.modified_snapshots: 321 return False 322 323 current, previous = self.modified_snapshots[name] 324 return current.fingerprint.metadata_hash != previous.fingerprint.metadata_hash 325 326 def text_diff(self, name: str) -> str: 327 """Finds the difference of a node between the current and remote environment. 328 329 Args: 330 name: The Snapshot name. 331 332 Returns: 333 A unified text diff of the node. 334 """ 335 if name not in self.snapshots_by_name: 336 raise SQLMeshError(f"`{name}` does not exist.") 337 if name not in self.modified_snapshots: 338 return "" 339 340 new, old = self.modified_snapshots[name] 341 try: 342 return old.node.text_diff(new.node) 343 except SQLMeshError as e: 344 logger.warning("Failed to diff model '%s': %s", name, str(e)) 345 return ""
ContextDiff is an object representing the difference between two environments.
The two environments can be the local environment and a remote environment, or two remote environments.
Whether the currently stored environment record is in unfinalized state.
Deleted nodes.
Modified snapshots.
Merged snapshots.
New snapshots.
Snapshot IDs that were promoted by the previous plan.
Snapshots from the previous finalized state.
62 @classmethod 63 def create( 64 cls, 65 environment: str, 66 snapshots: t.Dict[str, Snapshot], 67 create_from: str, 68 state_reader: StateReader, 69 ensure_finalized_snapshots: bool = False, 70 ) -> ContextDiff: 71 """Create a ContextDiff object. 72 73 Args: 74 environment: The remote environment to diff. 75 snapshots: The snapshots of the current environment. 76 create_from: The environment to create the target environment from if it 77 doesn't exist. 78 state_reader: StateReader to access the remote environment to diff. 79 ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized 80 environment state, or to use whatever snapshots are in the current environment state even if 81 the environment is not finalized. 82 83 Returns: 84 The ContextDiff object. 85 """ 86 environment = environment.lower() 87 env = state_reader.get_environment(environment) 88 89 if env is None: 90 env = state_reader.get_environment(create_from.lower()) 91 is_new_environment = True 92 previously_promoted_snapshot_ids = set() 93 else: 94 is_new_environment = False 95 previously_promoted_snapshot_ids = {s.snapshot_id for s in env.promoted_snapshots} 96 97 environment_snapshot_infos = [] 98 if env: 99 environment_snapshot_infos = ( 100 env.snapshots 101 if not ensure_finalized_snapshots 102 else env.finalized_or_current_snapshots 103 ) 104 remote_snapshot_name_to_info = { 105 snapshot_info.name: snapshot_info for snapshot_info in environment_snapshot_infos 106 } 107 removed = { 108 snapshot_table_info.snapshot_id: snapshot_table_info 109 for snapshot_table_info in environment_snapshot_infos 110 if snapshot_table_info.name not in snapshots 111 } 112 added = { 113 snapshot.snapshot_id 114 for snapshot in snapshots.values() 115 if snapshot.name not in remote_snapshot_name_to_info 116 } 117 modified_snapshot_name_to_snapshot_info = { 118 snapshot.name: remote_snapshot_name_to_info[snapshot.name] 119 for snapshot in snapshots.values() 120 if snapshot.snapshot_id not in added 121 and snapshot.fingerprint != remote_snapshot_name_to_info[snapshot.name].fingerprint 122 } 123 modified_local_seed_snapshot_ids = { 124 s.snapshot_id 125 for s in snapshots.values() 126 if s.is_seed and s.name in modified_snapshot_name_to_snapshot_info 127 } 128 modified_remote_snapshot_ids = { 129 s.snapshot_id for s in modified_snapshot_name_to_snapshot_info.values() 130 } 131 132 stored = { 133 **state_reader.get_snapshots( 134 [ 135 snapshot.snapshot_id 136 for snapshot in snapshots.values() 137 if snapshot.snapshot_id not in modified_local_seed_snapshot_ids 138 ] 139 ), 140 **state_reader.get_snapshots( 141 modified_remote_snapshot_ids | modified_local_seed_snapshot_ids, 142 hydrate_seeds=True, 143 ), 144 } 145 146 merged_snapshots = {} 147 modified_snapshots = {} 148 new_snapshots = {} 149 150 for snapshot in snapshots.values(): 151 s_id = snapshot.snapshot_id 152 modified_snapshot_info = modified_snapshot_name_to_snapshot_info.get(snapshot.name) 153 existing_snapshot = stored.get(s_id) 154 155 if modified_snapshot_info and snapshot.node_type != modified_snapshot_info.node_type: 156 added.add(snapshot.snapshot_id) 157 removed[modified_snapshot_info.snapshot_id] = modified_snapshot_info 158 modified_snapshot_name_to_snapshot_info.pop(snapshot.name) 159 elif existing_snapshot: 160 # Keep the original node instance to preserve the query cache. 161 existing_snapshot.node = snapshot.node 162 163 merged_snapshots[s_id] = existing_snapshot.copy() 164 if modified_snapshot_info: 165 modified_snapshots[s_id.name] = ( 166 existing_snapshot, 167 stored[modified_snapshot_info.snapshot_id], 168 ) 169 else: 170 snapshot = snapshot.copy() 171 merged_snapshots[s_id] = snapshot 172 new_snapshots[snapshot.snapshot_id] = snapshot 173 if modified_snapshot_info: 174 snapshot.previous_versions = modified_snapshot_info.all_versions 175 modified_snapshots[s_id.name] = ( 176 snapshot, 177 stored[modified_snapshot_info.snapshot_id], 178 ) 179 180 return ContextDiff( 181 environment=environment, 182 is_new_environment=is_new_environment, 183 is_unfinalized_environment=bool(env and not env.finalized_ts), 184 create_from=create_from, 185 added=added, 186 removed_snapshots=removed, 187 modified_snapshots=modified_snapshots, 188 snapshots=merged_snapshots, 189 new_snapshots=new_snapshots, 190 previous_plan_id=env.plan_id if env and not is_new_environment else None, 191 previously_promoted_snapshot_ids=previously_promoted_snapshot_ids, 192 previous_finalized_snapshots=env.previous_finalized_snapshots if env else None, 193 )
Create a ContextDiff object.
Arguments:
- environment: The remote environment to diff.
- snapshots: The snapshots of the current environment.
- create_from: The environment to create the target environment from if it doesn't exist.
- state_reader: StateReader to access the remote environment to diff.
- ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized.
Returns:
The ContextDiff object.
195 @classmethod 196 def create_no_diff(cls, environment: str) -> ContextDiff: 197 """Create a no-op ContextDiff object. 198 199 Args: 200 environment: The environment to diff. 201 202 Returns: 203 The ContextDiff object. 204 """ 205 return ContextDiff( 206 environment=environment, 207 is_new_environment=False, 208 is_unfinalized_environment=False, 209 create_from="", 210 added=set(), 211 removed_snapshots={}, 212 modified_snapshots={}, 213 snapshots={}, 214 new_snapshots={}, 215 previous_plan_id=None, 216 previously_promoted_snapshot_ids=set(), 217 previous_finalized_snapshots=None, 218 )
Create a no-op ContextDiff object.
Arguments:
- environment: The environment to diff.
Returns:
The ContextDiff object.
Returns the set of added internal snapshot ids.
The set of snapshot ids that have to be promoted in the target environment.
The set of snapshot IDs that have not yet been promoted in the target environment.
Returns current snapshots in the environment.
275 def directly_modified(self, name: str) -> bool: 276 """Returns whether or not a node was directly modified in this context. 277 278 Args: 279 name: The snapshot name to check. 280 281 Returns: 282 Whether or not the node was directly modified. 283 """ 284 285 if name not in self.modified_snapshots: 286 return False 287 288 current, previous = self.modified_snapshots[name] 289 return current.fingerprint.data_hash != previous.fingerprint.data_hash
Returns whether or not a node was directly modified in this context.
Arguments:
- name: The snapshot name to check.
Returns:
Whether or not the node was directly modified.
291 def indirectly_modified(self, name: str) -> bool: 292 """Returns whether or not a node was indirectly modified in this context. 293 294 Args: 295 name: The snapshot name to check. 296 297 Returns: 298 Whether or not the node was indirectly modified. 299 """ 300 301 if name not in self.modified_snapshots: 302 return False 303 304 current, previous = self.modified_snapshots[name] 305 return ( 306 current.fingerprint.data_hash == previous.fingerprint.data_hash 307 and current.fingerprint.parent_data_hash != previous.fingerprint.parent_data_hash 308 )
Returns whether or not a node was indirectly modified in this context.
Arguments:
- name: The snapshot name to check.
Returns:
Whether or not the node was indirectly modified.
310 def metadata_updated(self, name: str) -> bool: 311 """Returns whether or not the given node's metadata has been updated. 312 313 Args: 314 name: The node to check. 315 316 Returns: 317 Whether or not the node's metadata has been updated. 318 """ 319 320 if name not in self.modified_snapshots: 321 return False 322 323 current, previous = self.modified_snapshots[name] 324 return current.fingerprint.metadata_hash != previous.fingerprint.metadata_hash
Returns whether or not the given node's metadata has been updated.
Arguments:
- name: The node to check.
Returns:
Whether or not the node's metadata has been updated.
326 def text_diff(self, name: str) -> str: 327 """Finds the difference of a node between the current and remote environment. 328 329 Args: 330 name: The Snapshot name. 331 332 Returns: 333 A unified text diff of the node. 334 """ 335 if name not in self.snapshots_by_name: 336 raise SQLMeshError(f"`{name}` does not exist.") 337 if name not in self.modified_snapshots: 338 return "" 339 340 new, old = self.modified_snapshots[name] 341 try: 342 return old.node.text_diff(new.node) 343 except SQLMeshError as e: 344 logger.warning("Failed to diff model '%s': %s", name, str(e)) 345 return ""
Finds the difference of a node between the current and remote environment.
Arguments:
- name: The Snapshot name.
Returns:
A unified text diff of the node.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs