Plain is headed towards 1.0! Subscribe for development updates →

  1import gc
  2import logging
  3import multiprocessing
  4import os
  5import time
  6from concurrent.futures import Future, ProcessPoolExecutor
  7from functools import partial
  8
  9from plain import models
 10from plain.models import transaction
 11from plain.runtime import settings
 12from plain.signals import request_finished, request_started
 13from plain.utils import timezone
 14from plain.utils.module_loading import import_string
 15
 16from .models import Job, JobRequest, JobResult, JobResultStatuses
 17from .registry import jobs_registry
 18
 19logger = logging.getLogger("plain.worker")
 20
 21
 22class Worker:
 23    def __init__(
 24        self,
 25        queues,
 26        jobs_schedule=None,
 27        max_processes=None,
 28        max_jobs_per_process=None,
 29        max_pending_per_process=10,
 30        stats_every=None,
 31    ):
 32        if jobs_schedule is None:
 33            jobs_schedule = []
 34
 35        self.executor = ProcessPoolExecutor(
 36            max_workers=max_processes,
 37            max_tasks_per_child=max_jobs_per_process,
 38            mp_context=multiprocessing.get_context("spawn"),
 39        )
 40
 41        self.queues = queues
 42
 43        # Filter the jobs schedule to those that are in the same queue as this worker
 44        self.jobs_schedule = [x for x in jobs_schedule if x[0].get_queue() in queues]
 45
 46        # How often to log the stats (in seconds)
 47        self.stats_every = stats_every
 48
 49        self.max_processes = self.executor._max_workers
 50        self.max_jobs_per_process = max_jobs_per_process
 51        self.max_pending_per_process = max_pending_per_process
 52
 53        self._is_shutting_down = False
 54
 55    def run(self):
 56        logger.info(
 57            "⬣ Starting Plain worker\n    Registered jobs: %s\n    Queues: %s\n    Jobs schedule: %s\n    Stats every: %s seconds\n    Max processes: %s\n    Max jobs per process: %s\n    Max pending per process: %s\n    PID: %s",
 58            "\n                     ".join(
 59                f"{name}: {cls}" for name, cls in jobs_registry.jobs.items()
 60            ),
 61            ", ".join(self.queues),
 62            "\n                   ".join(str(x) for x in self.jobs_schedule),
 63            self.stats_every,
 64            self.max_processes,
 65            self.max_jobs_per_process,
 66            self.max_pending_per_process,
 67            os.getpid(),
 68        )
 69
 70        while not self._is_shutting_down:
 71            try:
 72                self.maybe_log_stats()
 73                self.maybe_check_job_results()
 74                self.maybe_schedule_jobs()
 75            except Exception as e:
 76                # Log the issue, but don't stop the worker
 77                # (these tasks are kind of ancilarry to the main job processing)
 78                logger.exception(e)
 79
 80            if len(self.executor._pending_work_items) >= (
 81                self.max_processes * self.max_pending_per_process
 82            ):
 83                # We don't want to convert too many JobRequests to Jobs,
 84                # because anything not started yet will be cancelled on deploy etc.
 85                # It's easier to leave them in the JobRequest db queue as long as possible.
 86                time.sleep(0.1)
 87                continue
 88
 89            with transaction.atomic():
 90                job_request = (
 91                    JobRequest.objects.select_for_update(skip_locked=True)
 92                    .filter(
 93                        queue__in=self.queues,
 94                    )
 95                    .filter(
 96                        models.Q(start_at__isnull=True)
 97                        | models.Q(start_at__lte=timezone.now())
 98                    )
 99                    .order_by("priority", "-start_at", "-created_at")
100                    .first()
101                )
102                if not job_request:
103                    # Potentially no jobs to process (who knows for how long)
104                    # but sleep for a second to give the CPU and DB a break
105                    time.sleep(1)
106                    continue
107
108                logger.info(
109                    'Preparing to execute job job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queues="%s"',
110                    job_request.job_class,
111                    job_request.uuid,
112                    job_request.priority,
113                    job_request.source,
114                    job_request.queue,
115                )
116
117                job = job_request.convert_to_job()
118
119            job_uuid = str(job.uuid)  # Make a str copy
120
121            # Release these now
122            del job_request
123            del job
124
125            future = self.executor.submit(process_job, job_uuid)
126            future.add_done_callback(partial(future_finished_callback, job_uuid))
127
128            # Do a quick sleep regardless to see if it
129            # gives processes a chance to start up
130            time.sleep(0.1)
131
132    def shutdown(self):
133        if self._is_shutting_down:
134            # Already shutting down somewhere else
135            return
136
137        logger.info("Job worker shutdown started")
138        self._is_shutting_down = True
139        self.executor.shutdown(wait=True, cancel_futures=True)
140        logger.info("Job worker shutdown complete")
141
142    def maybe_log_stats(self):
143        if not self.stats_every:
144            return
145
146        now = time.time()
147
148        if not hasattr(self, "_stats_logged_at"):
149            self._stats_logged_at = now
150
151        if now - self._stats_logged_at > self.stats_every:
152            self._stats_logged_at = now
153            self.log_stats()
154
155    def maybe_check_job_results(self):
156        now = time.time()
157
158        if not hasattr(self, "_job_results_checked_at"):
159            self._job_results_checked_at = now
160
161        check_every = 60  # Only need to check once a minute
162
163        if now - self._job_results_checked_at > check_every:
164            self._job_results_checked_at = now
165            self.rescue_job_results()
166
167    def maybe_schedule_jobs(self):
168        if not self.jobs_schedule:
169            return
170
171        now = time.time()
172
173        if not hasattr(self, "_jobs_schedule_checked_at"):
174            self._jobs_schedule_checked_at = now
175
176        check_every = 60  # Only need to check once every 60 seconds
177
178        if now - self._jobs_schedule_checked_at > check_every:
179            for job, schedule in self.jobs_schedule:
180                next_start_at = schedule.next()
181
182                # Leverage the unique_key to prevent duplicate scheduled
183                # jobs with the same start time (also works if unique_key == "")
184                schedule_unique_key = (
185                    f"{job.get_unique_key()}:scheduled:{int(next_start_at.timestamp())}"
186                )
187
188                # Drawback here is if scheduled job is running, and detected by unique_key
189                # so it doesn't schedule the next one? Maybe an ok downside... prevents
190                # overlapping executions...?
191                result = job.run_in_worker(
192                    delay=next_start_at,
193                    unique_key=schedule_unique_key,
194                )
195                # Results are a list if it found scheduled/running jobs...
196                if not isinstance(result, list):
197                    logger.info(
198                        'Scheduling job job_class=%s job_queue="%s" job_start_at="%s" job_schedule="%s" job_unique_key="%s"',
199                        result.job_class,
200                        result.queue,
201                        result.start_at,
202                        schedule,
203                        result.unique_key,
204                    )
205
206            self._jobs_schedule_checked_at = now
207
208    def log_stats(self):
209        try:
210            num_proccesses = len(self.executor._processes)
211        except (AttributeError, TypeError):
212            # Depending on shutdown timing and internal behavior, this might not work
213            num_proccesses = 0
214
215        jobs_requested = JobRequest.objects.filter(queue__in=self.queues).count()
216        jobs_processing = Job.objects.filter(queue__in=self.queues).count()
217
218        logger.info(
219            'Job worker stats worker_processes=%s worker_queues="%s" jobs_requested=%s jobs_processing=%s worker_max_processes=%s worker_max_jobs_per_process=%s',
220            num_proccesses,
221            ",".join(self.queues),
222            jobs_requested,
223            jobs_processing,
224            self.max_processes,
225            self.max_jobs_per_process,
226        )
227
228    def rescue_job_results(self):
229        """Find any lost or failed jobs on this worker's queues and handle them."""
230        # TODO return results and log them if there are any?
231        Job.objects.filter(queue__in=self.queues).mark_lost_jobs()
232        JobResult.objects.filter(queue__in=self.queues).retry_failed_jobs()
233
234
235def future_finished_callback(job_uuid: str, future: Future):
236    if future.cancelled():
237        logger.warning("Job cancelled job_uuid=%s", job_uuid)
238        job = Job.objects.get(uuid=job_uuid)
239        job.convert_to_result(status=JobResultStatuses.CANCELLED)
240    else:
241        logger.debug("Job finished job_uuid=%s", job_uuid)
242
243
244def process_job(job_uuid):
245    try:
246        worker_pid = os.getpid()
247
248        request_started.send(sender=None)
249
250        job = Job.objects.get(uuid=job_uuid)
251
252        logger.info(
253            'Executing job worker_pid=%s job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queue="%s"',
254            worker_pid,
255            job.job_class,
256            job.job_request_uuid,
257            job.priority,
258            job.source,
259            job.queue,
260        )
261
262        def middleware_chain(job):
263            return job.run()
264
265        for middleware_path in reversed(settings.WORKER_MIDDLEWARE):
266            middleware_class = import_string(middleware_path)
267            middleware_instance = middleware_class(middleware_chain)
268            middleware_chain = middleware_instance
269
270        job_result = middleware_chain(job)
271
272        # Release it now
273        del job
274
275        duration = job_result.ended_at - job_result.started_at
276        duration = duration.total_seconds()
277
278        logger.info(
279            'Completed job worker_pid=%s job_class=%s job_uuid=%s job_request_uuid=%s job_result_uuid=%s job_priority=%s job_source="%s" job_queue="%s" job_duration=%s',
280            worker_pid,
281            job_result.job_class,
282            job_result.job_uuid,
283            job_result.job_request_uuid,
284            job_result.uuid,
285            job_result.priority,
286            job_result.source,
287            job_result.queue,
288            duration,
289        )
290
291        del job_result
292    except Exception as e:
293        # Raising exceptions inside the worker process doesn't
294        # seem to be caught/shown anywhere as configured.
295        # So we at least log it out here.
296        # (A job should catch it's own user-code errors, so this is for library errors)
297        logger.exception(e)
298    finally:
299        request_finished.send(sender=None)
300        gc.collect()