Plain is headed towards 1.0! Subscribe for development updates →

  1import datetime
  2import inspect
  3import logging
  4
  5from plain.models import IntegrityError
  6from plain.utils import timezone
  7
  8from .registry import JobParameters, jobs_registry
  9
 10logger = logging.getLogger(__name__)
 11
 12
 13class JobType(type):
 14    """
 15    Metaclass allows us to capture the original args/kwargs
 16    used to instantiate the job, so we can store them in the database
 17    when we schedule the job.
 18    """
 19
 20    def __call__(self, *args, **kwargs):
 21        instance = super().__call__(*args, **kwargs)
 22        instance._init_args = args
 23        instance._init_kwargs = kwargs
 24        return instance
 25
 26
 27class Job(metaclass=JobType):
 28    def run(self):
 29        raise NotImplementedError
 30
 31    def run_in_worker(
 32        self,
 33        *,
 34        queue: str | None = None,
 35        delay: int | datetime.timedelta | datetime.datetime | None = None,
 36        priority: int | None = None,
 37        retries: int | None = None,
 38        retry_attempt: int = 0,
 39        unique_key: str | None = None,
 40    ):
 41        from .models import JobRequest
 42
 43        try:
 44            # Try to automatically annotate the source of the job
 45            caller = inspect.stack()[1]
 46            source = f"{caller.filename}:{caller.lineno}"
 47        except (IndexError, AttributeError):
 48            source = ""
 49
 50        parameters = JobParameters.to_json(self._init_args, self._init_kwargs)
 51
 52        if queue is None:
 53            queue = self.get_queue()
 54
 55        if priority is None:
 56            priority = self.get_priority()
 57
 58        if retries is None:
 59            retries = self.get_retries()
 60
 61        if delay is None:
 62            start_at = None
 63        elif isinstance(delay, int):
 64            start_at = timezone.now() + datetime.timedelta(seconds=delay)
 65        elif isinstance(delay, datetime.timedelta):
 66            start_at = timezone.now() + delay
 67        elif isinstance(delay, datetime.datetime):
 68            start_at = delay
 69        else:
 70            raise ValueError(f"Invalid delay: {delay}")
 71
 72        if unique_key is None:
 73            unique_key = self.get_unique_key()
 74
 75        if unique_key:
 76            # Only need to look at in progress jobs
 77            # if we also have a unique key.
 78            # Otherwise it's up to the user to use _in_progress()
 79            if running := self._in_progress(unique_key):
 80                return running
 81
 82        try:
 83            job_request = JobRequest(
 84                job_class=jobs_registry.get_job_class_name(self.__class__),
 85                parameters=parameters,
 86                start_at=start_at,
 87                source=source,
 88                queue=queue,
 89                priority=priority,
 90                retries=retries,
 91                retry_attempt=retry_attempt,
 92                unique_key=unique_key,
 93            )
 94            job_request.save(
 95                clean_and_validate=False
 96            )  # So IntegrityError is raised on unique instead of potentially confusing ValidationError...
 97            return job_request
 98        except IntegrityError as e:
 99            logger.warning("Job already in progress: %s", e)
100            # Try to return the _in_progress list again
101            return self._in_progress(unique_key)
102
103    def _in_progress(self, unique_key):
104        """Get all JobRequests and Jobs that are currently in progress, regardless of queue."""
105        from .models import Job, JobRequest
106
107        job_class_name = jobs_registry.get_job_class_name(self.__class__)
108
109        job_requests = JobRequest.objects.filter(
110            job_class=job_class_name,
111            unique_key=unique_key,
112        )
113
114        jobs = Job.objects.filter(
115            job_class=job_class_name,
116            unique_key=unique_key,
117        )
118
119        return list(job_requests) + list(jobs)
120
121    def get_unique_key(self) -> str:
122        """
123        A unique key to prevent duplicate jobs from being queued.
124        Enabled by returning a non-empty string.
125
126        Note that this is not a "once and only once" guarantee, but rather
127        an "at least once" guarantee. Jobs should still be idempotent in case
128        multiple instances are queued in a race condition.
129        """
130        return ""
131
132    def get_queue(self) -> str:
133        return "default"
134
135    def get_priority(self) -> int:
136        return 0
137
138    def get_retries(self) -> int:
139        return 0
140
141    def get_retry_delay(self, attempt: int) -> int:
142        """
143        Calculate a delay in seconds before the next retry attempt.
144
145        On the first retry, attempt will be 1.
146        """
147        return 0