Worker
Process background jobs with a database-driven worker.
from plain.worker import Job
from plain.mail import send_mail
# Create a new job class
class WelcomeUserJob(Job):
def __init__(self, user):
self.user = user
def run(self):
send_mail(
subject="Welcome!",
message=f"Hello from Plain, {self.user}",
from_email="[email protected]",
recipient_list=[self.user.email],
)
# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()
The worker process is run separately using plain worker run
.
Staff
Job history
Scheduled jobs
Monitoring
1import gc
2import logging
3import multiprocessing
4import os
5import time
6from concurrent.futures import Future, ProcessPoolExecutor
7from functools import partial
8
9from plain import models
10from plain.models import transaction
11from plain.runtime import settings
12from plain.signals import request_finished, request_started
13from plain.utils import timezone
14from plain.utils.module_loading import import_string
15
16from .models import Job, JobRequest, JobResult, JobResultStatuses
17
18logger = logging.getLogger("plain.worker")
19
20
21class Worker:
22 def __init__(
23 self,
24 queues,
25 jobs_schedule=None,
26 max_processes=None,
27 max_jobs_per_process=None,
28 max_pending_per_process=10,
29 stats_every=None,
30 ):
31 if jobs_schedule is None:
32 jobs_schedule = []
33
34 self.executor = ProcessPoolExecutor(
35 max_workers=max_processes,
36 max_tasks_per_child=max_jobs_per_process,
37 mp_context=multiprocessing.get_context("spawn"),
38 )
39
40 self.queues = queues
41
42 # Filter the jobs schedule to those that are in the same queue as this worker
43 self.jobs_schedule = [x for x in jobs_schedule if x[0].get_queue() in queues]
44
45 # How often to log the stats (in seconds)
46 self.stats_every = stats_every
47
48 self.max_processes = self.executor._max_workers
49 self.max_jobs_per_process = max_jobs_per_process
50 self.max_pending_per_process = max_pending_per_process
51
52 self._is_shutting_down = False
53
54 def run(self):
55 logger.info(
56 "⬣ Starting Plain worker\n Queues: %s\n Jobs schedule: %s\n Stats every: %s seconds\n Max processes: %s\n Max jobs per process: %s\n Max pending per process: %s\n PID: %s",
57 ", ".join(self.queues),
58 "\n ".join(str(x) for x in self.jobs_schedule),
59 self.stats_every,
60 self.max_processes,
61 self.max_jobs_per_process,
62 self.max_pending_per_process,
63 os.getpid(),
64 )
65
66 while not self._is_shutting_down:
67 try:
68 self.maybe_log_stats()
69 self.maybe_check_job_results()
70 self.maybe_schedule_jobs()
71 except Exception as e:
72 # Log the issue, but don't stop the worker
73 # (these tasks are kind of ancilarry to the main job processing)
74 logger.exception(e)
75
76 if len(self.executor._pending_work_items) >= (
77 self.max_processes * self.max_pending_per_process
78 ):
79 # We don't want to convert too many JobRequests to Jobs,
80 # because anything not started yet will be cancelled on deploy etc.
81 # It's easier to leave them in the JobRequest db queue as long as possible.
82 time.sleep(0.1)
83 continue
84
85 with transaction.atomic():
86 job_request = (
87 JobRequest.objects.select_for_update(skip_locked=True)
88 .filter(
89 queue__in=self.queues,
90 )
91 .filter(
92 models.Q(start_at__isnull=True)
93 | models.Q(start_at__lte=timezone.now())
94 )
95 .order_by("priority", "-start_at", "-created_at")
96 .first()
97 )
98 if not job_request:
99 # Potentially no jobs to process (who knows for how long)
100 # but sleep for a second to give the CPU and DB a break
101 time.sleep(1)
102 continue
103
104 logger.info(
105 'Preparing to execute job job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queues="%s"',
106 job_request.job_class,
107 job_request.uuid,
108 job_request.priority,
109 job_request.source,
110 job_request.queue,
111 )
112
113 job = job_request.convert_to_job()
114
115 job_uuid = str(job.uuid) # Make a str copy
116
117 # Release these now
118 del job_request
119 del job
120
121 future = self.executor.submit(process_job, job_uuid)
122 future.add_done_callback(partial(future_finished_callback, job_uuid))
123
124 # Do a quick sleep regardless to see if it
125 # gives processes a chance to start up
126 time.sleep(0.1)
127
128 def shutdown(self):
129 if self._is_shutting_down:
130 # Already shutting down somewhere else
131 return
132
133 logger.info("Job worker shutdown started")
134 self._is_shutting_down = True
135 self.executor.shutdown(wait=True, cancel_futures=True)
136 logger.info("Job worker shutdown complete")
137
138 def maybe_log_stats(self):
139 if not self.stats_every:
140 return
141
142 now = time.time()
143
144 if not hasattr(self, "_stats_logged_at"):
145 self._stats_logged_at = now
146
147 if now - self._stats_logged_at > self.stats_every:
148 self._stats_logged_at = now
149 self.log_stats()
150
151 def maybe_check_job_results(self):
152 now = time.time()
153
154 if not hasattr(self, "_job_results_checked_at"):
155 self._job_results_checked_at = now
156
157 check_every = 60 # Only need to check once a minute
158
159 if now - self._job_results_checked_at > check_every:
160 self._job_results_checked_at = now
161 self.rescue_job_results()
162
163 def maybe_schedule_jobs(self):
164 if not self.jobs_schedule:
165 return
166
167 now = time.time()
168
169 if not hasattr(self, "_jobs_schedule_checked_at"):
170 self._jobs_schedule_checked_at = now
171
172 check_every = 60 # Only need to check once every 60 seconds
173
174 if now - self._jobs_schedule_checked_at > check_every:
175 for job, schedule in self.jobs_schedule:
176 next_start_at = schedule.next()
177
178 # Leverage the unique_key to prevent duplicate scheduled
179 # jobs with the same start time (also works if unique_key == "")
180 schedule_unique_key = (
181 f"{job.get_unique_key()}:scheduled:{int(next_start_at.timestamp())}"
182 )
183
184 # Drawback here is if scheduled job is running, and detected by unique_key
185 # so it doesn't schedule the next one? Maybe an ok downside... prevents
186 # overlapping executions...?
187 result = job.run_in_worker(
188 delay=next_start_at,
189 unique_key=schedule_unique_key,
190 )
191 # Results are a list if it found scheduled/running jobs...
192 if not isinstance(result, list):
193 logger.info(
194 'Scheduling job job_class=%s job_queue="%s" job_start_at="%s" job_schedule="%s" job_unique_key="%s"',
195 result.job_class,
196 result.queue,
197 result.start_at,
198 schedule,
199 result.unique_key,
200 )
201
202 self._jobs_schedule_checked_at = now
203
204 def log_stats(self):
205 try:
206 num_proccesses = len(self.executor._processes)
207 except (AttributeError, TypeError):
208 # Depending on shutdown timing and internal behavior, this might not work
209 num_proccesses = 0
210
211 jobs_requested = JobRequest.objects.filter(queue__in=self.queues).count()
212 jobs_processing = Job.objects.filter(queue__in=self.queues).count()
213
214 logger.info(
215 'Job worker stats worker_processes=%s worker_queues="%s" jobs_requested=%s jobs_processing=%s worker_max_processes=%s worker_max_jobs_per_process=%s',
216 num_proccesses,
217 ",".join(self.queues),
218 jobs_requested,
219 jobs_processing,
220 self.max_processes,
221 self.max_jobs_per_process,
222 )
223
224 def rescue_job_results(self):
225 """Find any lost or failed jobs on this worker's queues and handle them."""
226 # TODO return results and log them if there are any?
227 Job.objects.filter(queue__in=self.queues).mark_lost_jobs()
228 JobResult.objects.filter(queue__in=self.queues).retry_failed_jobs()
229
230
231def future_finished_callback(job_uuid: str, future: Future):
232 if future.cancelled():
233 logger.warning("Job cancelled job_uuid=%s", job_uuid)
234 job = Job.objects.get(uuid=job_uuid)
235 job.convert_to_result(status=JobResultStatuses.CANCELLED)
236 else:
237 logger.debug("Job finished job_uuid=%s", job_uuid)
238
239
240def process_job(job_uuid):
241 try:
242 worker_pid = os.getpid()
243
244 request_started.send(sender=None)
245
246 job = Job.objects.get(uuid=job_uuid)
247
248 logger.info(
249 'Executing job worker_pid=%s job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queue="%s"',
250 worker_pid,
251 job.job_class,
252 job.job_request_uuid,
253 job.priority,
254 job.source,
255 job.queue,
256 )
257
258 def middleware_chain(job):
259 return job.run()
260
261 for middleware_path in reversed(settings.WORKER_MIDDLEWARE):
262 middleware_class = import_string(middleware_path)
263 middleware_instance = middleware_class(middleware_chain)
264 middleware_chain = middleware_instance
265
266 job_result = middleware_chain(job)
267
268 # Release it now
269 del job
270
271 duration = job_result.ended_at - job_result.started_at
272 duration = duration.total_seconds()
273
274 logger.info(
275 'Completed job worker_pid=%s job_class=%s job_uuid=%s job_request_uuid=%s job_result_uuid=%s job_priority=%s job_source="%s" job_queue="%s" job_duration=%s',
276 worker_pid,
277 job_result.job_class,
278 job_result.job_uuid,
279 job_result.job_request_uuid,
280 job_result.uuid,
281 job_result.priority,
282 job_result.source,
283 job_result.queue,
284 duration,
285 )
286
287 del job_result
288 except Exception as e:
289 # Raising exceptions inside the worker process doesn't
290 # seem to be caught/shown anywhere as configured.
291 # So we at least log it out here.
292 # (A job should catch it's own user-code errors, so this is for library errors)
293 logger.exception(e)
294 finally:
295 request_finished.send(sender=None)
296 gc.collect()