1from __future__ import annotations
2
3import datetime
4import subprocess
5from typing import Any
6
7from plain.utils import timezone
8
9from .jobs import Job
10from .registry import jobs_registry, register_job
11
12__all__ = ["Schedule", "ScheduledCommand"]
13
14_MONTH_NAMES = {
15 "JAN": 1,
16 "FEB": 2,
17 "MAR": 3,
18 "APR": 4,
19 "MAY": 5,
20 "JUN": 6,
21 "JUL": 7,
22 "AUG": 8,
23 "SEP": 9,
24 "OCT": 10,
25 "NOV": 11,
26 "DEC": 12,
27}
28_DAY_NAMES = {
29 "MON": 0,
30 "TUE": 1,
31 "WED": 2,
32 "THU": 3,
33 "FRI": 4,
34 "SAT": 5,
35 "SUN": 6,
36}
37
38
39class _ScheduleComponent:
40 def __init__(self, values: list[int], raw: str | int = "") -> None:
41 self.values = sorted(values)
42 self._raw = raw
43
44 def __str__(self) -> str:
45 if self._raw:
46 return str(self._raw)
47 return ",".join(str(v) for v in self.values)
48
49 def __eq__(self, other: Any) -> bool:
50 return self.values == other.values
51
52 @classmethod
53 def parse(
54 cls,
55 value: int | str,
56 min_allowed: int,
57 max_allowed: int,
58 str_conversions: dict[str, int] | None = None,
59 ) -> _ScheduleComponent:
60 if str_conversions is None:
61 str_conversions = {}
62
63 if isinstance(value, int):
64 if value < min_allowed or value > max_allowed:
65 raise ValueError(
66 f"Schedule component should be between {min_allowed} and {max_allowed}"
67 )
68 return cls([value], raw=value)
69
70 if not isinstance(value, str):
71 raise ValueError("Schedule component should be an int or str")
72
73 # First split any subcomponents and re-parse them
74 if "," in value:
75 return cls(
76 sum(
77 (
78 cls.parse(
79 sub_value, min_allowed, max_allowed, str_conversions
80 ).values
81 for sub_value in value.split(",")
82 ),
83 [],
84 ),
85 raw=value,
86 )
87
88 if value == "*":
89 return cls(list(range(min_allowed, max_allowed + 1)), raw=value)
90
91 def _convert(value: str) -> int:
92 result = str_conversions.get(value.upper(), value)
93 return int(result)
94
95 if "/" in value:
96 values, step = value.split("/")
97 values = cls.parse(values, min_allowed, max_allowed, str_conversions)
98 return cls([v for v in values.values if v % int(step) == 0], raw=value)
99
100 if "-" in value:
101 start, end = value.split("-")
102 return cls(list(range(_convert(start), _convert(end) + 1)), raw=value)
103
104 return cls([_convert(value)], raw=value)
105
106
107class Schedule:
108 def __init__(
109 self,
110 *,
111 minute: int | str = "*",
112 hour: int | str = "*",
113 day_of_month: int | str = "*",
114 month: int | str = "*",
115 day_of_week: int | str = "*",
116 raw: str = "",
117 ) -> None:
118 self.minute = _ScheduleComponent.parse(minute, min_allowed=0, max_allowed=59)
119 self.hour = _ScheduleComponent.parse(hour, min_allowed=0, max_allowed=23)
120 self.day_of_month = _ScheduleComponent.parse(
121 day_of_month, min_allowed=1, max_allowed=31
122 )
123 self.month = _ScheduleComponent.parse(
124 month,
125 min_allowed=1,
126 max_allowed=12,
127 str_conversions=_MONTH_NAMES,
128 )
129 self.day_of_week = _ScheduleComponent.parse(
130 day_of_week,
131 min_allowed=0,
132 max_allowed=6,
133 str_conversions=_DAY_NAMES,
134 )
135 self._raw = raw
136
137 def __str__(self) -> str:
138 if self._raw:
139 return self._raw
140 return f"{self.minute} {self.hour} {self.day_of_month} {self.month} {self.day_of_week}"
141
142 def __repr__(self) -> str:
143 return f"<Schedule {self}>"
144
145 @classmethod
146 def from_cron(cls, cron: str) -> Schedule:
147 raw = cron
148
149 if cron == "@yearly" or cron == "@annually":
150 cron = "0 0 1 1 *"
151 elif cron == "@monthly":
152 cron = "0 0 1 * *"
153 elif cron == "@weekly":
154 cron = "0 0 * * 0"
155 elif cron == "@daily" or cron == "@midnight":
156 cron = "0 0 * * *"
157 elif cron == "@hourly":
158 cron = "0 * * * *"
159
160 minute, hour, day_of_month, month, day_of_week = cron.split()
161
162 return cls(
163 minute=minute,
164 hour=hour,
165 day_of_month=day_of_month,
166 month=month,
167 day_of_week=day_of_week,
168 raw=raw,
169 )
170
171 def next(self, now: datetime.datetime | None = None) -> datetime.datetime:
172 """
173 Find the next datetime that matches the schedule after the given datetime.
174 """
175 dt = now or timezone.localtime() # Use the defined plain timezone by default
176
177 # We only care about minutes, so immediately jump to the next minute
178 dt += datetime.timedelta(minutes=1)
179 dt = dt.replace(second=0, microsecond=0)
180
181 def _go_to_next_day(v: datetime.datetime) -> datetime.datetime:
182 v = v + datetime.timedelta(days=1)
183 return v.replace(
184 hour=self.hour.values[0],
185 minute=self.minute.values[0],
186 )
187
188 # If we don't find a value in the next 500 days,
189 # then the schedule is probably never going to match (i.e. Feb 31)
190 max_future = dt + datetime.timedelta(days=500)
191
192 while True:
193 is_valid_day = (
194 dt.month in self.month.values
195 and dt.day in self.day_of_month.values
196 and dt.weekday() in self.day_of_week.values
197 )
198 if is_valid_day:
199 # We're on a valid day, now find the next valid hour and minute
200 for hour in self.hour.values:
201 if hour < dt.hour:
202 continue
203 for minute in self.minute.values:
204 if hour == dt.hour and minute < dt.minute:
205 continue
206 candidate_datetime = dt.replace(hour=hour, minute=minute)
207 if candidate_datetime >= dt:
208 return candidate_datetime
209 # If no valid time is found today, reset to the first valid minute and hour of the next day
210 dt = _go_to_next_day(dt)
211 else:
212 # Increment the day until a valid month/day/weekday combination is found
213 dt = _go_to_next_day(dt)
214
215 if dt > max_future:
216 raise ValueError("No valid schedule match found in the next 500 days")
217
218
219@register_job
220class ScheduledCommand(Job):
221 """Run a shell command on a schedule."""
222
223 def __init__(self, command: str) -> None:
224 self.command = command
225
226 def __repr__(self) -> str:
227 return f"<ScheduledCommand: {self.command}>"
228
229 def run(self) -> None:
230 subprocess.run(self.command, shell=True, check=True)
231
232 def default_concurrency_key(self) -> str:
233 # The ScheduledCommand can be used for different commands,
234 # so we need the concurrency_key to separate them for uniqueness
235 return self.command
236
237
238def load_schedule(
239 schedules: list[tuple[str | Job, str | Schedule]],
240) -> list[tuple[Job, Schedule]]:
241 jobs_schedule: list[tuple[Job, Schedule]] = []
242
243 for job, schedule in schedules:
244 if isinstance(job, str):
245 if job.startswith("cmd:"):
246 job = ScheduledCommand(job[4:])
247 else:
248 job = jobs_registry.load_job(job, {"args": [], "kwargs": {}})
249
250 if isinstance(schedule, str):
251 schedule = Schedule.from_cron(schedule)
252
253 jobs_schedule.append((job, schedule))
254
255 return jobs_schedule