Worker
Process background jobs with a database-driven worker.
from plain.worker import Job
from plain.mail import send_mail
# Create a new job class
class WelcomeUserJob(Job):
def __init__(self, user):
self.user = user
def run(self):
send_mail(
subject="Welcome!",
message=f"Hello from Plain, {self.user}",
from_email="[email protected]",
recipient_list=[self.user.email],
)
# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()
The worker process is run separately using plain worker run
.
Admin
Job history
Scheduled jobs
Monitoring
1import gc
2import logging
3import multiprocessing
4import os
5import time
6from concurrent.futures import Future, ProcessPoolExecutor
7from functools import partial
8
9from plain import models
10from plain.models import transaction
11from plain.runtime import settings
12from plain.signals import request_finished, request_started
13from plain.utils import timezone
14from plain.utils.module_loading import import_string
15
16from .models import Job, JobRequest, JobResult, JobResultStatuses
17from .registry import jobs_registry
18
19logger = logging.getLogger("plain.worker")
20
21
22class Worker:
23 def __init__(
24 self,
25 queues,
26 jobs_schedule=None,
27 max_processes=None,
28 max_jobs_per_process=None,
29 max_pending_per_process=10,
30 stats_every=None,
31 ):
32 if jobs_schedule is None:
33 jobs_schedule = []
34
35 self.executor = ProcessPoolExecutor(
36 max_workers=max_processes,
37 max_tasks_per_child=max_jobs_per_process,
38 mp_context=multiprocessing.get_context("spawn"),
39 )
40
41 self.queues = queues
42
43 # Filter the jobs schedule to those that are in the same queue as this worker
44 self.jobs_schedule = [x for x in jobs_schedule if x[0].get_queue() in queues]
45
46 # How often to log the stats (in seconds)
47 self.stats_every = stats_every
48
49 self.max_processes = self.executor._max_workers
50 self.max_jobs_per_process = max_jobs_per_process
51 self.max_pending_per_process = max_pending_per_process
52
53 self._is_shutting_down = False
54
55 def run(self):
56 logger.info(
57 "⬣ Starting Plain worker\n Registered jobs: %s\n Queues: %s\n Jobs schedule: %s\n Stats every: %s seconds\n Max processes: %s\n Max jobs per process: %s\n Max pending per process: %s\n PID: %s",
58 "\n ".join(
59 f"{name}: {cls}" for name, cls in jobs_registry.jobs.items()
60 ),
61 ", ".join(self.queues),
62 "\n ".join(str(x) for x in self.jobs_schedule),
63 self.stats_every,
64 self.max_processes,
65 self.max_jobs_per_process,
66 self.max_pending_per_process,
67 os.getpid(),
68 )
69
70 while not self._is_shutting_down:
71 try:
72 self.maybe_log_stats()
73 self.maybe_check_job_results()
74 self.maybe_schedule_jobs()
75 except Exception as e:
76 # Log the issue, but don't stop the worker
77 # (these tasks are kind of ancilarry to the main job processing)
78 logger.exception(e)
79
80 if len(self.executor._pending_work_items) >= (
81 self.max_processes * self.max_pending_per_process
82 ):
83 # We don't want to convert too many JobRequests to Jobs,
84 # because anything not started yet will be cancelled on deploy etc.
85 # It's easier to leave them in the JobRequest db queue as long as possible.
86 time.sleep(0.1)
87 continue
88
89 with transaction.atomic():
90 job_request = (
91 JobRequest.objects.select_for_update(skip_locked=True)
92 .filter(
93 queue__in=self.queues,
94 )
95 .filter(
96 models.Q(start_at__isnull=True)
97 | models.Q(start_at__lte=timezone.now())
98 )
99 .order_by("priority", "-start_at", "-created_at")
100 .first()
101 )
102 if not job_request:
103 # Potentially no jobs to process (who knows for how long)
104 # but sleep for a second to give the CPU and DB a break
105 time.sleep(1)
106 continue
107
108 logger.info(
109 'Preparing to execute job job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queues="%s"',
110 job_request.job_class,
111 job_request.uuid,
112 job_request.priority,
113 job_request.source,
114 job_request.queue,
115 )
116
117 job = job_request.convert_to_job()
118
119 job_uuid = str(job.uuid) # Make a str copy
120
121 # Release these now
122 del job_request
123 del job
124
125 future = self.executor.submit(process_job, job_uuid)
126 future.add_done_callback(partial(future_finished_callback, job_uuid))
127
128 # Do a quick sleep regardless to see if it
129 # gives processes a chance to start up
130 time.sleep(0.1)
131
132 def shutdown(self):
133 if self._is_shutting_down:
134 # Already shutting down somewhere else
135 return
136
137 logger.info("Job worker shutdown started")
138 self._is_shutting_down = True
139 self.executor.shutdown(wait=True, cancel_futures=True)
140 logger.info("Job worker shutdown complete")
141
142 def maybe_log_stats(self):
143 if not self.stats_every:
144 return
145
146 now = time.time()
147
148 if not hasattr(self, "_stats_logged_at"):
149 self._stats_logged_at = now
150
151 if now - self._stats_logged_at > self.stats_every:
152 self._stats_logged_at = now
153 self.log_stats()
154
155 def maybe_check_job_results(self):
156 now = time.time()
157
158 if not hasattr(self, "_job_results_checked_at"):
159 self._job_results_checked_at = now
160
161 check_every = 60 # Only need to check once a minute
162
163 if now - self._job_results_checked_at > check_every:
164 self._job_results_checked_at = now
165 self.rescue_job_results()
166
167 def maybe_schedule_jobs(self):
168 if not self.jobs_schedule:
169 return
170
171 now = time.time()
172
173 if not hasattr(self, "_jobs_schedule_checked_at"):
174 self._jobs_schedule_checked_at = now
175
176 check_every = 60 # Only need to check once every 60 seconds
177
178 if now - self._jobs_schedule_checked_at > check_every:
179 for job, schedule in self.jobs_schedule:
180 next_start_at = schedule.next()
181
182 # Leverage the unique_key to prevent duplicate scheduled
183 # jobs with the same start time (also works if unique_key == "")
184 schedule_unique_key = (
185 f"{job.get_unique_key()}:scheduled:{int(next_start_at.timestamp())}"
186 )
187
188 # Drawback here is if scheduled job is running, and detected by unique_key
189 # so it doesn't schedule the next one? Maybe an ok downside... prevents
190 # overlapping executions...?
191 result = job.run_in_worker(
192 delay=next_start_at,
193 unique_key=schedule_unique_key,
194 )
195 # Results are a list if it found scheduled/running jobs...
196 if not isinstance(result, list):
197 logger.info(
198 'Scheduling job job_class=%s job_queue="%s" job_start_at="%s" job_schedule="%s" job_unique_key="%s"',
199 result.job_class,
200 result.queue,
201 result.start_at,
202 schedule,
203 result.unique_key,
204 )
205
206 self._jobs_schedule_checked_at = now
207
208 def log_stats(self):
209 try:
210 num_proccesses = len(self.executor._processes)
211 except (AttributeError, TypeError):
212 # Depending on shutdown timing and internal behavior, this might not work
213 num_proccesses = 0
214
215 jobs_requested = JobRequest.objects.filter(queue__in=self.queues).count()
216 jobs_processing = Job.objects.filter(queue__in=self.queues).count()
217
218 logger.info(
219 'Job worker stats worker_processes=%s worker_queues="%s" jobs_requested=%s jobs_processing=%s worker_max_processes=%s worker_max_jobs_per_process=%s',
220 num_proccesses,
221 ",".join(self.queues),
222 jobs_requested,
223 jobs_processing,
224 self.max_processes,
225 self.max_jobs_per_process,
226 )
227
228 def rescue_job_results(self):
229 """Find any lost or failed jobs on this worker's queues and handle them."""
230 # TODO return results and log them if there are any?
231 Job.objects.filter(queue__in=self.queues).mark_lost_jobs()
232 JobResult.objects.filter(queue__in=self.queues).retry_failed_jobs()
233
234
235def future_finished_callback(job_uuid: str, future: Future):
236 if future.cancelled():
237 logger.warning("Job cancelled job_uuid=%s", job_uuid)
238 job = Job.objects.get(uuid=job_uuid)
239 job.convert_to_result(status=JobResultStatuses.CANCELLED)
240 else:
241 logger.debug("Job finished job_uuid=%s", job_uuid)
242
243
244def process_job(job_uuid):
245 try:
246 worker_pid = os.getpid()
247
248 request_started.send(sender=None)
249
250 job = Job.objects.get(uuid=job_uuid)
251
252 logger.info(
253 'Executing job worker_pid=%s job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queue="%s"',
254 worker_pid,
255 job.job_class,
256 job.job_request_uuid,
257 job.priority,
258 job.source,
259 job.queue,
260 )
261
262 def middleware_chain(job):
263 return job.run()
264
265 for middleware_path in reversed(settings.WORKER_MIDDLEWARE):
266 middleware_class = import_string(middleware_path)
267 middleware_instance = middleware_class(middleware_chain)
268 middleware_chain = middleware_instance
269
270 job_result = middleware_chain(job)
271
272 # Release it now
273 del job
274
275 duration = job_result.ended_at - job_result.started_at
276 duration = duration.total_seconds()
277
278 logger.info(
279 'Completed job worker_pid=%s job_class=%s job_uuid=%s job_request_uuid=%s job_result_uuid=%s job_priority=%s job_source="%s" job_queue="%s" job_duration=%s',
280 worker_pid,
281 job_result.job_class,
282 job_result.job_uuid,
283 job_result.job_request_uuid,
284 job_result.uuid,
285 job_result.priority,
286 job_result.source,
287 job_result.queue,
288 duration,
289 )
290
291 del job_result
292 except Exception as e:
293 # Raising exceptions inside the worker process doesn't
294 # seem to be caught/shown anywhere as configured.
295 # So we at least log it out here.
296 # (A job should catch it's own user-code errors, so this is for library errors)
297 logger.exception(e)
298 finally:
299 request_finished.send(sender=None)
300 gc.collect()