sqlmesh.core.signal
1from __future__ import annotations 2 3import typing as t 4from sqlmesh.utils import UniqueKeyDict, registry_decorator 5from sqlmesh.utils.errors import MissingSourceError 6 7if t.TYPE_CHECKING: 8 from sqlmesh.core.context import ExecutionContext 9 from sqlmesh.core.snapshot.definition import Snapshot 10 from sqlmesh.utils.date import DatetimeRanges 11 from sqlmesh.core.snapshot.definition import DeployabilityIndex 12 13 14class signal(registry_decorator): 15 """Specifies a function which intervals are ready from a list of scheduled intervals. 16 17 When SQLMesh wishes to execute a batch of intervals, say between `a` and `d`, then 18 the `batch` parameter will contain each individual interval within this batch, 19 i.e.: `[a,b),[b,c),[c,d)`. 20 21 This function may return `True` to indicate that the whole batch is ready, 22 `False` to indicate none of the batch's intervals are ready, or a list of 23 intervals (a batch) to indicate exactly which ones are ready. 24 25 When returning a batch, the function is expected to return a subset of 26 the `batch` parameter, e.g.: `[a,b),[b,c)`. Note that it may return 27 gaps, e.g.: `[a,b),[c,d)`, but it may not alter the bounds of any of the 28 intervals. 29 30 The interface allows an implementation to check batches of intervals without 31 having to actually compute individual intervals itself. 32 33 Args: 34 batch: the list of intervals that are missing and scheduled to run. 35 36 Returns: 37 Either `True` to indicate all intervals are ready, `False` to indicate none are 38 ready or a list of intervals to indicate exactly which ones are ready. 39 """ 40 41 42SignalRegistry = UniqueKeyDict[str, signal] 43 44 45@signal() 46def freshness( 47 batch: DatetimeRanges, 48 snapshot: Snapshot, 49 context: ExecutionContext, 50) -> bool: 51 """ 52 Implements model freshness as a signal, i.e it considers this model to be fresh if: 53 - Any upstream SQLMesh model has available intervals to compute i.e is fresh 54 - Any upstream external model has been altered since the last time the model was evaluated 55 """ 56 adapter = context.engine_adapter 57 if context.is_restatement or not adapter.SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS: 58 return True 59 60 deployability_index = context.deployability_index or DeployabilityIndex.all_deployable() 61 62 last_altered_ts = ( 63 snapshot.last_altered_ts 64 if deployability_index.is_deployable(snapshot) 65 else snapshot.dev_last_altered_ts 66 ) 67 68 if not last_altered_ts: 69 return True 70 71 parent_snapshots = {context.snapshots[p.name] for p in snapshot.parents} 72 73 upstream_parent_snapshots = {p for p in parent_snapshots if not p.is_external} 74 external_parents = snapshot.node.depends_on - {p.name for p in upstream_parent_snapshots} 75 76 if context.parent_intervals: 77 # At least one upstream sqlmesh model has intervals to compute (i.e is fresh), 78 # so the current model is considered fresh too 79 return True 80 81 if external_parents: 82 external_last_altered_timestamps = adapter.get_table_last_modified_ts( 83 list(external_parents) 84 ) 85 86 if len(external_last_altered_timestamps) != len(external_parents): 87 raise MissingSourceError( 88 f"Expected {len(external_parents)} sources to be present, but got {len(external_last_altered_timestamps)}." 89 ) 90 91 # Finding new data means that the upstream depedencies have been altered 92 # since the last time the model was evaluated 93 return any( 94 external_last_altered_ts > last_altered_ts 95 for external_last_altered_ts in external_last_altered_timestamps 96 ) 97 98 return False
15class signal(registry_decorator): 16 """Specifies a function which intervals are ready from a list of scheduled intervals. 17 18 When SQLMesh wishes to execute a batch of intervals, say between `a` and `d`, then 19 the `batch` parameter will contain each individual interval within this batch, 20 i.e.: `[a,b),[b,c),[c,d)`. 21 22 This function may return `True` to indicate that the whole batch is ready, 23 `False` to indicate none of the batch's intervals are ready, or a list of 24 intervals (a batch) to indicate exactly which ones are ready. 25 26 When returning a batch, the function is expected to return a subset of 27 the `batch` parameter, e.g.: `[a,b),[b,c)`. Note that it may return 28 gaps, e.g.: `[a,b),[c,d)`, but it may not alter the bounds of any of the 29 intervals. 30 31 The interface allows an implementation to check batches of intervals without 32 having to actually compute individual intervals itself. 33 34 Args: 35 batch: the list of intervals that are missing and scheduled to run. 36 37 Returns: 38 Either `True` to indicate all intervals are ready, `False` to indicate none are 39 ready or a list of intervals to indicate exactly which ones are ready. 40 """
Specifies a function which intervals are ready from a list of scheduled intervals.
When SQLMesh wishes to execute a batch of intervals, say between a and d, then
the batch parameter will contain each individual interval within this batch,
i.e.: [a,b),[b,c),[c,d).
This function may return True to indicate that the whole batch is ready,
False to indicate none of the batch's intervals are ready, or a list of
intervals (a batch) to indicate exactly which ones are ready.
When returning a batch, the function is expected to return a subset of
the batch parameter, e.g.: [a,b),[b,c). Note that it may return
gaps, e.g.: [a,b),[c,d), but it may not alter the bounds of any of the
intervals.
The interface allows an implementation to check batches of intervals without having to actually compute individual intervals itself.
Arguments:
- batch: the list of intervals that are missing and scheduled to run.
Returns:
Either
Trueto indicate all intervals are ready,Falseto indicate none are ready or a list of intervals to indicate exactly which ones are ready.
46@signal() 47def freshness( 48 batch: DatetimeRanges, 49 snapshot: Snapshot, 50 context: ExecutionContext, 51) -> bool: 52 """ 53 Implements model freshness as a signal, i.e it considers this model to be fresh if: 54 - Any upstream SQLMesh model has available intervals to compute i.e is fresh 55 - Any upstream external model has been altered since the last time the model was evaluated 56 """ 57 adapter = context.engine_adapter 58 if context.is_restatement or not adapter.SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS: 59 return True 60 61 deployability_index = context.deployability_index or DeployabilityIndex.all_deployable() 62 63 last_altered_ts = ( 64 snapshot.last_altered_ts 65 if deployability_index.is_deployable(snapshot) 66 else snapshot.dev_last_altered_ts 67 ) 68 69 if not last_altered_ts: 70 return True 71 72 parent_snapshots = {context.snapshots[p.name] for p in snapshot.parents} 73 74 upstream_parent_snapshots = {p for p in parent_snapshots if not p.is_external} 75 external_parents = snapshot.node.depends_on - {p.name for p in upstream_parent_snapshots} 76 77 if context.parent_intervals: 78 # At least one upstream sqlmesh model has intervals to compute (i.e is fresh), 79 # so the current model is considered fresh too 80 return True 81 82 if external_parents: 83 external_last_altered_timestamps = adapter.get_table_last_modified_ts( 84 list(external_parents) 85 ) 86 87 if len(external_last_altered_timestamps) != len(external_parents): 88 raise MissingSourceError( 89 f"Expected {len(external_parents)} sources to be present, but got {len(external_last_altered_timestamps)}." 90 ) 91 92 # Finding new data means that the upstream depedencies have been altered 93 # since the last time the model was evaluated 94 return any( 95 external_last_altered_ts > last_altered_ts 96 for external_last_altered_ts in external_last_altered_timestamps 97 ) 98 99 return False
Implements model freshness as a signal, i.e it considers this model to be fresh if:
- Any upstream SQLMesh model has available intervals to compute i.e is fresh
- Any upstream external model has been altered since the last time the model was evaluated