Worker
Process background jobs with a database-driven worker.
from plain.worker import Job
from plain.mail import send_mail
# Create a new job class
class WelcomeUserJob(Job):
def __init__(self, user):
self.user = user
def run(self):
send_mail(
subject="Welcome!",
message=f"Hello from Plain, {self.user}",
from_email="[email protected]",
recipient_list=[self.user.email],
)
# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()
The worker process is run separately using plain worker run
.
Staff
Job history
Scheduled jobs
Monitoring
1import datetime
2import logging
3import traceback
4import uuid
5
6from plain import models
7from plain.models import transaction
8from plain.runtime import settings
9from plain.utils import timezone
10
11from .jobs import load_job
12
13logger = logging.getLogger("plain.worker")
14
15
16class JobRequest(models.Model):
17 """
18 Keep all pending job requests in a single table.
19 """
20
21 created_at = models.DateTimeField(auto_now_add=True, db_index=True)
22 uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
23
24 job_class = models.CharField(max_length=255, db_index=True)
25 parameters = models.JSONField(blank=True, null=True)
26 priority = models.IntegerField(default=0, db_index=True)
27 source = models.TextField(blank=True)
28 queue = models.CharField(default="default", max_length=255, db_index=True)
29
30 retries = models.IntegerField(default=0)
31 retry_attempt = models.IntegerField(default=0)
32
33 unique_key = models.CharField(max_length=255, blank=True, db_index=True)
34
35 start_at = models.DateTimeField(blank=True, null=True, db_index=True)
36
37 # context
38 # expires_at = models.DateTimeField(blank=True, null=True)
39
40 class Meta:
41 ordering = ["priority", "-created_at"]
42 indexes = [
43 # Used to dedupe unique in-process jobs
44 models.Index(
45 name="job_request_class_unique_key", fields=["job_class", "unique_key"]
46 ),
47 ]
48 # The job_class and unique_key should be unique at the db-level,
49 # but only if unique_key is not ""
50 constraints = [
51 models.UniqueConstraint(
52 fields=["job_class", "unique_key"],
53 condition=models.Q(unique_key__gt="", retry_attempt=0),
54 name="unique_job_class_unique_key",
55 )
56 ]
57
58 def __str__(self):
59 return f"{self.job_class} [{self.uuid}]"
60
61 def convert_to_job(self):
62 """
63 JobRequests are the pending jobs that are waiting to be executed.
64 We immediately convert them to JobResults when they are picked up.
65 """
66 with transaction.atomic():
67 result = Job.objects.create(
68 job_request_uuid=self.uuid,
69 job_class=self.job_class,
70 parameters=self.parameters,
71 priority=self.priority,
72 source=self.source,
73 queue=self.queue,
74 retries=self.retries,
75 retry_attempt=self.retry_attempt,
76 unique_key=self.unique_key,
77 )
78
79 # Delete the pending JobRequest now
80 self.delete()
81
82 return result
83
84
85class JobQuerySet(models.QuerySet):
86 def running(self):
87 return self.filter(started_at__isnull=False)
88
89 def waiting(self):
90 return self.filter(started_at__isnull=True)
91
92 def mark_lost_jobs(self):
93 # Lost jobs are jobs that have been pending for too long,
94 # and probably never going to get picked up by a worker process.
95 # In theory we could save a timeout per-job and mark them timed-out more quickly,
96 # but if they're still running, we can't actually send a signal to cancel it...
97 now = timezone.now()
98 cutoff = now - datetime.timedelta(seconds=settings.WORKER_JOBS_LOST_AFTER)
99 lost_jobs = self.filter(
100 created_at__lt=cutoff
101 ) # Doesn't matter whether it started or not -- it shouldn't take this long.
102
103 # Note that this will save it in the results,
104 # but lost jobs are only retried if they have a retry!
105 for job in lost_jobs:
106 job.convert_to_result(status=JobResultStatuses.LOST)
107
108
109class Job(models.Model):
110 """
111 All active jobs are stored in this table.
112 """
113
114 uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
115 created_at = models.DateTimeField(auto_now_add=True, db_index=True)
116 started_at = models.DateTimeField(blank=True, null=True, db_index=True)
117
118 # From the JobRequest
119 job_request_uuid = models.UUIDField(db_index=True)
120 job_class = models.CharField(max_length=255, db_index=True)
121 parameters = models.JSONField(blank=True, null=True)
122 priority = models.IntegerField(default=0, db_index=True)
123 source = models.TextField(blank=True)
124 queue = models.CharField(default="default", max_length=255, db_index=True)
125 retries = models.IntegerField(default=0)
126 retry_attempt = models.IntegerField(default=0)
127 unique_key = models.CharField(max_length=255, blank=True, db_index=True)
128
129 objects = JobQuerySet.as_manager()
130
131 class Meta:
132 ordering = ["-created_at"]
133 indexes = [
134 # Used to dedupe unique in-process jobs
135 models.Index(
136 name="job_class_unique_key", fields=["job_class", "unique_key"]
137 ),
138 ]
139
140 def run(self):
141 # This is how we know it has been picked up
142 self.started_at = timezone.now()
143 self.save(update_fields=["started_at"])
144
145 try:
146 job = load_job(self.job_class, self.parameters)
147 job.run()
148 status = JobResultStatuses.SUCCESSFUL
149 error = ""
150 except Exception as e:
151 status = JobResultStatuses.ERRORED
152 error = "".join(traceback.format_tb(e.__traceback__))
153 logger.exception(e)
154
155 return self.convert_to_result(status=status, error=error)
156
157 def convert_to_result(self, *, status, error=""):
158 """
159 Convert this Job to a JobResult.
160 """
161 with transaction.atomic():
162 result = JobResult.objects.create(
163 ended_at=timezone.now(),
164 error=error,
165 status=status,
166 # From the Job
167 job_uuid=self.uuid,
168 started_at=self.started_at,
169 # From the JobRequest
170 job_request_uuid=self.job_request_uuid,
171 job_class=self.job_class,
172 parameters=self.parameters,
173 priority=self.priority,
174 source=self.source,
175 queue=self.queue,
176 retries=self.retries,
177 retry_attempt=self.retry_attempt,
178 unique_key=self.unique_key,
179 )
180
181 # Delete the Job now
182 self.delete()
183
184 return result
185
186 def as_json(self):
187 """A JSON-compatible representation to make it easier to reference in Sentry or logging"""
188 return {
189 "uuid": str(self.uuid),
190 "created_at": self.created_at.isoformat(),
191 "started_at": self.started_at.isoformat() if self.started_at else None,
192 "job_request_uuid": str(self.job_request_uuid),
193 "job_class": self.job_class,
194 "parameters": self.parameters,
195 "priority": self.priority,
196 "source": self.source,
197 "queue": self.queue,
198 "retries": self.retries,
199 "retry_attempt": self.retry_attempt,
200 "unique_key": self.unique_key,
201 }
202
203
204class JobResultQuerySet(models.QuerySet):
205 def successful(self):
206 return self.filter(status=JobResultStatuses.SUCCESSFUL)
207
208 def cancelled(self):
209 return self.filter(status=JobResultStatuses.CANCELLED)
210
211 def lost(self):
212 return self.filter(status=JobResultStatuses.LOST)
213
214 def errored(self):
215 return self.filter(status=JobResultStatuses.ERRORED)
216
217 def retried(self):
218 return self.filter(
219 models.Q(retry_job_request_uuid__isnull=False)
220 | models.Q(retry_attempt__gt=0)
221 )
222
223 def failed(self):
224 return self.filter(
225 status__in=[
226 JobResultStatuses.ERRORED,
227 JobResultStatuses.LOST,
228 JobResultStatuses.CANCELLED,
229 ]
230 )
231
232 def retryable(self):
233 return self.failed().filter(
234 retry_job_request_uuid__isnull=True,
235 retries__gt=0,
236 retry_attempt__lt=models.F("retries"),
237 )
238
239 def retry_failed_jobs(self):
240 for result in self.retryable():
241 result.retry_job()
242
243
244class JobResultStatuses(models.TextChoices):
245 SUCCESSFUL = "SUCCESSFUL", "Successful"
246 ERRORED = "ERRORED", "Errored" # Threw an error
247 CANCELLED = "CANCELLED", "Cancelled" # Cancelled (probably by deploy)
248 LOST = (
249 "LOST",
250 "Lost",
251 ) # Either process lost, lost in transit, or otherwise never finished
252
253
254class JobResult(models.Model):
255 """
256 All in-process and completed jobs are stored in this table.
257 """
258
259 uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
260 created_at = models.DateTimeField(auto_now_add=True, db_index=True)
261
262 # From the Job
263 job_uuid = models.UUIDField(db_index=True)
264 started_at = models.DateTimeField(blank=True, null=True, db_index=True)
265 ended_at = models.DateTimeField(blank=True, null=True, db_index=True)
266 error = models.TextField(blank=True)
267 status = models.CharField(
268 max_length=20,
269 choices=JobResultStatuses.choices,
270 db_index=True,
271 )
272
273 # From the JobRequest
274 job_request_uuid = models.UUIDField(db_index=True)
275 job_class = models.CharField(max_length=255, db_index=True)
276 parameters = models.JSONField(blank=True, null=True)
277 priority = models.IntegerField(default=0, db_index=True)
278 source = models.TextField(blank=True)
279 queue = models.CharField(default="default", max_length=255, db_index=True)
280 retries = models.IntegerField(default=0)
281 retry_attempt = models.IntegerField(default=0)
282 unique_key = models.CharField(max_length=255, blank=True, db_index=True)
283
284 # Retries
285 retry_job_request_uuid = models.UUIDField(blank=True, null=True)
286
287 objects = JobResultQuerySet.as_manager()
288
289 class Meta:
290 ordering = ["-created_at"]
291
292 def retry_job(self, delay: int | None = None):
293 retry_attempt = self.retry_attempt + 1
294
295 try:
296 job = load_job(self.job_class, self.parameters)
297 class_delay = job.get_retry_delay(retry_attempt)
298 except Exception as e:
299 # If this fails at all (loading model instance from str, class not existing, user code error)
300 # then we just continue without a delay. The job request itself can handle the failure like normal.
301 logger.exception(e)
302 class_delay = None
303
304 retry_delay = delay or class_delay
305
306 with transaction.atomic():
307 result = job.run_in_worker(
308 # Pass most of what we know through so it stays consistent
309 queue=self.queue,
310 delay=retry_delay,
311 priority=self.priority,
312 retries=self.retries,
313 retry_attempt=retry_attempt,
314 # Unique key could be passed also?
315 )
316
317 # It's possible this could return a list of pending
318 # jobs, so we need to check if we actually created a new job
319 if isinstance(result, JobRequest):
320 # We need to know the retry request for this result
321 self.retry_job_request_uuid = result.uuid
322 self.save(update_fields=["retry_job_request_uuid"])
323 else:
324 # What to do in this situation? Will continue to run the retry
325 # logic until it successfully retries or it is deleted.
326 pass
327
328 return result