Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import gc
  4import logging
  5import multiprocessing
  6import os
  7import time
  8from concurrent.futures import Future, ProcessPoolExecutor
  9from functools import partial
 10from typing import Any
 11
 12from plain import models
 13from plain.models import transaction
 14from plain.runtime import settings
 15from plain.signals import request_finished, request_started
 16from plain.utils import timezone
 17from plain.utils.module_loading import import_string
 18
 19from .models import JobProcess, JobRequest, JobResult, JobResultStatuses
 20from .registry import jobs_registry
 21
 22logger = logging.getLogger("plain.jobs")
 23
 24
 25class Worker:
 26    def __init__(
 27        self,
 28        queues: list[str],
 29        jobs_schedule: list[Any] | None = None,
 30        max_processes: int | None = None,
 31        max_jobs_per_process: int | None = None,
 32        max_pending_per_process: int = 10,
 33        stats_every: int | None = None,
 34    ) -> None:
 35        if jobs_schedule is None:
 36            jobs_schedule = []
 37
 38        self.executor = ProcessPoolExecutor(
 39            max_workers=max_processes,
 40            max_tasks_per_child=max_jobs_per_process,
 41            mp_context=multiprocessing.get_context("spawn"),
 42        )
 43
 44        self.queues = queues
 45
 46        # Filter the jobs schedule to those that are in the same queue as this worker
 47        self.jobs_schedule = [x for x in jobs_schedule if x[0].get_queue() in queues]
 48
 49        # How often to log the stats (in seconds)
 50        self.stats_every = stats_every
 51
 52        self.max_processes = self.executor._max_workers
 53        self.max_jobs_per_process = max_jobs_per_process
 54        self.max_pending_per_process = max_pending_per_process
 55
 56        self._is_shutting_down = False
 57
 58    def run(self) -> None:
 59        logger.info(
 60            "⬣ Starting Plain worker\n    Registered jobs: %s\n    Queues: %s\n    Jobs schedule: %s\n    Stats every: %s seconds\n    Max processes: %s\n    Max jobs per process: %s\n    Max pending per process: %s\n    PID: %s",
 61            "\n                     ".join(
 62                f"{name}: {cls}" for name, cls in jobs_registry.jobs.items()
 63            ),
 64            ", ".join(self.queues),
 65            "\n                   ".join(str(x) for x in self.jobs_schedule),
 66            self.stats_every,
 67            self.max_processes,
 68            self.max_jobs_per_process,
 69            self.max_pending_per_process,
 70            os.getpid(),
 71        )
 72
 73        while not self._is_shutting_down:
 74            try:
 75                self.maybe_log_stats()
 76                self.maybe_check_job_results()
 77                self.maybe_schedule_jobs()
 78            except Exception as e:
 79                # Log the issue, but don't stop the worker
 80                # (these tasks are kind of ancilarry to the main job processing)
 81                logger.exception(e)
 82
 83            if len(self.executor._pending_work_items) >= (
 84                self.max_processes * self.max_pending_per_process
 85            ):
 86                # We don't want to convert too many JobRequests to Jobs,
 87                # because anything not started yet will be cancelled on deploy etc.
 88                # It's easier to leave them in the JobRequest db queue as long as possible.
 89                time.sleep(0.1)
 90                continue
 91
 92            with transaction.atomic():
 93                job_request = (
 94                    JobRequest.query.select_for_update(skip_locked=True)
 95                    .filter(
 96                        queue__in=self.queues,
 97                    )
 98                    .filter(
 99                        models.Q(start_at__isnull=True)
100                        | models.Q(start_at__lte=timezone.now())
101                    )
102                    .order_by("priority", "-start_at", "-created_at")
103                    .first()
104                )
105                if not job_request:
106                    # Potentially no jobs to process (who knows for how long)
107                    # but sleep for a second to give the CPU and DB a break
108                    time.sleep(1)
109                    continue
110
111                logger.info(
112                    'Preparing to execute job job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queues="%s"',
113                    job_request.job_class,
114                    job_request.uuid,
115                    job_request.priority,
116                    job_request.source,
117                    job_request.queue,
118                )
119
120                job = job_request.convert_to_job_process()
121
122            job_process_uuid = str(job.uuid)  # Make a str copy
123
124            # Release these now
125            del job_request
126            del job
127
128            future = self.executor.submit(process_job, job_process_uuid)
129            future.add_done_callback(
130                partial(future_finished_callback, job_process_uuid)
131            )
132
133            # Do a quick sleep regardless to see if it
134            # gives processes a chance to start up
135            time.sleep(0.1)
136
137    def shutdown(self) -> None:
138        if self._is_shutting_down:
139            # Already shutting down somewhere else
140            return
141
142        logger.info("Job worker shutdown started")
143        self._is_shutting_down = True
144        self.executor.shutdown(wait=True, cancel_futures=True)
145        logger.info("Job worker shutdown complete")
146
147    def maybe_log_stats(self) -> None:
148        if not self.stats_every:
149            return
150
151        now = time.time()
152
153        if not hasattr(self, "_stats_logged_at"):
154            self._stats_logged_at = now
155
156        if now - self._stats_logged_at > self.stats_every:
157            self._stats_logged_at = now
158            self.log_stats()
159
160    def maybe_check_job_results(self) -> None:
161        now = time.time()
162
163        if not hasattr(self, "_job_results_checked_at"):
164            self._job_results_checked_at = now
165
166        check_every = 60  # Only need to check once a minute
167
168        if now - self._job_results_checked_at > check_every:
169            self._job_results_checked_at = now
170            self.rescue_job_results()
171
172    def maybe_schedule_jobs(self) -> None:
173        if not self.jobs_schedule:
174            return
175
176        now = time.time()
177
178        if not hasattr(self, "_jobs_schedule_checked_at"):
179            self._jobs_schedule_checked_at = now
180
181        check_every = 60  # Only need to check once every 60 seconds
182
183        if now - self._jobs_schedule_checked_at > check_every:
184            for job, schedule in self.jobs_schedule:
185                next_start_at = schedule.next()
186
187                # Leverage the unique_key to prevent duplicate scheduled
188                # jobs with the same start time (also works if unique_key == "")
189                schedule_unique_key = (
190                    f"{job.get_unique_key()}:scheduled:{int(next_start_at.timestamp())}"
191                )
192
193                # Drawback here is if scheduled job is running, and detected by unique_key
194                # so it doesn't schedule the next one? Maybe an ok downside... prevents
195                # overlapping executions...?
196                result = job.run_in_worker(
197                    delay=next_start_at,
198                    unique_key=schedule_unique_key,
199                )
200                # Results are a list if it found scheduled/running jobs...
201                if not isinstance(result, list):
202                    logger.info(
203                        'Scheduling job job_class=%s job_queue="%s" job_start_at="%s" job_schedule="%s" job_unique_key="%s"',
204                        result.job_class,
205                        result.queue,
206                        result.start_at,
207                        schedule,
208                        result.unique_key,
209                    )
210
211            self._jobs_schedule_checked_at = now
212
213    def log_stats(self) -> None:
214        try:
215            num_proccesses = len(self.executor._processes)
216        except (AttributeError, TypeError):
217            # Depending on shutdown timing and internal behavior, this might not work
218            num_proccesses = 0
219
220        jobs_requested = JobRequest.query.filter(queue__in=self.queues).count()
221        jobs_processing = JobProcess.query.filter(queue__in=self.queues).count()
222
223        logger.info(
224            'Job worker stats worker_processes=%s worker_queues="%s" jobs_requested=%s jobs_processing=%s worker_max_processes=%s worker_max_jobs_per_process=%s',
225            num_proccesses,
226            ",".join(self.queues),
227            jobs_requested,
228            jobs_processing,
229            self.max_processes,
230            self.max_jobs_per_process,
231        )
232
233    def rescue_job_results(self) -> None:
234        """Find any lost or failed jobs on this worker's queues and handle them."""
235        # TODO return results and log them if there are any?
236        JobProcess.query.filter(queue__in=self.queues).mark_lost_jobs()
237        JobResult.query.filter(queue__in=self.queues).retry_failed_jobs()
238
239
240def future_finished_callback(job_process_uuid: str, future: Future) -> None:
241    if future.cancelled():
242        logger.warning("Job cancelled job_process_uuid=%s", job_process_uuid)
243        try:
244            job = JobProcess.query.get(uuid=job_process_uuid)
245            job.convert_to_result(status=JobResultStatuses.CANCELLED)
246        except JobProcess.DoesNotExist:
247            # Job may have already been cleaned up
248            pass
249    elif exception := future.exception():
250        # Process pool may have been killed...
251        logger.warning(
252            "Job failed job_process_uuid=%s",
253            job_process_uuid,
254            exc_info=exception,
255        )
256        try:
257            job = JobProcess.query.get(uuid=job_process_uuid)
258            job.convert_to_result(status=JobResultStatuses.CANCELLED)
259        except JobProcess.DoesNotExist:
260            # Job may have already been cleaned up
261            pass
262    else:
263        logger.debug("Job finished job_process_uuid=%s", job_process_uuid)
264
265
266def process_job(job_process_uuid: str) -> None:
267    try:
268        worker_pid = os.getpid()
269
270        request_started.send(sender=None)
271
272        job_process = JobProcess.query.get(uuid=job_process_uuid)
273
274        logger.info(
275            'Executing job worker_pid=%s job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queue="%s"',
276            worker_pid,
277            job_process.job_class,
278            job_process.job_request_uuid,
279            job_process.priority,
280            job_process.source,
281            job_process.queue,
282        )
283
284        def middleware_chain(job: JobProcess) -> JobResult:
285            return job.run()
286
287        for middleware_path in reversed(settings.JOBS_MIDDLEWARE):
288            middleware_class = import_string(middleware_path)
289            middleware_instance = middleware_class(middleware_chain)
290            middleware_chain = middleware_instance
291
292        job_result = middleware_chain(job_process)
293
294        # Release it now
295        del job_process
296
297        duration = job_result.ended_at - job_result.started_at  # type: ignore[unsupported-operator]
298        duration = duration.total_seconds()
299
300        logger.info(
301            'Completed job worker_pid=%s job_class=%s job_process_uuid=%s job_request_uuid=%s job_result_uuid=%s job_priority=%s job_source="%s" job_queue="%s" job_duration=%s',
302            worker_pid,
303            job_result.job_class,
304            job_result.job_process_uuid,
305            job_result.job_request_uuid,
306            job_result.uuid,
307            job_result.priority,
308            job_result.source,
309            job_result.queue,
310            duration,
311        )
312
313        del job_result
314    except Exception as e:
315        # Raising exceptions inside the worker process doesn't
316        # seem to be caught/shown anywhere as configured.
317        # So we at least log it out here.
318        # (A job should catch it's own user-code errors, so this is for library errors)
319        logger.exception(e)
320    finally:
321        request_finished.send(sender=None)
322        gc.collect()