Plain is headed towards 1.0! Subscribe for development updates →

Worker

Process background jobs with a database-driven worker.

from plain.worker import Job
from plain.mail import send_mail

# Create a new job class
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_mail(
            subject="Welcome!",
            message=f"Hello from Plain, {self.user}",
            from_email="[email protected]",
            recipient_list=[self.user.email],
        )


# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()

The worker process is run separately using plain worker run.

Admin

Job history

Scheduled jobs

Monitoring

  1import gc
  2import logging
  3import multiprocessing
  4import os
  5import time
  6from concurrent.futures import Future, ProcessPoolExecutor
  7from functools import partial
  8
  9from plain import models
 10from plain.models import transaction
 11from plain.runtime import settings
 12from plain.signals import request_finished, request_started
 13from plain.utils import timezone
 14from plain.utils.module_loading import import_string
 15
 16from .models import Job, JobRequest, JobResult, JobResultStatuses
 17from .registry import jobs_registry
 18
 19logger = logging.getLogger("plain.worker")
 20
 21
 22class Worker:
 23    def __init__(
 24        self,
 25        queues,
 26        jobs_schedule=None,
 27        max_processes=None,
 28        max_jobs_per_process=None,
 29        max_pending_per_process=10,
 30        stats_every=None,
 31    ):
 32        if jobs_schedule is None:
 33            jobs_schedule = []
 34
 35        self.executor = ProcessPoolExecutor(
 36            max_workers=max_processes,
 37            max_tasks_per_child=max_jobs_per_process,
 38            mp_context=multiprocessing.get_context("spawn"),
 39        )
 40
 41        self.queues = queues
 42
 43        # Filter the jobs schedule to those that are in the same queue as this worker
 44        self.jobs_schedule = [x for x in jobs_schedule if x[0].get_queue() in queues]
 45
 46        # How often to log the stats (in seconds)
 47        self.stats_every = stats_every
 48
 49        self.max_processes = self.executor._max_workers
 50        self.max_jobs_per_process = max_jobs_per_process
 51        self.max_pending_per_process = max_pending_per_process
 52
 53        self._is_shutting_down = False
 54
 55    def run(self):
 56        logger.info(
 57            "⬣ Starting Plain worker\n    Registered jobs: %s\n    Queues: %s\n    Jobs schedule: %s\n    Stats every: %s seconds\n    Max processes: %s\n    Max jobs per process: %s\n    Max pending per process: %s\n    PID: %s",
 58            "\n                     ".join(
 59                f"{name}: {cls}" for name, cls in jobs_registry.jobs.items()
 60            ),
 61            ", ".join(self.queues),
 62            "\n                   ".join(str(x) for x in self.jobs_schedule),
 63            self.stats_every,
 64            self.max_processes,
 65            self.max_jobs_per_process,
 66            self.max_pending_per_process,
 67            os.getpid(),
 68        )
 69
 70        while not self._is_shutting_down:
 71            try:
 72                self.maybe_log_stats()
 73                self.maybe_check_job_results()
 74                self.maybe_schedule_jobs()
 75            except Exception as e:
 76                # Log the issue, but don't stop the worker
 77                # (these tasks are kind of ancilarry to the main job processing)
 78                logger.exception(e)
 79
 80            if len(self.executor._pending_work_items) >= (
 81                self.max_processes * self.max_pending_per_process
 82            ):
 83                # We don't want to convert too many JobRequests to Jobs,
 84                # because anything not started yet will be cancelled on deploy etc.
 85                # It's easier to leave them in the JobRequest db queue as long as possible.
 86                time.sleep(0.1)
 87                continue
 88
 89            with transaction.atomic():
 90                job_request = (
 91                    JobRequest.objects.select_for_update(skip_locked=True)
 92                    .filter(
 93                        queue__in=self.queues,
 94                    )
 95                    .filter(
 96                        models.Q(start_at__isnull=True)
 97                        | models.Q(start_at__lte=timezone.now())
 98                    )
 99                    .order_by("priority", "-start_at", "-created_at")
100                    .first()
101                )
102                if not job_request:
103                    # Potentially no jobs to process (who knows for how long)
104                    # but sleep for a second to give the CPU and DB a break
105                    time.sleep(1)
106                    continue
107
108                logger.info(
109                    'Preparing to execute job job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queues="%s"',
110                    job_request.job_class,
111                    job_request.uuid,
112                    job_request.priority,
113                    job_request.source,
114                    job_request.queue,
115                )
116
117                job = job_request.convert_to_job()
118
119            job_uuid = str(job.uuid)  # Make a str copy
120
121            # Release these now
122            del job_request
123            del job
124
125            future = self.executor.submit(process_job, job_uuid)
126            future.add_done_callback(partial(future_finished_callback, job_uuid))
127
128            # Do a quick sleep regardless to see if it
129            # gives processes a chance to start up
130            time.sleep(0.1)
131
132    def shutdown(self):
133        if self._is_shutting_down:
134            # Already shutting down somewhere else
135            return
136
137        logger.info("Job worker shutdown started")
138        self._is_shutting_down = True
139        self.executor.shutdown(wait=True, cancel_futures=True)
140        logger.info("Job worker shutdown complete")
141
142    def maybe_log_stats(self):
143        if not self.stats_every:
144            return
145
146        now = time.time()
147
148        if not hasattr(self, "_stats_logged_at"):
149            self._stats_logged_at = now
150
151        if now - self._stats_logged_at > self.stats_every:
152            self._stats_logged_at = now
153            self.log_stats()
154
155    def maybe_check_job_results(self):
156        now = time.time()
157
158        if not hasattr(self, "_job_results_checked_at"):
159            self._job_results_checked_at = now
160
161        check_every = 60  # Only need to check once a minute
162
163        if now - self._job_results_checked_at > check_every:
164            self._job_results_checked_at = now
165            self.rescue_job_results()
166
167    def maybe_schedule_jobs(self):
168        if not self.jobs_schedule:
169            return
170
171        now = time.time()
172
173        if not hasattr(self, "_jobs_schedule_checked_at"):
174            self._jobs_schedule_checked_at = now
175
176        check_every = 60  # Only need to check once every 60 seconds
177
178        if now - self._jobs_schedule_checked_at > check_every:
179            for job, schedule in self.jobs_schedule:
180                next_start_at = schedule.next()
181
182                # Leverage the unique_key to prevent duplicate scheduled
183                # jobs with the same start time (also works if unique_key == "")
184                schedule_unique_key = (
185                    f"{job.get_unique_key()}:scheduled:{int(next_start_at.timestamp())}"
186                )
187
188                # Drawback here is if scheduled job is running, and detected by unique_key
189                # so it doesn't schedule the next one? Maybe an ok downside... prevents
190                # overlapping executions...?
191                result = job.run_in_worker(
192                    delay=next_start_at,
193                    unique_key=schedule_unique_key,
194                )
195                # Results are a list if it found scheduled/running jobs...
196                if not isinstance(result, list):
197                    logger.info(
198                        'Scheduling job job_class=%s job_queue="%s" job_start_at="%s" job_schedule="%s" job_unique_key="%s"',
199                        result.job_class,
200                        result.queue,
201                        result.start_at,
202                        schedule,
203                        result.unique_key,
204                    )
205
206            self._jobs_schedule_checked_at = now
207
208    def log_stats(self):
209        try:
210            num_proccesses = len(self.executor._processes)
211        except (AttributeError, TypeError):
212            # Depending on shutdown timing and internal behavior, this might not work
213            num_proccesses = 0
214
215        jobs_requested = JobRequest.objects.filter(queue__in=self.queues).count()
216        jobs_processing = Job.objects.filter(queue__in=self.queues).count()
217
218        logger.info(
219            'Job worker stats worker_processes=%s worker_queues="%s" jobs_requested=%s jobs_processing=%s worker_max_processes=%s worker_max_jobs_per_process=%s',
220            num_proccesses,
221            ",".join(self.queues),
222            jobs_requested,
223            jobs_processing,
224            self.max_processes,
225            self.max_jobs_per_process,
226        )
227
228    def rescue_job_results(self):
229        """Find any lost or failed jobs on this worker's queues and handle them."""
230        # TODO return results and log them if there are any?
231        Job.objects.filter(queue__in=self.queues).mark_lost_jobs()
232        JobResult.objects.filter(queue__in=self.queues).retry_failed_jobs()
233
234
235def future_finished_callback(job_uuid: str, future: Future):
236    if future.cancelled():
237        logger.warning("Job cancelled job_uuid=%s", job_uuid)
238        job = Job.objects.get(uuid=job_uuid)
239        job.convert_to_result(status=JobResultStatuses.CANCELLED)
240    else:
241        logger.debug("Job finished job_uuid=%s", job_uuid)
242
243
244def process_job(job_uuid):
245    try:
246        worker_pid = os.getpid()
247
248        request_started.send(sender=None)
249
250        job = Job.objects.get(uuid=job_uuid)
251
252        logger.info(
253            'Executing job worker_pid=%s job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queue="%s"',
254            worker_pid,
255            job.job_class,
256            job.job_request_uuid,
257            job.priority,
258            job.source,
259            job.queue,
260        )
261
262        def middleware_chain(job):
263            return job.run()
264
265        for middleware_path in reversed(settings.WORKER_MIDDLEWARE):
266            middleware_class = import_string(middleware_path)
267            middleware_instance = middleware_class(middleware_chain)
268            middleware_chain = middleware_instance
269
270        job_result = middleware_chain(job)
271
272        # Release it now
273        del job
274
275        duration = job_result.ended_at - job_result.started_at
276        duration = duration.total_seconds()
277
278        logger.info(
279            'Completed job worker_pid=%s job_class=%s job_uuid=%s job_request_uuid=%s job_result_uuid=%s job_priority=%s job_source="%s" job_queue="%s" job_duration=%s',
280            worker_pid,
281            job_result.job_class,
282            job_result.job_uuid,
283            job_result.job_request_uuid,
284            job_result.uuid,
285            job_result.priority,
286            job_result.source,
287            job_result.queue,
288            duration,
289        )
290
291        del job_result
292    except Exception as e:
293        # Raising exceptions inside the worker process doesn't
294        # seem to be caught/shown anywhere as configured.
295        # So we at least log it out here.
296        # (A job should catch it's own user-code errors, so this is for library errors)
297        logger.exception(e)
298    finally:
299        request_finished.send(sender=None)
300        gc.collect()