sqlmesh.utils.cron
1from __future__ import annotations 2 3import typing as t 4from datetime import datetime, timedelta 5from functools import lru_cache 6 7from croniter import croniter 8from sqlglot.helper import first 9 10from sqlmesh.utils.date import TimeLike, now, to_datetime 11 12 13@lru_cache(maxsize=None) 14def interval_seconds(cron: str) -> int: 15 """Computes the interval seconds of a cron statement if it is deterministic. 16 17 Args: 18 cron: The cron string. 19 20 Returns: 21 The number of seconds that cron represents if it is stable, otherwise 0. 22 """ 23 deltas = set() 24 curr = to_datetime(croniter(cron).get_next() * 1000) 25 26 for _ in range(5): 27 prev = curr 28 curr = to_datetime(croniter(cron, curr).get_next() * 1000) 29 deltas.add(curr - prev) 30 31 if len(deltas) > 1: 32 return 0 33 return int(first(deltas).total_seconds()) 34 35 36class CroniterCache: 37 def __init__(self, cron: str, time: t.Optional[TimeLike] = None): 38 self.cron = cron 39 self.curr: datetime = to_datetime(now() if time is None else time) 40 self.interval_seconds = interval_seconds(self.cron) 41 42 def get_next(self, estimate: bool = False) -> datetime: 43 if estimate and self.interval_seconds: 44 self.curr = self.curr + timedelta(seconds=self.interval_seconds) 45 else: 46 self.curr = to_datetime(croniter(self.cron, self.curr).get_next() * 1000) 47 return self.curr 48 49 def get_prev(self, estimate: bool = False) -> datetime: 50 if estimate and self.interval_seconds: 51 self.curr = self.curr - timedelta(seconds=self.interval_seconds) 52 else: 53 self.curr = to_datetime(croniter(self.cron, self.curr).get_prev() * 1000) 54 return self.curr
@lru_cache(maxsize=None)
def
interval_seconds(cron: str) -> int:
14@lru_cache(maxsize=None) 15def interval_seconds(cron: str) -> int: 16 """Computes the interval seconds of a cron statement if it is deterministic. 17 18 Args: 19 cron: The cron string. 20 21 Returns: 22 The number of seconds that cron represents if it is stable, otherwise 0. 23 """ 24 deltas = set() 25 curr = to_datetime(croniter(cron).get_next() * 1000) 26 27 for _ in range(5): 28 prev = curr 29 curr = to_datetime(croniter(cron, curr).get_next() * 1000) 30 deltas.add(curr - prev) 31 32 if len(deltas) > 1: 33 return 0 34 return int(first(deltas).total_seconds())
Computes the interval seconds of a cron statement if it is deterministic.
Arguments:
- cron: The cron string.
Returns:
The number of seconds that cron represents if it is stable, otherwise 0.
class
CroniterCache:
37class CroniterCache: 38 def __init__(self, cron: str, time: t.Optional[TimeLike] = None): 39 self.cron = cron 40 self.curr: datetime = to_datetime(now() if time is None else time) 41 self.interval_seconds = interval_seconds(self.cron) 42 43 def get_next(self, estimate: bool = False) -> datetime: 44 if estimate and self.interval_seconds: 45 self.curr = self.curr + timedelta(seconds=self.interval_seconds) 46 else: 47 self.curr = to_datetime(croniter(self.cron, self.curr).get_next() * 1000) 48 return self.curr 49 50 def get_prev(self, estimate: bool = False) -> datetime: 51 if estimate and self.interval_seconds: 52 self.curr = self.curr - timedelta(seconds=self.interval_seconds) 53 else: 54 self.curr = to_datetime(croniter(self.cron, self.curr).get_prev() * 1000) 55 return self.curr
CroniterCache( cron: str, time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None)