1from __future__ import annotations
2
3import datetime
4import gc
5import multiprocessing
6import os
7import socket
8import threading
9import time
10import traceback
11import uuid
12from concurrent.futures import Future, ProcessPoolExecutor, wait
13from concurrent.futures.process import BrokenProcessPool
14from functools import partial
15from typing import TYPE_CHECKING, Any
16
17from opentelemetry import trace
18from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
19
20from plain.logs import get_framework_logger
21from plain.postgres import transaction
22from plain.postgres.db import return_database_connection
23from plain.runtime import settings
24from plain.utils import timezone
25from plain.utils.module_loading import import_string
26from plain.utils.os import get_cpu_count
27from plain.utils.otel import format_exception_type
28
29from .otel import WorkerMetrics, tracer
30from .registry import jobs_registry
31
32if TYPE_CHECKING:
33 from .models import JobResult
34
35# Models are NOT imported at the top of this file!
36# See comment on _worker_process_initializer() for explanation.
37
38logger = get_framework_logger()
39
40
41def _worker_process_initializer() -> None:
42 """Initialize Plain framework in worker process before processing jobs.
43
44 Why this is needed:
45 - We use multiprocessing with 'spawn' context (not 'fork')
46 - Spawn creates fresh Python processes, not forked copies
47 - When a spawned process starts, it re-imports this module BEFORE the initializer runs
48 - If we imported models at the top of this file, model registration would
49 happen before plain.runtime.setup(), causing PackageRegistryNotReady errors
50
51 Solution:
52 - This initializer runs plain.runtime.setup() FIRST in each worker process
53 - All model imports happen lazily inside functions (after setup completes)
54 - This ensures packages registry is ready before any models are accessed
55
56 Execution order in spawned worker:
57 1. Re-import workers.py (but models NOT imported yet - lazy!)
58 2. Run this initializer → plain.runtime.setup()
59 3. Execute process_job() → NOW it's safe to import models
60 """
61 from plain.runtime import setup
62
63 # Each spawned worker process needs to set up Plain
64 # (spawn context creates fresh processes, not forks)
65 setup()
66
67
68class Worker:
69 def __init__(
70 self,
71 queues: list[str],
72 jobs_schedule: list[Any] | None = None,
73 max_processes: int | None = None,
74 max_jobs_per_process: int | None = None,
75 max_pending_per_process: int = 10,
76 stats_every: int | None = None,
77 ) -> None:
78 if jobs_schedule is None:
79 jobs_schedule = []
80
81 if max_processes is None:
82 max_processes = get_cpu_count()
83
84 self.executor = ProcessPoolExecutor(
85 max_workers=max_processes,
86 max_tasks_per_child=max_jobs_per_process,
87 mp_context=multiprocessing.get_context("spawn"),
88 initializer=_worker_process_initializer,
89 )
90
91 self.queues = queues
92
93 # Filter the jobs schedule to those that are in the same queue as this worker
94 self.jobs_schedule = [
95 x for x in jobs_schedule if x[0].default_queue() in queues
96 ]
97
98 # How often to log the stats (in seconds)
99 self.stats_every = stats_every
100
101 self.max_processes = self.executor._max_workers # ty: ignore[unresolved-attribute]
102 self.max_jobs_per_process = max_jobs_per_process
103 self.max_pending_per_process = max_pending_per_process
104
105 self._is_shutting_down = False
106
107 self.worker_id = uuid.uuid4()
108 self._hostname = socket.gethostname()
109 self._pid = os.getpid()
110 self._heartbeat_at = 0.0
111 # We refuse to claim JobRequests until a WorkerHeartbeat row exists
112 # for our worker_id. Otherwise an unregistered worker could stamp
113 # JobProcess rows with a worker_id that has no heartbeat row to ever
114 # match — rescue would never find them.
115 self._heartbeat_registered = False
116
117 # Track our own in-flight futures so the shutdown drain doesn't depend
118 # on ProcessPoolExecutor's private _pending_work_items dict (which
119 # isn't guaranteed to drain cleanly when cancel_futures=True is used).
120 # The mapping (Future → JobProcess.uuid) also lets _rescue_own_orphans
121 # reconcile DB rows against currently-tracked futures. Done callbacks
122 # fire from the executor's management thread, so all access goes
123 # through _inflight_lock to keep iteration safe.
124 self._inflight_futures: dict[Future, str] = {}
125 self._inflight_lock = threading.Lock()
126
127 self.metrics = WorkerMetrics(self)
128
129 def run(self) -> None:
130 logger.info(
131 "Starting Plain worker",
132 extra={
133 "registered_jobs": list(jobs_registry.jobs.keys()),
134 "queues": list(self.queues),
135 "jobs_schedule": [str(x) for x in self.jobs_schedule],
136 "stats_every": self.stats_every,
137 "max_processes": self.max_processes,
138 "max_jobs_per_process": self.max_jobs_per_process,
139 "max_pending_per_process": self.max_pending_per_process,
140 "pid": self._pid,
141 "worker_id": str(self.worker_id),
142 },
143 )
144
145 self.register_heartbeat()
146 self._run_loop()
147 self._drain_with_heartbeat()
148 # Only reached on clean exit. On error/interrupt, control unwinds past
149 # this and the heartbeat row is left to go stale — rescue then picks
150 # up our in-flight jobs as LOST. Deleting the row here on error would
151 # lie about being alive and strand any JobProcess rows still stamped
152 # with this worker_id.
153 self.deregister_heartbeat()
154
155 def _discard_inflight(self, future: Future) -> None:
156 with self._inflight_lock:
157 self._inflight_futures.pop(future, None)
158
159 def _drain_with_heartbeat(self) -> None:
160 """Wait for in-flight jobs to finish while keeping our heartbeat alive.
161
162 Without continuing to heartbeat during drain, a long-running job
163 (e.g. multi-minute LLM turns) would let our row go stale and trigger
164 false-positive LOST conversions from another worker's rescue tick.
165
166 Drain is unbounded — a stuck job will block here until the platform
167 sends SIGKILL (Heroku's grace period, k8s terminationGracePeriod, etc.).
168 At that point the heartbeat goes stale and rescue picks up the orphans.
169 """
170 self.executor.shutdown(wait=False, cancel_futures=True)
171 while True:
172 # Snapshot under the lock — done callbacks mutate the set from
173 # the executor's management thread.
174 with self._inflight_lock:
175 snapshot = list(self._inflight_futures.keys())
176 if not snapshot:
177 break
178 # Sleep up to 1s, waking early if any future completes.
179 wait(snapshot, timeout=1)
180 try:
181 self.maybe_heartbeat()
182 except Exception as e:
183 logger.exception(e)
184 logger.info("Job worker shutdown complete")
185
186 def _run_loop(self) -> None:
187 # Lazy import - see _worker_process_initializer() comment for why
188 from .models import JobRequest
189
190 while not self._is_shutting_down:
191 # Return last tick's pooled connection so the next checkout
192 # re-validates it. Holding one connection for the worker's whole
193 # life means a server-side close (PG restart, failover) wedges
194 # the loop reusing a dead connection on every tick.
195 return_database_connection()
196
197 with tracer.start_as_current_span(
198 "worker loop", kind=trace.SpanKind.CONSUMER
199 ) as span:
200 try:
201 self.maybe_heartbeat()
202 self.maybe_log_stats()
203 self.maybe_check_job_results()
204 self.maybe_schedule_jobs()
205 except Exception as e:
206 # The catch is inside the span, so the SDK's auto-record
207 # on context exit won't fire — stamp the canonical
208 # failure signal explicitly. Log and continue: these
209 # tasks are ancillary to the main job processing.
210 span.record_exception(e)
211 span.set_status(trace.StatusCode.ERROR)
212 span.set_attribute(ERROR_TYPE, format_exception_type(e))
213 logger.exception(e)
214
215 # Re-check shutdown after maintenance — a signal may have arrived
216 # between the loop condition and now. Don't pick up new work.
217 if self._is_shutting_down:
218 break
219
220 if not self._heartbeat_registered:
221 time.sleep(1)
222 continue
223
224 if len(self._inflight_futures) >= (
225 self.max_processes * self.max_pending_per_process
226 ):
227 # We don't want to convert too many JobRequests to Jobs,
228 # because anything not started yet will be cancelled on deploy etc.
229 # It's easier to leave them in the JobRequest db queue as long as possible.
230 time.sleep(0.5)
231 continue
232
233 with transaction.atomic():
234 job_request = (
235 JobRequest.query.ready_to_run()
236 .filter(queue__in=self.queues)
237 .select_for_update(skip_locked=True)
238 .order_by("-priority", "-start_at", "-created_at")
239 .first()
240 )
241 if not job_request:
242 # Potentially no jobs to process (who knows for how long)
243 # but sleep for a second to give the CPU and DB a break
244 time.sleep(1)
245 continue
246
247 logger.debug(
248 "Preparing to execute job",
249 extra={
250 "job_class": job_request.job_class,
251 "job_request_uuid": job_request.uuid,
252 "job_priority": job_request.priority,
253 "job_source": job_request.source,
254 "job_queue": job_request.queue,
255 },
256 )
257
258 job = job_request.convert_to_job_process(worker_id=self.worker_id)
259
260 # Signal may have fired during the DB queries above. Don't submit
261 # new work past shutdown — revert the JobProcess back to a
262 # JobRequest so the next worker generation picks it up.
263 if self._is_shutting_down:
264 job.revert_to_job_request()
265 break
266
267 job_process_uuid = str(job.uuid) # Make a str copy
268
269 try:
270 future = self.executor.submit(process_job, job_process_uuid)
271 with self._inflight_lock:
272 self._inflight_futures[future] = job_process_uuid
273 future.add_done_callback(
274 partial(future_finished_callback, job_process_uuid)
275 )
276 future.add_done_callback(self._discard_inflight)
277 except (BrokenProcessPool, RuntimeError):
278 # BrokenProcessPool: child OOM, segfault, or other crash.
279 # RuntimeError: executor already shut down (shutdown race).
280 # Either way, the job was already converted from JobRequest
281 # to JobProcess, so re-enqueue it before exiting.
282 logger.warning(
283 "Process pool broken, re-enqueuing job",
284 extra={"job_process_uuid": job_process_uuid},
285 )
286 job.revert_to_job_request()
287 break
288
289 def shutdown(self) -> None:
290 if self._is_shutting_down:
291 # Already shutting down somewhere else
292 return
293
294 logger.info("Job worker shutdown requested")
295 # Just flip the flag — drain happens in _drain_with_heartbeat() so
296 # heartbeats keep firing during it. Blocking the signal handler with
297 # executor.shutdown(wait=True) here would let our row go stale.
298 self._is_shutting_down = True
299
300 def maybe_log_stats(self) -> None:
301 if not self.stats_every:
302 return
303
304 now = time.time()
305
306 if not hasattr(self, "_stats_logged_at"):
307 self._stats_logged_at = now
308
309 if now - self._stats_logged_at > self.stats_every:
310 self._stats_logged_at = now
311 self.log_stats()
312
313 def maybe_check_job_results(self) -> None:
314 now = time.time()
315
316 if not hasattr(self, "_job_results_checked_at"):
317 self._job_results_checked_at = now
318
319 check_every = 60 # Only need to check once a minute
320
321 if now - self._job_results_checked_at > check_every:
322 self._job_results_checked_at = now
323 self.rescue_job_results()
324
325 def _create_heartbeat_row(self) -> None:
326 # Lazy import - see _worker_process_initializer() comment for why
327 from .models import WorkerHeartbeat
328
329 WorkerHeartbeat.query.create(
330 worker_id=self.worker_id,
331 hostname=self._hostname,
332 pid=self._pid,
333 queues=list(self.queues),
334 last_heartbeat_at=timezone.now(),
335 )
336
337 def _refresh_heartbeat(self) -> None:
338 # Lazy import - see _worker_process_initializer() comment for why
339 from .models import WorkerHeartbeat
340
341 updated = WorkerHeartbeat.query.filter(worker_id=self.worker_id).update(
342 last_heartbeat_at=timezone.now()
343 )
344 if not updated:
345 # Row was deleted — registration failed earlier, or another
346 # rescuer claimed us as dead. Recreate so we're discoverable.
347 logger.warning(
348 "Worker heartbeat row missing, re-registering",
349 extra={"worker_id": str(self.worker_id)},
350 )
351 self._create_heartbeat_row()
352
353 def register_heartbeat(self) -> None:
354 try:
355 self._create_heartbeat_row()
356 self._heartbeat_at = time.time()
357 self._heartbeat_registered = True
358 except Exception as e:
359 # Registration failure is non-fatal — maybe_heartbeat will retry.
360 # Until it succeeds, _heartbeat_registered stays False and the run
361 # loop won't claim work.
362 logger.exception(e)
363 logger.warning(
364 "Worker heartbeat registration failed; worker will not claim "
365 "jobs until a heartbeat row is created",
366 extra={"worker_id": str(self.worker_id)},
367 )
368
369 def maybe_heartbeat(self) -> None:
370 now = time.time()
371 if (
372 self._heartbeat_registered
373 and now - self._heartbeat_at < settings.JOBS_HEARTBEAT_INTERVAL
374 ):
375 return
376
377 try:
378 self._refresh_heartbeat()
379 self._heartbeat_at = now
380 self._heartbeat_registered = True
381 except Exception as e:
382 # We don't know if the row exists (the update may have returned 0
383 # and the recreate may have raised). Mark unregistered so the run
384 # loop stops claiming work until the next tick succeeds. If the
385 # DB is unreachable for long enough, our row goes stale and
386 # rescue marks our jobs LOST — that's the intended behavior.
387 self._heartbeat_registered = False
388 logger.exception(e)
389
390 def deregister_heartbeat(self) -> None:
391 # Lazy import - see _worker_process_initializer() comment for why
392 from .models import JobProcess, WorkerHeartbeat
393
394 try:
395 # If any JobProcess rows still reference this worker_id, a
396 # bookkeeping error during drain (e.g. future_finished_callback's
397 # own convert_to_result raised) left them stranded. Don't delete
398 # the heartbeat — let it go stale so rescue can pick them up.
399 if JobProcess.query.filter(worker_id=self.worker_id).exists():
400 logger.warning(
401 "Worker has remaining JobProcess rows at shutdown; "
402 "leaving heartbeat for rescue to claim",
403 extra={"worker_id": str(self.worker_id)},
404 )
405 return
406
407 WorkerHeartbeat.query.filter(worker_id=self.worker_id).delete()
408 except Exception as e:
409 # Best effort. A leftover row will be reclaimed by rescue when its
410 # heartbeat goes stale.
411 logger.exception(e)
412
413 def maybe_schedule_jobs(self) -> None:
414 if not self.jobs_schedule:
415 return
416
417 now = time.time()
418
419 if not hasattr(self, "_jobs_schedule_checked_at"):
420 self._jobs_schedule_checked_at = now
421
422 check_every = 60 # Only need to check once every 60 seconds
423
424 if now - self._jobs_schedule_checked_at > check_every:
425 for job, schedule in self.jobs_schedule:
426 next_start_at = schedule.next()
427
428 # Leverage the concurrency_key to group scheduled jobs
429 # with the same start time
430 schedule_concurrency_key = f"{job.default_concurrency_key()}:scheduled:{int(next_start_at.timestamp())}"
431
432 # Job's should_enqueue hook can control scheduling behavior
433 result = job.run_in_worker(
434 delay=next_start_at,
435 concurrency_key=schedule_concurrency_key,
436 )
437 # Result is None if should_enqueue returned False
438 if result:
439 logger.info(
440 "Scheduling job",
441 extra={
442 "job_class": result.job_class,
443 "job_queue": result.queue,
444 "job_start_at": result.start_at,
445 "job_schedule": schedule,
446 "concurrency_key": result.concurrency_key,
447 },
448 )
449
450 self._jobs_schedule_checked_at = now
451
452 def log_stats(self) -> None:
453 # Lazy import - see _worker_process_initializer() comment for why
454 from .models import JobProcess, JobRequest
455
456 try:
457 num_proccesses = len(self.executor._processes)
458 except (AttributeError, TypeError):
459 # Depending on shutdown timing and internal behavior, this might not work
460 num_proccesses = 0
461
462 jobs_requested = JobRequest.query.filter(queue__in=self.queues).count()
463 jobs_processing = JobProcess.query.filter(queue__in=self.queues).count()
464
465 logger.info(
466 "Job worker stats",
467 extra={
468 "worker_processes": num_proccesses,
469 "worker_queues": ",".join(self.queues),
470 "jobs_requested": jobs_requested,
471 "jobs_processing": jobs_processing,
472 "worker_max_processes": self.max_processes,
473 "worker_max_jobs_per_process": self.max_jobs_per_process,
474 },
475 )
476
477 def rescue_job_results(self) -> None:
478 """Find any lost or failed jobs on this worker's queues and handle them.
479
480 Hooks are dispatched with maybe_heartbeat() interleaved so a slow or
481 large batch of on_aborted hooks can't starve our heartbeat and trigger
482 a false-positive LOST from a peer's rescue tick.
483 """
484 # Lazy import - see _worker_process_initializer() comment for why
485 from .models import JobResult, rescue_stale_workers
486
487 # rescue_stale_workers is global, not queue-scoped — a dead worker's
488 # heartbeat going stale is a global signal, and partial conversion
489 # would strand jobs.
490 global_hooks = rescue_stale_workers()
491 own_hooks = self._rescue_own_orphans()
492 self._dispatch_aborted_hooks(global_hooks + own_hooks)
493 JobResult.query.filter(queue__in=self.queues).retry_failed_jobs()
494
495 def _dispatch_aborted_hooks(self, results: list[JobResult]) -> None:
496 for result in results:
497 self.maybe_heartbeat()
498 result.dispatch_aborted_hook()
499
500 def _rescue_own_orphans(self) -> list[JobResult]:
501 """Convert any of our own JobProcess rows that aren't tracked by a
502 live future to LOST.
503
504 These shouldn't normally exist. The path that creates them: a future
505 completes, future_finished_callback runs, but convert_to_result raises
506 (transient DB error, peer-rescuer constraint conflict, etc.). The
507 exception escapes the callback into concurrent.futures (which logs and
508 moves on), _discard_inflight still fires from the second callback, and
509 the row is left in the DB with no path back — our heartbeat is fresh
510 so rescue_stale_workers won't see it.
511
512 Age threshold avoids racing with newly-claimed rows that haven't been
513 added to _inflight_futures yet (microsecond window between
514 convert_to_job_process and the dict insert in _run_loop).
515
516 Returns JobResults whose on_aborted hook the caller should dispatch.
517 """
518 from .models import JobProcess, JobResultStatuses
519
520 with self._inflight_lock:
521 inflight_uuids = list(self._inflight_futures.values())
522
523 cutoff = timezone.now() - datetime.timedelta(
524 seconds=settings.JOBS_HEARTBEAT_TIMEOUT
525 )
526 stranded = JobProcess.query.filter(
527 worker_id=self.worker_id,
528 created_at__lt=cutoff,
529 ).exclude(uuid__in=inflight_uuids)
530
531 pending_hooks: list[JobResult] = []
532 for orphan in list(stranded):
533 try:
534 result = orphan.convert_to_result(
535 status=JobResultStatuses.LOST,
536 error="JobProcess stranded — done-callback failed during conversion",
537 fire_hook=False,
538 )
539 except Exception:
540 logger.exception(
541 "Failed to rescue own orphan JobProcess",
542 extra={"job_process_uuid": str(orphan.uuid)},
543 )
544 continue
545 pending_hooks.append(result)
546 return pending_hooks
547
548
549def future_finished_callback(job_process_uuid: str, future: Future) -> None:
550 # Lazy import - see _worker_process_initializer() comment for why
551 from .models import JobProcess, JobResultStatuses
552
553 if future.cancelled():
554 logger.warning("Job cancelled", extra={"job_process_uuid": job_process_uuid})
555 try:
556 job = JobProcess.query.get(uuid=job_process_uuid)
557 job.convert_to_result(status=JobResultStatuses.CANCELLED)
558 except JobProcess.DoesNotExist:
559 # Job may have already been cleaned up
560 pass
561 elif exception := future.exception():
562 # Process pool may have been killed (OOM/segfault), or process_job
563 # itself raised past its outer except (e.g. import failure).
564 logger.warning(
565 "Job failed",
566 extra={"job_process_uuid": job_process_uuid},
567 exc_info=exception,
568 )
569 try:
570 job = JobProcess.query.get(uuid=job_process_uuid)
571 # If started_at is set, run() was actively executing when the
572 # process died — user code may have set up state it expected to
573 # tear down. Use LOST so on_aborted fires. If started_at is
574 # unset, run() never got to execute (import failure, etc.), so
575 # ERRORED with no hook is correct.
576 if job.started_at is not None:
577 status = JobResultStatuses.LOST
578 else:
579 status = JobResultStatuses.ERRORED
580 job.convert_to_result(
581 status=status,
582 error="".join(traceback.format_exception(exception)),
583 )
584 except JobProcess.DoesNotExist:
585 # Job may have already been cleaned up
586 pass
587 else:
588 logger.debug("Job finished", extra={"job_process_uuid": job_process_uuid})
589 # Orphan check: process_job's outer except-Exception swallows any
590 # failure that escapes job.run() (middleware crash, OTel error, DB
591 # blip during convert_to_result, etc.). The future completes cleanly
592 # but the JobProcess row was never converted, and since our parent
593 # is still heartbeating, rescue_stale_workers won't see it as orphaned.
594 job = JobProcess.query.filter(uuid=job_process_uuid).first()
595 if job is None:
596 return
597 logger.warning(
598 "Job future completed but JobProcess survived; converting to ERRORED",
599 extra={"job_process_uuid": job_process_uuid},
600 )
601 try:
602 job.convert_to_result(
603 status=JobResultStatuses.ERRORED,
604 error="Job future completed without recording a result",
605 )
606 except Exception:
607 # A peer rescuer may have already created a JobResult(LOST) for
608 # this row, in which case the unique constraint on
609 # JobResult.job_process_uuid trips. Either way, the row is now
610 # accounted for — log and move on rather than letting the
611 # exception escape into the executor's done-callback machinery.
612 logger.exception(
613 "Failed to convert orphan JobProcess to ERRORED",
614 extra={"job_process_uuid": job_process_uuid},
615 )
616
617
618def process_job(job_process_uuid: str) -> None:
619 # Lazy import - see _worker_process_initializer() comment for why
620 from .models import JobProcess
621
622 try:
623 worker_pid = os.getpid()
624
625 try:
626 job_process = JobProcess.query.get(uuid=job_process_uuid)
627 except Exception as e:
628 # The CONSUMER span inside JobProcess.run() is never reached if
629 # the lookup itself fails. Emit one here so the failure has an
630 # entry-span home in OTel (e.g. a psycopg transient on this read
631 # would otherwise leave only a CLIENT span, which entry-span
632 # filtering excludes).
633 with tracer.start_as_current_span(
634 "process job",
635 kind=trace.SpanKind.CONSUMER,
636 ) as span:
637 span.record_exception(e)
638 span.set_status(trace.StatusCode.ERROR)
639 span.set_attribute(ERROR_TYPE, format_exception_type(e))
640 raise
641
642 logger.info(
643 "Executing job",
644 extra={
645 "worker_pid": worker_pid,
646 "job_class": job_process.job_class,
647 "job_request_uuid": job_process.job_request_uuid,
648 "job_priority": job_process.priority,
649 "job_source": job_process.source,
650 "job_queue": job_process.queue,
651 },
652 )
653
654 def middleware_chain(job: JobProcess) -> JobResult:
655 return job.run()
656
657 for middleware_path in reversed(settings.JOBS_MIDDLEWARE):
658 middleware_class = import_string(middleware_path)
659 middleware_instance = middleware_class(middleware_chain)
660 middleware_chain = middleware_instance.process_job
661
662 job_result = middleware_chain(job_process)
663
664 assert job_result.ended_at is not None
665 assert job_result.started_at is not None
666 duration = job_result.ended_at - job_result.started_at
667 duration = duration.total_seconds()
668
669 if job_result.requested_at and job_result.started_at:
670 queue_time = (
671 job_result.started_at - job_result.requested_at
672 ).total_seconds()
673 else:
674 queue_time = None
675
676 logger.info(
677 "Completed job",
678 extra={
679 "worker_pid": worker_pid,
680 "job_class": job_result.job_class,
681 "job_process_uuid": job_result.job_process_uuid,
682 "job_request_uuid": job_result.job_request_uuid,
683 "job_result_uuid": job_result.uuid,
684 "job_priority": job_result.priority,
685 "job_source": job_result.source,
686 "job_queue": job_result.queue,
687 "job_duration": duration,
688 "job_queue_time": queue_time,
689 },
690 )
691 except Exception as e:
692 # Raising exceptions inside the worker process doesn't
693 # seem to be caught/shown anywhere as configured.
694 # So we at least log it out here.
695 # (A job should catch it's own user-code errors, so this is for library errors)
696 logger.exception(e)
697 finally:
698 return_database_connection()
699 gc.collect()