v0.146.0
  1from __future__ import annotations
  2
  3import datetime
  4import subprocess
  5from typing import Any
  6
  7from plain.utils import timezone
  8
  9from .jobs import Job
 10from .registry import jobs_registry, register_job
 11
 12__all__ = ["Schedule", "ScheduledCommand"]
 13
 14_MONTH_NAMES = {
 15    "JAN": 1,
 16    "FEB": 2,
 17    "MAR": 3,
 18    "APR": 4,
 19    "MAY": 5,
 20    "JUN": 6,
 21    "JUL": 7,
 22    "AUG": 8,
 23    "SEP": 9,
 24    "OCT": 10,
 25    "NOV": 11,
 26    "DEC": 12,
 27}
 28_DAY_NAMES = {
 29    "MON": 0,
 30    "TUE": 1,
 31    "WED": 2,
 32    "THU": 3,
 33    "FRI": 4,
 34    "SAT": 5,
 35    "SUN": 6,
 36}
 37
 38
 39class _ScheduleComponent:
 40    def __init__(self, values: list[int], raw: str | int = "") -> None:
 41        self.values = sorted(values)
 42        self._raw = raw
 43
 44    def __str__(self) -> str:
 45        if self._raw:
 46            return str(self._raw)
 47        return ",".join(str(v) for v in self.values)
 48
 49    def __eq__(self, other: Any) -> bool:
 50        return self.values == other.values
 51
 52    @classmethod
 53    def parse(
 54        cls,
 55        value: int | str,
 56        min_allowed: int,
 57        max_allowed: int,
 58        str_conversions: dict[str, int] | None = None,
 59    ) -> _ScheduleComponent:
 60        if str_conversions is None:
 61            str_conversions = {}
 62
 63        if isinstance(value, int):
 64            if value < min_allowed or value > max_allowed:
 65                raise ValueError(
 66                    f"Schedule component should be between {min_allowed} and {max_allowed}"
 67                )
 68            return cls([value], raw=value)
 69
 70        if not isinstance(value, str):
 71            raise ValueError("Schedule component should be an int or str")
 72
 73        # First split any subcomponents and re-parse them
 74        if "," in value:
 75            return cls(
 76                sum(
 77                    (
 78                        cls.parse(
 79                            sub_value, min_allowed, max_allowed, str_conversions
 80                        ).values
 81                        for sub_value in value.split(",")
 82                    ),
 83                    [],
 84                ),
 85                raw=value,
 86            )
 87
 88        if value == "*":
 89            return cls(list(range(min_allowed, max_allowed + 1)), raw=value)
 90
 91        def _convert(value: str) -> int:
 92            result = str_conversions.get(value.upper(), value)
 93            return int(result)
 94
 95        if "/" in value:
 96            values, step = value.split("/")
 97            values = cls.parse(values, min_allowed, max_allowed, str_conversions)
 98            return cls([v for v in values.values if v % int(step) == 0], raw=value)
 99
100        if "-" in value:
101            start, end = value.split("-")
102            return cls(list(range(_convert(start), _convert(end) + 1)), raw=value)
103
104        return cls([_convert(value)], raw=value)
105
106
107class Schedule:
108    def __init__(
109        self,
110        *,
111        minute: int | str = "*",
112        hour: int | str = "*",
113        day_of_month: int | str = "*",
114        month: int | str = "*",
115        day_of_week: int | str = "*",
116        raw: str = "",
117    ) -> None:
118        self.minute = _ScheduleComponent.parse(minute, min_allowed=0, max_allowed=59)
119        self.hour = _ScheduleComponent.parse(hour, min_allowed=0, max_allowed=23)
120        self.day_of_month = _ScheduleComponent.parse(
121            day_of_month, min_allowed=1, max_allowed=31
122        )
123        self.month = _ScheduleComponent.parse(
124            month,
125            min_allowed=1,
126            max_allowed=12,
127            str_conversions=_MONTH_NAMES,
128        )
129        self.day_of_week = _ScheduleComponent.parse(
130            day_of_week,
131            min_allowed=0,
132            max_allowed=6,
133            str_conversions=_DAY_NAMES,
134        )
135        self._raw = raw
136
137    def __str__(self) -> str:
138        if self._raw:
139            return self._raw
140        return f"{self.minute} {self.hour} {self.day_of_month} {self.month} {self.day_of_week}"
141
142    def __repr__(self) -> str:
143        return f"<Schedule {self}>"
144
145    @classmethod
146    def from_cron(cls, cron: str) -> Schedule:
147        raw = cron
148
149        if cron == "@yearly" or cron == "@annually":
150            cron = "0 0 1 1 *"
151        elif cron == "@monthly":
152            cron = "0 0 1 * *"
153        elif cron == "@weekly":
154            cron = "0 0 * * 0"
155        elif cron == "@daily" or cron == "@midnight":
156            cron = "0 0 * * *"
157        elif cron == "@hourly":
158            cron = "0 * * * *"
159
160        minute, hour, day_of_month, month, day_of_week = cron.split()
161
162        return cls(
163            minute=minute,
164            hour=hour,
165            day_of_month=day_of_month,
166            month=month,
167            day_of_week=day_of_week,
168            raw=raw,
169        )
170
171    def next(self, now: datetime.datetime | None = None) -> datetime.datetime:
172        """
173        Find the next datetime that matches the schedule after the given datetime.
174        """
175        dt = now or timezone.localtime()  # Use the defined plain timezone by default
176
177        # We only care about minutes, so immediately jump to the next minute
178        dt += datetime.timedelta(minutes=1)
179        dt = dt.replace(second=0, microsecond=0)
180
181        def _go_to_next_day(v: datetime.datetime) -> datetime.datetime:
182            v = v + datetime.timedelta(days=1)
183            return v.replace(
184                hour=self.hour.values[0],
185                minute=self.minute.values[0],
186            )
187
188        # If we don't find a value in the next 500 days,
189        # then the schedule is probably never going to match (i.e. Feb 31)
190        max_future = dt + datetime.timedelta(days=500)
191
192        while True:
193            is_valid_day = (
194                dt.month in self.month.values
195                and dt.day in self.day_of_month.values
196                and dt.weekday() in self.day_of_week.values
197            )
198            if is_valid_day:
199                # We're on a valid day, now find the next valid hour and minute
200                for hour in self.hour.values:
201                    if hour < dt.hour:
202                        continue
203                    for minute in self.minute.values:
204                        if hour == dt.hour and minute < dt.minute:
205                            continue
206                        candidate_datetime = dt.replace(hour=hour, minute=minute)
207                        if candidate_datetime >= dt:
208                            return candidate_datetime
209                # If no valid time is found today, reset to the first valid minute and hour of the next day
210                dt = _go_to_next_day(dt)
211            else:
212                # Increment the day until a valid month/day/weekday combination is found
213                dt = _go_to_next_day(dt)
214
215            if dt > max_future:
216                raise ValueError("No valid schedule match found in the next 500 days")
217
218
219@register_job
220class ScheduledCommand(Job):
221    """Run a shell command on a schedule."""
222
223    def __init__(self, command: str) -> None:
224        self.command = command
225
226    def __repr__(self) -> str:
227        return f"<ScheduledCommand: {self.command}>"
228
229    def run(self) -> None:
230        subprocess.run(self.command, shell=True, check=True)
231
232    def default_concurrency_key(self) -> str:
233        # The ScheduledCommand can be used for different commands,
234        # so we need the concurrency_key to separate them for uniqueness
235        return self.command
236
237
238def load_schedule(
239    schedules: list[tuple[str | Job, str | Schedule]],
240) -> list[tuple[Job, Schedule]]:
241    jobs_schedule: list[tuple[Job, Schedule]] = []
242
243    for job, schedule in schedules:
244        if isinstance(job, str):
245            if job.startswith("cmd:"):
246                job = ScheduledCommand(job[4:])
247            else:
248                job = jobs_registry.load_job(job, {"args": [], "kwargs": {}})
249
250        if isinstance(schedule, str):
251            schedule = Schedule.from_cron(schedule)
252
253        jobs_schedule.append((job, schedule))
254
255    return jobs_schedule