Plain is headed towards 1.0! Subscribe for development updates →

Worker

Process background jobs with a database-driven worker.

from plain.worker import Job
from plain.mail import send_mail

# Create a new job class
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_mail(
            subject="Welcome!",
            message=f"Hello from Plain, {self.user}",
            from_email="[email protected]",
            recipient_list=[self.user.email],
        )


# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()

The worker process is run separately using plain worker run.

Staff

Job history

Scheduled jobs

Monitoring

  1import gc
  2import logging
  3import multiprocessing
  4import os
  5import time
  6from concurrent.futures import Future, ProcessPoolExecutor
  7from functools import partial
  8
  9from plain import models
 10from plain.models import transaction
 11from plain.runtime import settings
 12from plain.signals import request_finished, request_started
 13from plain.utils import timezone
 14from plain.utils.module_loading import import_string
 15
 16from .models import Job, JobRequest, JobResult, JobResultStatuses
 17
 18logger = logging.getLogger("plain.worker")
 19
 20
 21class Worker:
 22    def __init__(
 23        self,
 24        queues,
 25        jobs_schedule=None,
 26        max_processes=None,
 27        max_jobs_per_process=None,
 28        max_pending_per_process=10,
 29        stats_every=None,
 30    ):
 31        if jobs_schedule is None:
 32            jobs_schedule = []
 33
 34        self.executor = ProcessPoolExecutor(
 35            max_workers=max_processes,
 36            max_tasks_per_child=max_jobs_per_process,
 37            mp_context=multiprocessing.get_context("spawn"),
 38        )
 39
 40        self.queues = queues
 41
 42        # Filter the jobs schedule to those that are in the same queue as this worker
 43        self.jobs_schedule = [x for x in jobs_schedule if x[0].get_queue() in queues]
 44
 45        # How often to log the stats (in seconds)
 46        self.stats_every = stats_every
 47
 48        self.max_processes = self.executor._max_workers
 49        self.max_jobs_per_process = max_jobs_per_process
 50        self.max_pending_per_process = max_pending_per_process
 51
 52        self._is_shutting_down = False
 53
 54    def run(self):
 55        logger.info(
 56            "⬣ Starting Plain worker\n    Queues: %s\n    Jobs schedule: %s\n    Stats every: %s seconds\n    Max processes: %s\n    Max jobs per process: %s\n    Max pending per process: %s\n    PID: %s",
 57            ", ".join(self.queues),
 58            "\n                   ".join(str(x) for x in self.jobs_schedule),
 59            self.stats_every,
 60            self.max_processes,
 61            self.max_jobs_per_process,
 62            self.max_pending_per_process,
 63            os.getpid(),
 64        )
 65
 66        while not self._is_shutting_down:
 67            try:
 68                self.maybe_log_stats()
 69                self.maybe_check_job_results()
 70                self.maybe_schedule_jobs()
 71            except Exception as e:
 72                # Log the issue, but don't stop the worker
 73                # (these tasks are kind of ancilarry to the main job processing)
 74                logger.exception(e)
 75
 76            if len(self.executor._pending_work_items) >= (
 77                self.max_processes * self.max_pending_per_process
 78            ):
 79                # We don't want to convert too many JobRequests to Jobs,
 80                # because anything not started yet will be cancelled on deploy etc.
 81                # It's easier to leave them in the JobRequest db queue as long as possible.
 82                time.sleep(0.1)
 83                continue
 84
 85            with transaction.atomic():
 86                job_request = (
 87                    JobRequest.objects.select_for_update(skip_locked=True)
 88                    .filter(
 89                        queue__in=self.queues,
 90                    )
 91                    .filter(
 92                        models.Q(start_at__isnull=True)
 93                        | models.Q(start_at__lte=timezone.now())
 94                    )
 95                    .order_by("priority", "-start_at", "-created_at")
 96                    .first()
 97                )
 98                if not job_request:
 99                    # Potentially no jobs to process (who knows for how long)
100                    # but sleep for a second to give the CPU and DB a break
101                    time.sleep(1)
102                    continue
103
104                logger.info(
105                    'Preparing to execute job job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queues="%s"',
106                    job_request.job_class,
107                    job_request.uuid,
108                    job_request.priority,
109                    job_request.source,
110                    job_request.queue,
111                )
112
113                job = job_request.convert_to_job()
114
115            job_uuid = str(job.uuid)  # Make a str copy
116
117            # Release these now
118            del job_request
119            del job
120
121            future = self.executor.submit(process_job, job_uuid)
122            future.add_done_callback(partial(future_finished_callback, job_uuid))
123
124            # Do a quick sleep regardless to see if it
125            # gives processes a chance to start up
126            time.sleep(0.1)
127
128    def shutdown(self):
129        if self._is_shutting_down:
130            # Already shutting down somewhere else
131            return
132
133        logger.info("Job worker shutdown started")
134        self._is_shutting_down = True
135        self.executor.shutdown(wait=True, cancel_futures=True)
136        logger.info("Job worker shutdown complete")
137
138    def maybe_log_stats(self):
139        if not self.stats_every:
140            return
141
142        now = time.time()
143
144        if not hasattr(self, "_stats_logged_at"):
145            self._stats_logged_at = now
146
147        if now - self._stats_logged_at > self.stats_every:
148            self._stats_logged_at = now
149            self.log_stats()
150
151    def maybe_check_job_results(self):
152        now = time.time()
153
154        if not hasattr(self, "_job_results_checked_at"):
155            self._job_results_checked_at = now
156
157        check_every = 60  # Only need to check once a minute
158
159        if now - self._job_results_checked_at > check_every:
160            self._job_results_checked_at = now
161            self.rescue_job_results()
162
163    def maybe_schedule_jobs(self):
164        if not self.jobs_schedule:
165            return
166
167        now = time.time()
168
169        if not hasattr(self, "_jobs_schedule_checked_at"):
170            self._jobs_schedule_checked_at = now
171
172        check_every = 60  # Only need to check once every 60 seconds
173
174        if now - self._jobs_schedule_checked_at > check_every:
175            for job, schedule in self.jobs_schedule:
176                next_start_at = schedule.next()
177
178                # Leverage the unique_key to prevent duplicate scheduled
179                # jobs with the same start time (also works if unique_key == "")
180                schedule_unique_key = (
181                    f"{job.get_unique_key()}:scheduled:{int(next_start_at.timestamp())}"
182                )
183
184                # Drawback here is if scheduled job is running, and detected by unique_key
185                # so it doesn't schedule the next one? Maybe an ok downside... prevents
186                # overlapping executions...?
187                result = job.run_in_worker(
188                    delay=next_start_at,
189                    unique_key=schedule_unique_key,
190                )
191                # Results are a list if it found scheduled/running jobs...
192                if not isinstance(result, list):
193                    logger.info(
194                        'Scheduling job job_class=%s job_queue="%s" job_start_at="%s" job_schedule="%s" job_unique_key="%s"',
195                        result.job_class,
196                        result.queue,
197                        result.start_at,
198                        schedule,
199                        result.unique_key,
200                    )
201
202            self._jobs_schedule_checked_at = now
203
204    def log_stats(self):
205        try:
206            num_proccesses = len(self.executor._processes)
207        except (AttributeError, TypeError):
208            # Depending on shutdown timing and internal behavior, this might not work
209            num_proccesses = 0
210
211        jobs_requested = JobRequest.objects.filter(queue__in=self.queues).count()
212        jobs_processing = Job.objects.filter(queue__in=self.queues).count()
213
214        logger.info(
215            'Job worker stats worker_processes=%s worker_queues="%s" jobs_requested=%s jobs_processing=%s worker_max_processes=%s worker_max_jobs_per_process=%s',
216            num_proccesses,
217            ",".join(self.queues),
218            jobs_requested,
219            jobs_processing,
220            self.max_processes,
221            self.max_jobs_per_process,
222        )
223
224    def rescue_job_results(self):
225        """Find any lost or failed jobs on this worker's queues and handle them."""
226        # TODO return results and log them if there are any?
227        Job.objects.filter(queue__in=self.queues).mark_lost_jobs()
228        JobResult.objects.filter(queue__in=self.queues).retry_failed_jobs()
229
230
231def future_finished_callback(job_uuid: str, future: Future):
232    if future.cancelled():
233        logger.warning("Job cancelled job_uuid=%s", job_uuid)
234        job = Job.objects.get(uuid=job_uuid)
235        job.convert_to_result(status=JobResultStatuses.CANCELLED)
236    else:
237        logger.debug("Job finished job_uuid=%s", job_uuid)
238
239
240def process_job(job_uuid):
241    try:
242        worker_pid = os.getpid()
243
244        request_started.send(sender=None)
245
246        job = Job.objects.get(uuid=job_uuid)
247
248        logger.info(
249            'Executing job worker_pid=%s job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queue="%s"',
250            worker_pid,
251            job.job_class,
252            job.job_request_uuid,
253            job.priority,
254            job.source,
255            job.queue,
256        )
257
258        def middleware_chain(job):
259            return job.run()
260
261        for middleware_path in reversed(settings.WORKER_MIDDLEWARE):
262            middleware_class = import_string(middleware_path)
263            middleware_instance = middleware_class(middleware_chain)
264            middleware_chain = middleware_instance
265
266        job_result = middleware_chain(job)
267
268        # Release it now
269        del job
270
271        duration = job_result.ended_at - job_result.started_at
272        duration = duration.total_seconds()
273
274        logger.info(
275            'Completed job worker_pid=%s job_class=%s job_uuid=%s job_request_uuid=%s job_result_uuid=%s job_priority=%s job_source="%s" job_queue="%s" job_duration=%s',
276            worker_pid,
277            job_result.job_class,
278            job_result.job_uuid,
279            job_result.job_request_uuid,
280            job_result.uuid,
281            job_result.priority,
282            job_result.source,
283            job_result.queue,
284            duration,
285        )
286
287        del job_result
288    except Exception as e:
289        # Raising exceptions inside the worker process doesn't
290        # seem to be caught/shown anywhere as configured.
291        # So we at least log it out here.
292        # (A job should catch it's own user-code errors, so this is for library errors)
293        logger.exception(e)
294    finally:
295        request_finished.send(sender=None)
296        gc.collect()