Worker
Process background jobs with a database-driven worker.
from plain.worker import Job
from plain.mail import send_mail
# Create a new job class
class WelcomeUserJob(Job):
def __init__(self, user):
self.user = user
def run(self):
send_mail(
subject="Welcome!",
message=f"Hello from Plain, {self.user}",
from_email="[email protected]",
recipient_list=[self.user.email],
)
# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()
The worker process is run separately using plain worker run
.
Staff
Job history
Scheduled jobs
Monitoring
1from datetime import timedelta
2
3from plain import models
4from plain.http import ResponseRedirect
5from plain.runtime import settings
6from plain.staff.cards import Card
7from plain.staff.dates import DatetimeRangeAliases
8from plain.staff.views import (
9 StaffModelDetailView,
10 StaffModelListView,
11 StaffModelViewset,
12 register_viewset,
13)
14
15from .models import Job, JobRequest, JobResult
16
17
18def _td_format(td_object):
19 seconds = int(td_object.total_seconds())
20 periods = [
21 ("year", 60 * 60 * 24 * 365),
22 ("month", 60 * 60 * 24 * 30),
23 ("day", 60 * 60 * 24),
24 ("hour", 60 * 60),
25 ("minute", 60),
26 ("second", 1),
27 ]
28
29 strings = []
30 for period_name, period_seconds in periods:
31 if seconds > period_seconds:
32 period_value, seconds = divmod(seconds, period_seconds)
33 has_s = "s" if period_value > 1 else ""
34 strings.append(f"{period_value} {period_name}{has_s}")
35
36 return ", ".join(strings)
37
38
39class SuccessfulJobsCard(Card):
40 title = "Successful Jobs"
41 text = "View"
42
43 def get_number(self):
44 return (
45 JobResult.objects.successful()
46 .filter(created_at__range=self.datetime_range.as_tuple())
47 .count()
48 )
49
50 def get_link(self):
51 return JobResultViewset.ListView.get_absolute_url() + "?filter=Successful"
52
53
54class ErroredJobsCard(Card):
55 title = "Errored Jobs"
56 text = "View"
57
58 def get_number(self):
59 return (
60 JobResult.objects.errored()
61 .filter(created_at__range=self.datetime_range.as_tuple())
62 .count()
63 )
64
65 def get_link(self):
66 return JobResultViewset.ListView.get_absolute_url() + "?filter=Errored"
67
68
69class LostJobsCard(Card):
70 title = "Lost Jobs"
71 text = "View" # TODO make not required - just an icon?
72
73 def get_description(self):
74 delta = timedelta(seconds=settings.WORKER_JOBS_LOST_AFTER)
75 return f"Jobs are considered lost after {_td_format(delta)}"
76
77 def get_number(self):
78 return (
79 JobResult.objects.lost()
80 .filter(created_at__range=self.datetime_range.as_tuple())
81 .count()
82 )
83
84 def get_link(self):
85 return JobResultViewset.ListView.get_absolute_url() + "?filter=Lost"
86
87
88class RetriedJobsCard(Card):
89 title = "Retried Jobs"
90 text = "View" # TODO make not required - just an icon?
91
92 def get_number(self):
93 return (
94 JobResult.objects.retried()
95 .filter(created_at__range=self.datetime_range.as_tuple())
96 .count()
97 )
98
99 def get_link(self):
100 return JobResultViewset.ListView.get_absolute_url() + "?filter=Retried"
101
102
103class WaitingJobsCard(Card):
104 title = "Waiting Jobs"
105
106 def get_number(self):
107 return Job.objects.waiting().count()
108
109
110class RunningJobsCard(Card):
111 title = "Running Jobs"
112
113 def get_number(self):
114 return Job.objects.running().count()
115
116
117@register_viewset
118class JobRequestViewset(StaffModelViewset):
119 class ListView(StaffModelListView):
120 nav_section = "Worker"
121 model = JobRequest
122 title = "Job requests"
123 fields = ["id", "job_class", "priority", "created_at", "start_at", "unique_key"]
124 actions = ["Delete"]
125
126 def perform_action(self, action: str, target_pks: list):
127 if action == "Delete":
128 JobRequest.objects.filter(pk__in=target_pks).delete()
129
130 class DetailView(StaffModelDetailView):
131 model = JobRequest
132 title = "Job Request"
133
134
135@register_viewset
136class JobViewset(StaffModelViewset):
137 class ListView(StaffModelListView):
138 nav_section = "Worker"
139 model = Job
140 fields = [
141 "id",
142 "job_class",
143 "priority",
144 "created_at",
145 "started_at",
146 "unique_key",
147 ]
148 actions = ["Delete"]
149 cards = [
150 WaitingJobsCard,
151 RunningJobsCard,
152 ]
153
154 def perform_action(self, action: str, target_pks: list):
155 if action == "Delete":
156 Job.objects.filter(pk__in=target_pks).delete()
157
158 class DetailView(StaffModelDetailView):
159 model = Job
160
161
162@register_viewset
163class JobResultViewset(StaffModelViewset):
164 class ListView(StaffModelListView):
165 nav_section = "Worker"
166 model = JobResult
167 title = "Job results"
168 fields = [
169 "id",
170 "job_class",
171 "priority",
172 "created_at",
173 "status",
174 "retried",
175 "is_retry",
176 ]
177 search_fields = [
178 "uuid",
179 "job_uuid",
180 "job_request_uuid",
181 "job_class",
182 ]
183 cards = [
184 SuccessfulJobsCard,
185 ErroredJobsCard,
186 LostJobsCard,
187 RetriedJobsCard,
188 ]
189 filters = [
190 "Successful",
191 "Errored",
192 "Cancelled",
193 "Lost",
194 "Retried",
195 ]
196 actions = [
197 "Retry",
198 ]
199 allow_global_search = False
200 default_datetime_range = DatetimeRangeAliases.LAST_30_DAYS
201
202 def get_description(self):
203 delta = timedelta(seconds=settings.WORKER_JOBS_CLEARABLE_AFTER)
204 return f"Jobs are cleared after {_td_format(delta)}"
205
206 def get_initial_queryset(self):
207 queryset = super().get_initial_queryset()
208 queryset = queryset.annotate(
209 retried=models.Case(
210 models.When(retry_job_request_uuid__isnull=False, then=True),
211 default=False,
212 output_field=models.BooleanField(),
213 ),
214 is_retry=models.Case(
215 models.When(retry_attempt__gt=0, then=True),
216 default=False,
217 output_field=models.BooleanField(),
218 ),
219 )
220 if self.filter == "Successful":
221 return queryset.successful()
222 if self.filter == "Errored":
223 return queryset.errored()
224 if self.filter == "Cancelled":
225 return queryset.cancelled()
226 if self.filter == "Lost":
227 return queryset.lost()
228 if self.filter == "Retried":
229 return queryset.retried()
230 return queryset
231
232 def get_fields(self):
233 fields = super().get_fields()
234 if self.filter == "Retried":
235 fields.append("retries")
236 fields.append("retry_attempt")
237 return fields
238
239 def perform_action(self, action: str, target_pks: list):
240 if action == "Retry":
241 for result in JobResult.objects.filter(pk__in=target_pks):
242 result.retry_job(delay=0)
243 else:
244 raise ValueError("Invalid action")
245
246 class DetailView(StaffModelDetailView):
247 model = JobResult
248 title = "Job result"
249
250 def post(self):
251 self.load_object()
252 self.object.retry_job(delay=0)
253 return ResponseRedirect(".")