1from __future__ import annotations
2
3import datetime
4import inspect
5import logging
6from typing import TYPE_CHECKING, Any
7
8from opentelemetry import trace
9from opentelemetry.semconv._incubating.attributes.code_attributes import (
10 CODE_FILEPATH,
11 CODE_LINENO,
12)
13from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
14 MESSAGING_DESTINATION_NAME,
15 MESSAGING_MESSAGE_ID,
16 MESSAGING_OPERATION_NAME,
17 MESSAGING_OPERATION_TYPE,
18 MESSAGING_SYSTEM,
19 MessagingOperationTypeValues,
20)
21from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
22from opentelemetry.trace import SpanKind, format_span_id, format_trace_id
23
24from plain.models import IntegrityError
25from plain.utils import timezone
26
27from .registry import JobParameters, jobs_registry
28
29if TYPE_CHECKING:
30 from .models import JobProcess, JobRequest
31
32logger = logging.getLogger(__name__)
33tracer = trace.get_tracer("plain.jobs")
34
35
36class JobType(type):
37 """
38 Metaclass allows us to capture the original args/kwargs
39 used to instantiate the job, so we can store them in the database
40 when we schedule the job.
41 """
42
43 def __call__(self, *args: Any, **kwargs: Any) -> Job:
44 instance = super().__call__(*args, **kwargs)
45 instance._init_args = args
46 instance._init_kwargs = kwargs
47 return instance
48
49
50class Job(metaclass=JobType):
51 def run(self) -> None:
52 raise NotImplementedError
53
54 def run_in_worker(
55 self,
56 *,
57 queue: str | None = None,
58 delay: int | datetime.timedelta | datetime.datetime | None = None,
59 priority: int | None = None,
60 retries: int | None = None,
61 retry_attempt: int = 0,
62 unique_key: str | None = None,
63 ) -> JobRequest | list[JobRequest | JobProcess]:
64 from .models import JobRequest
65
66 job_class_name = jobs_registry.get_job_class_name(self.__class__)
67
68 if queue is None:
69 queue = self.get_queue()
70
71 with tracer.start_as_current_span(
72 f"run_in_worker {job_class_name}",
73 kind=SpanKind.PRODUCER,
74 attributes={
75 MESSAGING_SYSTEM: "plain.jobs",
76 MESSAGING_OPERATION_TYPE: MessagingOperationTypeValues.SEND.value,
77 MESSAGING_OPERATION_NAME: "run_in_worker",
78 MESSAGING_DESTINATION_NAME: queue,
79 },
80 ) as span:
81 try:
82 # Try to automatically annotate the source of the job
83 caller = inspect.stack()[1]
84 source = f"{caller.filename}:{caller.lineno}"
85 span.set_attributes(
86 {
87 CODE_FILEPATH: caller.filename,
88 CODE_LINENO: caller.lineno,
89 }
90 )
91 except (IndexError, AttributeError):
92 source = ""
93
94 parameters = JobParameters.to_json(self._init_args, self._init_kwargs)
95
96 if priority is None:
97 priority = self.get_priority()
98
99 if retries is None:
100 retries = self.get_retries()
101
102 if delay is None:
103 start_at = None
104 elif isinstance(delay, int):
105 start_at = timezone.now() + datetime.timedelta(seconds=delay)
106 elif isinstance(delay, datetime.timedelta):
107 start_at = timezone.now() + delay
108 elif isinstance(delay, datetime.datetime):
109 start_at = delay
110 else:
111 raise ValueError(f"Invalid delay: {delay}")
112
113 if unique_key is None:
114 unique_key = self.get_unique_key()
115
116 if unique_key:
117 # Only need to look at in progress jobs
118 # if we also have a unique key.
119 # Otherwise it's up to the user to use _in_progress()
120 if running := self._in_progress(unique_key):
121 span.set_attribute(ERROR_TYPE, "DuplicateJob")
122 return running
123
124 # Is recording is not enough here... because we also record for summaries!
125
126 # Capture current trace context
127 current_span = trace.get_current_span()
128 span_context = current_span.get_span_context()
129
130 # Only include trace context if the span is being recorded (sampled)
131 # This ensures jobs are only linked to traces that are actually being collected
132 if current_span.is_recording() and span_context.is_valid:
133 trace_id = f"0x{format_trace_id(span_context.trace_id)}"
134 span_id = f"0x{format_span_id(span_context.span_id)}"
135 else:
136 trace_id = None
137 span_id = None
138
139 try:
140 job_request = JobRequest(
141 job_class=job_class_name,
142 parameters=parameters,
143 start_at=start_at,
144 source=source,
145 queue=queue,
146 priority=priority,
147 retries=retries,
148 retry_attempt=retry_attempt,
149 unique_key=unique_key,
150 trace_id=trace_id,
151 span_id=span_id,
152 )
153 job_request.save(
154 clean_and_validate=False
155 ) # So IntegrityError is raised on unique instead of potentially confusing ValidationError...
156
157 span.set_attribute(
158 MESSAGING_MESSAGE_ID,
159 str(job_request.uuid),
160 )
161
162 # Add job UUID to current span for bidirectional linking
163 span.set_attribute("job.uuid", str(job_request.uuid))
164 span.set_status(trace.StatusCode.OK)
165
166 return job_request
167 except IntegrityError as e:
168 span.set_attribute(ERROR_TYPE, "IntegrityError")
169 span.set_status(trace.Status(trace.StatusCode.ERROR, "Duplicate job"))
170 logger.warning("Job already in progress: %s", e)
171 # Try to return the _in_progress list again
172 return self._in_progress(unique_key)
173
174 def _in_progress(self, unique_key: str) -> list[JobRequest | JobProcess]:
175 """Get all JobRequests and JobProcess that are currently in progress, regardless of queue."""
176 from .models import JobProcess, JobRequest
177
178 job_class_name = jobs_registry.get_job_class_name(self.__class__)
179
180 job_requests = JobRequest.query.filter(
181 job_class=job_class_name,
182 unique_key=unique_key,
183 )
184
185 jobs = JobProcess.query.filter(
186 job_class=job_class_name,
187 unique_key=unique_key,
188 )
189
190 return list(job_requests) + list(jobs)
191
192 def get_unique_key(self) -> str:
193 """
194 A unique key to prevent duplicate jobs from being queued.
195 Enabled by returning a non-empty string.
196
197 Note that this is not a "once and only once" guarantee, but rather
198 an "at least once" guarantee. Jobs should still be idempotent in case
199 multiple instances are queued in a race condition.
200 """
201 return ""
202
203 def get_queue(self) -> str:
204 return "default"
205
206 def get_priority(self) -> int:
207 """
208 Return the default priority for this job.
209
210 Higher numbers run first: 10 > 5 > 0 > -5 > -10
211 - Use positive numbers for high priority jobs
212 - Use negative numbers for low priority jobs
213 - Default is 0
214 """
215 return 0
216
217 def get_retries(self) -> int:
218 return 0
219
220 def get_retry_delay(self, attempt: int) -> int:
221 """
222 Calculate a delay in seconds before the next retry attempt.
223
224 On the first retry, attempt will be 1.
225 """
226 return 0