1from __future__ import annotations
2
3from abc import ABC, abstractmethod
4from collections.abc import Callable
5from typing import TYPE_CHECKING
6
7from plain.logs import app_logger
8
9if TYPE_CHECKING:
10 from .models import JobProcess, JobResult
11
12
13class JobMiddleware(ABC):
14 """
15 Abstract base class for job middleware.
16
17 Subclasses must implement process_job() to handle the job execution cycle.
18
19 Example:
20 class MyJobMiddleware(JobMiddleware):
21 def process_job(self, job: JobProcess) -> JobResult:
22 # Pre-processing
23 result = self.run_job(job)
24 # Post-processing
25 return result
26 """
27
28 def __init__(self, run_job: Callable[[JobProcess], JobResult]):
29 self.run_job = run_job
30
31 @abstractmethod
32 def process_job(self, job: JobProcess) -> JobResult:
33 """Process the job and return a result. Must be implemented by subclasses."""
34 ...
35
36
37class AppLoggerMiddleware(JobMiddleware):
38 def process_job(self, job: JobProcess) -> JobResult:
39 with app_logger.include_context(
40 job_request_uuid=str(job.job_request_uuid), job_process_uuid=str(job.uuid)
41 ):
42 return self.run_job(job)