Plain is headed towards 1.0! Subscribe for development updates →

Worker

Process background jobs with a database-driven worker.

from plain.worker import Job
from plain.mail import send_mail

# Create a new job class
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_mail(
            subject="Welcome!",
            message=f"Hello from Plain, {self.user}",
            from_email="[email protected]",
            recipient_list=[self.user.email],
        )


# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()

The worker process is run separately using plain worker run.

Staff

Job history

Scheduled jobs

Monitoring

  1import datetime
  2import logging
  3import traceback
  4import uuid
  5
  6from plain import models
  7from plain.models import transaction
  8from plain.runtime import settings
  9from plain.utils import timezone
 10
 11from .jobs import load_job
 12
 13logger = logging.getLogger("plain.worker")
 14
 15
 16class JobRequest(models.Model):
 17    """
 18    Keep all pending job requests in a single table.
 19    """
 20
 21    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
 22    uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
 23
 24    job_class = models.CharField(max_length=255, db_index=True)
 25    parameters = models.JSONField(blank=True, null=True)
 26    priority = models.IntegerField(default=0, db_index=True)
 27    source = models.TextField(blank=True)
 28    queue = models.CharField(default="default", max_length=255, db_index=True)
 29
 30    retries = models.IntegerField(default=0)
 31    retry_attempt = models.IntegerField(default=0)
 32
 33    unique_key = models.CharField(max_length=255, blank=True, db_index=True)
 34
 35    start_at = models.DateTimeField(blank=True, null=True, db_index=True)
 36
 37    # context
 38    # expires_at = models.DateTimeField(blank=True, null=True)
 39
 40    class Meta:
 41        ordering = ["priority", "-created_at"]
 42        indexes = [
 43            # Used to dedupe unique in-process jobs
 44            models.Index(
 45                name="job_request_class_unique_key", fields=["job_class", "unique_key"]
 46            ),
 47        ]
 48        # The job_class and unique_key should be unique at the db-level,
 49        # but only if unique_key is not ""
 50        constraints = [
 51            models.UniqueConstraint(
 52                fields=["job_class", "unique_key"],
 53                condition=models.Q(unique_key__gt="", retry_attempt=0),
 54                name="unique_job_class_unique_key",
 55            )
 56        ]
 57
 58    def __str__(self):
 59        return f"{self.job_class} [{self.uuid}]"
 60
 61    def convert_to_job(self):
 62        """
 63        JobRequests are the pending jobs that are waiting to be executed.
 64        We immediately convert them to JobResults when they are picked up.
 65        """
 66        with transaction.atomic():
 67            result = Job.objects.create(
 68                job_request_uuid=self.uuid,
 69                job_class=self.job_class,
 70                parameters=self.parameters,
 71                priority=self.priority,
 72                source=self.source,
 73                queue=self.queue,
 74                retries=self.retries,
 75                retry_attempt=self.retry_attempt,
 76                unique_key=self.unique_key,
 77            )
 78
 79            # Delete the pending JobRequest now
 80            self.delete()
 81
 82        return result
 83
 84
 85class JobQuerySet(models.QuerySet):
 86    def running(self):
 87        return self.filter(started_at__isnull=False)
 88
 89    def waiting(self):
 90        return self.filter(started_at__isnull=True)
 91
 92    def mark_lost_jobs(self):
 93        # Lost jobs are jobs that have been pending for too long,
 94        # and probably never going to get picked up by a worker process.
 95        # In theory we could save a timeout per-job and mark them timed-out more quickly,
 96        # but if they're still running, we can't actually send a signal to cancel it...
 97        now = timezone.now()
 98        cutoff = now - datetime.timedelta(seconds=settings.WORKER_JOBS_LOST_AFTER)
 99        lost_jobs = self.filter(
100            created_at__lt=cutoff
101        )  # Doesn't matter whether it started or not -- it shouldn't take this long.
102
103        # Note that this will save it in the results,
104        # but lost jobs are only retried if they have a retry!
105        for job in lost_jobs:
106            job.convert_to_result(status=JobResultStatuses.LOST)
107
108
109class Job(models.Model):
110    """
111    All active jobs are stored in this table.
112    """
113
114    uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
115    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
116    started_at = models.DateTimeField(blank=True, null=True, db_index=True)
117
118    # From the JobRequest
119    job_request_uuid = models.UUIDField(db_index=True)
120    job_class = models.CharField(max_length=255, db_index=True)
121    parameters = models.JSONField(blank=True, null=True)
122    priority = models.IntegerField(default=0, db_index=True)
123    source = models.TextField(blank=True)
124    queue = models.CharField(default="default", max_length=255, db_index=True)
125    retries = models.IntegerField(default=0)
126    retry_attempt = models.IntegerField(default=0)
127    unique_key = models.CharField(max_length=255, blank=True, db_index=True)
128
129    objects = JobQuerySet.as_manager()
130
131    class Meta:
132        ordering = ["-created_at"]
133        indexes = [
134            # Used to dedupe unique in-process jobs
135            models.Index(
136                name="job_class_unique_key", fields=["job_class", "unique_key"]
137            ),
138        ]
139
140    def run(self):
141        # This is how we know it has been picked up
142        self.started_at = timezone.now()
143        self.save(update_fields=["started_at"])
144
145        try:
146            job = load_job(self.job_class, self.parameters)
147            job.run()
148            status = JobResultStatuses.SUCCESSFUL
149            error = ""
150        except Exception as e:
151            status = JobResultStatuses.ERRORED
152            error = "".join(traceback.format_tb(e.__traceback__))
153            logger.exception(e)
154
155        return self.convert_to_result(status=status, error=error)
156
157    def convert_to_result(self, *, status, error=""):
158        """
159        Convert this Job to a JobResult.
160        """
161        with transaction.atomic():
162            result = JobResult.objects.create(
163                ended_at=timezone.now(),
164                error=error,
165                status=status,
166                # From the Job
167                job_uuid=self.uuid,
168                started_at=self.started_at,
169                # From the JobRequest
170                job_request_uuid=self.job_request_uuid,
171                job_class=self.job_class,
172                parameters=self.parameters,
173                priority=self.priority,
174                source=self.source,
175                queue=self.queue,
176                retries=self.retries,
177                retry_attempt=self.retry_attempt,
178                unique_key=self.unique_key,
179            )
180
181            # Delete the Job now
182            self.delete()
183
184        return result
185
186    def as_json(self):
187        """A JSON-compatible representation to make it easier to reference in Sentry or logging"""
188        return {
189            "uuid": str(self.uuid),
190            "created_at": self.created_at.isoformat(),
191            "started_at": self.started_at.isoformat() if self.started_at else None,
192            "job_request_uuid": str(self.job_request_uuid),
193            "job_class": self.job_class,
194            "parameters": self.parameters,
195            "priority": self.priority,
196            "source": self.source,
197            "queue": self.queue,
198            "retries": self.retries,
199            "retry_attempt": self.retry_attempt,
200            "unique_key": self.unique_key,
201        }
202
203
204class JobResultQuerySet(models.QuerySet):
205    def successful(self):
206        return self.filter(status=JobResultStatuses.SUCCESSFUL)
207
208    def cancelled(self):
209        return self.filter(status=JobResultStatuses.CANCELLED)
210
211    def lost(self):
212        return self.filter(status=JobResultStatuses.LOST)
213
214    def errored(self):
215        return self.filter(status=JobResultStatuses.ERRORED)
216
217    def retried(self):
218        return self.filter(
219            models.Q(retry_job_request_uuid__isnull=False)
220            | models.Q(retry_attempt__gt=0)
221        )
222
223    def failed(self):
224        return self.filter(
225            status__in=[
226                JobResultStatuses.ERRORED,
227                JobResultStatuses.LOST,
228                JobResultStatuses.CANCELLED,
229            ]
230        )
231
232    def retryable(self):
233        return self.failed().filter(
234            retry_job_request_uuid__isnull=True,
235            retries__gt=0,
236            retry_attempt__lt=models.F("retries"),
237        )
238
239    def retry_failed_jobs(self):
240        for result in self.retryable():
241            result.retry_job()
242
243
244class JobResultStatuses(models.TextChoices):
245    SUCCESSFUL = "SUCCESSFUL", "Successful"
246    ERRORED = "ERRORED", "Errored"  # Threw an error
247    CANCELLED = "CANCELLED", "Cancelled"  # Cancelled (probably by deploy)
248    LOST = (
249        "LOST",
250        "Lost",
251    )  # Either process lost, lost in transit, or otherwise never finished
252
253
254class JobResult(models.Model):
255    """
256    All in-process and completed jobs are stored in this table.
257    """
258
259    uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
260    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
261
262    # From the Job
263    job_uuid = models.UUIDField(db_index=True)
264    started_at = models.DateTimeField(blank=True, null=True, db_index=True)
265    ended_at = models.DateTimeField(blank=True, null=True, db_index=True)
266    error = models.TextField(blank=True)
267    status = models.CharField(
268        max_length=20,
269        choices=JobResultStatuses.choices,
270        db_index=True,
271    )
272
273    # From the JobRequest
274    job_request_uuid = models.UUIDField(db_index=True)
275    job_class = models.CharField(max_length=255, db_index=True)
276    parameters = models.JSONField(blank=True, null=True)
277    priority = models.IntegerField(default=0, db_index=True)
278    source = models.TextField(blank=True)
279    queue = models.CharField(default="default", max_length=255, db_index=True)
280    retries = models.IntegerField(default=0)
281    retry_attempt = models.IntegerField(default=0)
282    unique_key = models.CharField(max_length=255, blank=True, db_index=True)
283
284    # Retries
285    retry_job_request_uuid = models.UUIDField(blank=True, null=True)
286
287    objects = JobResultQuerySet.as_manager()
288
289    class Meta:
290        ordering = ["-created_at"]
291
292    def retry_job(self, delay: int | None = None):
293        retry_attempt = self.retry_attempt + 1
294
295        try:
296            job = load_job(self.job_class, self.parameters)
297            class_delay = job.get_retry_delay(retry_attempt)
298        except Exception as e:
299            # If this fails at all (loading model instance from str, class not existing, user code error)
300            # then we just continue without a delay. The job request itself can handle the failure like normal.
301            logger.exception(e)
302            class_delay = None
303
304        retry_delay = delay or class_delay
305
306        with transaction.atomic():
307            result = job.run_in_worker(
308                # Pass most of what we know through so it stays consistent
309                queue=self.queue,
310                delay=retry_delay,
311                priority=self.priority,
312                retries=self.retries,
313                retry_attempt=retry_attempt,
314                # Unique key could be passed also?
315            )
316
317            # It's possible this could return a list of pending
318            # jobs, so we need to check if we actually created a new job
319            if isinstance(result, JobRequest):
320                # We need to know the retry request for this result
321                self.retry_job_request_uuid = result.uuid
322                self.save(update_fields=["retry_job_request_uuid"])
323            else:
324                # What to do in this situation? Will continue to run the retry
325                # logic until it successfully retries or it is deleted.
326                pass
327
328        return result