1from __future__ import annotations
2
3import datetime
4import subprocess
5from typing import Any
6
7from plain.utils import timezone
8
9from .jobs import Job
10from .registry import jobs_registry, register_job
11
12_MONTH_NAMES = {
13 "JAN": 1,
14 "FEB": 2,
15 "MAR": 3,
16 "APR": 4,
17 "MAY": 5,
18 "JUN": 6,
19 "JUL": 7,
20 "AUG": 8,
21 "SEP": 9,
22 "OCT": 10,
23 "NOV": 11,
24 "DEC": 12,
25}
26_DAY_NAMES = {
27 "MON": 0,
28 "TUE": 1,
29 "WED": 2,
30 "THU": 3,
31 "FRI": 4,
32 "SAT": 5,
33 "SUN": 6,
34}
35
36
37class _ScheduleComponent:
38 def __init__(self, values: list[int], raw: str | int = "") -> None:
39 self.values = sorted(values)
40 self._raw = raw
41
42 def __str__(self) -> str:
43 if self._raw:
44 return str(self._raw)
45 return ",".join(str(v) for v in self.values)
46
47 def __eq__(self, other: Any) -> bool:
48 return self.values == other.values
49
50 @classmethod
51 def parse(
52 cls,
53 value: int | str,
54 min_allowed: int,
55 max_allowed: int,
56 str_conversions: dict[str, int] | None = None,
57 ) -> _ScheduleComponent:
58 if str_conversions is None:
59 str_conversions = {}
60
61 if isinstance(value, int):
62 if value < min_allowed or value > max_allowed:
63 raise ValueError(
64 f"Schedule component should be between {min_allowed} and {max_allowed}"
65 )
66 return cls([value], raw=value)
67
68 if not isinstance(value, str):
69 raise ValueError("Schedule component should be an int or str")
70
71 # First split any subcomponents and re-parse them
72 if "," in value:
73 return cls(
74 sum(
75 (
76 cls.parse(
77 sub_value, min_allowed, max_allowed, str_conversions
78 ).values
79 for sub_value in value.split(",")
80 ),
81 [],
82 ),
83 raw=value,
84 )
85
86 if value == "*":
87 return cls(list(range(min_allowed, max_allowed + 1)), raw=value)
88
89 def _convert(value: str) -> int:
90 result = str_conversions.get(value.upper(), value)
91 return int(result)
92
93 if "/" in value:
94 values, step = value.split("/")
95 values = cls.parse(values, min_allowed, max_allowed, str_conversions)
96 return cls([v for v in values.values if v % int(step) == 0], raw=value)
97
98 if "-" in value:
99 start, end = value.split("-")
100 return cls(list(range(_convert(start), _convert(end) + 1)), raw=value)
101
102 return cls([_convert(value)], raw=value)
103
104
105class Schedule:
106 def __init__(
107 self,
108 *,
109 minute: int | str = "*",
110 hour: int | str = "*",
111 day_of_month: int | str = "*",
112 month: int | str = "*",
113 day_of_week: int | str = "*",
114 raw: str = "",
115 ) -> None:
116 self.minute = _ScheduleComponent.parse(minute, min_allowed=0, max_allowed=59)
117 self.hour = _ScheduleComponent.parse(hour, min_allowed=0, max_allowed=23)
118 self.day_of_month = _ScheduleComponent.parse(
119 day_of_month, min_allowed=1, max_allowed=31
120 )
121 self.month = _ScheduleComponent.parse(
122 month,
123 min_allowed=1,
124 max_allowed=12,
125 str_conversions=_MONTH_NAMES,
126 )
127 self.day_of_week = _ScheduleComponent.parse(
128 day_of_week,
129 min_allowed=0,
130 max_allowed=6,
131 str_conversions=_DAY_NAMES,
132 )
133 self._raw = raw
134
135 def __str__(self) -> str:
136 if self._raw:
137 return self._raw
138 return f"{self.minute} {self.hour} {self.day_of_month} {self.month} {self.day_of_week}"
139
140 def __repr__(self) -> str:
141 return f"<Schedule {self}>"
142
143 @classmethod
144 def from_cron(cls, cron: str) -> Schedule:
145 raw = cron
146
147 if cron == "@yearly" or cron == "@annually":
148 cron = "0 0 1 1 *"
149 elif cron == "@monthly":
150 cron = "0 0 1 * *"
151 elif cron == "@weekly":
152 cron = "0 0 * * 0"
153 elif cron == "@daily" or cron == "@midnight":
154 cron = "0 0 * * *"
155 elif cron == "@hourly":
156 cron = "0 * * * *"
157
158 minute, hour, day_of_month, month, day_of_week = cron.split()
159
160 return cls(
161 minute=minute,
162 hour=hour,
163 day_of_month=day_of_month,
164 month=month,
165 day_of_week=day_of_week,
166 raw=raw,
167 )
168
169 def next(self, now: datetime.datetime | None = None) -> datetime.datetime:
170 """
171 Find the next datetime that matches the schedule after the given datetime.
172 """
173 dt = now or timezone.localtime() # Use the defined plain timezone by default
174
175 # We only care about minutes, so immediately jump to the next minute
176 dt += datetime.timedelta(minutes=1)
177 dt = dt.replace(second=0, microsecond=0)
178
179 def _go_to_next_day(v: datetime.datetime) -> datetime.datetime:
180 v = v + datetime.timedelta(days=1)
181 return v.replace(
182 hour=self.hour.values[0],
183 minute=self.minute.values[0],
184 )
185
186 # If we don't find a value in the next 500 days,
187 # then the schedule is probably never going to match (i.e. Feb 31)
188 max_future = dt + datetime.timedelta(days=500)
189
190 while True:
191 is_valid_day = (
192 dt.month in self.month.values
193 and dt.day in self.day_of_month.values
194 and dt.weekday() in self.day_of_week.values
195 )
196 if is_valid_day:
197 # We're on a valid day, now find the next valid hour and minute
198 for hour in self.hour.values:
199 if hour < dt.hour:
200 continue
201 for minute in self.minute.values:
202 if hour == dt.hour and minute < dt.minute:
203 continue
204 candidate_datetime = dt.replace(hour=hour, minute=minute)
205 if candidate_datetime >= dt:
206 return candidate_datetime
207 # If no valid time is found today, reset to the first valid minute and hour of the next day
208 dt = _go_to_next_day(dt)
209 else:
210 # Increment the day until a valid month/day/weekday combination is found
211 dt = _go_to_next_day(dt)
212
213 if dt > max_future:
214 raise ValueError("No valid schedule match found in the next 500 days")
215
216
217@register_job
218class ScheduledCommand(Job):
219 def __init__(self, command: str) -> None:
220 self.command = command
221
222 def __repr__(self) -> str:
223 return f"<ScheduledCommand: {self.command}>"
224
225 def run(self) -> None:
226 subprocess.run(self.command, shell=True, check=True)
227
228 def get_unique_key(self) -> str:
229 # The ScheduledCommand can be used for different commands,
230 # so we need the unique_key to separate them in the scheduling uniqueness logic
231 return self.command
232
233
234def load_schedule(
235 schedules: list[tuple[str | Job, str | Schedule]],
236) -> list[tuple[Job, Schedule]]:
237 jobs_schedule: list[tuple[Job, Schedule]] = []
238
239 for job, schedule in schedules:
240 if isinstance(job, str):
241 if job.startswith("cmd:"):
242 job = ScheduledCommand(job[4:])
243 else:
244 job = jobs_registry.load_job(job, {"args": [], "kwargs": {}})
245
246 if isinstance(schedule, str):
247 schedule = Schedule.from_cron(schedule)
248
249 jobs_schedule.append((job, schedule))
250
251 return jobs_schedule