Worker
Process background jobs with a database-driven worker.
from plain.worker import Job
from plain.mail import send_mail
# Create a new job class
class WelcomeUserJob(Job):
def __init__(self, user):
self.user = user
def run(self):
send_mail(
subject="Welcome!",
message=f"Hello from Plain, {self.user}",
from_email="[email protected]",
recipient_list=[self.user.email],
)
# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()
The worker process is run separately using plain worker run
.
Admin
Job history
Scheduled jobs
Monitoring
1from datetime import timedelta
2
3from plain import models
4from plain.admin.cards import Card
5from plain.admin.views import (
6 AdminModelDetailView,
7 AdminModelListView,
8 AdminViewset,
9 register_viewset,
10)
11from plain.http import ResponseRedirect
12from plain.runtime import settings
13
14from .models import Job, JobRequest, JobResult
15
16
17def _td_format(td_object):
18 seconds = int(td_object.total_seconds())
19 periods = [
20 ("year", 60 * 60 * 24 * 365),
21 ("month", 60 * 60 * 24 * 30),
22 ("day", 60 * 60 * 24),
23 ("hour", 60 * 60),
24 ("minute", 60),
25 ("second", 1),
26 ]
27
28 strings = []
29 for period_name, period_seconds in periods:
30 if seconds > period_seconds:
31 period_value, seconds = divmod(seconds, period_seconds)
32 has_s = "s" if period_value > 1 else ""
33 strings.append(f"{period_value} {period_name}{has_s}")
34
35 return ", ".join(strings)
36
37
38class SuccessfulJobsCard(Card):
39 title = "Successful Jobs"
40 text = "View"
41
42 def get_number(self):
43 return JobResult.objects.successful().count()
44
45 def get_link(self):
46 return JobResultViewset.ListView.get_view_url() + "?display=Successful"
47
48
49class ErroredJobsCard(Card):
50 title = "Errored Jobs"
51 text = "View"
52
53 def get_number(self):
54 return JobResult.objects.errored().count()
55
56 def get_link(self):
57 return JobResultViewset.ListView.get_view_url() + "?display=Errored"
58
59
60class LostJobsCard(Card):
61 title = "Lost Jobs"
62 text = "View" # TODO make not required - just an icon?
63
64 def get_description(self):
65 delta = timedelta(seconds=settings.WORKER_JOBS_LOST_AFTER)
66 return f"Jobs are considered lost after {_td_format(delta)}"
67
68 def get_number(self):
69 return JobResult.objects.lost().count()
70
71 def get_link(self):
72 return JobResultViewset.ListView.get_view_url() + "?display=Lost"
73
74
75class RetriedJobsCard(Card):
76 title = "Retried Jobs"
77 text = "View" # TODO make not required - just an icon?
78
79 def get_number(self):
80 return JobResult.objects.retried().count()
81
82 def get_link(self):
83 return JobResultViewset.ListView.get_view_url() + "?display=Retried"
84
85
86class WaitingJobsCard(Card):
87 title = "Waiting Jobs"
88
89 def get_number(self):
90 return Job.objects.waiting().count()
91
92
93class RunningJobsCard(Card):
94 title = "Running Jobs"
95
96 def get_number(self):
97 return Job.objects.running().count()
98
99
100@register_viewset
101class JobRequestViewset(AdminViewset):
102 class ListView(AdminModelListView):
103 nav_section = "Worker"
104 model = JobRequest
105 title = "Job requests"
106 fields = ["id", "job_class", "priority", "created_at", "start_at", "unique_key"]
107 actions = ["Delete"]
108
109 def perform_action(self, action: str, target_pks: list):
110 if action == "Delete":
111 JobRequest.objects.filter(pk__in=target_pks).delete()
112
113 class DetailView(AdminModelDetailView):
114 model = JobRequest
115 title = "Job Request"
116
117
118@register_viewset
119class JobViewset(AdminViewset):
120 class ListView(AdminModelListView):
121 nav_section = "Worker"
122 model = Job
123 fields = [
124 "id",
125 "job_class",
126 "priority",
127 "created_at",
128 "started_at",
129 "unique_key",
130 ]
131 actions = ["Delete"]
132 cards = [
133 WaitingJobsCard,
134 RunningJobsCard,
135 ]
136
137 def perform_action(self, action: str, target_pks: list):
138 if action == "Delete":
139 Job.objects.filter(pk__in=target_pks).delete()
140
141 class DetailView(AdminModelDetailView):
142 model = Job
143
144
145@register_viewset
146class JobResultViewset(AdminViewset):
147 class ListView(AdminModelListView):
148 nav_section = "Worker"
149 model = JobResult
150 title = "Job results"
151 fields = [
152 "id",
153 "job_class",
154 "priority",
155 "created_at",
156 "status",
157 "retried",
158 "is_retry",
159 ]
160 search_fields = [
161 "uuid",
162 "job_uuid",
163 "job_request_uuid",
164 "job_class",
165 ]
166 cards = [
167 SuccessfulJobsCard,
168 ErroredJobsCard,
169 LostJobsCard,
170 RetriedJobsCard,
171 ]
172 filters = [
173 "Successful",
174 "Errored",
175 "Cancelled",
176 "Lost",
177 "Retried",
178 ]
179 actions = [
180 "Retry",
181 ]
182 allow_global_search = False
183
184 def get_description(self):
185 delta = timedelta(seconds=settings.WORKER_JOBS_CLEARABLE_AFTER)
186 return f"Jobs are cleared after {_td_format(delta)}"
187
188 def get_initial_queryset(self):
189 queryset = super().get_initial_queryset()
190 queryset = queryset.annotate(
191 retried=models.Case(
192 models.When(retry_job_request_uuid__isnull=False, then=True),
193 default=False,
194 output_field=models.BooleanField(),
195 ),
196 is_retry=models.Case(
197 models.When(retry_attempt__gt=0, then=True),
198 default=False,
199 output_field=models.BooleanField(),
200 ),
201 )
202 if self.display == "Successful":
203 return queryset.successful()
204 if self.display == "Errored":
205 return queryset.errored()
206 if self.display == "Cancelled":
207 return queryset.cancelled()
208 if self.display == "Lost":
209 return queryset.lost()
210 if self.display == "Retried":
211 return queryset.retried()
212 return queryset
213
214 def get_fields(self):
215 fields = super().get_fields()
216 if self.display == "Retried":
217 fields.append("retries")
218 fields.append("retry_attempt")
219 return fields
220
221 def perform_action(self, action: str, target_pks: list):
222 if action == "Retry":
223 for result in JobResult.objects.filter(pk__in=target_pks):
224 result.retry_job(delay=0)
225 else:
226 raise ValueError("Invalid action")
227
228 class DetailView(AdminModelDetailView):
229 model = JobResult
230 title = "Job result"
231
232 def post(self):
233 self.load_object()
234 self.object.retry_job(delay=0)
235 return ResponseRedirect(".")