Edit on GitHub

sqlmesh.core.signal

 1from __future__ import annotations
 2
 3import typing as t
 4from sqlmesh.utils import UniqueKeyDict, registry_decorator
 5from sqlmesh.utils.errors import MissingSourceError
 6
 7if t.TYPE_CHECKING:
 8    from sqlmesh.core.context import ExecutionContext
 9    from sqlmesh.core.snapshot.definition import Snapshot
10    from sqlmesh.utils.date import DatetimeRanges
11    from sqlmesh.core.snapshot.definition import DeployabilityIndex
12
13
14class signal(registry_decorator):
15    """Specifies a function which intervals are ready from a list of scheduled intervals.
16
17    When SQLMesh wishes to execute a batch of intervals, say between `a` and `d`, then
18    the `batch` parameter will contain each individual interval within this batch,
19    i.e.: `[a,b),[b,c),[c,d)`.
20
21    This function may return `True` to indicate that the whole batch is ready,
22    `False` to indicate none of the batch's intervals are ready, or a list of
23    intervals (a batch) to indicate exactly which ones are ready.
24
25    When returning a batch, the function is expected to return a subset of
26    the `batch` parameter, e.g.: `[a,b),[b,c)`. Note that it may return
27    gaps, e.g.: `[a,b),[c,d)`, but it may not alter the bounds of any of the
28    intervals.
29
30    The interface allows an implementation to check batches of intervals without
31    having to actually compute individual intervals itself.
32
33    Args:
34        batch: the list of intervals that are missing and scheduled to run.
35
36    Returns:
37        Either `True` to indicate all intervals are ready, `False` to indicate none are
38        ready or a list of intervals to indicate exactly which ones are ready.
39    """
40
41
42SignalRegistry = UniqueKeyDict[str, signal]
43
44
45@signal()
46def freshness(
47    batch: DatetimeRanges,
48    snapshot: Snapshot,
49    context: ExecutionContext,
50) -> bool:
51    """
52    Implements model freshness as a signal, i.e it considers this model to be fresh if:
53    - Any upstream SQLMesh model has available intervals to compute i.e is fresh
54    - Any upstream external model has been altered since the last time the model was evaluated
55    """
56    adapter = context.engine_adapter
57    if context.is_restatement or not adapter.SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS:
58        return True
59
60    deployability_index = context.deployability_index or DeployabilityIndex.all_deployable()
61
62    last_altered_ts = (
63        snapshot.last_altered_ts
64        if deployability_index.is_deployable(snapshot)
65        else snapshot.dev_last_altered_ts
66    )
67
68    if not last_altered_ts:
69        return True
70
71    parent_snapshots = {context.snapshots[p.name] for p in snapshot.parents}
72
73    upstream_parent_snapshots = {p for p in parent_snapshots if not p.is_external}
74    external_parents = snapshot.node.depends_on - {p.name for p in upstream_parent_snapshots}
75
76    if context.parent_intervals:
77        # At least one upstream sqlmesh model has intervals to compute (i.e is fresh),
78        # so the current model is considered fresh too
79        return True
80
81    if external_parents:
82        external_last_altered_timestamps = adapter.get_table_last_modified_ts(
83            list(external_parents)
84        )
85
86        if len(external_last_altered_timestamps) != len(external_parents):
87            raise MissingSourceError(
88                f"Expected {len(external_parents)} sources to be present, but got {len(external_last_altered_timestamps)}."
89            )
90
91        # Finding new data means that the upstream depedencies have been altered
92        # since the last time the model was evaluated
93        return any(
94            external_last_altered_ts > last_altered_ts
95            for external_last_altered_ts in external_last_altered_timestamps
96        )
97
98    return False
class signal(sqlmesh.utils.registry_decorator):
15class signal(registry_decorator):
16    """Specifies a function which intervals are ready from a list of scheduled intervals.
17
18    When SQLMesh wishes to execute a batch of intervals, say between `a` and `d`, then
19    the `batch` parameter will contain each individual interval within this batch,
20    i.e.: `[a,b),[b,c),[c,d)`.
21
22    This function may return `True` to indicate that the whole batch is ready,
23    `False` to indicate none of the batch's intervals are ready, or a list of
24    intervals (a batch) to indicate exactly which ones are ready.
25
26    When returning a batch, the function is expected to return a subset of
27    the `batch` parameter, e.g.: `[a,b),[b,c)`. Note that it may return
28    gaps, e.g.: `[a,b),[c,d)`, but it may not alter the bounds of any of the
29    intervals.
30
31    The interface allows an implementation to check batches of intervals without
32    having to actually compute individual intervals itself.
33
34    Args:
35        batch: the list of intervals that are missing and scheduled to run.
36
37    Returns:
38        Either `True` to indicate all intervals are ready, `False` to indicate none are
39        ready or a list of intervals to indicate exactly which ones are ready.
40    """

Specifies a function which intervals are ready from a list of scheduled intervals.

When SQLMesh wishes to execute a batch of intervals, say between a and d, then the batch parameter will contain each individual interval within this batch, i.e.: [a,b),[b,c),[c,d).

This function may return True to indicate that the whole batch is ready, False to indicate none of the batch's intervals are ready, or a list of intervals (a batch) to indicate exactly which ones are ready.

When returning a batch, the function is expected to return a subset of the batch parameter, e.g.: [a,b),[b,c). Note that it may return gaps, e.g.: [a,b),[c,d), but it may not alter the bounds of any of the intervals.

The interface allows an implementation to check batches of intervals without having to actually compute individual intervals itself.

Arguments:
  • batch: the list of intervals that are missing and scheduled to run.
Returns:

Either True to indicate all intervals are ready, False to indicate none are ready or a list of intervals to indicate exactly which ones are ready.

SignalRegistry = sqlmesh.utils.UniqueKeyDict[str, signal]
@signal()
def freshness( batch: List[Tuple[datetime.datetime, datetime.datetime]], snapshot: sqlmesh.core.snapshot.definition.Snapshot, context: sqlmesh.core.context.ExecutionContext) -> bool:
46@signal()
47def freshness(
48    batch: DatetimeRanges,
49    snapshot: Snapshot,
50    context: ExecutionContext,
51) -> bool:
52    """
53    Implements model freshness as a signal, i.e it considers this model to be fresh if:
54    - Any upstream SQLMesh model has available intervals to compute i.e is fresh
55    - Any upstream external model has been altered since the last time the model was evaluated
56    """
57    adapter = context.engine_adapter
58    if context.is_restatement or not adapter.SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS:
59        return True
60
61    deployability_index = context.deployability_index or DeployabilityIndex.all_deployable()
62
63    last_altered_ts = (
64        snapshot.last_altered_ts
65        if deployability_index.is_deployable(snapshot)
66        else snapshot.dev_last_altered_ts
67    )
68
69    if not last_altered_ts:
70        return True
71
72    parent_snapshots = {context.snapshots[p.name] for p in snapshot.parents}
73
74    upstream_parent_snapshots = {p for p in parent_snapshots if not p.is_external}
75    external_parents = snapshot.node.depends_on - {p.name for p in upstream_parent_snapshots}
76
77    if context.parent_intervals:
78        # At least one upstream sqlmesh model has intervals to compute (i.e is fresh),
79        # so the current model is considered fresh too
80        return True
81
82    if external_parents:
83        external_last_altered_timestamps = adapter.get_table_last_modified_ts(
84            list(external_parents)
85        )
86
87        if len(external_last_altered_timestamps) != len(external_parents):
88            raise MissingSourceError(
89                f"Expected {len(external_parents)} sources to be present, but got {len(external_last_altered_timestamps)}."
90            )
91
92        # Finding new data means that the upstream depedencies have been altered
93        # since the last time the model was evaluated
94        return any(
95            external_last_altered_ts > last_altered_ts
96            for external_last_altered_ts in external_last_altered_timestamps
97        )
98
99    return False

Implements model freshness as a signal, i.e it considers this model to be fresh if:

  • Any upstream SQLMesh model has available intervals to compute i.e is fresh
  • Any upstream external model has been altered since the last time the model was evaluated