Plain is headed towards 1.0! Subscribe for development updates →

Worker

Process background jobs with a database-driven worker.

from plain.worker import Job
from plain.mail import send_mail

# Create a new job class
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_mail(
            subject="Welcome!",
            message=f"Hello from Plain, {self.user}",
            from_email="[email protected]",
            recipient_list=[self.user.email],
        )


# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()

The worker process is run separately using plain worker run.

Staff

Job history

Scheduled jobs

Monitoring

  1import datetime
  2import inspect
  3import logging
  4from importlib import import_module
  5
  6from plain.models import IntegrityError, Model
  7from plain.utils import timezone
  8
  9logger = logging.getLogger(__name__)
 10
 11
 12def load_job(job_class_path, parameters):
 13    module_path, class_name = job_class_path.rsplit(".", 1)
 14    module = import_module(module_path)
 15    job_class = getattr(module, class_name)
 16    args, kwargs = JobParameters.from_json(parameters)
 17    return job_class(*args, **kwargs)
 18
 19
 20class JobParameters:
 21    @staticmethod
 22    def to_json(args, kwargs):
 23        serialized_args = []
 24        for arg in args:
 25            if isinstance(arg, Model):
 26                serialized_args.append(ModelInstanceParameter.from_instance(arg))
 27            else:
 28                serialized_args.append(arg)
 29
 30        serialized_kwargs = {}
 31        for key, value in kwargs.items():
 32            if isinstance(value, Model):
 33                serialized_kwargs[key] = ModelInstanceParameter.from_instance(value)
 34            else:
 35                serialized_kwargs[key] = value
 36
 37        return {"args": serialized_args, "kwargs": serialized_kwargs}
 38
 39    @staticmethod
 40    def from_json(data):
 41        args = []
 42        for arg in data["args"]:
 43            if ModelInstanceParameter.is_gid(arg):
 44                args.append(ModelInstanceParameter.to_instance(arg))
 45            else:
 46                args.append(arg)
 47
 48        kwargs = {}
 49        for key, value in data["kwargs"].items():
 50            if ModelInstanceParameter.is_gid(value):
 51                kwargs[key] = ModelInstanceParameter.to_instance(value)
 52            else:
 53                kwargs[key] = value
 54
 55        return args, kwargs
 56
 57
 58class ModelInstanceParameter:
 59    """
 60    A string representation of a model instance,
 61    so we can convert a single parameter (model instance itself)
 62    into a string that can be serialized and stored in the database.
 63    """
 64
 65    @staticmethod
 66    def from_instance(instance):
 67        return f"gid://{instance._meta.package_label}/{instance._meta.model_name}/{instance.pk}"
 68
 69    @staticmethod
 70    def to_instance(s):
 71        if not s.startswith("gid://"):
 72            raise ValueError("Invalid ModelInstanceParameter string")
 73        package, model, pk = s[6:].split("/")
 74        from plain.packages import packages
 75
 76        model = packages.get_model(package, model)
 77        return model.objects.get(pk=pk)
 78
 79    @staticmethod
 80    def is_gid(x):
 81        if not isinstance(x, str):
 82            return False
 83        return x.startswith("gid://")
 84
 85
 86class JobType(type):
 87    """
 88    Metaclass allows us to capture the original args/kwargs
 89    used to instantiate the job, so we can store them in the database
 90    when we schedule the job.
 91    """
 92
 93    def __call__(self, *args, **kwargs):
 94        instance = super().__call__(*args, **kwargs)
 95        instance._init_args = args
 96        instance._init_kwargs = kwargs
 97        return instance
 98
 99
100class Job(metaclass=JobType):
101    def run(self):
102        raise NotImplementedError
103
104    def run_in_worker(
105        self,
106        *,
107        queue: str | None = None,
108        delay: int | datetime.timedelta | datetime.datetime | None = None,
109        priority: int | None = None,
110        retries: int | None = None,
111        retry_attempt: int = 0,
112        unique_key: str | None = None,
113    ):
114        from .models import JobRequest
115
116        try:
117            # Try to automatically annotate the source of the job
118            caller = inspect.stack()[1]
119            source = f"{caller.filename}:{caller.lineno}"
120        except (IndexError, AttributeError):
121            source = ""
122
123        parameters = JobParameters.to_json(self._init_args, self._init_kwargs)
124
125        if queue is None:
126            queue = self.get_queue()
127
128        if priority is None:
129            priority = self.get_priority()
130
131        if retries is None:
132            retries = self.get_retries()
133
134        if delay is None:
135            start_at = None
136        elif isinstance(delay, int):
137            start_at = timezone.now() + datetime.timedelta(seconds=delay)
138        elif isinstance(delay, datetime.timedelta):
139            start_at = timezone.now() + delay
140        elif isinstance(delay, datetime.datetime):
141            start_at = delay
142        else:
143            raise ValueError(f"Invalid delay: {delay}")
144
145        if unique_key is None:
146            unique_key = self.get_unique_key()
147
148        if unique_key:
149            # Only need to look at in progress jobs
150            # if we also have a unique key.
151            # Otherwise it's up to the user to use _in_progress()
152            if running := self._in_progress(unique_key):
153                return running
154
155        try:
156            job_request = JobRequest(
157                job_class=self._job_class_str(),
158                parameters=parameters,
159                start_at=start_at,
160                source=source,
161                queue=queue,
162                priority=priority,
163                retries=retries,
164                retry_attempt=retry_attempt,
165                unique_key=unique_key,
166            )
167            job_request.save(
168                clean_and_validate=False
169            )  # So IntegrityError is raised on unique instead of potentially confusing ValidationError...
170            return job_request
171        except IntegrityError as e:
172            logger.warning("Job already in progress: %s", e)
173            # Try to return the _in_progress list again
174            return self._in_progress(unique_key)
175
176    def _job_class_str(self):
177        return f"{self.__module__}.{self.__class__.__name__}"
178
179    def _in_progress(self, unique_key):
180        """Get all JobRequests and Jobs that are currently in progress, regardless of queue."""
181        from .models import Job, JobRequest
182
183        job_class = self._job_class_str()
184
185        job_requests = JobRequest.objects.filter(
186            job_class=job_class,
187            unique_key=unique_key,
188        )
189
190        jobs = Job.objects.filter(
191            job_class=job_class,
192            unique_key=unique_key,
193        )
194
195        return list(job_requests) + list(jobs)
196
197    def get_unique_key(self) -> str:
198        """
199        A unique key to prevent duplicate jobs from being queued.
200        Enabled by returning a non-empty string.
201
202        Note that this is not a "once and only once" guarantee, but rather
203        an "at least once" guarantee. Jobs should still be idempotent in case
204        multiple instances are queued in a race condition.
205        """
206        return ""
207
208    def get_queue(self) -> str:
209        return "default"
210
211    def get_priority(self) -> int:
212        return 0
213
214    def get_retries(self) -> int:
215        return 0
216
217    def get_retry_delay(self, attempt: int) -> int:
218        """
219        Calcluate a delay in seconds before the next retry attempt.
220
221        On the first retry, attempt will be 1.
222        """
223        return 0