v0.146.0
  1from __future__ import annotations
  2
  3import datetime
  4import gc
  5import multiprocessing
  6import os
  7import socket
  8import threading
  9import time
 10import traceback
 11import uuid
 12from concurrent.futures import Future, ProcessPoolExecutor, wait
 13from concurrent.futures.process import BrokenProcessPool
 14from functools import partial
 15from typing import TYPE_CHECKING, Any
 16
 17from opentelemetry import trace
 18from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
 19
 20from plain.logs import get_framework_logger
 21from plain.postgres import transaction
 22from plain.postgres.db import return_database_connection
 23from plain.runtime import settings
 24from plain.utils import timezone
 25from plain.utils.module_loading import import_string
 26from plain.utils.os import get_cpu_count
 27from plain.utils.otel import format_exception_type
 28
 29from .otel import WorkerMetrics, tracer
 30from .registry import jobs_registry
 31
 32if TYPE_CHECKING:
 33    from .models import JobResult
 34
 35# Models are NOT imported at the top of this file!
 36# See comment on _worker_process_initializer() for explanation.
 37
 38logger = get_framework_logger()
 39
 40
 41def _worker_process_initializer() -> None:
 42    """Initialize Plain framework in worker process before processing jobs.
 43
 44    Why this is needed:
 45    - We use multiprocessing with 'spawn' context (not 'fork')
 46    - Spawn creates fresh Python processes, not forked copies
 47    - When a spawned process starts, it re-imports this module BEFORE the initializer runs
 48    - If we imported models at the top of this file, model registration would
 49      happen before plain.runtime.setup(), causing PackageRegistryNotReady errors
 50
 51    Solution:
 52    - This initializer runs plain.runtime.setup() FIRST in each worker process
 53    - All model imports happen lazily inside functions (after setup completes)
 54    - This ensures packages registry is ready before any models are accessed
 55
 56    Execution order in spawned worker:
 57    1. Re-import workers.py (but models NOT imported yet - lazy!)
 58    2. Run this initializer → plain.runtime.setup()
 59    3. Execute process_job() → NOW it's safe to import models
 60    """
 61    from plain.runtime import setup
 62
 63    # Each spawned worker process needs to set up Plain
 64    # (spawn context creates fresh processes, not forks)
 65    setup()
 66
 67
 68class Worker:
 69    def __init__(
 70        self,
 71        queues: list[str],
 72        jobs_schedule: list[Any] | None = None,
 73        max_processes: int | None = None,
 74        max_jobs_per_process: int | None = None,
 75        max_pending_per_process: int = 10,
 76        stats_every: int | None = None,
 77    ) -> None:
 78        if jobs_schedule is None:
 79            jobs_schedule = []
 80
 81        if max_processes is None:
 82            max_processes = get_cpu_count()
 83
 84        self.executor = ProcessPoolExecutor(
 85            max_workers=max_processes,
 86            max_tasks_per_child=max_jobs_per_process,
 87            mp_context=multiprocessing.get_context("spawn"),
 88            initializer=_worker_process_initializer,
 89        )
 90
 91        self.queues = queues
 92
 93        # Filter the jobs schedule to those that are in the same queue as this worker
 94        self.jobs_schedule = [
 95            x for x in jobs_schedule if x[0].default_queue() in queues
 96        ]
 97
 98        # How often to log the stats (in seconds)
 99        self.stats_every = stats_every
100
101        self.max_processes = self.executor._max_workers  # ty: ignore[unresolved-attribute]
102        self.max_jobs_per_process = max_jobs_per_process
103        self.max_pending_per_process = max_pending_per_process
104
105        self._is_shutting_down = False
106
107        self.worker_id = uuid.uuid4()
108        self._hostname = socket.gethostname()
109        self._pid = os.getpid()
110        self._heartbeat_at = 0.0
111        # We refuse to claim JobRequests until a WorkerHeartbeat row exists
112        # for our worker_id. Otherwise an unregistered worker could stamp
113        # JobProcess rows with a worker_id that has no heartbeat row to ever
114        # match — rescue would never find them.
115        self._heartbeat_registered = False
116
117        # Track our own in-flight futures so the shutdown drain doesn't depend
118        # on ProcessPoolExecutor's private _pending_work_items dict (which
119        # isn't guaranteed to drain cleanly when cancel_futures=True is used).
120        # The mapping (Future → JobProcess.uuid) also lets _rescue_own_orphans
121        # reconcile DB rows against currently-tracked futures. Done callbacks
122        # fire from the executor's management thread, so all access goes
123        # through _inflight_lock to keep iteration safe.
124        self._inflight_futures: dict[Future, str] = {}
125        self._inflight_lock = threading.Lock()
126
127        self.metrics = WorkerMetrics(self)
128
129    def run(self) -> None:
130        logger.info(
131            "Starting Plain worker",
132            extra={
133                "registered_jobs": list(jobs_registry.jobs.keys()),
134                "queues": list(self.queues),
135                "jobs_schedule": [str(x) for x in self.jobs_schedule],
136                "stats_every": self.stats_every,
137                "max_processes": self.max_processes,
138                "max_jobs_per_process": self.max_jobs_per_process,
139                "max_pending_per_process": self.max_pending_per_process,
140                "pid": self._pid,
141                "worker_id": str(self.worker_id),
142            },
143        )
144
145        self.register_heartbeat()
146        self._run_loop()
147        self._drain_with_heartbeat()
148        # Only reached on clean exit. On error/interrupt, control unwinds past
149        # this and the heartbeat row is left to go stale — rescue then picks
150        # up our in-flight jobs as LOST. Deleting the row here on error would
151        # lie about being alive and strand any JobProcess rows still stamped
152        # with this worker_id.
153        self.deregister_heartbeat()
154
155    def _discard_inflight(self, future: Future) -> None:
156        with self._inflight_lock:
157            self._inflight_futures.pop(future, None)
158
159    def _drain_with_heartbeat(self) -> None:
160        """Wait for in-flight jobs to finish while keeping our heartbeat alive.
161
162        Without continuing to heartbeat during drain, a long-running job
163        (e.g. multi-minute LLM turns) would let our row go stale and trigger
164        false-positive LOST conversions from another worker's rescue tick.
165
166        Drain is unbounded — a stuck job will block here until the platform
167        sends SIGKILL (Heroku's grace period, k8s terminationGracePeriod, etc.).
168        At that point the heartbeat goes stale and rescue picks up the orphans.
169        """
170        self.executor.shutdown(wait=False, cancel_futures=True)
171        while True:
172            # Snapshot under the lock — done callbacks mutate the set from
173            # the executor's management thread.
174            with self._inflight_lock:
175                snapshot = list(self._inflight_futures.keys())
176            if not snapshot:
177                break
178            # Sleep up to 1s, waking early if any future completes.
179            wait(snapshot, timeout=1)
180            try:
181                self.maybe_heartbeat()
182            except Exception as e:
183                logger.exception(e)
184        logger.info("Job worker shutdown complete")
185
186    def _run_loop(self) -> None:
187        # Lazy import - see _worker_process_initializer() comment for why
188        from .models import JobRequest
189
190        while not self._is_shutting_down:
191            # Return last tick's pooled connection so the next checkout
192            # re-validates it. Holding one connection for the worker's whole
193            # life means a server-side close (PG restart, failover) wedges
194            # the loop reusing a dead connection on every tick.
195            return_database_connection()
196
197            with tracer.start_as_current_span(
198                "worker loop", kind=trace.SpanKind.CONSUMER
199            ) as span:
200                try:
201                    self.maybe_heartbeat()
202                    self.maybe_log_stats()
203                    self.maybe_check_job_results()
204                    self.maybe_schedule_jobs()
205                except Exception as e:
206                    # The catch is inside the span, so the SDK's auto-record
207                    # on context exit won't fire — stamp the canonical
208                    # failure signal explicitly. Log and continue: these
209                    # tasks are ancillary to the main job processing.
210                    span.record_exception(e)
211                    span.set_status(trace.StatusCode.ERROR)
212                    span.set_attribute(ERROR_TYPE, format_exception_type(e))
213                    logger.exception(e)
214
215            # Re-check shutdown after maintenance — a signal may have arrived
216            # between the loop condition and now. Don't pick up new work.
217            if self._is_shutting_down:
218                break
219
220            if not self._heartbeat_registered:
221                time.sleep(1)
222                continue
223
224            if len(self._inflight_futures) >= (
225                self.max_processes * self.max_pending_per_process
226            ):
227                # We don't want to convert too many JobRequests to Jobs,
228                # because anything not started yet will be cancelled on deploy etc.
229                # It's easier to leave them in the JobRequest db queue as long as possible.
230                time.sleep(0.5)
231                continue
232
233            with transaction.atomic():
234                job_request = (
235                    JobRequest.query.ready_to_run()
236                    .filter(queue__in=self.queues)
237                    .select_for_update(skip_locked=True)
238                    .order_by("-priority", "-start_at", "-created_at")
239                    .first()
240                )
241                if not job_request:
242                    # Potentially no jobs to process (who knows for how long)
243                    # but sleep for a second to give the CPU and DB a break
244                    time.sleep(1)
245                    continue
246
247                logger.debug(
248                    "Preparing to execute job",
249                    extra={
250                        "job_class": job_request.job_class,
251                        "job_request_uuid": job_request.uuid,
252                        "job_priority": job_request.priority,
253                        "job_source": job_request.source,
254                        "job_queue": job_request.queue,
255                    },
256                )
257
258                job = job_request.convert_to_job_process(worker_id=self.worker_id)
259
260            # Signal may have fired during the DB queries above. Don't submit
261            # new work past shutdown — revert the JobProcess back to a
262            # JobRequest so the next worker generation picks it up.
263            if self._is_shutting_down:
264                job.revert_to_job_request()
265                break
266
267            job_process_uuid = str(job.uuid)  # Make a str copy
268
269            try:
270                future = self.executor.submit(process_job, job_process_uuid)
271                with self._inflight_lock:
272                    self._inflight_futures[future] = job_process_uuid
273                future.add_done_callback(
274                    partial(future_finished_callback, job_process_uuid)
275                )
276                future.add_done_callback(self._discard_inflight)
277            except (BrokenProcessPool, RuntimeError):
278                # BrokenProcessPool: child OOM, segfault, or other crash.
279                # RuntimeError: executor already shut down (shutdown race).
280                # Either way, the job was already converted from JobRequest
281                # to JobProcess, so re-enqueue it before exiting.
282                logger.warning(
283                    "Process pool broken, re-enqueuing job",
284                    extra={"job_process_uuid": job_process_uuid},
285                )
286                job.revert_to_job_request()
287                break
288
289    def shutdown(self) -> None:
290        if self._is_shutting_down:
291            # Already shutting down somewhere else
292            return
293
294        logger.info("Job worker shutdown requested")
295        # Just flip the flag — drain happens in _drain_with_heartbeat() so
296        # heartbeats keep firing during it. Blocking the signal handler with
297        # executor.shutdown(wait=True) here would let our row go stale.
298        self._is_shutting_down = True
299
300    def maybe_log_stats(self) -> None:
301        if not self.stats_every:
302            return
303
304        now = time.time()
305
306        if not hasattr(self, "_stats_logged_at"):
307            self._stats_logged_at = now
308
309        if now - self._stats_logged_at > self.stats_every:
310            self._stats_logged_at = now
311            self.log_stats()
312
313    def maybe_check_job_results(self) -> None:
314        now = time.time()
315
316        if not hasattr(self, "_job_results_checked_at"):
317            self._job_results_checked_at = now
318
319        check_every = 60  # Only need to check once a minute
320
321        if now - self._job_results_checked_at > check_every:
322            self._job_results_checked_at = now
323            self.rescue_job_results()
324
325    def _create_heartbeat_row(self) -> None:
326        # Lazy import - see _worker_process_initializer() comment for why
327        from .models import WorkerHeartbeat
328
329        WorkerHeartbeat.query.create(
330            worker_id=self.worker_id,
331            hostname=self._hostname,
332            pid=self._pid,
333            queues=list(self.queues),
334            last_heartbeat_at=timezone.now(),
335        )
336
337    def _refresh_heartbeat(self) -> None:
338        # Lazy import - see _worker_process_initializer() comment for why
339        from .models import WorkerHeartbeat
340
341        updated = WorkerHeartbeat.query.filter(worker_id=self.worker_id).update(
342            last_heartbeat_at=timezone.now()
343        )
344        if not updated:
345            # Row was deleted — registration failed earlier, or another
346            # rescuer claimed us as dead. Recreate so we're discoverable.
347            logger.warning(
348                "Worker heartbeat row missing, re-registering",
349                extra={"worker_id": str(self.worker_id)},
350            )
351            self._create_heartbeat_row()
352
353    def register_heartbeat(self) -> None:
354        try:
355            self._create_heartbeat_row()
356            self._heartbeat_at = time.time()
357            self._heartbeat_registered = True
358        except Exception as e:
359            # Registration failure is non-fatal — maybe_heartbeat will retry.
360            # Until it succeeds, _heartbeat_registered stays False and the run
361            # loop won't claim work.
362            logger.exception(e)
363            logger.warning(
364                "Worker heartbeat registration failed; worker will not claim "
365                "jobs until a heartbeat row is created",
366                extra={"worker_id": str(self.worker_id)},
367            )
368
369    def maybe_heartbeat(self) -> None:
370        now = time.time()
371        if (
372            self._heartbeat_registered
373            and now - self._heartbeat_at < settings.JOBS_HEARTBEAT_INTERVAL
374        ):
375            return
376
377        try:
378            self._refresh_heartbeat()
379            self._heartbeat_at = now
380            self._heartbeat_registered = True
381        except Exception as e:
382            # We don't know if the row exists (the update may have returned 0
383            # and the recreate may have raised). Mark unregistered so the run
384            # loop stops claiming work until the next tick succeeds. If the
385            # DB is unreachable for long enough, our row goes stale and
386            # rescue marks our jobs LOST — that's the intended behavior.
387            self._heartbeat_registered = False
388            logger.exception(e)
389
390    def deregister_heartbeat(self) -> None:
391        # Lazy import - see _worker_process_initializer() comment for why
392        from .models import JobProcess, WorkerHeartbeat
393
394        try:
395            # If any JobProcess rows still reference this worker_id, a
396            # bookkeeping error during drain (e.g. future_finished_callback's
397            # own convert_to_result raised) left them stranded. Don't delete
398            # the heartbeat — let it go stale so rescue can pick them up.
399            if JobProcess.query.filter(worker_id=self.worker_id).exists():
400                logger.warning(
401                    "Worker has remaining JobProcess rows at shutdown; "
402                    "leaving heartbeat for rescue to claim",
403                    extra={"worker_id": str(self.worker_id)},
404                )
405                return
406
407            WorkerHeartbeat.query.filter(worker_id=self.worker_id).delete()
408        except Exception as e:
409            # Best effort. A leftover row will be reclaimed by rescue when its
410            # heartbeat goes stale.
411            logger.exception(e)
412
413    def maybe_schedule_jobs(self) -> None:
414        if not self.jobs_schedule:
415            return
416
417        now = time.time()
418
419        if not hasattr(self, "_jobs_schedule_checked_at"):
420            self._jobs_schedule_checked_at = now
421
422        check_every = 60  # Only need to check once every 60 seconds
423
424        if now - self._jobs_schedule_checked_at > check_every:
425            for job, schedule in self.jobs_schedule:
426                next_start_at = schedule.next()
427
428                # Leverage the concurrency_key to group scheduled jobs
429                # with the same start time
430                schedule_concurrency_key = f"{job.default_concurrency_key()}:scheduled:{int(next_start_at.timestamp())}"
431
432                # Job's should_enqueue hook can control scheduling behavior
433                result = job.run_in_worker(
434                    delay=next_start_at,
435                    concurrency_key=schedule_concurrency_key,
436                )
437                # Result is None if should_enqueue returned False
438                if result:
439                    logger.info(
440                        "Scheduling job",
441                        extra={
442                            "job_class": result.job_class,
443                            "job_queue": result.queue,
444                            "job_start_at": result.start_at,
445                            "job_schedule": schedule,
446                            "concurrency_key": result.concurrency_key,
447                        },
448                    )
449
450            self._jobs_schedule_checked_at = now
451
452    def log_stats(self) -> None:
453        # Lazy import - see _worker_process_initializer() comment for why
454        from .models import JobProcess, JobRequest
455
456        try:
457            num_proccesses = len(self.executor._processes)
458        except (AttributeError, TypeError):
459            # Depending on shutdown timing and internal behavior, this might not work
460            num_proccesses = 0
461
462        jobs_requested = JobRequest.query.filter(queue__in=self.queues).count()
463        jobs_processing = JobProcess.query.filter(queue__in=self.queues).count()
464
465        logger.info(
466            "Job worker stats",
467            extra={
468                "worker_processes": num_proccesses,
469                "worker_queues": ",".join(self.queues),
470                "jobs_requested": jobs_requested,
471                "jobs_processing": jobs_processing,
472                "worker_max_processes": self.max_processes,
473                "worker_max_jobs_per_process": self.max_jobs_per_process,
474            },
475        )
476
477    def rescue_job_results(self) -> None:
478        """Find any lost or failed jobs on this worker's queues and handle them.
479
480        Hooks are dispatched with maybe_heartbeat() interleaved so a slow or
481        large batch of on_aborted hooks can't starve our heartbeat and trigger
482        a false-positive LOST from a peer's rescue tick.
483        """
484        # Lazy import - see _worker_process_initializer() comment for why
485        from .models import JobResult, rescue_stale_workers
486
487        # rescue_stale_workers is global, not queue-scoped — a dead worker's
488        # heartbeat going stale is a global signal, and partial conversion
489        # would strand jobs.
490        global_hooks = rescue_stale_workers()
491        own_hooks = self._rescue_own_orphans()
492        self._dispatch_aborted_hooks(global_hooks + own_hooks)
493        JobResult.query.filter(queue__in=self.queues).retry_failed_jobs()
494
495    def _dispatch_aborted_hooks(self, results: list[JobResult]) -> None:
496        for result in results:
497            self.maybe_heartbeat()
498            result.dispatch_aborted_hook()
499
500    def _rescue_own_orphans(self) -> list[JobResult]:
501        """Convert any of our own JobProcess rows that aren't tracked by a
502        live future to LOST.
503
504        These shouldn't normally exist. The path that creates them: a future
505        completes, future_finished_callback runs, but convert_to_result raises
506        (transient DB error, peer-rescuer constraint conflict, etc.). The
507        exception escapes the callback into concurrent.futures (which logs and
508        moves on), _discard_inflight still fires from the second callback, and
509        the row is left in the DB with no path back — our heartbeat is fresh
510        so rescue_stale_workers won't see it.
511
512        Age threshold avoids racing with newly-claimed rows that haven't been
513        added to _inflight_futures yet (microsecond window between
514        convert_to_job_process and the dict insert in _run_loop).
515
516        Returns JobResults whose on_aborted hook the caller should dispatch.
517        """
518        from .models import JobProcess, JobResultStatuses
519
520        with self._inflight_lock:
521            inflight_uuids = list(self._inflight_futures.values())
522
523        cutoff = timezone.now() - datetime.timedelta(
524            seconds=settings.JOBS_HEARTBEAT_TIMEOUT
525        )
526        stranded = JobProcess.query.filter(
527            worker_id=self.worker_id,
528            created_at__lt=cutoff,
529        ).exclude(uuid__in=inflight_uuids)
530
531        pending_hooks: list[JobResult] = []
532        for orphan in list(stranded):
533            try:
534                result = orphan.convert_to_result(
535                    status=JobResultStatuses.LOST,
536                    error="JobProcess stranded — done-callback failed during conversion",
537                    fire_hook=False,
538                )
539            except Exception:
540                logger.exception(
541                    "Failed to rescue own orphan JobProcess",
542                    extra={"job_process_uuid": str(orphan.uuid)},
543                )
544                continue
545            pending_hooks.append(result)
546        return pending_hooks
547
548
549def future_finished_callback(job_process_uuid: str, future: Future) -> None:
550    # Lazy import - see _worker_process_initializer() comment for why
551    from .models import JobProcess, JobResultStatuses
552
553    if future.cancelled():
554        logger.warning("Job cancelled", extra={"job_process_uuid": job_process_uuid})
555        try:
556            job = JobProcess.query.get(uuid=job_process_uuid)
557            job.convert_to_result(status=JobResultStatuses.CANCELLED)
558        except JobProcess.DoesNotExist:
559            # Job may have already been cleaned up
560            pass
561    elif exception := future.exception():
562        # Process pool may have been killed (OOM/segfault), or process_job
563        # itself raised past its outer except (e.g. import failure).
564        logger.warning(
565            "Job failed",
566            extra={"job_process_uuid": job_process_uuid},
567            exc_info=exception,
568        )
569        try:
570            job = JobProcess.query.get(uuid=job_process_uuid)
571            # If started_at is set, run() was actively executing when the
572            # process died — user code may have set up state it expected to
573            # tear down. Use LOST so on_aborted fires. If started_at is
574            # unset, run() never got to execute (import failure, etc.), so
575            # ERRORED with no hook is correct.
576            if job.started_at is not None:
577                status = JobResultStatuses.LOST
578            else:
579                status = JobResultStatuses.ERRORED
580            job.convert_to_result(
581                status=status,
582                error="".join(traceback.format_exception(exception)),
583            )
584        except JobProcess.DoesNotExist:
585            # Job may have already been cleaned up
586            pass
587    else:
588        logger.debug("Job finished", extra={"job_process_uuid": job_process_uuid})
589        # Orphan check: process_job's outer except-Exception swallows any
590        # failure that escapes job.run() (middleware crash, OTel error, DB
591        # blip during convert_to_result, etc.). The future completes cleanly
592        # but the JobProcess row was never converted, and since our parent
593        # is still heartbeating, rescue_stale_workers won't see it as orphaned.
594        job = JobProcess.query.filter(uuid=job_process_uuid).first()
595        if job is None:
596            return
597        logger.warning(
598            "Job future completed but JobProcess survived; converting to ERRORED",
599            extra={"job_process_uuid": job_process_uuid},
600        )
601        try:
602            job.convert_to_result(
603                status=JobResultStatuses.ERRORED,
604                error="Job future completed without recording a result",
605            )
606        except Exception:
607            # A peer rescuer may have already created a JobResult(LOST) for
608            # this row, in which case the unique constraint on
609            # JobResult.job_process_uuid trips. Either way, the row is now
610            # accounted for — log and move on rather than letting the
611            # exception escape into the executor's done-callback machinery.
612            logger.exception(
613                "Failed to convert orphan JobProcess to ERRORED",
614                extra={"job_process_uuid": job_process_uuid},
615            )
616
617
618def process_job(job_process_uuid: str) -> None:
619    # Lazy import - see _worker_process_initializer() comment for why
620    from .models import JobProcess
621
622    try:
623        worker_pid = os.getpid()
624
625        try:
626            job_process = JobProcess.query.get(uuid=job_process_uuid)
627        except Exception as e:
628            # The CONSUMER span inside JobProcess.run() is never reached if
629            # the lookup itself fails. Emit one here so the failure has an
630            # entry-span home in OTel (e.g. a psycopg transient on this read
631            # would otherwise leave only a CLIENT span, which entry-span
632            # filtering excludes).
633            with tracer.start_as_current_span(
634                "process job",
635                kind=trace.SpanKind.CONSUMER,
636            ) as span:
637                span.record_exception(e)
638                span.set_status(trace.StatusCode.ERROR)
639                span.set_attribute(ERROR_TYPE, format_exception_type(e))
640            raise
641
642        logger.info(
643            "Executing job",
644            extra={
645                "worker_pid": worker_pid,
646                "job_class": job_process.job_class,
647                "job_request_uuid": job_process.job_request_uuid,
648                "job_priority": job_process.priority,
649                "job_source": job_process.source,
650                "job_queue": job_process.queue,
651            },
652        )
653
654        def middleware_chain(job: JobProcess) -> JobResult:
655            return job.run()
656
657        for middleware_path in reversed(settings.JOBS_MIDDLEWARE):
658            middleware_class = import_string(middleware_path)
659            middleware_instance = middleware_class(middleware_chain)
660            middleware_chain = middleware_instance.process_job
661
662        job_result = middleware_chain(job_process)
663
664        assert job_result.ended_at is not None
665        assert job_result.started_at is not None
666        duration = job_result.ended_at - job_result.started_at
667        duration = duration.total_seconds()
668
669        if job_result.requested_at and job_result.started_at:
670            queue_time = (
671                job_result.started_at - job_result.requested_at
672            ).total_seconds()
673        else:
674            queue_time = None
675
676        logger.info(
677            "Completed job",
678            extra={
679                "worker_pid": worker_pid,
680                "job_class": job_result.job_class,
681                "job_process_uuid": job_result.job_process_uuid,
682                "job_request_uuid": job_result.job_request_uuid,
683                "job_result_uuid": job_result.uuid,
684                "job_priority": job_result.priority,
685                "job_source": job_result.source,
686                "job_queue": job_result.queue,
687                "job_duration": duration,
688                "job_queue_time": queue_time,
689            },
690        )
691    except Exception as e:
692        # Raising exceptions inside the worker process doesn't
693        # seem to be caught/shown anywhere as configured.
694        # So we at least log it out here.
695        # (A job should catch it's own user-code errors, so this is for library errors)
696        logger.exception(e)
697    finally:
698        return_database_connection()
699        gc.collect()