v0.146.0
  1from __future__ import annotations
  2
  3import datetime
  4import time
  5import traceback
  6from typing import TYPE_CHECKING, Any, Self
  7from uuid import UUID
  8
  9from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
 10    MESSAGING_CONSUMER_GROUP_NAME,
 11    MESSAGING_MESSAGE_ID,
 12    MESSAGING_OPERATION_NAME,
 13)
 14from opentelemetry.trace import Link, SpanContext, SpanKind, TraceFlags
 15
 16from plain import postgres
 17from plain.logs import get_framework_logger
 18from plain.postgres import transaction, types
 19from plain.postgres.expressions import F
 20from plain.runtime import settings
 21from plain.utils import timezone
 22
 23from .exceptions import DeferJob
 24from .otel import (
 25    operation_duration_histogram,
 26    process_metric_attributes,
 27    queue_wait_duration_histogram,
 28    record_consumed,
 29    record_span_error,
 30    tracer,
 31)
 32from .registry import jobs_registry
 33
 34if TYPE_CHECKING:
 35    from .jobs import Job
 36
 37__all__ = [
 38    "JobRequest",
 39    "JobProcess",
 40    "JobResult",
 41    "JobResultStatuses",
 42    "WorkerHeartbeat",
 43]
 44
 45logger = get_framework_logger()
 46
 47
 48class JobRequestQuerySet(postgres.QuerySet["JobRequest"]):
 49    def ready_to_run(self) -> Self:
 50        """JobRequests with no scheduling constraint or whose `start_at` is past."""
 51        return self.filter(
 52            postgres.Q(start_at__isnull=True) | postgres.Q(start_at__lte=timezone.now())
 53        )
 54
 55    def scheduled(self) -> Self:
 56        """JobRequests scheduled to start in the future."""
 57        return self.filter(start_at__gt=timezone.now())
 58
 59
 60@postgres.register_model
 61class JobRequest(postgres.Model):
 62    """
 63    Keep all pending job requests in a single table.
 64    """
 65
 66    created_at: datetime.datetime = types.DateTimeField(create_now=True)
 67    uuid: UUID = types.UUIDField(generate=True)
 68
 69    job_class: str = types.TextField(max_length=255)
 70    parameters: dict[str, Any] | None = types.JSONField(required=False, allow_null=True)
 71    priority: int = types.SmallIntegerField(default=0)
 72    source: str = types.TextField(required=False)
 73    queue: str = types.TextField(default="default", max_length=255)
 74
 75    retries: int = types.SmallIntegerField(default=0)
 76    retry_attempt: int = types.SmallIntegerField(default=0)
 77
 78    concurrency_key: str = types.TextField(max_length=255, required=False)
 79
 80    start_at: datetime.datetime | None = types.DateTimeField(
 81        required=False, allow_null=True
 82    )
 83
 84    # OpenTelemetry trace context
 85    trace_id: str | None = types.TextField(
 86        max_length=34, required=False, allow_null=True
 87    )
 88    span_id: str | None = types.TextField(
 89        max_length=18, required=False, allow_null=True
 90    )
 91
 92    # expires_at = postgres.DateTimeField(required=False, allow_null=True)
 93
 94    query: JobRequestQuerySet = JobRequestQuerySet()
 95
 96    model_options = postgres.Options(
 97        ordering=["-priority", "-created_at"],
 98        indexes=[
 99            postgres.Index(
100                name="plainjobs_jobrequest_priority_idx", fields=["priority"]
101            ),
102            postgres.Index(
103                name="plainjobs_jobrequest_created_at_idx", fields=["created_at"]
104            ),
105            postgres.Index(name="plainjobs_jobrequest_queue_idx", fields=["queue"]),
106            postgres.Index(
107                name="plainjobs_jobrequest_start_at_idx", fields=["start_at"]
108            ),
109            postgres.Index(
110                name="plainjobs_jobrequest_concurrency_key_idx",
111                fields=["concurrency_key"],
112            ),
113            postgres.Index(
114                name="plainjobs_jobrequest_trace_id_idx", fields=["trace_id"]
115            ),
116            # Used for job grouping queries
117            postgres.Index(
118                name="job_request_concurrency_key",
119                fields=["job_class", "concurrency_key"],
120            ),
121        ],
122        constraints=[
123            postgres.UniqueConstraint(
124                fields=["uuid"], name="plainjobs_jobrequest_unique_uuid"
125            ),
126        ],
127    )
128
129    def __str__(self) -> str:
130        return f"{self.job_class} [{self.uuid}]"
131
132    def convert_to_job_process(self, *, worker_id: UUID) -> JobProcess:
133        """
134        JobRequests are the pending jobs that are waiting to be executed.
135        We immediately convert them to JobProcess when they are picked up.
136
137        worker_id stamps ownership: rescue_stale_workers uses it to find
138        which jobs belonged to a worker whose heartbeat went stale. Required —
139        every JobProcess has an owning worker, and the NOT NULL column
140        constraint is what stops pre-heartbeat workers from inserting
141        unrescuable rows during a rolling upgrade.
142        """
143        with transaction.atomic():
144            result = JobProcess.query.create(
145                job_request_uuid=self.uuid,
146                requested_at=self.created_at,
147                job_class=self.job_class,
148                parameters=self.parameters,
149                priority=self.priority,
150                source=self.source,
151                queue=self.queue,
152                retries=self.retries,
153                retry_attempt=self.retry_attempt,
154                concurrency_key=self.concurrency_key,
155                trace_id=self.trace_id,
156                span_id=self.span_id,
157                worker_id=worker_id,
158            )
159
160            # Delete the pending JobRequest now
161            self.delete()
162
163        return result
164
165
166class JobQuerySet(postgres.QuerySet["JobProcess"]):
167    def running(self) -> Self:
168        return self.filter(started_at__isnull=False)
169
170    def waiting(self) -> Self:
171        return self.filter(started_at__isnull=True)
172
173
174@postgres.register_model
175class JobProcess(postgres.Model):
176    """
177    All active jobs are stored in this table.
178    """
179
180    uuid: UUID = types.UUIDField(generate=True)
181    created_at: datetime.datetime = types.DateTimeField(create_now=True)
182    started_at: datetime.datetime | None = types.DateTimeField(
183        required=False, allow_null=True
184    )
185
186    # From the JobRequest
187    job_request_uuid: UUID = types.UUIDField()
188    requested_at: datetime.datetime | None = types.DateTimeField(
189        required=False, allow_null=True
190    )
191    job_class: str = types.TextField(max_length=255)
192    parameters: dict[str, Any] | None = types.JSONField(required=False, allow_null=True)
193    priority: int = types.SmallIntegerField(default=0)
194    source: str = types.TextField(required=False)
195    queue: str = types.TextField(default="default", max_length=255)
196    retries: int = types.SmallIntegerField(default=0)
197    retry_attempt: int = types.SmallIntegerField(default=0)
198    concurrency_key: str = types.TextField(max_length=255, required=False)
199
200    # OpenTelemetry trace context
201    trace_id: str | None = types.TextField(
202        max_length=34, required=False, allow_null=True
203    )
204    span_id: str | None = types.TextField(
205        max_length=18, required=False, allow_null=True
206    )
207
208    worker_id: UUID = types.UUIDField()
209
210    query: JobQuerySet = JobQuerySet()
211
212    model_options = postgres.Options(
213        ordering=["-created_at"],
214        indexes=[
215            postgres.Index(
216                name="plainjobs_jobprocess_created_at_idx", fields=["created_at"]
217            ),
218            postgres.Index(name="plainjobs_jobprocess_queue_idx", fields=["queue"]),
219            postgres.Index(
220                name="plainjobs_jobprocess_concurrency_key_idx",
221                fields=["concurrency_key"],
222            ),
223            postgres.Index(
224                name="plainjobs_jobprocess_started_at_idx", fields=["started_at"]
225            ),
226            postgres.Index(
227                name="plainjobs_jobprocess_job_request_uuid_idx",
228                fields=["job_request_uuid"],
229            ),
230            postgres.Index(
231                name="plainjobs_jobprocess_trace_id_idx", fields=["trace_id"]
232            ),
233            postgres.Index(
234                name="plainjobs_jobprocess_worker_id_idx", fields=["worker_id"]
235            ),
236            # Used for job grouping queries
237            postgres.Index(
238                name="job_concurrency_key",
239                fields=["job_class", "concurrency_key"],
240            ),
241        ],
242        constraints=[
243            postgres.UniqueConstraint(
244                fields=["uuid"], name="plainjobs_job_unique_uuid"
245            ),
246        ],
247    )
248
249    def revert_to_job_request(self) -> JobRequest:
250        """Undo convert_to_job_process — put the job back in the request queue."""
251        with transaction.atomic():
252            job_request = JobRequest.query.create(
253                uuid=self.job_request_uuid,
254                job_class=self.job_class,
255                parameters=self.parameters,
256                priority=self.priority,
257                source=self.source,
258                queue=self.queue,
259                retries=self.retries,
260                retry_attempt=self.retry_attempt,
261                concurrency_key=self.concurrency_key,
262                trace_id=self.trace_id,
263                span_id=self.span_id,
264            )
265            self.delete()
266        return job_request
267
268    def run(self) -> JobResult:
269        links = []
270        if self.trace_id and self.span_id:
271            try:
272                links.append(
273                    Link(
274                        SpanContext(
275                            trace_id=int(self.trace_id, 16),
276                            span_id=int(self.span_id, 16),
277                            is_remote=True,
278                            trace_flags=TraceFlags(TraceFlags.SAMPLED),
279                        )
280                    )
281                )
282            except (ValueError, TypeError):
283                logger.warning(
284                    "Invalid trace context for job",
285                    extra={"job_uuid": self.uuid},
286                )
287
288        metric_attributes: dict[str, Any] = process_metric_attributes(
289            self.queue, self.job_class
290        )
291        start_time = time.perf_counter()
292        try:
293            with tracer.start_as_current_span(
294                f"process {self.queue}",
295                kind=SpanKind.CONSUMER,
296                attributes={
297                    **metric_attributes,
298                    MESSAGING_OPERATION_NAME: "process",
299                    MESSAGING_MESSAGE_ID: str(self.uuid),
300                    MESSAGING_CONSUMER_GROUP_NAME: self.queue,
301                },
302                links=links,
303            ) as span:
304                # This is how we know it has been picked up
305                self.started_at = timezone.now()
306                self.save(update_fields=["started_at"])
307
308                if self.requested_at:
309                    queue_wait = (self.started_at - self.requested_at).total_seconds()
310                    queue_wait_duration_histogram.record(queue_wait, metric_attributes)
311
312                try:
313                    job = jobs_registry.load_job(self.job_class, self.parameters)
314                    job.job_process = self
315
316                    try:
317                        job.run()
318                    except DeferJob as e:
319                        # Job deferred - not an error, log at INFO level
320                        logger.info(
321                            "Job deferred",
322                            extra={
323                                "delay": e.delay,
324                                "increment_retries": e.increment_retries,
325                                "job_class": self.job_class,
326                                "job_process_uuid": self.uuid,
327                            },
328                        )
329                        result = self.defer(job=job, defer_exception=e)
330                        if result.retry_job_request_uuid is None:
331                            # Re-enqueue was blocked by should_enqueue() —
332                            # either the default uniqueness rule (a peer
333                            # exists) or a user override (rate limit, custom
334                            # rule). Same treatment as the initial-enqueue
335                            # path's `job.enqueue.skipped`: not an error,
336                            # just visibility on the consumer span.
337                            span.set_attribute("plain.jobs.defer.skipped", True)
338                        return result
339
340                    return self.convert_to_result(status=JobResultStatuses.SUCCESSFUL)
341
342                except Exception as e:
343                    # Note: if a rescuer already wrote JobResult(LOST) for this
344                    # row (heartbeat went stale during a long job, then the job
345                    # actually finished), the convert_to_result below trips the
346                    # unique constraint on job_process_uuid and produces a
347                    # second log line. Rare; correct outcome; not worth
348                    # pre-checking on every successful job.
349                    logger.exception(e)
350                    error_type = record_span_error(span, e, metric_attributes)
351                    return self.convert_to_result(
352                        status=JobResultStatuses.ERRORED,
353                        error="".join(traceback.format_tb(e.__traceback__)),
354                        error_type=error_type,
355                    )
356        finally:
357            duration = time.perf_counter() - start_time
358            operation_duration_histogram.record(duration, metric_attributes)
359
360    def defer(self, *, job: Job, defer_exception: DeferJob) -> JobResult:
361        """Defer this job by re-enqueueing it for later execution.
362
363        Atomically deletes the JobProcess, re-enqueues the job, and creates
364        a JobResult. The concurrency slot is released before re-enqueue so
365        the new request's own `should_enqueue()` check can pass.
366
367        If `should_enqueue()` blocks the re-enqueue, the framework honors
368        that signal — same convention as `run_in_worker()` and `retry_job()`,
369        which both return `None` silently in the same situation. The
370        JobResult is still `DEFERRED` but `retry_job_request_uuid` is
371        `None`, the error message records that the re-enqueue was skipped,
372        and the caller stamps `plain.jobs.defer.skipped=True` on the
373        consumer span so this case is queryable in APM without surfacing
374        as an exception.
375        """
376        # Calculate new retry_attempt based on increment_retries
377        retry_attempt = (
378            self.retry_attempt + 1
379            if defer_exception.increment_retries
380            else self.retry_attempt
381        )
382
383        with transaction.atomic():
384            # 1. Save JobProcess state and delete (releases concurrency slot)
385            job_process_uuid = self.uuid
386            job_request_uuid = self.job_request_uuid
387            requested_at = self.requested_at
388            started_at = self.started_at
389            self.delete()
390
391            # 2. Re-enqueue job (concurrency check can now pass)
392            new_job_request = job.run_in_worker(
393                queue=self.queue,
394                delay=defer_exception.delay,
395                priority=self.priority,
396                retries=self.retries,
397                retry_attempt=retry_attempt,
398                concurrency_key=self.concurrency_key,
399            )
400
401            if new_job_request is None:
402                error = (
403                    f"Deferred for {defer_exception.delay} seconds "
404                    f"(re-enqueue skipped: should_enqueue() returned False "
405                    f"for concurrency_key '{self.concurrency_key}')"
406                )
407                retry_job_request_uuid = None
408            else:
409                error = f"Deferred for {defer_exception.delay} seconds"
410                retry_job_request_uuid = new_job_request.uuid
411
412            # 3. Create JobResult (linking to new request if one was created)
413            result = JobResult.query.create(
414                ended_at=timezone.now(),
415                error=error,
416                status=JobResultStatuses.DEFERRED,
417                retry_job_request_uuid=retry_job_request_uuid,
418                # From the JobProcess
419                job_process_uuid=job_process_uuid,
420                started_at=started_at,
421                # From the JobRequest
422                job_request_uuid=job_request_uuid,
423                requested_at=requested_at,
424                job_class=self.job_class,
425                parameters=self.parameters,
426                priority=self.priority,
427                source=self.source,
428                queue=self.queue,
429                retries=self.retries,
430                retry_attempt=self.retry_attempt,
431                concurrency_key=self.concurrency_key,
432                trace_id=self.trace_id,
433                span_id=self.span_id,
434            )
435
436        # Counter ticks for the DEFERRED outcome too — defer() bypasses
437        # convert_to_result, so without this the deferred path would not
438        # show up in the consumed counter.
439        record_consumed(result)
440        return result
441
442    def convert_to_result(
443        self,
444        *,
445        status: str,
446        error: str = "",
447        error_type: str | None = None,
448        fire_hook: bool = True,
449    ) -> JobResult:
450        """
451        Convert this JobProcess to a JobResult.
452
453        error_type, when supplied, is the OTel-style exception name (matching
454        the spec's `error.type` attribute). It rides along to the consumed
455        counter so dashboards can group ERRORED jobs by exception class. Only
456        the live exception-driven paths supply it — rescue (LOST) and direct
457        cancellations have no exception object to derive it from.
458
459        fire_hook controls whether on_aborted dispatches synchronously. The
460        rescue path passes fire_hook=False so it can dispatch hooks AFTER its
461        outer transaction commits — otherwise a hook DB error would mark the
462        connection for rollback even though dispatch_aborted_hook catches the
463        exception, poisoning the rescue commit.
464        """
465        with transaction.atomic():
466            result = JobResult.query.create(
467                ended_at=timezone.now(),
468                error=error,
469                status=status,
470                # From the JobProcess
471                job_process_uuid=self.uuid,
472                started_at=self.started_at,
473                # From the JobRequest
474                job_request_uuid=self.job_request_uuid,
475                requested_at=self.requested_at,
476                job_class=self.job_class,
477                parameters=self.parameters,
478                priority=self.priority,
479                source=self.source,
480                queue=self.queue,
481                retries=self.retries,
482                retry_attempt=self.retry_attempt,
483                concurrency_key=self.concurrency_key,
484                trace_id=self.trace_id,
485                span_id=self.span_id,
486            )
487
488            # Delete the JobProcess now
489            self.delete()
490
491        # Counter ticks for every terminal status — the live SUCCESSFUL/ERRORED
492        # paths plus the LOST/CANCELLED paths that don't flow through
493        # JobProcess.run()'s finally. The outcome attribute lets dashboards
494        # split throughput by final status; error_type is forwarded for ERRORED
495        # jobs caught by the live path.
496        record_consumed(result, error_type=error_type)
497
498        # Fire Job.on_aborted outside the atomic block so a raise in user code
499        # can't roll back the framework's bookkeeping. Only for terminal
500        # statuses run() couldn't observe.
501        if fire_hook and status in (
502            JobResultStatuses.LOST,
503            JobResultStatuses.CANCELLED,
504        ):
505            result.dispatch_aborted_hook()
506
507        return result
508
509    def as_json(self) -> dict[str, str | int | dict | None]:
510        """A JSON-compatible representation to make it easier to reference in Sentry or logging"""
511        return {
512            "uuid": str(self.uuid),
513            "created_at": self.created_at.isoformat(),
514            "started_at": self.started_at.isoformat() if self.started_at else None,
515            "job_request_uuid": str(self.job_request_uuid),
516            "job_class": self.job_class,
517            "parameters": self.parameters,
518            "priority": self.priority,
519            "source": self.source,
520            "queue": self.queue,
521            "retries": self.retries,
522            "retry_attempt": self.retry_attempt,
523            "concurrency_key": self.concurrency_key,
524            "trace_id": self.trace_id,
525            "span_id": self.span_id,
526        }
527
528
529class JobResultQuerySet(postgres.QuerySet["JobResult"]):
530    def successful(self) -> Self:
531        return self.filter(status=JobResultStatuses.SUCCESSFUL)
532
533    def cancelled(self) -> Self:
534        return self.filter(status=JobResultStatuses.CANCELLED)
535
536    def lost(self) -> Self:
537        return self.filter(status=JobResultStatuses.LOST)
538
539    def errored(self) -> Self:
540        return self.filter(status=JobResultStatuses.ERRORED)
541
542    def retried(self) -> Self:
543        return self.filter(
544            postgres.Q(retry_job_request_uuid__isnull=False)
545            | postgres.Q(retry_attempt__gt=0)
546        )
547
548    def failed(self) -> Self:
549        return self.filter(
550            status__in=[
551                JobResultStatuses.ERRORED,
552                JobResultStatuses.LOST,
553                JobResultStatuses.CANCELLED,
554            ]
555        )
556
557    def retryable(self) -> Self:
558        return self.failed().filter(
559            retry_job_request_uuid__isnull=True,
560            retries__gt=0,
561            retry_attempt__lt=F("retries"),
562        )
563
564    def retry_failed_jobs(self) -> None:
565        for result in self.retryable():
566            try:
567                result.retry_job()
568            except Exception:
569                # If something went wrong (like a job class being deleted)
570                # then we immediately increment the retry_attempt on the existing obj
571                # so it won't retry forever.
572                logger.exception(
573                    "Failed to retry job, incrementing retry_attempt",
574                    extra={"result": str(result)},
575                )
576                result.retry_attempt += 1
577                result.save(update_fields=["retry_attempt"])
578
579
580class JobResultStatuses(postgres.TextChoices):
581    SUCCESSFUL = "SUCCESSFUL", "Successful"
582    ERRORED = "ERRORED", "Errored"  # Threw an error
583    CANCELLED = "CANCELLED", "Cancelled"  # Interrupted by shutdown/deploy
584    DEFERRED = "DEFERRED", "Deferred"  # Intentionally rescheduled (will run again)
585    LOST = (
586        "LOST",
587        "Lost",
588    )  # Either process lost, lost in transit, or otherwise never finished
589
590
591@postgres.register_model
592class JobResult(postgres.Model):
593    """
594    All in-process and completed jobs are stored in this table.
595    """
596
597    uuid: UUID = types.UUIDField(generate=True)
598    created_at: datetime.datetime = types.DateTimeField(create_now=True)
599
600    # From the Job
601    job_process_uuid: UUID = types.UUIDField()
602    started_at: datetime.datetime | None = types.DateTimeField(
603        required=False, allow_null=True
604    )
605    ended_at: datetime.datetime | None = types.DateTimeField(
606        required=False, allow_null=True
607    )
608    error: str = types.TextField(required=False)
609    status: str = types.TextField(
610        max_length=20,
611        choices=JobResultStatuses.choices,
612    )
613
614    # From the JobRequest
615    job_request_uuid: UUID = types.UUIDField()
616    requested_at: datetime.datetime | None = types.DateTimeField(
617        required=False, allow_null=True
618    )
619    job_class: str = types.TextField(max_length=255)
620    parameters: dict[str, Any] | None = types.JSONField(required=False, allow_null=True)
621    priority: int = types.SmallIntegerField(default=0)
622    source: str = types.TextField(required=False)
623    queue: str = types.TextField(default="default", max_length=255)
624    retries: int = types.SmallIntegerField(default=0)
625    retry_attempt: int = types.SmallIntegerField(default=0)
626    concurrency_key: str = types.TextField(max_length=255, required=False)
627
628    # Retries
629    retry_job_request_uuid: UUID | None = types.UUIDField(
630        required=False, allow_null=True
631    )
632
633    # OpenTelemetry trace context
634    trace_id: str | None = types.TextField(
635        max_length=34, required=False, allow_null=True
636    )
637    span_id: str | None = types.TextField(
638        max_length=18, required=False, allow_null=True
639    )
640
641    query: JobResultQuerySet = JobResultQuerySet()
642
643    model_options = postgres.Options(
644        ordering=["-created_at"],
645        indexes=[
646            postgres.Index(
647                name="plainjobs_jobresult_created_at_idx", fields=["created_at"]
648            ),
649            postgres.Index(
650                name="plainjobs_jobresult_started_at_idx", fields=["started_at"]
651            ),
652            postgres.Index(
653                name="plainjobs_jobresult_ended_at_idx", fields=["ended_at"]
654            ),
655            postgres.Index(name="plainjobs_jobresult_status_idx", fields=["status"]),
656            postgres.Index(
657                name="plainjobs_jobresult_job_request_uuid_idx",
658                fields=["job_request_uuid"],
659            ),
660            postgres.Index(
661                name="plainjobs_jobresult_job_class_idx", fields=["job_class"]
662            ),
663            postgres.Index(name="plainjobs_jobresult_queue_idx", fields=["queue"]),
664            postgres.Index(
665                name="plainjobs_jobresult_trace_id_idx", fields=["trace_id"]
666            ),
667        ],
668        constraints=[
669            postgres.UniqueConstraint(
670                fields=["uuid"], name="plainjobs_jobresult_unique_uuid"
671            ),
672            # One JobProcess produces exactly one JobResult. Guards the
673            # rescue-vs-late-finish race: if our heartbeat goes stale during a
674            # DB outage, a peer rescuer creates JobResult(LOST) for our
675            # JobProcess. When our subprocess eventually finishes and calls
676            # convert_to_result on the now-deleted JobProcess, the second
677            # insert hits this constraint and is swallowed by process_job's
678            # outer except — instead of silently producing two divergent
679            # results for the same run.
680            postgres.UniqueConstraint(
681                fields=["job_process_uuid"],
682                name="plainjobs_jobresult_unique_job_process_uuid",
683            ),
684        ],
685    )
686
687    def dispatch_aborted_hook(self) -> None:
688        """
689        Load the Job class and call its on_aborted hook with this result.
690
691        Errors loading the class or running the hook are logged but suppressed
692        so JobProcess → JobResult bookkeeping is never blocked by user code or
693        stale registrations.
694        """
695        try:
696            job = jobs_registry.load_job(self.job_class, self.parameters)
697        except Exception:
698            logger.exception(
699                "Failed to load job for on_aborted hook",
700                extra={"job_class": self.job_class},
701            )
702            return
703
704        try:
705            job.on_aborted(self)
706        except Exception:
707            logger.exception(
708                "Job.on_aborted raised",
709                extra={"job_class": self.job_class},
710            )
711
712    def retry_job(self, delay: int | None = None) -> JobRequest | None:
713        retry_attempt = self.retry_attempt + 1
714        job = jobs_registry.load_job(self.job_class, self.parameters)
715
716        if delay is None:
717            retry_delay = job.calculate_retry_delay(retry_attempt)
718        else:
719            retry_delay = delay
720
721        with transaction.atomic():
722            result = job.run_in_worker(
723                # Pass most of what we know through so it stays consistent
724                queue=self.queue,
725                delay=retry_delay,
726                priority=self.priority,
727                retries=self.retries,
728                retry_attempt=retry_attempt,
729                concurrency_key=self.concurrency_key,
730            )
731            if result:
732                self.retry_job_request_uuid = result.uuid
733                self.save(update_fields=["retry_job_request_uuid"])
734                return result
735
736        return None
737
738
739@postgres.register_model
740class WorkerHeartbeat(postgres.Model):
741    """
742    A live registration row written by each worker process while it's running.
743
744    Workers create a row at startup, bump `last_heartbeat_at` periodically, and
745    delete it on clean shutdown. `rescue_stale_workers` finds rows whose
746    heartbeat is older than `JOBS_HEARTBEAT_TIMEOUT` and rescues their
747    in-flight jobs.
748    """
749
750    worker_id: UUID = types.UUIDField()
751    hostname: str = types.TextField(max_length=255)
752    pid: int = types.IntegerField()
753    queues: list[str] = types.JSONField()
754    started_at: datetime.datetime = types.DateTimeField(create_now=True)
755    last_heartbeat_at: datetime.datetime = types.DateTimeField()
756
757    model_options = postgres.Options(
758        ordering=["-last_heartbeat_at"],
759        indexes=[
760            postgres.Index(
761                name="plainjobs_workerheartbeat_last_heartbeat_at_idx",
762                fields=["last_heartbeat_at"],
763            ),
764        ],
765        constraints=[
766            # The unique constraint provides the worker_id lookup index.
767            postgres.UniqueConstraint(
768                fields=["worker_id"],
769                name="plainjobs_workerheartbeat_unique_worker_id",
770            ),
771        ],
772    )
773
774    def __str__(self) -> str:
775        return f"WorkerHeartbeat({self.worker_id} on {self.hostname}:{self.pid})"
776
777
778def heartbeat_cutoff() -> datetime.datetime:
779    """The timestamp before which a WorkerHeartbeat is considered stale.
780
781    Single source of truth — rescue, admin display, and OTel gauges all
782    consult this so they agree on which workers are alive.
783    """
784    return timezone.now() - datetime.timedelta(seconds=settings.JOBS_HEARTBEAT_TIMEOUT)
785
786
787def rescue_stale_workers() -> list[JobResult]:
788    """
789    Convert in-flight JobProcess rows from dead workers to JobResult(LOST).
790
791    A worker is dead when its WorkerHeartbeat is older than
792    JOBS_HEARTBEAT_TIMEOUT. Detection is heartbeat-based, not time-based, so
793    a long-running legitimate job is safe as long as its worker keeps
794    heartbeating.
795
796    Returns the JobResults whose on_aborted hook still needs to fire. The
797    caller dispatches them, interleaving heartbeat ticks if needed — a slow
798    or large batch of hooks would otherwise starve the calling worker's
799    heartbeat and trigger false-positive rescue from a peer.
800
801    This is a free function (not a queryset method) because rescue is
802    inherently global: filtering would let one rescuer claim a dead heartbeat
803    without converting all of that worker's jobs, stranding the rest forever.
804    """
805    cutoff = heartbeat_cutoff()
806    dead_workers = WorkerHeartbeat.query.filter(last_heartbeat_at__lt=cutoff)
807
808    pending_hooks: list[JobResult] = []
809    for worker in dead_workers:
810        # Per-worker rescue is atomic: the heartbeat delete (claim) and every
811        # JobProcess→JobResult conversion either all commit, or all roll
812        # back. Without this, a mid-loop failure would leave the heartbeat
813        # deleted but some JobProcesses still stamped with the dead worker_id
814        # — stranded forever with no heartbeat to match.
815        #
816        # on_aborted hooks are deferred: dispatching them inside the atomic
817        # block would let a hook's DB error mark the connection for rollback
818        # (even though dispatch_aborted_hook swallows the exception),
819        # aborting the rescue commit.
820        worker_hooks: list[JobResult] = []
821        try:
822            with transaction.atomic():
823                # Atomic claim. If another rescuer also saw this dead
824                # heartbeat, only one of us deletes a row. The loser sees 0
825                # affected and skips.
826                claimed = WorkerHeartbeat.query.filter(
827                    worker_id=worker.worker_id,
828                    last_heartbeat_at__lt=cutoff,
829                ).delete()
830                if not claimed:
831                    continue
832
833                # list() materializes the queryset before the loop body
834                # starts deleting rows, so iteration can't skip entries.
835                for job in list(JobProcess.query.filter(worker_id=worker.worker_id)):
836                    result = job.convert_to_result(
837                        status=JobResultStatuses.LOST, fire_hook=False
838                    )
839                    worker_hooks.append(result)
840        except Exception:
841            # One dead worker's failure shouldn't abort rescue of others. The
842            # next rescue tick will retry this worker (heartbeat was rolled
843            # back, so it's still discoverable).
844            logger.exception(
845                "Failed to rescue jobs for dead worker",
846                extra={"worker_id": str(worker.worker_id)},
847            )
848            continue
849
850        # Rescue committed. Hand hooks back for the caller to dispatch.
851        pending_hooks.extend(worker_hooks)
852
853    return pending_hooks