Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import datetime
  4import inspect
  5import logging
  6from typing import TYPE_CHECKING, Any
  7
  8from opentelemetry import trace
  9from opentelemetry.semconv._incubating.attributes.code_attributes import (
 10    CODE_FILEPATH,
 11    CODE_LINENO,
 12)
 13from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
 14    MESSAGING_DESTINATION_NAME,
 15    MESSAGING_MESSAGE_ID,
 16    MESSAGING_OPERATION_NAME,
 17    MESSAGING_OPERATION_TYPE,
 18    MESSAGING_SYSTEM,
 19    MessagingOperationTypeValues,
 20)
 21from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
 22from opentelemetry.trace import SpanKind, format_span_id, format_trace_id
 23
 24from plain.models import IntegrityError
 25from plain.utils import timezone
 26
 27from .registry import JobParameters, jobs_registry
 28
 29if TYPE_CHECKING:
 30    from .models import JobProcess, JobRequest
 31
 32logger = logging.getLogger(__name__)
 33tracer = trace.get_tracer("plain.jobs")
 34
 35
 36class JobType(type):
 37    """
 38    Metaclass allows us to capture the original args/kwargs
 39    used to instantiate the job, so we can store them in the database
 40    when we schedule the job.
 41    """
 42
 43    def __call__(self, *args: Any, **kwargs: Any) -> Job:
 44        instance = super().__call__(*args, **kwargs)
 45        instance._init_args = args
 46        instance._init_kwargs = kwargs
 47        return instance
 48
 49
 50class Job(metaclass=JobType):
 51    def run(self) -> None:
 52        raise NotImplementedError
 53
 54    def run_in_worker(
 55        self,
 56        *,
 57        queue: str | None = None,
 58        delay: int | datetime.timedelta | datetime.datetime | None = None,
 59        priority: int | None = None,
 60        retries: int | None = None,
 61        retry_attempt: int = 0,
 62        unique_key: str | None = None,
 63    ) -> JobRequest | list[JobRequest | JobProcess]:
 64        from .models import JobRequest
 65
 66        job_class_name = jobs_registry.get_job_class_name(self.__class__)
 67
 68        if queue is None:
 69            queue = self.get_queue()
 70
 71        with tracer.start_as_current_span(
 72            f"run_in_worker {job_class_name}",
 73            kind=SpanKind.PRODUCER,
 74            attributes={
 75                MESSAGING_SYSTEM: "plain.jobs",
 76                MESSAGING_OPERATION_TYPE: MessagingOperationTypeValues.SEND.value,
 77                MESSAGING_OPERATION_NAME: "run_in_worker",
 78                MESSAGING_DESTINATION_NAME: queue,
 79            },
 80        ) as span:
 81            try:
 82                # Try to automatically annotate the source of the job
 83                caller = inspect.stack()[1]
 84                source = f"{caller.filename}:{caller.lineno}"
 85                span.set_attributes(
 86                    {
 87                        CODE_FILEPATH: caller.filename,
 88                        CODE_LINENO: caller.lineno,
 89                    }
 90                )
 91            except (IndexError, AttributeError):
 92                source = ""
 93
 94            parameters = JobParameters.to_json(self._init_args, self._init_kwargs)
 95
 96            if priority is None:
 97                priority = self.get_priority()
 98
 99            if retries is None:
100                retries = self.get_retries()
101
102            if delay is None:
103                start_at = None
104            elif isinstance(delay, int):
105                start_at = timezone.now() + datetime.timedelta(seconds=delay)
106            elif isinstance(delay, datetime.timedelta):
107                start_at = timezone.now() + delay
108            elif isinstance(delay, datetime.datetime):
109                start_at = delay
110            else:
111                raise ValueError(f"Invalid delay: {delay}")
112
113            if unique_key is None:
114                unique_key = self.get_unique_key()
115
116            if unique_key:
117                # Only need to look at in progress jobs
118                # if we also have a unique key.
119                # Otherwise it's up to the user to use _in_progress()
120                if running := self._in_progress(unique_key):
121                    span.set_attribute(ERROR_TYPE, "DuplicateJob")
122                    return running
123
124            # Is recording is not enough here... because we also record for summaries!
125
126            # Capture current trace context
127            current_span = trace.get_current_span()
128            span_context = current_span.get_span_context()
129
130            # Only include trace context if the span is being recorded (sampled)
131            # This ensures jobs are only linked to traces that are actually being collected
132            if current_span.is_recording() and span_context.is_valid:
133                trace_id = f"0x{format_trace_id(span_context.trace_id)}"
134                span_id = f"0x{format_span_id(span_context.span_id)}"
135            else:
136                trace_id = None
137                span_id = None
138
139            try:
140                job_request = JobRequest(
141                    job_class=job_class_name,
142                    parameters=parameters,
143                    start_at=start_at,
144                    source=source,
145                    queue=queue,
146                    priority=priority,
147                    retries=retries,
148                    retry_attempt=retry_attempt,
149                    unique_key=unique_key,
150                    trace_id=trace_id,
151                    span_id=span_id,
152                )
153                job_request.save(
154                    clean_and_validate=False
155                )  # So IntegrityError is raised on unique instead of potentially confusing ValidationError...
156
157                span.set_attribute(
158                    MESSAGING_MESSAGE_ID,
159                    str(job_request.uuid),
160                )
161
162                # Add job UUID to current span for bidirectional linking
163                span.set_attribute("job.uuid", str(job_request.uuid))
164                span.set_status(trace.StatusCode.OK)
165
166                return job_request
167            except IntegrityError as e:
168                span.set_attribute(ERROR_TYPE, "IntegrityError")
169                span.set_status(trace.Status(trace.StatusCode.ERROR, "Duplicate job"))
170                logger.warning("Job already in progress: %s", e)
171                # Try to return the _in_progress list again
172                return self._in_progress(unique_key)
173
174    def _in_progress(self, unique_key: str) -> list[JobRequest | JobProcess]:
175        """Get all JobRequests and JobProcess that are currently in progress, regardless of queue."""
176        from .models import JobProcess, JobRequest
177
178        job_class_name = jobs_registry.get_job_class_name(self.__class__)
179
180        job_requests = JobRequest.query.filter(
181            job_class=job_class_name,
182            unique_key=unique_key,
183        )
184
185        jobs = JobProcess.query.filter(
186            job_class=job_class_name,
187            unique_key=unique_key,
188        )
189
190        return list(job_requests) + list(jobs)
191
192    def get_unique_key(self) -> str:
193        """
194        A unique key to prevent duplicate jobs from being queued.
195        Enabled by returning a non-empty string.
196
197        Note that this is not a "once and only once" guarantee, but rather
198        an "at least once" guarantee. Jobs should still be idempotent in case
199        multiple instances are queued in a race condition.
200        """
201        return ""
202
203    def get_queue(self) -> str:
204        return "default"
205
206    def get_priority(self) -> int:
207        """
208        Return the default priority for this job.
209
210        Higher numbers run first: 10 > 5 > 0 > -5 > -10
211        - Use positive numbers for high priority jobs
212        - Use negative numbers for low priority jobs
213        - Default is 0
214        """
215        return 0
216
217    def get_retries(self) -> int:
218        return 0
219
220    def get_retry_delay(self, attempt: int) -> int:
221        """
222        Calculate a delay in seconds before the next retry attempt.
223
224        On the first retry, attempt will be 1.
225        """
226        return 0