1from __future__ import annotations
2
3import datetime
4import logging
5import traceback
6import uuid
7from typing import Self
8
9from opentelemetry import trace
10from opentelemetry.semconv._incubating.attributes.code_attributes import (
11 CODE_NAMESPACE,
12)
13from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
14 MESSAGING_CONSUMER_GROUP_NAME,
15 MESSAGING_DESTINATION_NAME,
16 MESSAGING_MESSAGE_ID,
17 MESSAGING_OPERATION_NAME,
18 MESSAGING_OPERATION_TYPE,
19 MESSAGING_SYSTEM,
20 MessagingOperationTypeValues,
21)
22from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
23from opentelemetry.trace import Link, SpanContext, SpanKind
24
25from plain import models
26from plain.models import transaction
27from plain.runtime import settings
28from plain.utils import timezone
29
30from .registry import jobs_registry
31
32logger = logging.getLogger("plain.jobs")
33tracer = trace.get_tracer("plain.jobs")
34
35
36@models.register_model
37class JobRequest(models.Model):
38 """
39 Keep all pending job requests in a single table.
40 """
41
42 created_at = models.DateTimeField(auto_now_add=True)
43 uuid = models.UUIDField(default=uuid.uuid4)
44
45 job_class = models.CharField(max_length=255)
46 parameters = models.JSONField(required=False, allow_null=True)
47 priority = models.IntegerField(default=0)
48 source = models.TextField(required=False)
49 queue = models.CharField(default="default", max_length=255)
50
51 retries = models.IntegerField(default=0)
52 retry_attempt = models.IntegerField(default=0)
53
54 unique_key = models.CharField(max_length=255, required=False)
55
56 start_at = models.DateTimeField(required=False, allow_null=True)
57
58 # OpenTelemetry trace context
59 trace_id = models.CharField(max_length=34, required=False, allow_null=True)
60 span_id = models.CharField(max_length=18, required=False, allow_null=True)
61
62 # expires_at = models.DateTimeField(required=False, allow_null=True)
63
64 model_options = models.Options(
65 ordering=["priority", "-created_at"],
66 indexes=[
67 models.Index(fields=["priority"]),
68 models.Index(fields=["created_at"]),
69 models.Index(fields=["queue"]),
70 models.Index(fields=["start_at"]),
71 models.Index(fields=["unique_key"]),
72 models.Index(fields=["job_class"]),
73 models.Index(fields=["trace_id"]),
74 # Used to dedupe unique in-process jobs
75 models.Index(
76 name="job_request_class_unique_key", fields=["job_class", "unique_key"]
77 ),
78 ],
79 # The job_class and unique_key should be unique at the db-level,
80 # but only if unique_key is not ""
81 constraints=[
82 models.UniqueConstraint(
83 fields=["job_class", "unique_key"],
84 condition=models.Q(unique_key__gt="", retry_attempt=0),
85 name="plainjobs_jobrequest_unique_job_class_key",
86 ),
87 models.UniqueConstraint(
88 fields=["uuid"], name="plainjobs_jobrequest_unique_uuid"
89 ),
90 ],
91 )
92
93 def __str__(self) -> str:
94 return f"{self.job_class} [{self.uuid}]"
95
96 def convert_to_job_process(self) -> JobProcess:
97 """
98 JobRequests are the pending jobs that are waiting to be executed.
99 We immediately convert them to JobProcess when they are picked up.
100 """
101 with transaction.atomic():
102 result = JobProcess.query.create(
103 job_request_uuid=self.uuid,
104 job_class=self.job_class,
105 parameters=self.parameters,
106 priority=self.priority,
107 source=self.source,
108 queue=self.queue,
109 retries=self.retries,
110 retry_attempt=self.retry_attempt,
111 unique_key=self.unique_key,
112 trace_id=self.trace_id,
113 span_id=self.span_id,
114 )
115
116 # Delete the pending JobRequest now
117 self.delete()
118
119 return result
120
121
122class JobQuerySet(models.QuerySet["JobProcess"]):
123 def running(self) -> Self:
124 return self.filter(started_at__isnull=False)
125
126 def waiting(self) -> Self:
127 return self.filter(started_at__isnull=True)
128
129 def mark_lost_jobs(self) -> None:
130 # Lost jobs are jobs that have been pending for too long,
131 # and probably never going to get picked up by a worker process.
132 # In theory we could save a timeout per-job and mark them timed-out more quickly,
133 # but if they're still running, we can't actually send a signal to cancel it...
134 now = timezone.now()
135 cutoff = now - datetime.timedelta(seconds=settings.JOBS_TIMEOUT)
136 lost_jobs = self.filter(
137 created_at__lt=cutoff
138 ) # Doesn't matter whether it started or not -- it shouldn't take this long.
139
140 # Note that this will save it in the results,
141 # but lost jobs are only retried if they have a retry!
142 for job in lost_jobs:
143 job.convert_to_result(status=JobResultStatuses.LOST)
144
145
146@models.register_model
147class JobProcess(models.Model):
148 """
149 All active jobs are stored in this table.
150 """
151
152 uuid = models.UUIDField(default=uuid.uuid4)
153 created_at = models.DateTimeField(auto_now_add=True)
154 started_at = models.DateTimeField(required=False, allow_null=True)
155
156 # From the JobRequest
157 job_request_uuid = models.UUIDField()
158 job_class = models.CharField(max_length=255)
159 parameters = models.JSONField(required=False, allow_null=True)
160 priority = models.IntegerField(default=0)
161 source = models.TextField(required=False)
162 queue = models.CharField(default="default", max_length=255)
163 retries = models.IntegerField(default=0)
164 retry_attempt = models.IntegerField(default=0)
165 unique_key = models.CharField(max_length=255, required=False)
166
167 # OpenTelemetry trace context
168 trace_id = models.CharField(max_length=34, required=False, allow_null=True)
169 span_id = models.CharField(max_length=18, required=False, allow_null=True)
170
171 query = JobQuerySet()
172
173 model_options = models.Options(
174 ordering=["-created_at"],
175 indexes=[
176 models.Index(fields=["created_at"]),
177 models.Index(fields=["queue"]),
178 models.Index(fields=["unique_key"]),
179 models.Index(fields=["started_at"]),
180 models.Index(fields=["job_class"]),
181 models.Index(fields=["job_request_uuid"]),
182 models.Index(fields=["trace_id"]),
183 # Used to dedupe unique in-process jobs
184 models.Index(
185 name="job_class_unique_key", fields=["job_class", "unique_key"]
186 ),
187 ],
188 constraints=[
189 models.UniqueConstraint(fields=["uuid"], name="plainjobs_job_unique_uuid"),
190 ],
191 )
192
193 def run(self) -> JobResult:
194 links = []
195 if self.trace_id and self.span_id:
196 try:
197 links.append(
198 Link(
199 SpanContext(
200 trace_id=int(self.trace_id, 16),
201 span_id=int(self.span_id, 16),
202 is_remote=True,
203 )
204 )
205 )
206 except (ValueError, TypeError):
207 logger.warning("Invalid trace context for job %s", self.uuid)
208
209 with (
210 tracer.start_as_current_span(
211 f"run {self.job_class}",
212 kind=SpanKind.CONSUMER,
213 attributes={
214 MESSAGING_SYSTEM: "plain.jobs",
215 MESSAGING_OPERATION_TYPE: MessagingOperationTypeValues.PROCESS.value,
216 MESSAGING_OPERATION_NAME: "run",
217 MESSAGING_MESSAGE_ID: str(self.uuid),
218 MESSAGING_DESTINATION_NAME: self.queue,
219 MESSAGING_CONSUMER_GROUP_NAME: self.queue, # Workers consume from specific queues
220 CODE_NAMESPACE: self.job_class,
221 },
222 links=links,
223 ) as span
224 ):
225 # This is how we know it has been picked up
226 self.started_at = timezone.now()
227 self.save(update_fields=["started_at"])
228
229 try:
230 job = jobs_registry.load_job(self.job_class, self.parameters)
231 job.run()
232 status = JobResultStatuses.SUCCESSFUL
233 error = ""
234 span.set_status(trace.StatusCode.OK)
235 except Exception as e:
236 status = JobResultStatuses.ERRORED
237 error = "".join(traceback.format_tb(e.__traceback__))
238 logger.exception(e)
239 span.record_exception(e)
240 span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
241 span.set_attribute(ERROR_TYPE, type(e).__name__)
242
243 return self.convert_to_result(status=status, error=error)
244
245 def convert_to_result(self, *, status: str, error: str = "") -> JobResult:
246 """
247 Convert this JobProcess to a JobResult.
248 """
249 with transaction.atomic():
250 result = JobResult.query.create(
251 ended_at=timezone.now(),
252 error=error,
253 status=status,
254 # From the JobProcess
255 job_process_uuid=self.uuid,
256 started_at=self.started_at,
257 # From the JobRequest
258 job_request_uuid=self.job_request_uuid,
259 job_class=self.job_class,
260 parameters=self.parameters,
261 priority=self.priority,
262 source=self.source,
263 queue=self.queue,
264 retries=self.retries,
265 retry_attempt=self.retry_attempt,
266 unique_key=self.unique_key,
267 trace_id=self.trace_id,
268 span_id=self.span_id,
269 )
270
271 # Delete the JobProcess now
272 self.delete()
273
274 return result
275
276 def as_json(self) -> dict[str, str | int | dict | None]:
277 """A JSON-compatible representation to make it easier to reference in Sentry or logging"""
278 return {
279 "uuid": str(self.uuid),
280 "created_at": self.created_at.isoformat(),
281 "started_at": self.started_at.isoformat() if self.started_at else None,
282 "job_request_uuid": str(self.job_request_uuid),
283 "job_class": self.job_class,
284 "parameters": self.parameters,
285 "priority": self.priority,
286 "source": self.source,
287 "queue": self.queue,
288 "retries": self.retries,
289 "retry_attempt": self.retry_attempt,
290 "unique_key": self.unique_key,
291 "trace_id": self.trace_id,
292 "span_id": self.span_id,
293 }
294
295
296class JobResultQuerySet(models.QuerySet["JobResult"]):
297 def successful(self) -> Self:
298 return self.filter(status=JobResultStatuses.SUCCESSFUL)
299
300 def cancelled(self) -> Self:
301 return self.filter(status=JobResultStatuses.CANCELLED)
302
303 def lost(self) -> Self:
304 return self.filter(status=JobResultStatuses.LOST)
305
306 def errored(self) -> Self:
307 return self.filter(status=JobResultStatuses.ERRORED)
308
309 def retried(self) -> Self:
310 return self.filter(
311 models.Q(retry_job_request_uuid__isnull=False)
312 | models.Q(retry_attempt__gt=0)
313 )
314
315 def failed(self) -> Self:
316 return self.filter(
317 status__in=[
318 JobResultStatuses.ERRORED,
319 JobResultStatuses.LOST,
320 JobResultStatuses.CANCELLED,
321 ]
322 )
323
324 def retryable(self) -> Self:
325 return self.failed().filter(
326 retry_job_request_uuid__isnull=True,
327 retries__gt=0,
328 retry_attempt__lt=models.F("retries"),
329 )
330
331 def retry_failed_jobs(self) -> None:
332 for result in self.retryable():
333 try:
334 result.retry_job()
335 except Exception:
336 # If something went wrong (like a job class being deleted)
337 # then we immediately increment the retry_attempt on the existing obj
338 # so it won't retry forever.
339 logger.exception(
340 "Failed to retry job (incrementing retry_attempt): %s", result
341 )
342 result.retry_attempt += 1
343 result.save(update_fields=["retry_attempt"])
344
345
346class JobResultStatuses(models.TextChoices):
347 SUCCESSFUL = "SUCCESSFUL", "Successful"
348 ERRORED = "ERRORED", "Errored" # Threw an error
349 CANCELLED = "CANCELLED", "Cancelled" # Cancelled (probably by deploy)
350 LOST = (
351 "LOST",
352 "Lost",
353 ) # Either process lost, lost in transit, or otherwise never finished
354
355
356@models.register_model
357class JobResult(models.Model):
358 """
359 All in-process and completed jobs are stored in this table.
360 """
361
362 uuid = models.UUIDField(default=uuid.uuid4)
363 created_at = models.DateTimeField(auto_now_add=True)
364
365 # From the Job
366 job_process_uuid = models.UUIDField()
367 started_at = models.DateTimeField(required=False, allow_null=True)
368 ended_at = models.DateTimeField(required=False, allow_null=True)
369 error = models.TextField(required=False)
370 status = models.CharField(
371 max_length=20,
372 choices=JobResultStatuses.choices,
373 )
374
375 # From the JobRequest
376 job_request_uuid = models.UUIDField()
377 job_class = models.CharField(max_length=255)
378 parameters = models.JSONField(required=False, allow_null=True)
379 priority = models.IntegerField(default=0)
380 source = models.TextField(required=False)
381 queue = models.CharField(default="default", max_length=255)
382 retries = models.IntegerField(default=0)
383 retry_attempt = models.IntegerField(default=0)
384 unique_key = models.CharField(max_length=255, required=False)
385
386 # Retries
387 retry_job_request_uuid = models.UUIDField(required=False, allow_null=True)
388
389 # OpenTelemetry trace context
390 trace_id = models.CharField(max_length=34, required=False, allow_null=True)
391 span_id = models.CharField(max_length=18, required=False, allow_null=True)
392
393 query = JobResultQuerySet()
394
395 model_options = models.Options(
396 ordering=["-created_at"],
397 indexes=[
398 models.Index(fields=["created_at"]),
399 models.Index(fields=["job_process_uuid"]),
400 models.Index(fields=["started_at"]),
401 models.Index(fields=["ended_at"]),
402 models.Index(fields=["status"]),
403 models.Index(fields=["job_request_uuid"]),
404 models.Index(fields=["job_class"]),
405 models.Index(fields=["queue"]),
406 models.Index(fields=["trace_id"]),
407 ],
408 constraints=[
409 models.UniqueConstraint(
410 fields=["uuid"], name="plainjobs_jobresult_unique_uuid"
411 ),
412 ],
413 )
414
415 def retry_job(self, delay: int | None = None) -> JobRequest:
416 retry_attempt = self.retry_attempt + 1
417 job = jobs_registry.load_job(self.job_class, self.parameters)
418 retry_delay = delay or job.get_retry_delay(retry_attempt)
419
420 with transaction.atomic():
421 result = job.run_in_worker(
422 # Pass most of what we know through so it stays consistent
423 queue=self.queue,
424 delay=retry_delay,
425 priority=self.priority,
426 retries=self.retries,
427 retry_attempt=retry_attempt,
428 # Unique key could be passed also?
429 )
430
431 # TODO it is actually possible that result is a list
432 # of pending jobs, which would need to be handled...
433 # Right now it will throw an exception which could be caught by retry_failed_jobs.
434
435 self.retry_job_request_uuid = result.uuid # type: ignore
436 self.save(update_fields=["retry_job_request_uuid"])
437
438 return result # type: ignore