sqlmesh.core.plan.common
1from __future__ import annotations 2import typing as t 3import logging 4from dataclasses import dataclass, field 5 6from sqlmesh.core.state_sync import StateReader 7from sqlmesh.core.snapshot import Snapshot, SnapshotId, SnapshotIdAndVersion, SnapshotNameVersion 8from sqlmesh.core.snapshot.definition import Interval 9from sqlmesh.utils.dag import DAG 10from sqlmesh.utils.date import now_timestamp 11 12logger = logging.getLogger(__name__) 13 14 15def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool: 16 if new.is_view and new.is_indirect_non_breaking and not new.is_forward_only: 17 # View models always need to be rebuilt to reflect updated upstream dependencies 18 return True 19 if new.is_seed and not ( 20 new.is_metadata 21 and new.previous_version 22 and new.previous_version.snapshot_id(new.name) == old.snapshot_id 23 ): 24 # Seed models always need to be rebuilt to reflect changes in the seed file 25 # Unless only their metadata has been updated (eg description added) and the seed file has not been touched 26 return True 27 return is_breaking_kind_change(old, new) 28 29 30def is_breaking_kind_change(old: Snapshot, new: Snapshot) -> bool: 31 if new.is_model != old.is_model: 32 # If one is a model and the other isn't, then we need to rebuild 33 return True 34 if not new.is_model or not old.is_model: 35 # If neither are models, then we don't need to rebuild 36 # Note that the remaining checks only apply to model snapshots 37 return False 38 if old.virtual_environment_mode != new.virtual_environment_mode: 39 # If the virtual environment mode has changed, then we need to rebuild 40 return True 41 if old.model.kind.name == new.model.kind.name: 42 # If the kind hasn't changed, then we don't need to rebuild 43 return False 44 if not old.is_incremental or not new.is_incremental: 45 # If either is not incremental, then we need to rebuild 46 return True 47 if old.model.partitioned_by == new.model.partitioned_by: 48 # If the partitioning hasn't changed, then we don't need to rebuild 49 return False 50 return True 51 52 53@dataclass 54class SnapshotIntervalClearRequest: 55 # affected snapshot 56 snapshot: SnapshotIdAndVersion 57 58 # which interval to clear 59 interval: Interval 60 61 # which environments this snapshot is currently promoted 62 # note that this can be empty if the snapshot exists because its ttl has not expired 63 # but it is not part of any particular environment 64 environment_names: t.Set[str] = field(default_factory=set) 65 66 @property 67 def snapshot_id(self) -> SnapshotId: 68 return self.snapshot.snapshot_id 69 70 @property 71 def sorted_environment_names(self) -> t.List[str]: 72 return list(sorted(self.environment_names)) 73 74 75def identify_restatement_intervals_across_snapshot_versions( 76 state_reader: StateReader, 77 prod_restatements: t.Dict[str, Interval], 78 disable_restatement_models: t.Set[str], 79 loaded_snapshots: t.Dict[SnapshotId, Snapshot], 80 current_ts: t.Optional[int] = None, 81) -> t.Dict[SnapshotId, SnapshotIntervalClearRequest]: 82 """ 83 Given a map of snapshot names + intervals to restate in prod: 84 - Look up matching snapshots (match based on name - regardless of version, to get all versions) 85 - For each match, also match downstream snapshots in each dev environment while filtering out models that have restatement disabled 86 - Return a list of all snapshots that are affected + the interval that needs to be cleared for each 87 88 The goal here is to produce a list of intervals to invalidate across all dev snapshots so that a subsequent plan or 89 cadence run in those environments causes the intervals to be repopulated. 90 """ 91 if not prod_restatements: 92 return {} 93 94 # Although :loaded_snapshots is sourced from RestatementStage.all_snapshots, since the only time we ever need 95 # to clear intervals across all environments is for prod, the :loaded_snapshots here are always from prod 96 prod_name_versions: t.Set[SnapshotNameVersion] = { 97 s.name_version for s in loaded_snapshots.values() 98 } 99 100 snapshot_intervals_to_clear: t.Dict[SnapshotId, SnapshotIntervalClearRequest] = {} 101 102 for env_summary in state_reader.get_environments_summary(): 103 # Fetch the full environment object one at a time to avoid loading all environments into memory at once 104 env = state_reader.get_environment(env_summary.name) 105 if not env: 106 logger.warning("Environment %s not found", env_summary.name) 107 continue 108 109 snapshots_by_name = {s.name: s.table_info for s in env.snapshots} 110 111 # We dont just restate matching snapshots, we also have to restate anything downstream of them 112 # so that if A gets restated in prod and dev has A <- B <- C, B and C get restated in dev 113 env_dag = DAG({s.name: {p.name for p in s.parents} for s in env.snapshots}) 114 115 for restate_snapshot_name, interval in prod_restatements.items(): 116 if restate_snapshot_name not in snapshots_by_name: 117 # snapshot is not promoted in this environment 118 continue 119 120 affected_snapshot_names = [ 121 x 122 for x in ([restate_snapshot_name] + env_dag.downstream(restate_snapshot_name)) 123 if x not in disable_restatement_models 124 ] 125 126 for affected_snapshot_name in affected_snapshot_names: 127 affected_snapshot = snapshots_by_name[affected_snapshot_name] 128 129 # Don't clear intervals for a dev snapshot if it shares the same physical version with prod. 130 # Otherwise, prod will be affected by what should be a dev operation 131 if affected_snapshot.name_version in prod_name_versions: 132 continue 133 134 clear_request = snapshot_intervals_to_clear.get(affected_snapshot.snapshot_id) 135 if not clear_request: 136 clear_request = SnapshotIntervalClearRequest( 137 snapshot=affected_snapshot.id_and_version, interval=interval 138 ) 139 snapshot_intervals_to_clear[affected_snapshot.snapshot_id] = clear_request 140 141 clear_request.environment_names |= set([env.name]) 142 143 # snapshot_intervals_to_clear now contains the entire hierarchy of affected snapshots based 144 # on building the DAG for each environment and including downstream snapshots 145 # but, what if there are affected snapshots that arent part of any environment? 146 unique_snapshot_names = set(snapshot_id.name for snapshot_id in snapshot_intervals_to_clear) 147 148 current_ts = current_ts or now_timestamp() 149 all_matching_non_prod_snapshots = { 150 s.snapshot_id: s 151 for s in state_reader.get_snapshots_by_names( 152 snapshot_names=unique_snapshot_names, current_ts=current_ts, exclude_expired=True 153 ) 154 # Don't clear intervals for a snapshot if it shares the same physical version with prod. 155 # Otherwise, prod will be affected by what should be a dev operation 156 if s.name_version not in prod_name_versions 157 } 158 159 # identify the ones that we havent picked up yet, which are the ones that dont exist in any environment 160 if remaining_snapshot_ids := set(all_matching_non_prod_snapshots).difference( 161 snapshot_intervals_to_clear 162 ): 163 # these snapshot id's exist in isolation and may be related to a downstream dependency of the :prod_restatements, 164 # rather than directly related, so we can't simply look up the interval to clear based on :prod_restatements. 165 # To figure out the interval that should be cleared, we can match to the existing list based on name 166 # and conservatively take the widest interval that shows up 167 snapshot_name_to_widest_interval: t.Dict[str, Interval] = {} 168 for s_id, clear_request in snapshot_intervals_to_clear.items(): 169 current_start, current_end = snapshot_name_to_widest_interval.get( 170 s_id.name, clear_request.interval 171 ) 172 next_start, next_end = clear_request.interval 173 174 next_start = min(current_start, next_start) 175 next_end = max(current_end, next_end) 176 177 snapshot_name_to_widest_interval[s_id.name] = (next_start, next_end) 178 179 for remaining_snapshot_id in remaining_snapshot_ids: 180 remaining_snapshot = all_matching_non_prod_snapshots[remaining_snapshot_id] 181 snapshot_intervals_to_clear[remaining_snapshot_id] = SnapshotIntervalClearRequest( 182 snapshot=remaining_snapshot, 183 interval=snapshot_name_to_widest_interval[remaining_snapshot_id.name], 184 ) 185 186 # for any affected full_history_restatement_only snapshots, we need to widen the intervals being restated to 187 # include the whole time range for that snapshot. This requires a call to state to load the full snapshot record, 188 # so we only do it if necessary 189 full_history_restatement_snapshot_ids = [ 190 # FIXME: full_history_restatement_only is just one indicator that the snapshot can only be fully refreshed, the other one is Model.depends_on_self 191 # however, to figure out depends_on_self, we have to render all the model queries which, alongside having to fetch full snapshots from state, 192 # is problematic in secure environments that are deliberately isolated from arbitrary user code (since rendering a query may require user macros to be present) 193 # So for now, these are not considered 194 s_id 195 for s_id, s in snapshot_intervals_to_clear.items() 196 if s.snapshot.full_history_restatement_only 197 ] 198 if full_history_restatement_snapshot_ids: 199 # only load full snapshot records that we havent already loaded 200 additional_snapshots = state_reader.get_snapshots( 201 [ 202 s.snapshot_id 203 for s in full_history_restatement_snapshot_ids 204 if s.snapshot_id not in loaded_snapshots 205 ] 206 ) 207 208 all_snapshots = loaded_snapshots | additional_snapshots 209 210 for full_snapshot_id in full_history_restatement_snapshot_ids: 211 full_snapshot = all_snapshots[full_snapshot_id] 212 intervals_to_clear = snapshot_intervals_to_clear[full_snapshot_id] 213 214 original_start, original_end = intervals_to_clear.interval 215 216 # get_removal_interval() widens intervals if necessary 217 new_interval = full_snapshot.get_removal_interval( 218 start=original_start, end=original_end 219 ) 220 221 intervals_to_clear.interval = new_interval 222 223 return snapshot_intervals_to_clear
logger =
<Logger sqlmesh.core.plan.common (WARNING)>
def
should_force_rebuild( old: sqlmesh.core.snapshot.definition.Snapshot, new: sqlmesh.core.snapshot.definition.Snapshot) -> bool:
16def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool: 17 if new.is_view and new.is_indirect_non_breaking and not new.is_forward_only: 18 # View models always need to be rebuilt to reflect updated upstream dependencies 19 return True 20 if new.is_seed and not ( 21 new.is_metadata 22 and new.previous_version 23 and new.previous_version.snapshot_id(new.name) == old.snapshot_id 24 ): 25 # Seed models always need to be rebuilt to reflect changes in the seed file 26 # Unless only their metadata has been updated (eg description added) and the seed file has not been touched 27 return True 28 return is_breaking_kind_change(old, new)
def
is_breaking_kind_change( old: sqlmesh.core.snapshot.definition.Snapshot, new: sqlmesh.core.snapshot.definition.Snapshot) -> bool:
31def is_breaking_kind_change(old: Snapshot, new: Snapshot) -> bool: 32 if new.is_model != old.is_model: 33 # If one is a model and the other isn't, then we need to rebuild 34 return True 35 if not new.is_model or not old.is_model: 36 # If neither are models, then we don't need to rebuild 37 # Note that the remaining checks only apply to model snapshots 38 return False 39 if old.virtual_environment_mode != new.virtual_environment_mode: 40 # If the virtual environment mode has changed, then we need to rebuild 41 return True 42 if old.model.kind.name == new.model.kind.name: 43 # If the kind hasn't changed, then we don't need to rebuild 44 return False 45 if not old.is_incremental or not new.is_incremental: 46 # If either is not incremental, then we need to rebuild 47 return True 48 if old.model.partitioned_by == new.model.partitioned_by: 49 # If the partitioning hasn't changed, then we don't need to rebuild 50 return False 51 return True
@dataclass
class
SnapshotIntervalClearRequest:
54@dataclass 55class SnapshotIntervalClearRequest: 56 # affected snapshot 57 snapshot: SnapshotIdAndVersion 58 59 # which interval to clear 60 interval: Interval 61 62 # which environments this snapshot is currently promoted 63 # note that this can be empty if the snapshot exists because its ttl has not expired 64 # but it is not part of any particular environment 65 environment_names: t.Set[str] = field(default_factory=set) 66 67 @property 68 def snapshot_id(self) -> SnapshotId: 69 return self.snapshot.snapshot_id 70 71 @property 72 def sorted_environment_names(self) -> t.List[str]: 73 return list(sorted(self.environment_names))
SnapshotIntervalClearRequest( snapshot: sqlmesh.core.snapshot.definition.SnapshotIdAndVersion, interval: Tuple[int, int], environment_names: Set[str] = <factory>)
snapshot_id: sqlmesh.core.snapshot.definition.SnapshotId
def
identify_restatement_intervals_across_snapshot_versions( state_reader: sqlmesh.core.state_sync.base.StateReader, prod_restatements: Dict[str, Tuple[int, int]], disable_restatement_models: Set[str], loaded_snapshots: Dict[sqlmesh.core.snapshot.definition.SnapshotId, sqlmesh.core.snapshot.definition.Snapshot], current_ts: Optional[int] = None) -> Dict[sqlmesh.core.snapshot.definition.SnapshotId, SnapshotIntervalClearRequest]:
76def identify_restatement_intervals_across_snapshot_versions( 77 state_reader: StateReader, 78 prod_restatements: t.Dict[str, Interval], 79 disable_restatement_models: t.Set[str], 80 loaded_snapshots: t.Dict[SnapshotId, Snapshot], 81 current_ts: t.Optional[int] = None, 82) -> t.Dict[SnapshotId, SnapshotIntervalClearRequest]: 83 """ 84 Given a map of snapshot names + intervals to restate in prod: 85 - Look up matching snapshots (match based on name - regardless of version, to get all versions) 86 - For each match, also match downstream snapshots in each dev environment while filtering out models that have restatement disabled 87 - Return a list of all snapshots that are affected + the interval that needs to be cleared for each 88 89 The goal here is to produce a list of intervals to invalidate across all dev snapshots so that a subsequent plan or 90 cadence run in those environments causes the intervals to be repopulated. 91 """ 92 if not prod_restatements: 93 return {} 94 95 # Although :loaded_snapshots is sourced from RestatementStage.all_snapshots, since the only time we ever need 96 # to clear intervals across all environments is for prod, the :loaded_snapshots here are always from prod 97 prod_name_versions: t.Set[SnapshotNameVersion] = { 98 s.name_version for s in loaded_snapshots.values() 99 } 100 101 snapshot_intervals_to_clear: t.Dict[SnapshotId, SnapshotIntervalClearRequest] = {} 102 103 for env_summary in state_reader.get_environments_summary(): 104 # Fetch the full environment object one at a time to avoid loading all environments into memory at once 105 env = state_reader.get_environment(env_summary.name) 106 if not env: 107 logger.warning("Environment %s not found", env_summary.name) 108 continue 109 110 snapshots_by_name = {s.name: s.table_info for s in env.snapshots} 111 112 # We dont just restate matching snapshots, we also have to restate anything downstream of them 113 # so that if A gets restated in prod and dev has A <- B <- C, B and C get restated in dev 114 env_dag = DAG({s.name: {p.name for p in s.parents} for s in env.snapshots}) 115 116 for restate_snapshot_name, interval in prod_restatements.items(): 117 if restate_snapshot_name not in snapshots_by_name: 118 # snapshot is not promoted in this environment 119 continue 120 121 affected_snapshot_names = [ 122 x 123 for x in ([restate_snapshot_name] + env_dag.downstream(restate_snapshot_name)) 124 if x not in disable_restatement_models 125 ] 126 127 for affected_snapshot_name in affected_snapshot_names: 128 affected_snapshot = snapshots_by_name[affected_snapshot_name] 129 130 # Don't clear intervals for a dev snapshot if it shares the same physical version with prod. 131 # Otherwise, prod will be affected by what should be a dev operation 132 if affected_snapshot.name_version in prod_name_versions: 133 continue 134 135 clear_request = snapshot_intervals_to_clear.get(affected_snapshot.snapshot_id) 136 if not clear_request: 137 clear_request = SnapshotIntervalClearRequest( 138 snapshot=affected_snapshot.id_and_version, interval=interval 139 ) 140 snapshot_intervals_to_clear[affected_snapshot.snapshot_id] = clear_request 141 142 clear_request.environment_names |= set([env.name]) 143 144 # snapshot_intervals_to_clear now contains the entire hierarchy of affected snapshots based 145 # on building the DAG for each environment and including downstream snapshots 146 # but, what if there are affected snapshots that arent part of any environment? 147 unique_snapshot_names = set(snapshot_id.name for snapshot_id in snapshot_intervals_to_clear) 148 149 current_ts = current_ts or now_timestamp() 150 all_matching_non_prod_snapshots = { 151 s.snapshot_id: s 152 for s in state_reader.get_snapshots_by_names( 153 snapshot_names=unique_snapshot_names, current_ts=current_ts, exclude_expired=True 154 ) 155 # Don't clear intervals for a snapshot if it shares the same physical version with prod. 156 # Otherwise, prod will be affected by what should be a dev operation 157 if s.name_version not in prod_name_versions 158 } 159 160 # identify the ones that we havent picked up yet, which are the ones that dont exist in any environment 161 if remaining_snapshot_ids := set(all_matching_non_prod_snapshots).difference( 162 snapshot_intervals_to_clear 163 ): 164 # these snapshot id's exist in isolation and may be related to a downstream dependency of the :prod_restatements, 165 # rather than directly related, so we can't simply look up the interval to clear based on :prod_restatements. 166 # To figure out the interval that should be cleared, we can match to the existing list based on name 167 # and conservatively take the widest interval that shows up 168 snapshot_name_to_widest_interval: t.Dict[str, Interval] = {} 169 for s_id, clear_request in snapshot_intervals_to_clear.items(): 170 current_start, current_end = snapshot_name_to_widest_interval.get( 171 s_id.name, clear_request.interval 172 ) 173 next_start, next_end = clear_request.interval 174 175 next_start = min(current_start, next_start) 176 next_end = max(current_end, next_end) 177 178 snapshot_name_to_widest_interval[s_id.name] = (next_start, next_end) 179 180 for remaining_snapshot_id in remaining_snapshot_ids: 181 remaining_snapshot = all_matching_non_prod_snapshots[remaining_snapshot_id] 182 snapshot_intervals_to_clear[remaining_snapshot_id] = SnapshotIntervalClearRequest( 183 snapshot=remaining_snapshot, 184 interval=snapshot_name_to_widest_interval[remaining_snapshot_id.name], 185 ) 186 187 # for any affected full_history_restatement_only snapshots, we need to widen the intervals being restated to 188 # include the whole time range for that snapshot. This requires a call to state to load the full snapshot record, 189 # so we only do it if necessary 190 full_history_restatement_snapshot_ids = [ 191 # FIXME: full_history_restatement_only is just one indicator that the snapshot can only be fully refreshed, the other one is Model.depends_on_self 192 # however, to figure out depends_on_self, we have to render all the model queries which, alongside having to fetch full snapshots from state, 193 # is problematic in secure environments that are deliberately isolated from arbitrary user code (since rendering a query may require user macros to be present) 194 # So for now, these are not considered 195 s_id 196 for s_id, s in snapshot_intervals_to_clear.items() 197 if s.snapshot.full_history_restatement_only 198 ] 199 if full_history_restatement_snapshot_ids: 200 # only load full snapshot records that we havent already loaded 201 additional_snapshots = state_reader.get_snapshots( 202 [ 203 s.snapshot_id 204 for s in full_history_restatement_snapshot_ids 205 if s.snapshot_id not in loaded_snapshots 206 ] 207 ) 208 209 all_snapshots = loaded_snapshots | additional_snapshots 210 211 for full_snapshot_id in full_history_restatement_snapshot_ids: 212 full_snapshot = all_snapshots[full_snapshot_id] 213 intervals_to_clear = snapshot_intervals_to_clear[full_snapshot_id] 214 215 original_start, original_end = intervals_to_clear.interval 216 217 # get_removal_interval() widens intervals if necessary 218 new_interval = full_snapshot.get_removal_interval( 219 start=original_start, end=original_end 220 ) 221 222 intervals_to_clear.interval = new_interval 223 224 return snapshot_intervals_to_clear
Given a map of snapshot names + intervals to restate in prod: - Look up matching snapshots (match based on name - regardless of version, to get all versions) - For each match, also match downstream snapshots in each dev environment while filtering out models that have restatement disabled - Return a list of all snapshots that are affected + the interval that needs to be cleared for each
The goal here is to produce a list of intervals to invalidate across all dev snapshots so that a subsequent plan or cadence run in those environments causes the intervals to be repopulated.