1from __future__ import annotations
2
3import gc
4import logging
5import multiprocessing
6import os
7import time
8from concurrent.futures import Future, ProcessPoolExecutor
9from functools import partial
10from typing import Any
11
12from plain import models
13from plain.models import transaction
14from plain.runtime import settings
15from plain.signals import request_finished, request_started
16from plain.utils import timezone
17from plain.utils.module_loading import import_string
18
19from .models import JobProcess, JobRequest, JobResult, JobResultStatuses
20from .registry import jobs_registry
21
22logger = logging.getLogger("plain.jobs")
23
24
25class Worker:
26 def __init__(
27 self,
28 queues: list[str],
29 jobs_schedule: list[Any] | None = None,
30 max_processes: int | None = None,
31 max_jobs_per_process: int | None = None,
32 max_pending_per_process: int = 10,
33 stats_every: int | None = None,
34 ) -> None:
35 if jobs_schedule is None:
36 jobs_schedule = []
37
38 self.executor = ProcessPoolExecutor(
39 max_workers=max_processes,
40 max_tasks_per_child=max_jobs_per_process,
41 mp_context=multiprocessing.get_context("spawn"),
42 )
43
44 self.queues = queues
45
46 # Filter the jobs schedule to those that are in the same queue as this worker
47 self.jobs_schedule = [x for x in jobs_schedule if x[0].get_queue() in queues]
48
49 # How often to log the stats (in seconds)
50 self.stats_every = stats_every
51
52 self.max_processes = self.executor._max_workers
53 self.max_jobs_per_process = max_jobs_per_process
54 self.max_pending_per_process = max_pending_per_process
55
56 self._is_shutting_down = False
57
58 def run(self) -> None:
59 logger.info(
60 "⬣ Starting Plain worker\n Registered jobs: %s\n Queues: %s\n Jobs schedule: %s\n Stats every: %s seconds\n Max processes: %s\n Max jobs per process: %s\n Max pending per process: %s\n PID: %s",
61 "\n ".join(
62 f"{name}: {cls}" for name, cls in jobs_registry.jobs.items()
63 ),
64 ", ".join(self.queues),
65 "\n ".join(str(x) for x in self.jobs_schedule),
66 self.stats_every,
67 self.max_processes,
68 self.max_jobs_per_process,
69 self.max_pending_per_process,
70 os.getpid(),
71 )
72
73 while not self._is_shutting_down:
74 try:
75 self.maybe_log_stats()
76 self.maybe_check_job_results()
77 self.maybe_schedule_jobs()
78 except Exception as e:
79 # Log the issue, but don't stop the worker
80 # (these tasks are kind of ancilarry to the main job processing)
81 logger.exception(e)
82
83 if len(self.executor._pending_work_items) >= (
84 self.max_processes * self.max_pending_per_process
85 ):
86 # We don't want to convert too many JobRequests to Jobs,
87 # because anything not started yet will be cancelled on deploy etc.
88 # It's easier to leave them in the JobRequest db queue as long as possible.
89 time.sleep(0.1)
90 continue
91
92 with transaction.atomic():
93 job_request = (
94 JobRequest.query.select_for_update(skip_locked=True)
95 .filter(
96 queue__in=self.queues,
97 )
98 .filter(
99 models.Q(start_at__isnull=True)
100 | models.Q(start_at__lte=timezone.now())
101 )
102 .order_by("priority", "-start_at", "-created_at")
103 .first()
104 )
105 if not job_request:
106 # Potentially no jobs to process (who knows for how long)
107 # but sleep for a second to give the CPU and DB a break
108 time.sleep(1)
109 continue
110
111 logger.info(
112 'Preparing to execute job job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queues="%s"',
113 job_request.job_class,
114 job_request.uuid,
115 job_request.priority,
116 job_request.source,
117 job_request.queue,
118 )
119
120 job = job_request.convert_to_job_process()
121
122 job_process_uuid = str(job.uuid) # Make a str copy
123
124 # Release these now
125 del job_request
126 del job
127
128 future = self.executor.submit(process_job, job_process_uuid)
129 future.add_done_callback(
130 partial(future_finished_callback, job_process_uuid)
131 )
132
133 # Do a quick sleep regardless to see if it
134 # gives processes a chance to start up
135 time.sleep(0.1)
136
137 def shutdown(self) -> None:
138 if self._is_shutting_down:
139 # Already shutting down somewhere else
140 return
141
142 logger.info("Job worker shutdown started")
143 self._is_shutting_down = True
144 self.executor.shutdown(wait=True, cancel_futures=True)
145 logger.info("Job worker shutdown complete")
146
147 def maybe_log_stats(self) -> None:
148 if not self.stats_every:
149 return
150
151 now = time.time()
152
153 if not hasattr(self, "_stats_logged_at"):
154 self._stats_logged_at = now
155
156 if now - self._stats_logged_at > self.stats_every:
157 self._stats_logged_at = now
158 self.log_stats()
159
160 def maybe_check_job_results(self) -> None:
161 now = time.time()
162
163 if not hasattr(self, "_job_results_checked_at"):
164 self._job_results_checked_at = now
165
166 check_every = 60 # Only need to check once a minute
167
168 if now - self._job_results_checked_at > check_every:
169 self._job_results_checked_at = now
170 self.rescue_job_results()
171
172 def maybe_schedule_jobs(self) -> None:
173 if not self.jobs_schedule:
174 return
175
176 now = time.time()
177
178 if not hasattr(self, "_jobs_schedule_checked_at"):
179 self._jobs_schedule_checked_at = now
180
181 check_every = 60 # Only need to check once every 60 seconds
182
183 if now - self._jobs_schedule_checked_at > check_every:
184 for job, schedule in self.jobs_schedule:
185 next_start_at = schedule.next()
186
187 # Leverage the unique_key to prevent duplicate scheduled
188 # jobs with the same start time (also works if unique_key == "")
189 schedule_unique_key = (
190 f"{job.get_unique_key()}:scheduled:{int(next_start_at.timestamp())}"
191 )
192
193 # Drawback here is if scheduled job is running, and detected by unique_key
194 # so it doesn't schedule the next one? Maybe an ok downside... prevents
195 # overlapping executions...?
196 result = job.run_in_worker(
197 delay=next_start_at,
198 unique_key=schedule_unique_key,
199 )
200 # Results are a list if it found scheduled/running jobs...
201 if not isinstance(result, list):
202 logger.info(
203 'Scheduling job job_class=%s job_queue="%s" job_start_at="%s" job_schedule="%s" job_unique_key="%s"',
204 result.job_class,
205 result.queue,
206 result.start_at,
207 schedule,
208 result.unique_key,
209 )
210
211 self._jobs_schedule_checked_at = now
212
213 def log_stats(self) -> None:
214 try:
215 num_proccesses = len(self.executor._processes)
216 except (AttributeError, TypeError):
217 # Depending on shutdown timing and internal behavior, this might not work
218 num_proccesses = 0
219
220 jobs_requested = JobRequest.query.filter(queue__in=self.queues).count()
221 jobs_processing = JobProcess.query.filter(queue__in=self.queues).count()
222
223 logger.info(
224 'Job worker stats worker_processes=%s worker_queues="%s" jobs_requested=%s jobs_processing=%s worker_max_processes=%s worker_max_jobs_per_process=%s',
225 num_proccesses,
226 ",".join(self.queues),
227 jobs_requested,
228 jobs_processing,
229 self.max_processes,
230 self.max_jobs_per_process,
231 )
232
233 def rescue_job_results(self) -> None:
234 """Find any lost or failed jobs on this worker's queues and handle them."""
235 # TODO return results and log them if there are any?
236 JobProcess.query.filter(queue__in=self.queues).mark_lost_jobs()
237 JobResult.query.filter(queue__in=self.queues).retry_failed_jobs()
238
239
240def future_finished_callback(job_process_uuid: str, future: Future) -> None:
241 if future.cancelled():
242 logger.warning("Job cancelled job_process_uuid=%s", job_process_uuid)
243 try:
244 job = JobProcess.query.get(uuid=job_process_uuid)
245 job.convert_to_result(status=JobResultStatuses.CANCELLED)
246 except JobProcess.DoesNotExist:
247 # Job may have already been cleaned up
248 pass
249 elif exception := future.exception():
250 # Process pool may have been killed...
251 logger.warning(
252 "Job failed job_process_uuid=%s",
253 job_process_uuid,
254 exc_info=exception,
255 )
256 try:
257 job = JobProcess.query.get(uuid=job_process_uuid)
258 job.convert_to_result(status=JobResultStatuses.CANCELLED)
259 except JobProcess.DoesNotExist:
260 # Job may have already been cleaned up
261 pass
262 else:
263 logger.debug("Job finished job_process_uuid=%s", job_process_uuid)
264
265
266def process_job(job_process_uuid: str) -> None:
267 try:
268 worker_pid = os.getpid()
269
270 request_started.send(sender=None)
271
272 job_process = JobProcess.query.get(uuid=job_process_uuid)
273
274 logger.info(
275 'Executing job worker_pid=%s job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queue="%s"',
276 worker_pid,
277 job_process.job_class,
278 job_process.job_request_uuid,
279 job_process.priority,
280 job_process.source,
281 job_process.queue,
282 )
283
284 def middleware_chain(job: JobProcess) -> JobResult:
285 return job.run()
286
287 for middleware_path in reversed(settings.JOBS_MIDDLEWARE):
288 middleware_class = import_string(middleware_path)
289 middleware_instance = middleware_class(middleware_chain)
290 middleware_chain = middleware_instance
291
292 job_result = middleware_chain(job_process)
293
294 # Release it now
295 del job_process
296
297 duration = job_result.ended_at - job_result.started_at # type: ignore[unsupported-operator]
298 duration = duration.total_seconds()
299
300 logger.info(
301 'Completed job worker_pid=%s job_class=%s job_process_uuid=%s job_request_uuid=%s job_result_uuid=%s job_priority=%s job_source="%s" job_queue="%s" job_duration=%s',
302 worker_pid,
303 job_result.job_class,
304 job_result.job_process_uuid,
305 job_result.job_request_uuid,
306 job_result.uuid,
307 job_result.priority,
308 job_result.source,
309 job_result.queue,
310 duration,
311 )
312
313 del job_result
314 except Exception as e:
315 # Raising exceptions inside the worker process doesn't
316 # seem to be caught/shown anywhere as configured.
317 # So we at least log it out here.
318 # (A job should catch it's own user-code errors, so this is for library errors)
319 logger.exception(e)
320 finally:
321 request_finished.send(sender=None)
322 gc.collect()