Worker
Process background jobs with a database-driven worker.
from plain.worker import Job
from plain.mail import send_mail
# Create a new job class
class WelcomeUserJob(Job):
def __init__(self, user):
self.user = user
def run(self):
send_mail(
subject="Welcome!",
message=f"Hello from Plain, {self.user}",
from_email="[email protected]",
recipient_list=[self.user.email],
)
# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()
The worker process is run separately using plain worker run
.
Staff
Job history
Scheduled jobs
Monitoring
1import datetime
2import inspect
3import logging
4from importlib import import_module
5
6from plain.models import IntegrityError, Model
7from plain.utils import timezone
8
9logger = logging.getLogger(__name__)
10
11
12def load_job(job_class_path, parameters):
13 module_path, class_name = job_class_path.rsplit(".", 1)
14 module = import_module(module_path)
15 job_class = getattr(module, class_name)
16 args, kwargs = JobParameters.from_json(parameters)
17 return job_class(*args, **kwargs)
18
19
20class JobParameters:
21 @staticmethod
22 def to_json(args, kwargs):
23 serialized_args = []
24 for arg in args:
25 if isinstance(arg, Model):
26 serialized_args.append(ModelInstanceParameter.from_instance(arg))
27 else:
28 serialized_args.append(arg)
29
30 serialized_kwargs = {}
31 for key, value in kwargs.items():
32 if isinstance(value, Model):
33 serialized_kwargs[key] = ModelInstanceParameter.from_instance(value)
34 else:
35 serialized_kwargs[key] = value
36
37 return {"args": serialized_args, "kwargs": serialized_kwargs}
38
39 @staticmethod
40 def from_json(data):
41 args = []
42 for arg in data["args"]:
43 if ModelInstanceParameter.is_gid(arg):
44 args.append(ModelInstanceParameter.to_instance(arg))
45 else:
46 args.append(arg)
47
48 kwargs = {}
49 for key, value in data["kwargs"].items():
50 if ModelInstanceParameter.is_gid(value):
51 kwargs[key] = ModelInstanceParameter.to_instance(value)
52 else:
53 kwargs[key] = value
54
55 return args, kwargs
56
57
58class ModelInstanceParameter:
59 """
60 A string representation of a model instance,
61 so we can convert a single parameter (model instance itself)
62 into a string that can be serialized and stored in the database.
63 """
64
65 @staticmethod
66 def from_instance(instance):
67 return f"gid://{instance._meta.package_label}/{instance._meta.model_name}/{instance.pk}"
68
69 @staticmethod
70 def to_instance(s):
71 if not s.startswith("gid://"):
72 raise ValueError("Invalid ModelInstanceParameter string")
73 package, model, pk = s[6:].split("/")
74 from plain.packages import packages
75
76 model = packages.get_model(package, model)
77 return model.objects.get(pk=pk)
78
79 @staticmethod
80 def is_gid(x):
81 if not isinstance(x, str):
82 return False
83 return x.startswith("gid://")
84
85
86class JobType(type):
87 """
88 Metaclass allows us to capture the original args/kwargs
89 used to instantiate the job, so we can store them in the database
90 when we schedule the job.
91 """
92
93 def __call__(self, *args, **kwargs):
94 instance = super().__call__(*args, **kwargs)
95 instance._init_args = args
96 instance._init_kwargs = kwargs
97 return instance
98
99
100class Job(metaclass=JobType):
101 def run(self):
102 raise NotImplementedError
103
104 def run_in_worker(
105 self,
106 *,
107 queue: str | None = None,
108 delay: int | datetime.timedelta | datetime.datetime | None = None,
109 priority: int | None = None,
110 retries: int | None = None,
111 retry_attempt: int = 0,
112 unique_key: str | None = None,
113 ):
114 from .models import JobRequest
115
116 try:
117 # Try to automatically annotate the source of the job
118 caller = inspect.stack()[1]
119 source = f"{caller.filename}:{caller.lineno}"
120 except (IndexError, AttributeError):
121 source = ""
122
123 parameters = JobParameters.to_json(self._init_args, self._init_kwargs)
124
125 if queue is None:
126 queue = self.get_queue()
127
128 if priority is None:
129 priority = self.get_priority()
130
131 if retries is None:
132 retries = self.get_retries()
133
134 if delay is None:
135 start_at = None
136 elif isinstance(delay, int):
137 start_at = timezone.now() + datetime.timedelta(seconds=delay)
138 elif isinstance(delay, datetime.timedelta):
139 start_at = timezone.now() + delay
140 elif isinstance(delay, datetime.datetime):
141 start_at = delay
142 else:
143 raise ValueError(f"Invalid delay: {delay}")
144
145 if unique_key is None:
146 unique_key = self.get_unique_key()
147
148 if unique_key:
149 # Only need to look at in progress jobs
150 # if we also have a unique key.
151 # Otherwise it's up to the user to use _in_progress()
152 if running := self._in_progress(unique_key):
153 return running
154
155 try:
156 job_request = JobRequest(
157 job_class=self._job_class_str(),
158 parameters=parameters,
159 start_at=start_at,
160 source=source,
161 queue=queue,
162 priority=priority,
163 retries=retries,
164 retry_attempt=retry_attempt,
165 unique_key=unique_key,
166 )
167 job_request.save(
168 clean_and_validate=False
169 ) # So IntegrityError is raised on unique instead of potentially confusing ValidationError...
170 return job_request
171 except IntegrityError as e:
172 logger.warning("Job already in progress: %s", e)
173 # Try to return the _in_progress list again
174 return self._in_progress(unique_key)
175
176 def _job_class_str(self):
177 return f"{self.__module__}.{self.__class__.__name__}"
178
179 def _in_progress(self, unique_key):
180 """Get all JobRequests and Jobs that are currently in progress, regardless of queue."""
181 from .models import Job, JobRequest
182
183 job_class = self._job_class_str()
184
185 job_requests = JobRequest.objects.filter(
186 job_class=job_class,
187 unique_key=unique_key,
188 )
189
190 jobs = Job.objects.filter(
191 job_class=job_class,
192 unique_key=unique_key,
193 )
194
195 return list(job_requests) + list(jobs)
196
197 def get_unique_key(self) -> str:
198 """
199 A unique key to prevent duplicate jobs from being queued.
200 Enabled by returning a non-empty string.
201
202 Note that this is not a "once and only once" guarantee, but rather
203 an "at least once" guarantee. Jobs should still be idempotent in case
204 multiple instances are queued in a race condition.
205 """
206 return ""
207
208 def get_queue(self) -> str:
209 return "default"
210
211 def get_priority(self) -> int:
212 return 0
213
214 def get_retries(self) -> int:
215 return 0
216
217 def get_retry_delay(self, attempt: int) -> int:
218 """
219 Calcluate a delay in seconds before the next retry attempt.
220
221 On the first retry, attempt will be 1.
222 """
223 return 0