1import datetime
2import inspect
3import logging
4
5from plain.models import IntegrityError
6from plain.utils import timezone
7
8from .registry import JobParameters, jobs_registry
9
10logger = logging.getLogger(__name__)
11
12
13class JobType(type):
14 """
15 Metaclass allows us to capture the original args/kwargs
16 used to instantiate the job, so we can store them in the database
17 when we schedule the job.
18 """
19
20 def __call__(self, *args, **kwargs):
21 instance = super().__call__(*args, **kwargs)
22 instance._init_args = args
23 instance._init_kwargs = kwargs
24 return instance
25
26
27class Job(metaclass=JobType):
28 def run(self):
29 raise NotImplementedError
30
31 def run_in_worker(
32 self,
33 *,
34 queue: str | None = None,
35 delay: int | datetime.timedelta | datetime.datetime | None = None,
36 priority: int | None = None,
37 retries: int | None = None,
38 retry_attempt: int = 0,
39 unique_key: str | None = None,
40 ):
41 from .models import JobRequest
42
43 try:
44 # Try to automatically annotate the source of the job
45 caller = inspect.stack()[1]
46 source = f"{caller.filename}:{caller.lineno}"
47 except (IndexError, AttributeError):
48 source = ""
49
50 parameters = JobParameters.to_json(self._init_args, self._init_kwargs)
51
52 if queue is None:
53 queue = self.get_queue()
54
55 if priority is None:
56 priority = self.get_priority()
57
58 if retries is None:
59 retries = self.get_retries()
60
61 if delay is None:
62 start_at = None
63 elif isinstance(delay, int):
64 start_at = timezone.now() + datetime.timedelta(seconds=delay)
65 elif isinstance(delay, datetime.timedelta):
66 start_at = timezone.now() + delay
67 elif isinstance(delay, datetime.datetime):
68 start_at = delay
69 else:
70 raise ValueError(f"Invalid delay: {delay}")
71
72 if unique_key is None:
73 unique_key = self.get_unique_key()
74
75 if unique_key:
76 # Only need to look at in progress jobs
77 # if we also have a unique key.
78 # Otherwise it's up to the user to use _in_progress()
79 if running := self._in_progress(unique_key):
80 return running
81
82 try:
83 job_request = JobRequest(
84 job_class=jobs_registry.get_job_class_name(self.__class__),
85 parameters=parameters,
86 start_at=start_at,
87 source=source,
88 queue=queue,
89 priority=priority,
90 retries=retries,
91 retry_attempt=retry_attempt,
92 unique_key=unique_key,
93 )
94 job_request.save(
95 clean_and_validate=False
96 ) # So IntegrityError is raised on unique instead of potentially confusing ValidationError...
97 return job_request
98 except IntegrityError as e:
99 logger.warning("Job already in progress: %s", e)
100 # Try to return the _in_progress list again
101 return self._in_progress(unique_key)
102
103 def _in_progress(self, unique_key):
104 """Get all JobRequests and Jobs that are currently in progress, regardless of queue."""
105 from .models import Job, JobRequest
106
107 job_class_name = jobs_registry.get_job_class_name(self.__class__)
108
109 job_requests = JobRequest.objects.filter(
110 job_class=job_class_name,
111 unique_key=unique_key,
112 )
113
114 jobs = Job.objects.filter(
115 job_class=job_class_name,
116 unique_key=unique_key,
117 )
118
119 return list(job_requests) + list(jobs)
120
121 def get_unique_key(self) -> str:
122 """
123 A unique key to prevent duplicate jobs from being queued.
124 Enabled by returning a non-empty string.
125
126 Note that this is not a "once and only once" guarantee, but rather
127 an "at least once" guarantee. Jobs should still be idempotent in case
128 multiple instances are queued in a race condition.
129 """
130 return ""
131
132 def get_queue(self) -> str:
133 return "default"
134
135 def get_priority(self) -> int:
136 return 0
137
138 def get_retries(self) -> int:
139 return 0
140
141 def get_retry_delay(self, attempt: int) -> int:
142 """
143 Calculate a delay in seconds before the next retry attempt.
144
145 On the first retry, attempt will be 1.
146 """
147 return 0