Edit on GitHub

sqlmesh.core.plan.common

  1from __future__ import annotations
  2import typing as t
  3import logging
  4from dataclasses import dataclass, field
  5
  6from sqlmesh.core.state_sync import StateReader
  7from sqlmesh.core.snapshot import Snapshot, SnapshotId, SnapshotIdAndVersion, SnapshotNameVersion
  8from sqlmesh.core.snapshot.definition import Interval
  9from sqlmesh.utils.dag import DAG
 10from sqlmesh.utils.date import now_timestamp
 11
 12logger = logging.getLogger(__name__)
 13
 14
 15def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool:
 16    if new.is_view and new.is_indirect_non_breaking and not new.is_forward_only:
 17        # View models always need to be rebuilt to reflect updated upstream dependencies
 18        return True
 19    if new.is_seed and not (
 20        new.is_metadata
 21        and new.previous_version
 22        and new.previous_version.snapshot_id(new.name) == old.snapshot_id
 23    ):
 24        # Seed models always need to be rebuilt to reflect changes in the seed file
 25        # Unless only their metadata has been updated (eg description added) and the seed file has not been touched
 26        return True
 27    return is_breaking_kind_change(old, new)
 28
 29
 30def is_breaking_kind_change(old: Snapshot, new: Snapshot) -> bool:
 31    if new.is_model != old.is_model:
 32        # If one is a model and the other isn't, then we need to rebuild
 33        return True
 34    if not new.is_model or not old.is_model:
 35        # If neither are models, then we don't need to rebuild
 36        # Note that the remaining checks only apply to model snapshots
 37        return False
 38    if old.virtual_environment_mode != new.virtual_environment_mode:
 39        # If the virtual environment mode has changed, then we need to rebuild
 40        return True
 41    if old.model.kind.name == new.model.kind.name:
 42        # If the kind hasn't changed, then we don't need to rebuild
 43        return False
 44    if not old.is_incremental or not new.is_incremental:
 45        # If either is not incremental, then we need to rebuild
 46        return True
 47    if old.model.partitioned_by == new.model.partitioned_by:
 48        # If the partitioning hasn't changed, then we don't need to rebuild
 49        return False
 50    return True
 51
 52
 53@dataclass
 54class SnapshotIntervalClearRequest:
 55    # affected snapshot
 56    snapshot: SnapshotIdAndVersion
 57
 58    # which interval to clear
 59    interval: Interval
 60
 61    # which environments this snapshot is currently promoted
 62    # note that this can be empty if the snapshot exists because its ttl has not expired
 63    # but it is not part of any particular environment
 64    environment_names: t.Set[str] = field(default_factory=set)
 65
 66    @property
 67    def snapshot_id(self) -> SnapshotId:
 68        return self.snapshot.snapshot_id
 69
 70    @property
 71    def sorted_environment_names(self) -> t.List[str]:
 72        return list(sorted(self.environment_names))
 73
 74
 75def identify_restatement_intervals_across_snapshot_versions(
 76    state_reader: StateReader,
 77    prod_restatements: t.Dict[str, Interval],
 78    disable_restatement_models: t.Set[str],
 79    loaded_snapshots: t.Dict[SnapshotId, Snapshot],
 80    current_ts: t.Optional[int] = None,
 81) -> t.Dict[SnapshotId, SnapshotIntervalClearRequest]:
 82    """
 83    Given a map of snapshot names + intervals to restate in prod:
 84        - Look up matching snapshots (match based on name - regardless of version, to get all versions)
 85        - For each match, also match downstream snapshots in each dev environment while filtering out models that have restatement disabled
 86        - Return a list of all snapshots that are affected + the interval that needs to be cleared for each
 87
 88    The goal here is to produce a list of intervals to invalidate across all dev snapshots so that a subsequent plan or
 89    cadence run in those environments causes the intervals to be repopulated.
 90    """
 91    if not prod_restatements:
 92        return {}
 93
 94    # Although :loaded_snapshots is sourced from RestatementStage.all_snapshots, since the only time we ever need
 95    # to clear intervals across all environments is for prod, the :loaded_snapshots here are always from prod
 96    prod_name_versions: t.Set[SnapshotNameVersion] = {
 97        s.name_version for s in loaded_snapshots.values()
 98    }
 99
100    snapshot_intervals_to_clear: t.Dict[SnapshotId, SnapshotIntervalClearRequest] = {}
101
102    for env_summary in state_reader.get_environments_summary():
103        # Fetch the full environment object one at a time to avoid loading all environments into memory at once
104        env = state_reader.get_environment(env_summary.name)
105        if not env:
106            logger.warning("Environment %s not found", env_summary.name)
107            continue
108
109        snapshots_by_name = {s.name: s.table_info for s in env.snapshots}
110
111        # We dont just restate matching snapshots, we also have to restate anything downstream of them
112        # so that if A gets restated in prod and dev has A <- B <- C, B and C get restated in dev
113        env_dag = DAG({s.name: {p.name for p in s.parents} for s in env.snapshots})
114
115        for restate_snapshot_name, interval in prod_restatements.items():
116            if restate_snapshot_name not in snapshots_by_name:
117                # snapshot is not promoted in this environment
118                continue
119
120            affected_snapshot_names = [
121                x
122                for x in ([restate_snapshot_name] + env_dag.downstream(restate_snapshot_name))
123                if x not in disable_restatement_models
124            ]
125
126            for affected_snapshot_name in affected_snapshot_names:
127                affected_snapshot = snapshots_by_name[affected_snapshot_name]
128
129                # Don't clear intervals for a dev snapshot if it shares the same physical version with prod.
130                # Otherwise, prod will be affected by what should be a dev operation
131                if affected_snapshot.name_version in prod_name_versions:
132                    continue
133
134                clear_request = snapshot_intervals_to_clear.get(affected_snapshot.snapshot_id)
135                if not clear_request:
136                    clear_request = SnapshotIntervalClearRequest(
137                        snapshot=affected_snapshot.id_and_version, interval=interval
138                    )
139                    snapshot_intervals_to_clear[affected_snapshot.snapshot_id] = clear_request
140
141                clear_request.environment_names |= set([env.name])
142
143    # snapshot_intervals_to_clear now contains the entire hierarchy of affected snapshots based
144    # on building the DAG for each environment and including downstream snapshots
145    # but, what if there are affected snapshots that arent part of any environment?
146    unique_snapshot_names = set(snapshot_id.name for snapshot_id in snapshot_intervals_to_clear)
147
148    current_ts = current_ts or now_timestamp()
149    all_matching_non_prod_snapshots = {
150        s.snapshot_id: s
151        for s in state_reader.get_snapshots_by_names(
152            snapshot_names=unique_snapshot_names, current_ts=current_ts, exclude_expired=True
153        )
154        # Don't clear intervals for a snapshot if it shares the same physical version with prod.
155        # Otherwise, prod will be affected by what should be a dev operation
156        if s.name_version not in prod_name_versions
157    }
158
159    # identify the ones that we havent picked up yet, which are the ones that dont exist in any environment
160    if remaining_snapshot_ids := set(all_matching_non_prod_snapshots).difference(
161        snapshot_intervals_to_clear
162    ):
163        # these snapshot id's exist in isolation and may be related to a downstream dependency of the :prod_restatements,
164        # rather than directly related, so we can't simply look up the interval to clear based on :prod_restatements.
165        # To figure out the interval that should be cleared, we can match to the existing list based on name
166        # and conservatively take the widest interval that shows up
167        snapshot_name_to_widest_interval: t.Dict[str, Interval] = {}
168        for s_id, clear_request in snapshot_intervals_to_clear.items():
169            current_start, current_end = snapshot_name_to_widest_interval.get(
170                s_id.name, clear_request.interval
171            )
172            next_start, next_end = clear_request.interval
173
174            next_start = min(current_start, next_start)
175            next_end = max(current_end, next_end)
176
177            snapshot_name_to_widest_interval[s_id.name] = (next_start, next_end)
178
179        for remaining_snapshot_id in remaining_snapshot_ids:
180            remaining_snapshot = all_matching_non_prod_snapshots[remaining_snapshot_id]
181            snapshot_intervals_to_clear[remaining_snapshot_id] = SnapshotIntervalClearRequest(
182                snapshot=remaining_snapshot,
183                interval=snapshot_name_to_widest_interval[remaining_snapshot_id.name],
184            )
185
186    # for any affected full_history_restatement_only snapshots, we need to widen the intervals being restated to
187    # include the whole time range for that snapshot. This requires a call to state to load the full snapshot record,
188    # so we only do it if necessary
189    full_history_restatement_snapshot_ids = [
190        # FIXME: full_history_restatement_only is just one indicator that the snapshot can only be fully refreshed, the other one is Model.depends_on_self
191        # however, to figure out depends_on_self, we have to render all the model queries which, alongside having to fetch full snapshots from state,
192        # is problematic in secure environments that are deliberately isolated from arbitrary user code (since rendering a query may require user macros to be present)
193        # So for now, these are not considered
194        s_id
195        for s_id, s in snapshot_intervals_to_clear.items()
196        if s.snapshot.full_history_restatement_only
197    ]
198    if full_history_restatement_snapshot_ids:
199        # only load full snapshot records that we havent already loaded
200        additional_snapshots = state_reader.get_snapshots(
201            [
202                s.snapshot_id
203                for s in full_history_restatement_snapshot_ids
204                if s.snapshot_id not in loaded_snapshots
205            ]
206        )
207
208        all_snapshots = loaded_snapshots | additional_snapshots
209
210        for full_snapshot_id in full_history_restatement_snapshot_ids:
211            full_snapshot = all_snapshots[full_snapshot_id]
212            intervals_to_clear = snapshot_intervals_to_clear[full_snapshot_id]
213
214            original_start, original_end = intervals_to_clear.interval
215
216            # get_removal_interval() widens intervals if necessary
217            new_interval = full_snapshot.get_removal_interval(
218                start=original_start, end=original_end
219            )
220
221            intervals_to_clear.interval = new_interval
222
223    return snapshot_intervals_to_clear
logger = <Logger sqlmesh.core.plan.common (WARNING)>
def should_force_rebuild( old: sqlmesh.core.snapshot.definition.Snapshot, new: sqlmesh.core.snapshot.definition.Snapshot) -> bool:
16def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool:
17    if new.is_view and new.is_indirect_non_breaking and not new.is_forward_only:
18        # View models always need to be rebuilt to reflect updated upstream dependencies
19        return True
20    if new.is_seed and not (
21        new.is_metadata
22        and new.previous_version
23        and new.previous_version.snapshot_id(new.name) == old.snapshot_id
24    ):
25        # Seed models always need to be rebuilt to reflect changes in the seed file
26        # Unless only their metadata has been updated (eg description added) and the seed file has not been touched
27        return True
28    return is_breaking_kind_change(old, new)
def is_breaking_kind_change( old: sqlmesh.core.snapshot.definition.Snapshot, new: sqlmesh.core.snapshot.definition.Snapshot) -> bool:
31def is_breaking_kind_change(old: Snapshot, new: Snapshot) -> bool:
32    if new.is_model != old.is_model:
33        # If one is a model and the other isn't, then we need to rebuild
34        return True
35    if not new.is_model or not old.is_model:
36        # If neither are models, then we don't need to rebuild
37        # Note that the remaining checks only apply to model snapshots
38        return False
39    if old.virtual_environment_mode != new.virtual_environment_mode:
40        # If the virtual environment mode has changed, then we need to rebuild
41        return True
42    if old.model.kind.name == new.model.kind.name:
43        # If the kind hasn't changed, then we don't need to rebuild
44        return False
45    if not old.is_incremental or not new.is_incremental:
46        # If either is not incremental, then we need to rebuild
47        return True
48    if old.model.partitioned_by == new.model.partitioned_by:
49        # If the partitioning hasn't changed, then we don't need to rebuild
50        return False
51    return True
@dataclass
class SnapshotIntervalClearRequest:
54@dataclass
55class SnapshotIntervalClearRequest:
56    # affected snapshot
57    snapshot: SnapshotIdAndVersion
58
59    # which interval to clear
60    interval: Interval
61
62    # which environments this snapshot is currently promoted
63    # note that this can be empty if the snapshot exists because its ttl has not expired
64    # but it is not part of any particular environment
65    environment_names: t.Set[str] = field(default_factory=set)
66
67    @property
68    def snapshot_id(self) -> SnapshotId:
69        return self.snapshot.snapshot_id
70
71    @property
72    def sorted_environment_names(self) -> t.List[str]:
73        return list(sorted(self.environment_names))
SnapshotIntervalClearRequest( snapshot: sqlmesh.core.snapshot.definition.SnapshotIdAndVersion, interval: Tuple[int, int], environment_names: Set[str] = <factory>)
interval: Tuple[int, int]
environment_names: Set[str]
67    @property
68    def snapshot_id(self) -> SnapshotId:
69        return self.snapshot.snapshot_id
sorted_environment_names: List[str]
71    @property
72    def sorted_environment_names(self) -> t.List[str]:
73        return list(sorted(self.environment_names))
def identify_restatement_intervals_across_snapshot_versions( state_reader: sqlmesh.core.state_sync.base.StateReader, prod_restatements: Dict[str, Tuple[int, int]], disable_restatement_models: Set[str], loaded_snapshots: Dict[sqlmesh.core.snapshot.definition.SnapshotId, sqlmesh.core.snapshot.definition.Snapshot], current_ts: Optional[int] = None) -> Dict[sqlmesh.core.snapshot.definition.SnapshotId, SnapshotIntervalClearRequest]:
 76def identify_restatement_intervals_across_snapshot_versions(
 77    state_reader: StateReader,
 78    prod_restatements: t.Dict[str, Interval],
 79    disable_restatement_models: t.Set[str],
 80    loaded_snapshots: t.Dict[SnapshotId, Snapshot],
 81    current_ts: t.Optional[int] = None,
 82) -> t.Dict[SnapshotId, SnapshotIntervalClearRequest]:
 83    """
 84    Given a map of snapshot names + intervals to restate in prod:
 85        - Look up matching snapshots (match based on name - regardless of version, to get all versions)
 86        - For each match, also match downstream snapshots in each dev environment while filtering out models that have restatement disabled
 87        - Return a list of all snapshots that are affected + the interval that needs to be cleared for each
 88
 89    The goal here is to produce a list of intervals to invalidate across all dev snapshots so that a subsequent plan or
 90    cadence run in those environments causes the intervals to be repopulated.
 91    """
 92    if not prod_restatements:
 93        return {}
 94
 95    # Although :loaded_snapshots is sourced from RestatementStage.all_snapshots, since the only time we ever need
 96    # to clear intervals across all environments is for prod, the :loaded_snapshots here are always from prod
 97    prod_name_versions: t.Set[SnapshotNameVersion] = {
 98        s.name_version for s in loaded_snapshots.values()
 99    }
100
101    snapshot_intervals_to_clear: t.Dict[SnapshotId, SnapshotIntervalClearRequest] = {}
102
103    for env_summary in state_reader.get_environments_summary():
104        # Fetch the full environment object one at a time to avoid loading all environments into memory at once
105        env = state_reader.get_environment(env_summary.name)
106        if not env:
107            logger.warning("Environment %s not found", env_summary.name)
108            continue
109
110        snapshots_by_name = {s.name: s.table_info for s in env.snapshots}
111
112        # We dont just restate matching snapshots, we also have to restate anything downstream of them
113        # so that if A gets restated in prod and dev has A <- B <- C, B and C get restated in dev
114        env_dag = DAG({s.name: {p.name for p in s.parents} for s in env.snapshots})
115
116        for restate_snapshot_name, interval in prod_restatements.items():
117            if restate_snapshot_name not in snapshots_by_name:
118                # snapshot is not promoted in this environment
119                continue
120
121            affected_snapshot_names = [
122                x
123                for x in ([restate_snapshot_name] + env_dag.downstream(restate_snapshot_name))
124                if x not in disable_restatement_models
125            ]
126
127            for affected_snapshot_name in affected_snapshot_names:
128                affected_snapshot = snapshots_by_name[affected_snapshot_name]
129
130                # Don't clear intervals for a dev snapshot if it shares the same physical version with prod.
131                # Otherwise, prod will be affected by what should be a dev operation
132                if affected_snapshot.name_version in prod_name_versions:
133                    continue
134
135                clear_request = snapshot_intervals_to_clear.get(affected_snapshot.snapshot_id)
136                if not clear_request:
137                    clear_request = SnapshotIntervalClearRequest(
138                        snapshot=affected_snapshot.id_and_version, interval=interval
139                    )
140                    snapshot_intervals_to_clear[affected_snapshot.snapshot_id] = clear_request
141
142                clear_request.environment_names |= set([env.name])
143
144    # snapshot_intervals_to_clear now contains the entire hierarchy of affected snapshots based
145    # on building the DAG for each environment and including downstream snapshots
146    # but, what if there are affected snapshots that arent part of any environment?
147    unique_snapshot_names = set(snapshot_id.name for snapshot_id in snapshot_intervals_to_clear)
148
149    current_ts = current_ts or now_timestamp()
150    all_matching_non_prod_snapshots = {
151        s.snapshot_id: s
152        for s in state_reader.get_snapshots_by_names(
153            snapshot_names=unique_snapshot_names, current_ts=current_ts, exclude_expired=True
154        )
155        # Don't clear intervals for a snapshot if it shares the same physical version with prod.
156        # Otherwise, prod will be affected by what should be a dev operation
157        if s.name_version not in prod_name_versions
158    }
159
160    # identify the ones that we havent picked up yet, which are the ones that dont exist in any environment
161    if remaining_snapshot_ids := set(all_matching_non_prod_snapshots).difference(
162        snapshot_intervals_to_clear
163    ):
164        # these snapshot id's exist in isolation and may be related to a downstream dependency of the :prod_restatements,
165        # rather than directly related, so we can't simply look up the interval to clear based on :prod_restatements.
166        # To figure out the interval that should be cleared, we can match to the existing list based on name
167        # and conservatively take the widest interval that shows up
168        snapshot_name_to_widest_interval: t.Dict[str, Interval] = {}
169        for s_id, clear_request in snapshot_intervals_to_clear.items():
170            current_start, current_end = snapshot_name_to_widest_interval.get(
171                s_id.name, clear_request.interval
172            )
173            next_start, next_end = clear_request.interval
174
175            next_start = min(current_start, next_start)
176            next_end = max(current_end, next_end)
177
178            snapshot_name_to_widest_interval[s_id.name] = (next_start, next_end)
179
180        for remaining_snapshot_id in remaining_snapshot_ids:
181            remaining_snapshot = all_matching_non_prod_snapshots[remaining_snapshot_id]
182            snapshot_intervals_to_clear[remaining_snapshot_id] = SnapshotIntervalClearRequest(
183                snapshot=remaining_snapshot,
184                interval=snapshot_name_to_widest_interval[remaining_snapshot_id.name],
185            )
186
187    # for any affected full_history_restatement_only snapshots, we need to widen the intervals being restated to
188    # include the whole time range for that snapshot. This requires a call to state to load the full snapshot record,
189    # so we only do it if necessary
190    full_history_restatement_snapshot_ids = [
191        # FIXME: full_history_restatement_only is just one indicator that the snapshot can only be fully refreshed, the other one is Model.depends_on_self
192        # however, to figure out depends_on_self, we have to render all the model queries which, alongside having to fetch full snapshots from state,
193        # is problematic in secure environments that are deliberately isolated from arbitrary user code (since rendering a query may require user macros to be present)
194        # So for now, these are not considered
195        s_id
196        for s_id, s in snapshot_intervals_to_clear.items()
197        if s.snapshot.full_history_restatement_only
198    ]
199    if full_history_restatement_snapshot_ids:
200        # only load full snapshot records that we havent already loaded
201        additional_snapshots = state_reader.get_snapshots(
202            [
203                s.snapshot_id
204                for s in full_history_restatement_snapshot_ids
205                if s.snapshot_id not in loaded_snapshots
206            ]
207        )
208
209        all_snapshots = loaded_snapshots | additional_snapshots
210
211        for full_snapshot_id in full_history_restatement_snapshot_ids:
212            full_snapshot = all_snapshots[full_snapshot_id]
213            intervals_to_clear = snapshot_intervals_to_clear[full_snapshot_id]
214
215            original_start, original_end = intervals_to_clear.interval
216
217            # get_removal_interval() widens intervals if necessary
218            new_interval = full_snapshot.get_removal_interval(
219                start=original_start, end=original_end
220            )
221
222            intervals_to_clear.interval = new_interval
223
224    return snapshot_intervals_to_clear

Given a map of snapshot names + intervals to restate in prod: - Look up matching snapshots (match based on name - regardless of version, to get all versions) - For each match, also match downstream snapshots in each dev environment while filtering out models that have restatement disabled - Return a list of all snapshots that are affected + the interval that needs to be cleared for each

The goal here is to produce a list of intervals to invalidate across all dev snapshots so that a subsequent plan or cadence run in those environments causes the intervals to be repopulated.