1from __future__ import annotations
2
3import datetime
4import time
5import traceback
6from typing import TYPE_CHECKING, Any, Self
7from uuid import UUID
8
9from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
10 MESSAGING_CONSUMER_GROUP_NAME,
11 MESSAGING_MESSAGE_ID,
12 MESSAGING_OPERATION_NAME,
13)
14from opentelemetry.trace import Link, SpanContext, SpanKind, TraceFlags
15
16from plain import postgres
17from plain.logs import get_framework_logger
18from plain.postgres import transaction, types
19from plain.postgres.expressions import F
20from plain.runtime import settings
21from plain.utils import timezone
22
23from .exceptions import DeferJob
24from .otel import (
25 operation_duration_histogram,
26 process_metric_attributes,
27 queue_wait_duration_histogram,
28 record_consumed,
29 record_span_error,
30 tracer,
31)
32from .registry import jobs_registry
33
34if TYPE_CHECKING:
35 from .jobs import Job
36
37__all__ = [
38 "JobRequest",
39 "JobProcess",
40 "JobResult",
41 "JobResultStatuses",
42 "WorkerHeartbeat",
43]
44
45logger = get_framework_logger()
46
47
48class JobRequestQuerySet(postgres.QuerySet["JobRequest"]):
49 def ready_to_run(self) -> Self:
50 """JobRequests with no scheduling constraint or whose `start_at` is past."""
51 return self.filter(
52 postgres.Q(start_at__isnull=True) | postgres.Q(start_at__lte=timezone.now())
53 )
54
55 def scheduled(self) -> Self:
56 """JobRequests scheduled to start in the future."""
57 return self.filter(start_at__gt=timezone.now())
58
59
60@postgres.register_model
61class JobRequest(postgres.Model):
62 """
63 Keep all pending job requests in a single table.
64 """
65
66 created_at: datetime.datetime = types.DateTimeField(create_now=True)
67 uuid: UUID = types.UUIDField(generate=True)
68
69 job_class: str = types.TextField(max_length=255)
70 parameters: dict[str, Any] | None = types.JSONField(required=False, allow_null=True)
71 priority: int = types.SmallIntegerField(default=0)
72 source: str = types.TextField(required=False)
73 queue: str = types.TextField(default="default", max_length=255)
74
75 retries: int = types.SmallIntegerField(default=0)
76 retry_attempt: int = types.SmallIntegerField(default=0)
77
78 concurrency_key: str = types.TextField(max_length=255, required=False)
79
80 start_at: datetime.datetime | None = types.DateTimeField(
81 required=False, allow_null=True
82 )
83
84 # OpenTelemetry trace context
85 trace_id: str | None = types.TextField(
86 max_length=34, required=False, allow_null=True
87 )
88 span_id: str | None = types.TextField(
89 max_length=18, required=False, allow_null=True
90 )
91
92 # expires_at = postgres.DateTimeField(required=False, allow_null=True)
93
94 query: JobRequestQuerySet = JobRequestQuerySet()
95
96 model_options = postgres.Options(
97 ordering=["-priority", "-created_at"],
98 indexes=[
99 postgres.Index(
100 name="plainjobs_jobrequest_priority_idx", fields=["priority"]
101 ),
102 postgres.Index(
103 name="plainjobs_jobrequest_created_at_idx", fields=["created_at"]
104 ),
105 postgres.Index(name="plainjobs_jobrequest_queue_idx", fields=["queue"]),
106 postgres.Index(
107 name="plainjobs_jobrequest_start_at_idx", fields=["start_at"]
108 ),
109 postgres.Index(
110 name="plainjobs_jobrequest_concurrency_key_idx",
111 fields=["concurrency_key"],
112 ),
113 postgres.Index(
114 name="plainjobs_jobrequest_trace_id_idx", fields=["trace_id"]
115 ),
116 # Used for job grouping queries
117 postgres.Index(
118 name="job_request_concurrency_key",
119 fields=["job_class", "concurrency_key"],
120 ),
121 ],
122 constraints=[
123 postgres.UniqueConstraint(
124 fields=["uuid"], name="plainjobs_jobrequest_unique_uuid"
125 ),
126 ],
127 )
128
129 def __str__(self) -> str:
130 return f"{self.job_class} [{self.uuid}]"
131
132 def convert_to_job_process(self, *, worker_id: UUID) -> JobProcess:
133 """
134 JobRequests are the pending jobs that are waiting to be executed.
135 We immediately convert them to JobProcess when they are picked up.
136
137 worker_id stamps ownership: rescue_stale_workers uses it to find
138 which jobs belonged to a worker whose heartbeat went stale. Required —
139 every JobProcess has an owning worker, and the NOT NULL column
140 constraint is what stops pre-heartbeat workers from inserting
141 unrescuable rows during a rolling upgrade.
142 """
143 with transaction.atomic():
144 result = JobProcess.query.create(
145 job_request_uuid=self.uuid,
146 requested_at=self.created_at,
147 job_class=self.job_class,
148 parameters=self.parameters,
149 priority=self.priority,
150 source=self.source,
151 queue=self.queue,
152 retries=self.retries,
153 retry_attempt=self.retry_attempt,
154 concurrency_key=self.concurrency_key,
155 trace_id=self.trace_id,
156 span_id=self.span_id,
157 worker_id=worker_id,
158 )
159
160 # Delete the pending JobRequest now
161 self.delete()
162
163 return result
164
165
166class JobQuerySet(postgres.QuerySet["JobProcess"]):
167 def running(self) -> Self:
168 return self.filter(started_at__isnull=False)
169
170 def waiting(self) -> Self:
171 return self.filter(started_at__isnull=True)
172
173
174@postgres.register_model
175class JobProcess(postgres.Model):
176 """
177 All active jobs are stored in this table.
178 """
179
180 uuid: UUID = types.UUIDField(generate=True)
181 created_at: datetime.datetime = types.DateTimeField(create_now=True)
182 started_at: datetime.datetime | None = types.DateTimeField(
183 required=False, allow_null=True
184 )
185
186 # From the JobRequest
187 job_request_uuid: UUID = types.UUIDField()
188 requested_at: datetime.datetime | None = types.DateTimeField(
189 required=False, allow_null=True
190 )
191 job_class: str = types.TextField(max_length=255)
192 parameters: dict[str, Any] | None = types.JSONField(required=False, allow_null=True)
193 priority: int = types.SmallIntegerField(default=0)
194 source: str = types.TextField(required=False)
195 queue: str = types.TextField(default="default", max_length=255)
196 retries: int = types.SmallIntegerField(default=0)
197 retry_attempt: int = types.SmallIntegerField(default=0)
198 concurrency_key: str = types.TextField(max_length=255, required=False)
199
200 # OpenTelemetry trace context
201 trace_id: str | None = types.TextField(
202 max_length=34, required=False, allow_null=True
203 )
204 span_id: str | None = types.TextField(
205 max_length=18, required=False, allow_null=True
206 )
207
208 worker_id: UUID = types.UUIDField()
209
210 query: JobQuerySet = JobQuerySet()
211
212 model_options = postgres.Options(
213 ordering=["-created_at"],
214 indexes=[
215 postgres.Index(
216 name="plainjobs_jobprocess_created_at_idx", fields=["created_at"]
217 ),
218 postgres.Index(name="plainjobs_jobprocess_queue_idx", fields=["queue"]),
219 postgres.Index(
220 name="plainjobs_jobprocess_concurrency_key_idx",
221 fields=["concurrency_key"],
222 ),
223 postgres.Index(
224 name="plainjobs_jobprocess_started_at_idx", fields=["started_at"]
225 ),
226 postgres.Index(
227 name="plainjobs_jobprocess_job_request_uuid_idx",
228 fields=["job_request_uuid"],
229 ),
230 postgres.Index(
231 name="plainjobs_jobprocess_trace_id_idx", fields=["trace_id"]
232 ),
233 postgres.Index(
234 name="plainjobs_jobprocess_worker_id_idx", fields=["worker_id"]
235 ),
236 # Used for job grouping queries
237 postgres.Index(
238 name="job_concurrency_key",
239 fields=["job_class", "concurrency_key"],
240 ),
241 ],
242 constraints=[
243 postgres.UniqueConstraint(
244 fields=["uuid"], name="plainjobs_job_unique_uuid"
245 ),
246 ],
247 )
248
249 def revert_to_job_request(self) -> JobRequest:
250 """Undo convert_to_job_process — put the job back in the request queue."""
251 with transaction.atomic():
252 job_request = JobRequest.query.create(
253 uuid=self.job_request_uuid,
254 job_class=self.job_class,
255 parameters=self.parameters,
256 priority=self.priority,
257 source=self.source,
258 queue=self.queue,
259 retries=self.retries,
260 retry_attempt=self.retry_attempt,
261 concurrency_key=self.concurrency_key,
262 trace_id=self.trace_id,
263 span_id=self.span_id,
264 )
265 self.delete()
266 return job_request
267
268 def run(self) -> JobResult:
269 links = []
270 if self.trace_id and self.span_id:
271 try:
272 links.append(
273 Link(
274 SpanContext(
275 trace_id=int(self.trace_id, 16),
276 span_id=int(self.span_id, 16),
277 is_remote=True,
278 trace_flags=TraceFlags(TraceFlags.SAMPLED),
279 )
280 )
281 )
282 except (ValueError, TypeError):
283 logger.warning(
284 "Invalid trace context for job",
285 extra={"job_uuid": self.uuid},
286 )
287
288 metric_attributes: dict[str, Any] = process_metric_attributes(
289 self.queue, self.job_class
290 )
291 start_time = time.perf_counter()
292 try:
293 with tracer.start_as_current_span(
294 f"process {self.queue}",
295 kind=SpanKind.CONSUMER,
296 attributes={
297 **metric_attributes,
298 MESSAGING_OPERATION_NAME: "process",
299 MESSAGING_MESSAGE_ID: str(self.uuid),
300 MESSAGING_CONSUMER_GROUP_NAME: self.queue,
301 },
302 links=links,
303 ) as span:
304 # This is how we know it has been picked up
305 self.started_at = timezone.now()
306 self.save(update_fields=["started_at"])
307
308 if self.requested_at:
309 queue_wait = (self.started_at - self.requested_at).total_seconds()
310 queue_wait_duration_histogram.record(queue_wait, metric_attributes)
311
312 try:
313 job = jobs_registry.load_job(self.job_class, self.parameters)
314 job.job_process = self
315
316 try:
317 job.run()
318 except DeferJob as e:
319 # Job deferred - not an error, log at INFO level
320 logger.info(
321 "Job deferred",
322 extra={
323 "delay": e.delay,
324 "increment_retries": e.increment_retries,
325 "job_class": self.job_class,
326 "job_process_uuid": self.uuid,
327 },
328 )
329 result = self.defer(job=job, defer_exception=e)
330 if result.retry_job_request_uuid is None:
331 # Re-enqueue was blocked by should_enqueue() —
332 # either the default uniqueness rule (a peer
333 # exists) or a user override (rate limit, custom
334 # rule). Same treatment as the initial-enqueue
335 # path's `job.enqueue.skipped`: not an error,
336 # just visibility on the consumer span.
337 span.set_attribute("plain.jobs.defer.skipped", True)
338 return result
339
340 return self.convert_to_result(status=JobResultStatuses.SUCCESSFUL)
341
342 except Exception as e:
343 # Note: if a rescuer already wrote JobResult(LOST) for this
344 # row (heartbeat went stale during a long job, then the job
345 # actually finished), the convert_to_result below trips the
346 # unique constraint on job_process_uuid and produces a
347 # second log line. Rare; correct outcome; not worth
348 # pre-checking on every successful job.
349 logger.exception(e)
350 error_type = record_span_error(span, e, metric_attributes)
351 return self.convert_to_result(
352 status=JobResultStatuses.ERRORED,
353 error="".join(traceback.format_tb(e.__traceback__)),
354 error_type=error_type,
355 )
356 finally:
357 duration = time.perf_counter() - start_time
358 operation_duration_histogram.record(duration, metric_attributes)
359
360 def defer(self, *, job: Job, defer_exception: DeferJob) -> JobResult:
361 """Defer this job by re-enqueueing it for later execution.
362
363 Atomically deletes the JobProcess, re-enqueues the job, and creates
364 a JobResult. The concurrency slot is released before re-enqueue so
365 the new request's own `should_enqueue()` check can pass.
366
367 If `should_enqueue()` blocks the re-enqueue, the framework honors
368 that signal — same convention as `run_in_worker()` and `retry_job()`,
369 which both return `None` silently in the same situation. The
370 JobResult is still `DEFERRED` but `retry_job_request_uuid` is
371 `None`, the error message records that the re-enqueue was skipped,
372 and the caller stamps `plain.jobs.defer.skipped=True` on the
373 consumer span so this case is queryable in APM without surfacing
374 as an exception.
375 """
376 # Calculate new retry_attempt based on increment_retries
377 retry_attempt = (
378 self.retry_attempt + 1
379 if defer_exception.increment_retries
380 else self.retry_attempt
381 )
382
383 with transaction.atomic():
384 # 1. Save JobProcess state and delete (releases concurrency slot)
385 job_process_uuid = self.uuid
386 job_request_uuid = self.job_request_uuid
387 requested_at = self.requested_at
388 started_at = self.started_at
389 self.delete()
390
391 # 2. Re-enqueue job (concurrency check can now pass)
392 new_job_request = job.run_in_worker(
393 queue=self.queue,
394 delay=defer_exception.delay,
395 priority=self.priority,
396 retries=self.retries,
397 retry_attempt=retry_attempt,
398 concurrency_key=self.concurrency_key,
399 )
400
401 if new_job_request is None:
402 error = (
403 f"Deferred for {defer_exception.delay} seconds "
404 f"(re-enqueue skipped: should_enqueue() returned False "
405 f"for concurrency_key '{self.concurrency_key}')"
406 )
407 retry_job_request_uuid = None
408 else:
409 error = f"Deferred for {defer_exception.delay} seconds"
410 retry_job_request_uuid = new_job_request.uuid
411
412 # 3. Create JobResult (linking to new request if one was created)
413 result = JobResult.query.create(
414 ended_at=timezone.now(),
415 error=error,
416 status=JobResultStatuses.DEFERRED,
417 retry_job_request_uuid=retry_job_request_uuid,
418 # From the JobProcess
419 job_process_uuid=job_process_uuid,
420 started_at=started_at,
421 # From the JobRequest
422 job_request_uuid=job_request_uuid,
423 requested_at=requested_at,
424 job_class=self.job_class,
425 parameters=self.parameters,
426 priority=self.priority,
427 source=self.source,
428 queue=self.queue,
429 retries=self.retries,
430 retry_attempt=self.retry_attempt,
431 concurrency_key=self.concurrency_key,
432 trace_id=self.trace_id,
433 span_id=self.span_id,
434 )
435
436 # Counter ticks for the DEFERRED outcome too — defer() bypasses
437 # convert_to_result, so without this the deferred path would not
438 # show up in the consumed counter.
439 record_consumed(result)
440 return result
441
442 def convert_to_result(
443 self,
444 *,
445 status: str,
446 error: str = "",
447 error_type: str | None = None,
448 fire_hook: bool = True,
449 ) -> JobResult:
450 """
451 Convert this JobProcess to a JobResult.
452
453 error_type, when supplied, is the OTel-style exception name (matching
454 the spec's `error.type` attribute). It rides along to the consumed
455 counter so dashboards can group ERRORED jobs by exception class. Only
456 the live exception-driven paths supply it — rescue (LOST) and direct
457 cancellations have no exception object to derive it from.
458
459 fire_hook controls whether on_aborted dispatches synchronously. The
460 rescue path passes fire_hook=False so it can dispatch hooks AFTER its
461 outer transaction commits — otherwise a hook DB error would mark the
462 connection for rollback even though dispatch_aborted_hook catches the
463 exception, poisoning the rescue commit.
464 """
465 with transaction.atomic():
466 result = JobResult.query.create(
467 ended_at=timezone.now(),
468 error=error,
469 status=status,
470 # From the JobProcess
471 job_process_uuid=self.uuid,
472 started_at=self.started_at,
473 # From the JobRequest
474 job_request_uuid=self.job_request_uuid,
475 requested_at=self.requested_at,
476 job_class=self.job_class,
477 parameters=self.parameters,
478 priority=self.priority,
479 source=self.source,
480 queue=self.queue,
481 retries=self.retries,
482 retry_attempt=self.retry_attempt,
483 concurrency_key=self.concurrency_key,
484 trace_id=self.trace_id,
485 span_id=self.span_id,
486 )
487
488 # Delete the JobProcess now
489 self.delete()
490
491 # Counter ticks for every terminal status — the live SUCCESSFUL/ERRORED
492 # paths plus the LOST/CANCELLED paths that don't flow through
493 # JobProcess.run()'s finally. The outcome attribute lets dashboards
494 # split throughput by final status; error_type is forwarded for ERRORED
495 # jobs caught by the live path.
496 record_consumed(result, error_type=error_type)
497
498 # Fire Job.on_aborted outside the atomic block so a raise in user code
499 # can't roll back the framework's bookkeeping. Only for terminal
500 # statuses run() couldn't observe.
501 if fire_hook and status in (
502 JobResultStatuses.LOST,
503 JobResultStatuses.CANCELLED,
504 ):
505 result.dispatch_aborted_hook()
506
507 return result
508
509 def as_json(self) -> dict[str, str | int | dict | None]:
510 """A JSON-compatible representation to make it easier to reference in Sentry or logging"""
511 return {
512 "uuid": str(self.uuid),
513 "created_at": self.created_at.isoformat(),
514 "started_at": self.started_at.isoformat() if self.started_at else None,
515 "job_request_uuid": str(self.job_request_uuid),
516 "job_class": self.job_class,
517 "parameters": self.parameters,
518 "priority": self.priority,
519 "source": self.source,
520 "queue": self.queue,
521 "retries": self.retries,
522 "retry_attempt": self.retry_attempt,
523 "concurrency_key": self.concurrency_key,
524 "trace_id": self.trace_id,
525 "span_id": self.span_id,
526 }
527
528
529class JobResultQuerySet(postgres.QuerySet["JobResult"]):
530 def successful(self) -> Self:
531 return self.filter(status=JobResultStatuses.SUCCESSFUL)
532
533 def cancelled(self) -> Self:
534 return self.filter(status=JobResultStatuses.CANCELLED)
535
536 def lost(self) -> Self:
537 return self.filter(status=JobResultStatuses.LOST)
538
539 def errored(self) -> Self:
540 return self.filter(status=JobResultStatuses.ERRORED)
541
542 def retried(self) -> Self:
543 return self.filter(
544 postgres.Q(retry_job_request_uuid__isnull=False)
545 | postgres.Q(retry_attempt__gt=0)
546 )
547
548 def failed(self) -> Self:
549 return self.filter(
550 status__in=[
551 JobResultStatuses.ERRORED,
552 JobResultStatuses.LOST,
553 JobResultStatuses.CANCELLED,
554 ]
555 )
556
557 def retryable(self) -> Self:
558 return self.failed().filter(
559 retry_job_request_uuid__isnull=True,
560 retries__gt=0,
561 retry_attempt__lt=F("retries"),
562 )
563
564 def retry_failed_jobs(self) -> None:
565 for result in self.retryable():
566 try:
567 result.retry_job()
568 except Exception:
569 # If something went wrong (like a job class being deleted)
570 # then we immediately increment the retry_attempt on the existing obj
571 # so it won't retry forever.
572 logger.exception(
573 "Failed to retry job, incrementing retry_attempt",
574 extra={"result": str(result)},
575 )
576 result.retry_attempt += 1
577 result.save(update_fields=["retry_attempt"])
578
579
580class JobResultStatuses(postgres.TextChoices):
581 SUCCESSFUL = "SUCCESSFUL", "Successful"
582 ERRORED = "ERRORED", "Errored" # Threw an error
583 CANCELLED = "CANCELLED", "Cancelled" # Interrupted by shutdown/deploy
584 DEFERRED = "DEFERRED", "Deferred" # Intentionally rescheduled (will run again)
585 LOST = (
586 "LOST",
587 "Lost",
588 ) # Either process lost, lost in transit, or otherwise never finished
589
590
591@postgres.register_model
592class JobResult(postgres.Model):
593 """
594 All in-process and completed jobs are stored in this table.
595 """
596
597 uuid: UUID = types.UUIDField(generate=True)
598 created_at: datetime.datetime = types.DateTimeField(create_now=True)
599
600 # From the Job
601 job_process_uuid: UUID = types.UUIDField()
602 started_at: datetime.datetime | None = types.DateTimeField(
603 required=False, allow_null=True
604 )
605 ended_at: datetime.datetime | None = types.DateTimeField(
606 required=False, allow_null=True
607 )
608 error: str = types.TextField(required=False)
609 status: str = types.TextField(
610 max_length=20,
611 choices=JobResultStatuses.choices,
612 )
613
614 # From the JobRequest
615 job_request_uuid: UUID = types.UUIDField()
616 requested_at: datetime.datetime | None = types.DateTimeField(
617 required=False, allow_null=True
618 )
619 job_class: str = types.TextField(max_length=255)
620 parameters: dict[str, Any] | None = types.JSONField(required=False, allow_null=True)
621 priority: int = types.SmallIntegerField(default=0)
622 source: str = types.TextField(required=False)
623 queue: str = types.TextField(default="default", max_length=255)
624 retries: int = types.SmallIntegerField(default=0)
625 retry_attempt: int = types.SmallIntegerField(default=0)
626 concurrency_key: str = types.TextField(max_length=255, required=False)
627
628 # Retries
629 retry_job_request_uuid: UUID | None = types.UUIDField(
630 required=False, allow_null=True
631 )
632
633 # OpenTelemetry trace context
634 trace_id: str | None = types.TextField(
635 max_length=34, required=False, allow_null=True
636 )
637 span_id: str | None = types.TextField(
638 max_length=18, required=False, allow_null=True
639 )
640
641 query: JobResultQuerySet = JobResultQuerySet()
642
643 model_options = postgres.Options(
644 ordering=["-created_at"],
645 indexes=[
646 postgres.Index(
647 name="plainjobs_jobresult_created_at_idx", fields=["created_at"]
648 ),
649 postgres.Index(
650 name="plainjobs_jobresult_started_at_idx", fields=["started_at"]
651 ),
652 postgres.Index(
653 name="plainjobs_jobresult_ended_at_idx", fields=["ended_at"]
654 ),
655 postgres.Index(name="plainjobs_jobresult_status_idx", fields=["status"]),
656 postgres.Index(
657 name="plainjobs_jobresult_job_request_uuid_idx",
658 fields=["job_request_uuid"],
659 ),
660 postgres.Index(
661 name="plainjobs_jobresult_job_class_idx", fields=["job_class"]
662 ),
663 postgres.Index(name="plainjobs_jobresult_queue_idx", fields=["queue"]),
664 postgres.Index(
665 name="plainjobs_jobresult_trace_id_idx", fields=["trace_id"]
666 ),
667 ],
668 constraints=[
669 postgres.UniqueConstraint(
670 fields=["uuid"], name="plainjobs_jobresult_unique_uuid"
671 ),
672 # One JobProcess produces exactly one JobResult. Guards the
673 # rescue-vs-late-finish race: if our heartbeat goes stale during a
674 # DB outage, a peer rescuer creates JobResult(LOST) for our
675 # JobProcess. When our subprocess eventually finishes and calls
676 # convert_to_result on the now-deleted JobProcess, the second
677 # insert hits this constraint and is swallowed by process_job's
678 # outer except — instead of silently producing two divergent
679 # results for the same run.
680 postgres.UniqueConstraint(
681 fields=["job_process_uuid"],
682 name="plainjobs_jobresult_unique_job_process_uuid",
683 ),
684 ],
685 )
686
687 def dispatch_aborted_hook(self) -> None:
688 """
689 Load the Job class and call its on_aborted hook with this result.
690
691 Errors loading the class or running the hook are logged but suppressed
692 so JobProcess → JobResult bookkeeping is never blocked by user code or
693 stale registrations.
694 """
695 try:
696 job = jobs_registry.load_job(self.job_class, self.parameters)
697 except Exception:
698 logger.exception(
699 "Failed to load job for on_aborted hook",
700 extra={"job_class": self.job_class},
701 )
702 return
703
704 try:
705 job.on_aborted(self)
706 except Exception:
707 logger.exception(
708 "Job.on_aborted raised",
709 extra={"job_class": self.job_class},
710 )
711
712 def retry_job(self, delay: int | None = None) -> JobRequest | None:
713 retry_attempt = self.retry_attempt + 1
714 job = jobs_registry.load_job(self.job_class, self.parameters)
715
716 if delay is None:
717 retry_delay = job.calculate_retry_delay(retry_attempt)
718 else:
719 retry_delay = delay
720
721 with transaction.atomic():
722 result = job.run_in_worker(
723 # Pass most of what we know through so it stays consistent
724 queue=self.queue,
725 delay=retry_delay,
726 priority=self.priority,
727 retries=self.retries,
728 retry_attempt=retry_attempt,
729 concurrency_key=self.concurrency_key,
730 )
731 if result:
732 self.retry_job_request_uuid = result.uuid
733 self.save(update_fields=["retry_job_request_uuid"])
734 return result
735
736 return None
737
738
739@postgres.register_model
740class WorkerHeartbeat(postgres.Model):
741 """
742 A live registration row written by each worker process while it's running.
743
744 Workers create a row at startup, bump `last_heartbeat_at` periodically, and
745 delete it on clean shutdown. `rescue_stale_workers` finds rows whose
746 heartbeat is older than `JOBS_HEARTBEAT_TIMEOUT` and rescues their
747 in-flight jobs.
748 """
749
750 worker_id: UUID = types.UUIDField()
751 hostname: str = types.TextField(max_length=255)
752 pid: int = types.IntegerField()
753 queues: list[str] = types.JSONField()
754 started_at: datetime.datetime = types.DateTimeField(create_now=True)
755 last_heartbeat_at: datetime.datetime = types.DateTimeField()
756
757 model_options = postgres.Options(
758 ordering=["-last_heartbeat_at"],
759 indexes=[
760 postgres.Index(
761 name="plainjobs_workerheartbeat_last_heartbeat_at_idx",
762 fields=["last_heartbeat_at"],
763 ),
764 ],
765 constraints=[
766 # The unique constraint provides the worker_id lookup index.
767 postgres.UniqueConstraint(
768 fields=["worker_id"],
769 name="plainjobs_workerheartbeat_unique_worker_id",
770 ),
771 ],
772 )
773
774 def __str__(self) -> str:
775 return f"WorkerHeartbeat({self.worker_id} on {self.hostname}:{self.pid})"
776
777
778def heartbeat_cutoff() -> datetime.datetime:
779 """The timestamp before which a WorkerHeartbeat is considered stale.
780
781 Single source of truth — rescue, admin display, and OTel gauges all
782 consult this so they agree on which workers are alive.
783 """
784 return timezone.now() - datetime.timedelta(seconds=settings.JOBS_HEARTBEAT_TIMEOUT)
785
786
787def rescue_stale_workers() -> list[JobResult]:
788 """
789 Convert in-flight JobProcess rows from dead workers to JobResult(LOST).
790
791 A worker is dead when its WorkerHeartbeat is older than
792 JOBS_HEARTBEAT_TIMEOUT. Detection is heartbeat-based, not time-based, so
793 a long-running legitimate job is safe as long as its worker keeps
794 heartbeating.
795
796 Returns the JobResults whose on_aborted hook still needs to fire. The
797 caller dispatches them, interleaving heartbeat ticks if needed — a slow
798 or large batch of hooks would otherwise starve the calling worker's
799 heartbeat and trigger false-positive rescue from a peer.
800
801 This is a free function (not a queryset method) because rescue is
802 inherently global: filtering would let one rescuer claim a dead heartbeat
803 without converting all of that worker's jobs, stranding the rest forever.
804 """
805 cutoff = heartbeat_cutoff()
806 dead_workers = WorkerHeartbeat.query.filter(last_heartbeat_at__lt=cutoff)
807
808 pending_hooks: list[JobResult] = []
809 for worker in dead_workers:
810 # Per-worker rescue is atomic: the heartbeat delete (claim) and every
811 # JobProcess→JobResult conversion either all commit, or all roll
812 # back. Without this, a mid-loop failure would leave the heartbeat
813 # deleted but some JobProcesses still stamped with the dead worker_id
814 # — stranded forever with no heartbeat to match.
815 #
816 # on_aborted hooks are deferred: dispatching them inside the atomic
817 # block would let a hook's DB error mark the connection for rollback
818 # (even though dispatch_aborted_hook swallows the exception),
819 # aborting the rescue commit.
820 worker_hooks: list[JobResult] = []
821 try:
822 with transaction.atomic():
823 # Atomic claim. If another rescuer also saw this dead
824 # heartbeat, only one of us deletes a row. The loser sees 0
825 # affected and skips.
826 claimed = WorkerHeartbeat.query.filter(
827 worker_id=worker.worker_id,
828 last_heartbeat_at__lt=cutoff,
829 ).delete()
830 if not claimed:
831 continue
832
833 # list() materializes the queryset before the loop body
834 # starts deleting rows, so iteration can't skip entries.
835 for job in list(JobProcess.query.filter(worker_id=worker.worker_id)):
836 result = job.convert_to_result(
837 status=JobResultStatuses.LOST, fire_hook=False
838 )
839 worker_hooks.append(result)
840 except Exception:
841 # One dead worker's failure shouldn't abort rescue of others. The
842 # next rescue tick will retry this worker (heartbeat was rolled
843 # back, so it's still discoverable).
844 logger.exception(
845 "Failed to rescue jobs for dead worker",
846 extra={"worker_id": str(worker.worker_id)},
847 )
848 continue
849
850 # Rescue committed. Hand hooks back for the caller to dispatch.
851 pending_hooks.extend(worker_hooks)
852
853 return pending_hooks