Worker
Process background jobs with a database-driven worker.
from plain.worker import Job
from plain.mail import send_mail
# Create a new job class
class WelcomeUserJob(Job):
def __init__(self, user):
self.user = user
def run(self):
send_mail(
subject="Welcome!",
message=f"Hello from Plain, {self.user}",
from_email="[email protected]",
recipient_list=[self.user.email],
)
# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()
The worker process is run separately using plain worker run
.
Staff
Job history
Scheduled jobs
Monitoring
1import datetime
2import subprocess
3
4from plain.utils import timezone
5
6from .jobs import Job, load_job
7
8_MONTH_NAMES = {
9 "JAN": 1,
10 "FEB": 2,
11 "MAR": 3,
12 "APR": 4,
13 "MAY": 5,
14 "JUN": 6,
15 "JUL": 7,
16 "AUG": 8,
17 "SEP": 9,
18 "OCT": 10,
19 "NOV": 11,
20 "DEC": 12,
21}
22_DAY_NAMES = {
23 "MON": 0,
24 "TUE": 1,
25 "WED": 2,
26 "THU": 3,
27 "FRI": 4,
28 "SAT": 5,
29 "SUN": 6,
30}
31
32
33class _ScheduleComponent:
34 def __init__(self, values, raw=""):
35 self.values = sorted(values)
36 self._raw = raw
37
38 def __str__(self):
39 if self._raw:
40 return self._raw
41 return ",".join(str(v) for v in self.values)
42
43 def __eq__(self, other):
44 return self.values == other.values
45
46 @classmethod
47 def parse(cls, value, min_allowed, max_allowed, str_conversions=None):
48 if str_conversions is None:
49 str_conversions = {}
50
51 if isinstance(value, int):
52 if value < min_allowed or value > max_allowed:
53 raise ValueError(
54 f"Schedule component should be between {min_allowed} and {max_allowed}"
55 )
56 return cls([value], raw=value)
57
58 if not isinstance(value, str):
59 raise ValueError("Schedule component should be an int or str")
60
61 # First split any subcomponents and re-parse them
62 if "," in value:
63 return cls(
64 sum(
65 (
66 cls.parse(
67 sub_value, min_allowed, max_allowed, str_conversions
68 ).values
69 for sub_value in value.split(",")
70 ),
71 [],
72 ),
73 raw=value,
74 )
75
76 if value == "*":
77 return cls(list(range(min_allowed, max_allowed + 1)), raw=value)
78
79 def _convert(value):
80 result = str_conversions.get(value.upper(), value)
81 return int(result)
82
83 if "/" in value:
84 values, step = value.split("/")
85 values = cls.parse(values, min_allowed, max_allowed, str_conversions)
86 return cls([v for v in values.values if v % int(step) == 0], raw=value)
87
88 if "-" in value:
89 start, end = value.split("-")
90 return cls(list(range(_convert(start), _convert(end) + 1)), raw=value)
91
92 return cls([_convert(value)], raw=value)
93
94
95class Schedule:
96 def __init__(
97 self,
98 *,
99 minute="*",
100 hour="*",
101 day_of_month="*",
102 month="*",
103 day_of_week="*",
104 raw="",
105 ):
106 self.minute = _ScheduleComponent.parse(minute, min_allowed=0, max_allowed=59)
107 self.hour = _ScheduleComponent.parse(hour, min_allowed=0, max_allowed=23)
108 self.day_of_month = _ScheduleComponent.parse(
109 day_of_month, min_allowed=1, max_allowed=31
110 )
111 self.month = _ScheduleComponent.parse(
112 month,
113 min_allowed=1,
114 max_allowed=12,
115 str_conversions=_MONTH_NAMES,
116 )
117 self.day_of_week = _ScheduleComponent.parse(
118 day_of_week,
119 min_allowed=0,
120 max_allowed=6,
121 str_conversions=_DAY_NAMES,
122 )
123 self._raw = raw
124
125 def __str__(self):
126 if self._raw:
127 return self._raw
128 return f"{self.minute} {self.hour} {self.day_of_month} {self.month} {self.day_of_week}"
129
130 def __repr__(self) -> str:
131 return f"<Schedule {self}>"
132
133 @classmethod
134 def from_cron(cls, cron):
135 raw = cron
136
137 if cron == "@yearly" or cron == "@annually":
138 cron = "0 0 1 1 *"
139 elif cron == "@monthly":
140 cron = "0 0 1 * *"
141 elif cron == "@weekly":
142 cron = "0 0 * * 0"
143 elif cron == "@daily" or cron == "@midnight":
144 cron = "0 0 * * *"
145 elif cron == "@hourly":
146 cron = "0 * * * *"
147
148 minute, hour, day_of_month, month, day_of_week = cron.split()
149
150 return cls(
151 minute=minute,
152 hour=hour,
153 day_of_month=day_of_month,
154 month=month,
155 day_of_week=day_of_week,
156 raw=raw,
157 )
158
159 def next(self, now=None):
160 """
161 Find the next datetime that matches the schedule after the given datetime.
162 """
163 dt = now or timezone.localtime() # Use the defined plain timezone by default
164
165 # We only care about minutes, so immediately jump to the next minute
166 dt += datetime.timedelta(minutes=1)
167 dt = dt.replace(second=0, microsecond=0)
168
169 def _go_to_next_day(v):
170 v = v + datetime.timedelta(days=1)
171 return v.replace(
172 hour=self.hour.values[0],
173 minute=self.minute.values[0],
174 )
175
176 # If we don't find a value in the next 500 days,
177 # then the schedule is probably never going to match (i.e. Feb 31)
178 max_future = dt + datetime.timedelta(days=500)
179
180 while True:
181 is_valid_day = (
182 dt.month in self.month.values
183 and dt.day in self.day_of_month.values
184 and dt.weekday() in self.day_of_week.values
185 )
186 if is_valid_day:
187 # We're on a valid day, now find the next valid hour and minute
188 for hour in self.hour.values:
189 if hour < dt.hour:
190 continue
191 for minute in self.minute.values:
192 if hour == dt.hour and minute < dt.minute:
193 continue
194 candidate_datetime = dt.replace(hour=hour, minute=minute)
195 if candidate_datetime >= dt:
196 return candidate_datetime
197 # If no valid time is found today, reset to the first valid minute and hour of the next day
198 dt = _go_to_next_day(dt)
199 else:
200 # Increment the day until a valid month/day/weekday combination is found
201 dt = _go_to_next_day(dt)
202
203 if dt > max_future:
204 raise ValueError("No valid schedule match found in the next 500 days")
205
206
207class ScheduledCommand(Job):
208 def __init__(self, command):
209 self.command = command
210
211 def __repr__(self) -> str:
212 return f"<ScheduledCommand: {self.command}>"
213
214 def run(self):
215 subprocess.run(self.command, shell=True, check=True)
216
217 def get_unique_key(self) -> str:
218 # The ScheduledCommand can be used for different commands,
219 # so we need the unique_key to separate them in the scheduling uniqueness logic
220 return self.command
221
222
223def load_schedule(schedules):
224 jobs_schedule = []
225
226 for job, schedule in schedules:
227 if isinstance(job, str):
228 if job.startswith("cmd:"):
229 job = ScheduledCommand(job[4:])
230 else:
231 job = load_job(job, {"args": [], "kwargs": {}})
232
233 if isinstance(schedule, str):
234 schedule = Schedule.from_cron(schedule)
235
236 jobs_schedule.append((job, schedule))
237
238 return jobs_schedule