Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import datetime
  4import subprocess
  5from typing import Any
  6
  7from plain.utils import timezone
  8
  9from .jobs import Job
 10from .registry import jobs_registry, register_job
 11
 12_MONTH_NAMES = {
 13    "JAN": 1,
 14    "FEB": 2,
 15    "MAR": 3,
 16    "APR": 4,
 17    "MAY": 5,
 18    "JUN": 6,
 19    "JUL": 7,
 20    "AUG": 8,
 21    "SEP": 9,
 22    "OCT": 10,
 23    "NOV": 11,
 24    "DEC": 12,
 25}
 26_DAY_NAMES = {
 27    "MON": 0,
 28    "TUE": 1,
 29    "WED": 2,
 30    "THU": 3,
 31    "FRI": 4,
 32    "SAT": 5,
 33    "SUN": 6,
 34}
 35
 36
 37class _ScheduleComponent:
 38    def __init__(self, values: list[int], raw: str | int = "") -> None:
 39        self.values = sorted(values)
 40        self._raw = raw
 41
 42    def __str__(self) -> str:
 43        if self._raw:
 44            return str(self._raw)
 45        return ",".join(str(v) for v in self.values)
 46
 47    def __eq__(self, other: Any) -> bool:
 48        return self.values == other.values
 49
 50    @classmethod
 51    def parse(
 52        cls,
 53        value: int | str,
 54        min_allowed: int,
 55        max_allowed: int,
 56        str_conversions: dict[str, int] | None = None,
 57    ) -> _ScheduleComponent:
 58        if str_conversions is None:
 59            str_conversions = {}
 60
 61        if isinstance(value, int):
 62            if value < min_allowed or value > max_allowed:
 63                raise ValueError(
 64                    f"Schedule component should be between {min_allowed} and {max_allowed}"
 65                )
 66            return cls([value], raw=value)
 67
 68        if not isinstance(value, str):
 69            raise ValueError("Schedule component should be an int or str")
 70
 71        # First split any subcomponents and re-parse them
 72        if "," in value:
 73            return cls(
 74                sum(
 75                    (
 76                        cls.parse(
 77                            sub_value, min_allowed, max_allowed, str_conversions
 78                        ).values
 79                        for sub_value in value.split(",")
 80                    ),
 81                    [],
 82                ),
 83                raw=value,
 84            )
 85
 86        if value == "*":
 87            return cls(list(range(min_allowed, max_allowed + 1)), raw=value)
 88
 89        def _convert(value: str) -> int:
 90            result = str_conversions.get(value.upper(), value)
 91            return int(result)
 92
 93        if "/" in value:
 94            values, step = value.split("/")
 95            values = cls.parse(values, min_allowed, max_allowed, str_conversions)
 96            return cls([v for v in values.values if v % int(step) == 0], raw=value)
 97
 98        if "-" in value:
 99            start, end = value.split("-")
100            return cls(list(range(_convert(start), _convert(end) + 1)), raw=value)
101
102        return cls([_convert(value)], raw=value)
103
104
105class Schedule:
106    def __init__(
107        self,
108        *,
109        minute: int | str = "*",
110        hour: int | str = "*",
111        day_of_month: int | str = "*",
112        month: int | str = "*",
113        day_of_week: int | str = "*",
114        raw: str = "",
115    ) -> None:
116        self.minute = _ScheduleComponent.parse(minute, min_allowed=0, max_allowed=59)
117        self.hour = _ScheduleComponent.parse(hour, min_allowed=0, max_allowed=23)
118        self.day_of_month = _ScheduleComponent.parse(
119            day_of_month, min_allowed=1, max_allowed=31
120        )
121        self.month = _ScheduleComponent.parse(
122            month,
123            min_allowed=1,
124            max_allowed=12,
125            str_conversions=_MONTH_NAMES,
126        )
127        self.day_of_week = _ScheduleComponent.parse(
128            day_of_week,
129            min_allowed=0,
130            max_allowed=6,
131            str_conversions=_DAY_NAMES,
132        )
133        self._raw = raw
134
135    def __str__(self) -> str:
136        if self._raw:
137            return self._raw
138        return f"{self.minute} {self.hour} {self.day_of_month} {self.month} {self.day_of_week}"
139
140    def __repr__(self) -> str:
141        return f"<Schedule {self}>"
142
143    @classmethod
144    def from_cron(cls, cron: str) -> Schedule:
145        raw = cron
146
147        if cron == "@yearly" or cron == "@annually":
148            cron = "0 0 1 1 *"
149        elif cron == "@monthly":
150            cron = "0 0 1 * *"
151        elif cron == "@weekly":
152            cron = "0 0 * * 0"
153        elif cron == "@daily" or cron == "@midnight":
154            cron = "0 0 * * *"
155        elif cron == "@hourly":
156            cron = "0 * * * *"
157
158        minute, hour, day_of_month, month, day_of_week = cron.split()
159
160        return cls(
161            minute=minute,
162            hour=hour,
163            day_of_month=day_of_month,
164            month=month,
165            day_of_week=day_of_week,
166            raw=raw,
167        )
168
169    def next(self, now: datetime.datetime | None = None) -> datetime.datetime:
170        """
171        Find the next datetime that matches the schedule after the given datetime.
172        """
173        dt = now or timezone.localtime()  # Use the defined plain timezone by default
174
175        # We only care about minutes, so immediately jump to the next minute
176        dt += datetime.timedelta(minutes=1)
177        dt = dt.replace(second=0, microsecond=0)
178
179        def _go_to_next_day(v: datetime.datetime) -> datetime.datetime:
180            v = v + datetime.timedelta(days=1)
181            return v.replace(
182                hour=self.hour.values[0],
183                minute=self.minute.values[0],
184            )
185
186        # If we don't find a value in the next 500 days,
187        # then the schedule is probably never going to match (i.e. Feb 31)
188        max_future = dt + datetime.timedelta(days=500)
189
190        while True:
191            is_valid_day = (
192                dt.month in self.month.values
193                and dt.day in self.day_of_month.values
194                and dt.weekday() in self.day_of_week.values
195            )
196            if is_valid_day:
197                # We're on a valid day, now find the next valid hour and minute
198                for hour in self.hour.values:
199                    if hour < dt.hour:
200                        continue
201                    for minute in self.minute.values:
202                        if hour == dt.hour and minute < dt.minute:
203                            continue
204                        candidate_datetime = dt.replace(hour=hour, minute=minute)
205                        if candidate_datetime >= dt:
206                            return candidate_datetime
207                # If no valid time is found today, reset to the first valid minute and hour of the next day
208                dt = _go_to_next_day(dt)
209            else:
210                # Increment the day until a valid month/day/weekday combination is found
211                dt = _go_to_next_day(dt)
212
213            if dt > max_future:
214                raise ValueError("No valid schedule match found in the next 500 days")
215
216
217@register_job
218class ScheduledCommand(Job):
219    def __init__(self, command: str) -> None:
220        self.command = command
221
222    def __repr__(self) -> str:
223        return f"<ScheduledCommand: {self.command}>"
224
225    def run(self) -> None:
226        subprocess.run(self.command, shell=True, check=True)
227
228    def get_unique_key(self) -> str:
229        # The ScheduledCommand can be used for different commands,
230        # so we need the unique_key to separate them in the scheduling uniqueness logic
231        return self.command
232
233
234def load_schedule(
235    schedules: list[tuple[str | Job, str | Schedule]],
236) -> list[tuple[Job, Schedule]]:
237    jobs_schedule: list[tuple[Job, Schedule]] = []
238
239    for job, schedule in schedules:
240        if isinstance(job, str):
241            if job.startswith("cmd:"):
242                job = ScheduledCommand(job[4:])
243            else:
244                job = jobs_registry.load_job(job, {"args": [], "kwargs": {}})
245
246        if isinstance(schedule, str):
247            schedule = Schedule.from_cron(schedule)
248
249        jobs_schedule.append((job, schedule))
250
251    return jobs_schedule