Edit on GitHub

sqlmesh.utils.cron

 1from __future__ import annotations
 2
 3import typing as t
 4from datetime import datetime, timedelta
 5from functools import lru_cache
 6
 7from croniter import croniter
 8from sqlglot.helper import first
 9
10from sqlmesh.utils.date import TimeLike, now, to_datetime
11
12
13@lru_cache(maxsize=None)
14def interval_seconds(cron: str) -> int:
15    """Computes the interval seconds of a cron statement if it is deterministic.
16
17    Args:
18        cron: The cron string.
19
20    Returns:
21        The number of seconds that cron represents if it is stable, otherwise 0.
22    """
23    deltas = set()
24    curr = to_datetime(croniter(cron).get_next() * 1000)
25
26    for _ in range(5):
27        prev = curr
28        curr = to_datetime(croniter(cron, curr).get_next() * 1000)
29        deltas.add(curr - prev)
30
31        if len(deltas) > 1:
32            return 0
33    return int(first(deltas).total_seconds())
34
35
36class CroniterCache:
37    def __init__(self, cron: str, time: t.Optional[TimeLike] = None):
38        self.cron = cron
39        self.curr: datetime = to_datetime(now() if time is None else time)
40        self.interval_seconds = interval_seconds(self.cron)
41
42    def get_next(self, estimate: bool = False) -> datetime:
43        if estimate and self.interval_seconds:
44            self.curr = self.curr + timedelta(seconds=self.interval_seconds)
45        else:
46            self.curr = to_datetime(croniter(self.cron, self.curr).get_next() * 1000)
47        return self.curr
48
49    def get_prev(self, estimate: bool = False) -> datetime:
50        if estimate and self.interval_seconds:
51            self.curr = self.curr - timedelta(seconds=self.interval_seconds)
52        else:
53            self.curr = to_datetime(croniter(self.cron, self.curr).get_prev() * 1000)
54        return self.curr
@lru_cache(maxsize=None)
def interval_seconds(cron: str) -> int:
14@lru_cache(maxsize=None)
15def interval_seconds(cron: str) -> int:
16    """Computes the interval seconds of a cron statement if it is deterministic.
17
18    Args:
19        cron: The cron string.
20
21    Returns:
22        The number of seconds that cron represents if it is stable, otherwise 0.
23    """
24    deltas = set()
25    curr = to_datetime(croniter(cron).get_next() * 1000)
26
27    for _ in range(5):
28        prev = curr
29        curr = to_datetime(croniter(cron, curr).get_next() * 1000)
30        deltas.add(curr - prev)
31
32        if len(deltas) > 1:
33            return 0
34    return int(first(deltas).total_seconds())

Computes the interval seconds of a cron statement if it is deterministic.

Arguments:
  • cron: The cron string.
Returns:

The number of seconds that cron represents if it is stable, otherwise 0.

class CroniterCache:
37class CroniterCache:
38    def __init__(self, cron: str, time: t.Optional[TimeLike] = None):
39        self.cron = cron
40        self.curr: datetime = to_datetime(now() if time is None else time)
41        self.interval_seconds = interval_seconds(self.cron)
42
43    def get_next(self, estimate: bool = False) -> datetime:
44        if estimate and self.interval_seconds:
45            self.curr = self.curr + timedelta(seconds=self.interval_seconds)
46        else:
47            self.curr = to_datetime(croniter(self.cron, self.curr).get_next() * 1000)
48        return self.curr
49
50    def get_prev(self, estimate: bool = False) -> datetime:
51        if estimate and self.interval_seconds:
52            self.curr = self.curr - timedelta(seconds=self.interval_seconds)
53        else:
54            self.curr = to_datetime(croniter(self.cron, self.curr).get_prev() * 1000)
55        return self.curr
CroniterCache( cron: str, time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None)
38    def __init__(self, cron: str, time: t.Optional[TimeLike] = None):
39        self.cron = cron
40        self.curr: datetime = to_datetime(now() if time is None else time)
41        self.interval_seconds = interval_seconds(self.cron)
def get_next(self, estimate: bool = False) -> datetime.datetime:
43    def get_next(self, estimate: bool = False) -> datetime:
44        if estimate and self.interval_seconds:
45            self.curr = self.curr + timedelta(seconds=self.interval_seconds)
46        else:
47            self.curr = to_datetime(croniter(self.cron, self.curr).get_next() * 1000)
48        return self.curr
def get_prev(self, estimate: bool = False) -> datetime.datetime:
50    def get_prev(self, estimate: bool = False) -> datetime:
51        if estimate and self.interval_seconds:
52            self.curr = self.curr - timedelta(seconds=self.interval_seconds)
53        else:
54            self.curr = to_datetime(croniter(self.cron, self.curr).get_prev() * 1000)
55        return self.curr