1import datetime
2import subprocess
3
4from plain.utils import timezone
5
6from .jobs import Job
7from .registry import jobs_registry, register_job
8
9_MONTH_NAMES = {
10 "JAN": 1,
11 "FEB": 2,
12 "MAR": 3,
13 "APR": 4,
14 "MAY": 5,
15 "JUN": 6,
16 "JUL": 7,
17 "AUG": 8,
18 "SEP": 9,
19 "OCT": 10,
20 "NOV": 11,
21 "DEC": 12,
22}
23_DAY_NAMES = {
24 "MON": 0,
25 "TUE": 1,
26 "WED": 2,
27 "THU": 3,
28 "FRI": 4,
29 "SAT": 5,
30 "SUN": 6,
31}
32
33
34class _ScheduleComponent:
35 def __init__(self, values, raw=""):
36 self.values = sorted(values)
37 self._raw = raw
38
39 def __str__(self):
40 if self._raw:
41 return self._raw
42 return ",".join(str(v) for v in self.values)
43
44 def __eq__(self, other):
45 return self.values == other.values
46
47 @classmethod
48 def parse(cls, value, min_allowed, max_allowed, str_conversions=None):
49 if str_conversions is None:
50 str_conversions = {}
51
52 if isinstance(value, int):
53 if value < min_allowed or value > max_allowed:
54 raise ValueError(
55 f"Schedule component should be between {min_allowed} and {max_allowed}"
56 )
57 return cls([value], raw=value)
58
59 if not isinstance(value, str):
60 raise ValueError("Schedule component should be an int or str")
61
62 # First split any subcomponents and re-parse them
63 if "," in value:
64 return cls(
65 sum(
66 (
67 cls.parse(
68 sub_value, min_allowed, max_allowed, str_conversions
69 ).values
70 for sub_value in value.split(",")
71 ),
72 [],
73 ),
74 raw=value,
75 )
76
77 if value == "*":
78 return cls(list(range(min_allowed, max_allowed + 1)), raw=value)
79
80 def _convert(value):
81 result = str_conversions.get(value.upper(), value)
82 return int(result)
83
84 if "/" in value:
85 values, step = value.split("/")
86 values = cls.parse(values, min_allowed, max_allowed, str_conversions)
87 return cls([v for v in values.values if v % int(step) == 0], raw=value)
88
89 if "-" in value:
90 start, end = value.split("-")
91 return cls(list(range(_convert(start), _convert(end) + 1)), raw=value)
92
93 return cls([_convert(value)], raw=value)
94
95
96class Schedule:
97 def __init__(
98 self,
99 *,
100 minute="*",
101 hour="*",
102 day_of_month="*",
103 month="*",
104 day_of_week="*",
105 raw="",
106 ):
107 self.minute = _ScheduleComponent.parse(minute, min_allowed=0, max_allowed=59)
108 self.hour = _ScheduleComponent.parse(hour, min_allowed=0, max_allowed=23)
109 self.day_of_month = _ScheduleComponent.parse(
110 day_of_month, min_allowed=1, max_allowed=31
111 )
112 self.month = _ScheduleComponent.parse(
113 month,
114 min_allowed=1,
115 max_allowed=12,
116 str_conversions=_MONTH_NAMES,
117 )
118 self.day_of_week = _ScheduleComponent.parse(
119 day_of_week,
120 min_allowed=0,
121 max_allowed=6,
122 str_conversions=_DAY_NAMES,
123 )
124 self._raw = raw
125
126 def __str__(self):
127 if self._raw:
128 return self._raw
129 return f"{self.minute} {self.hour} {self.day_of_month} {self.month} {self.day_of_week}"
130
131 def __repr__(self) -> str:
132 return f"<Schedule {self}>"
133
134 @classmethod
135 def from_cron(cls, cron):
136 raw = cron
137
138 if cron == "@yearly" or cron == "@annually":
139 cron = "0 0 1 1 *"
140 elif cron == "@monthly":
141 cron = "0 0 1 * *"
142 elif cron == "@weekly":
143 cron = "0 0 * * 0"
144 elif cron == "@daily" or cron == "@midnight":
145 cron = "0 0 * * *"
146 elif cron == "@hourly":
147 cron = "0 * * * *"
148
149 minute, hour, day_of_month, month, day_of_week = cron.split()
150
151 return cls(
152 minute=minute,
153 hour=hour,
154 day_of_month=day_of_month,
155 month=month,
156 day_of_week=day_of_week,
157 raw=raw,
158 )
159
160 def next(self, now=None):
161 """
162 Find the next datetime that matches the schedule after the given datetime.
163 """
164 dt = now or timezone.localtime() # Use the defined plain timezone by default
165
166 # We only care about minutes, so immediately jump to the next minute
167 dt += datetime.timedelta(minutes=1)
168 dt = dt.replace(second=0, microsecond=0)
169
170 def _go_to_next_day(v):
171 v = v + datetime.timedelta(days=1)
172 return v.replace(
173 hour=self.hour.values[0],
174 minute=self.minute.values[0],
175 )
176
177 # If we don't find a value in the next 500 days,
178 # then the schedule is probably never going to match (i.e. Feb 31)
179 max_future = dt + datetime.timedelta(days=500)
180
181 while True:
182 is_valid_day = (
183 dt.month in self.month.values
184 and dt.day in self.day_of_month.values
185 and dt.weekday() in self.day_of_week.values
186 )
187 if is_valid_day:
188 # We're on a valid day, now find the next valid hour and minute
189 for hour in self.hour.values:
190 if hour < dt.hour:
191 continue
192 for minute in self.minute.values:
193 if hour == dt.hour and minute < dt.minute:
194 continue
195 candidate_datetime = dt.replace(hour=hour, minute=minute)
196 if candidate_datetime >= dt:
197 return candidate_datetime
198 # If no valid time is found today, reset to the first valid minute and hour of the next day
199 dt = _go_to_next_day(dt)
200 else:
201 # Increment the day until a valid month/day/weekday combination is found
202 dt = _go_to_next_day(dt)
203
204 if dt > max_future:
205 raise ValueError("No valid schedule match found in the next 500 days")
206
207
208@register_job
209class ScheduledCommand(Job):
210 def __init__(self, command):
211 self.command = command
212
213 def __repr__(self) -> str:
214 return f"<ScheduledCommand: {self.command}>"
215
216 def run(self):
217 subprocess.run(self.command, shell=True, check=True)
218
219 def get_unique_key(self) -> str:
220 # The ScheduledCommand can be used for different commands,
221 # so we need the unique_key to separate them in the scheduling uniqueness logic
222 return self.command
223
224
225def load_schedule(schedules):
226 jobs_schedule = []
227
228 for job, schedule in schedules:
229 if isinstance(job, str):
230 if job.startswith("cmd:"):
231 job = ScheduledCommand(job[4:])
232 else:
233 job = jobs_registry.load_job(job, {"args": [], "kwargs": {}})
234
235 if isinstance(schedule, str):
236 schedule = Schedule.from_cron(schedule)
237
238 jobs_schedule.append((job, schedule))
239
240 return jobs_schedule