1from __future__ import annotations
2
3from datetime import timedelta
4
5from plain import postgres
6from plain.admin.cards import Card, TrendCard
7from plain.admin.views import (
8 AdminModelDetailView,
9 AdminModelListView,
10 AdminViewset,
11 register_viewset,
12)
13from plain.http import RedirectResponse
14from plain.postgres.expressions import Case, When
15from plain.runtime import settings
16
17from .models import (
18 JobProcess,
19 JobRequest,
20 JobResult,
21 JobResultQuerySet,
22 WorkerHeartbeat,
23 heartbeat_cutoff,
24)
25
26
27def _td_format(td_object: timedelta) -> str:
28 seconds = int(td_object.total_seconds())
29 periods = [
30 ("year", 60 * 60 * 24 * 365),
31 ("month", 60 * 60 * 24 * 30),
32 ("day", 60 * 60 * 24),
33 ("hour", 60 * 60),
34 ("minute", 60),
35 ("second", 1),
36 ]
37
38 strings = []
39 for period_name, period_seconds in periods:
40 if seconds > period_seconds:
41 period_value, seconds = divmod(seconds, period_seconds)
42 has_s = "s" if period_value > 1 else ""
43 strings.append(f"{period_value} {period_name}{has_s}")
44
45 return ", ".join(strings)
46
47
48class JobResultsTrendCard(TrendCard):
49 title = "Results trend"
50 model = JobResult
51 datetime_field = "created_at"
52 size = TrendCard.Sizes.FULL
53 group_field = "status"
54 group_labels = {
55 "SUCCESSFUL": "Successful",
56 "ERRORED": "Errored",
57 "CANCELLED": "Cancelled",
58 "DEFERRED": "Deferred",
59 "LOST": "Lost",
60 }
61 group_colors = {
62 "SUCCESSFUL": "var(--success)",
63 "ERRORED": "var(--danger)",
64 "CANCELLED": "var(--muted-foreground)",
65 "DEFERRED": "var(--info)",
66 "LOST": "var(--warning)",
67 }
68
69
70class SuccessfulJobsCard(Card):
71 title = "Successful"
72 text = "View"
73
74 def get_metric(self) -> int:
75 return JobResult.query.successful().count()
76
77 def get_link(self) -> str:
78 return JobResultViewset.ListView.get_view_url() + "?display=Successful"
79
80
81class ErroredJobsCard(Card):
82 title = "Errored"
83 text = "View"
84
85 def get_metric(self) -> int:
86 return JobResult.query.errored().count()
87
88 def get_link(self) -> str:
89 return JobResultViewset.ListView.get_view_url() + "?display=Errored"
90
91
92class LostJobsCard(Card):
93 title = "Lost"
94 text = "View" # TODO make not required - just an icon?
95
96 def get_description(self) -> str:
97 delta = timedelta(seconds=settings.JOBS_HEARTBEAT_TIMEOUT)
98 return (
99 f"Jobs are considered lost when their worker stops heartbeating "
100 f"for more than {_td_format(delta)}"
101 )
102
103 def get_metric(self) -> int:
104 return JobResult.query.lost().count()
105
106 def get_link(self) -> str:
107 return JobResultViewset.ListView.get_view_url() + "?display=Lost"
108
109
110class RetriedJobsCard(Card):
111 title = "Retried"
112 text = "View" # TODO make not required - just an icon?
113
114 def get_metric(self) -> int:
115 return JobResult.query.retried().count()
116
117 def get_link(self) -> str:
118 return JobResultViewset.ListView.get_view_url() + "?display=Retried"
119
120
121class WaitingJobsCard(Card):
122 title = "Waiting"
123
124 def get_metric(self) -> int:
125 return JobProcess.query.waiting().count()
126
127
128class RunningJobsCard(Card):
129 title = "Running"
130
131 def get_metric(self) -> int:
132 return JobProcess.query.running().count()
133
134
135class ActiveWorkersCard(Card):
136 title = "Active workers"
137 text = "View"
138
139 def get_description(self) -> str:
140 delta = timedelta(seconds=settings.JOBS_HEARTBEAT_TIMEOUT)
141 return f"Workers whose heartbeat is within the last {_td_format(delta)}."
142
143 def get_metric(self) -> int:
144 return WorkerHeartbeat.query.filter(
145 last_heartbeat_at__gte=heartbeat_cutoff()
146 ).count()
147
148 def get_link(self) -> str:
149 return WorkerHeartbeatViewset.ListView.get_view_url()
150
151
152class StaleWorkersCard(Card):
153 title = "Stale workers"
154 text = "View"
155
156 def get_description(self) -> str:
157 delta = timedelta(seconds=settings.JOBS_HEARTBEAT_TIMEOUT)
158 return (
159 f"Workers whose heartbeat is older than {_td_format(delta)}. "
160 f"Their in-flight jobs are about to be rescued as Lost."
161 )
162
163 def get_metric(self) -> int:
164 return WorkerHeartbeat.query.filter(
165 last_heartbeat_at__lt=heartbeat_cutoff()
166 ).count()
167
168 def get_link(self) -> str:
169 return WorkerHeartbeatViewset.ListView.get_view_url() + "?display=Stale"
170
171
172@register_viewset
173class JobRequestViewset(AdminViewset):
174 class ListView(AdminModelListView):
175 nav_section = "Jobs"
176 nav_icon = "inbox"
177 model = JobRequest
178 title = "Requests"
179 description = "Jobs waiting to be picked up by a worker."
180 fields = [
181 "id",
182 "job_class",
183 "priority",
184 "created_at",
185 "start_at",
186 "concurrency_key",
187 ]
188 actions = ["Delete"]
189 queryset_order = ["-priority", "-start_at", "-created_at"]
190
191 def perform_action(self, action: str, target_ids: list[int]) -> None:
192 if action == "Delete":
193 JobRequest.query.filter(id__in=target_ids).delete()
194
195 class DetailView(AdminModelDetailView):
196 model = JobRequest
197 title = "Request"
198
199
200@register_viewset
201class JobProcessViewset(AdminViewset):
202 class ListView(AdminModelListView):
203 nav_section = "Jobs"
204 nav_icon = "gear"
205 model = JobProcess
206 title = "Processes"
207 description = "Jobs currently being processed by a worker."
208 fields = [
209 "id",
210 "job_class",
211 "priority",
212 "created_at",
213 "started_at",
214 "concurrency_key",
215 ]
216 actions = ["Delete"]
217 cards = [
218 WaitingJobsCard,
219 RunningJobsCard,
220 ActiveWorkersCard,
221 StaleWorkersCard,
222 ]
223
224 def perform_action(self, action: str, target_ids: list[int]) -> None:
225 if action == "Delete":
226 JobProcess.query.filter(id__in=target_ids).delete()
227
228 class DetailView(AdminModelDetailView):
229 model = JobProcess
230 title = "Process"
231
232
233@register_viewset
234class JobResultViewset(AdminViewset):
235 class ListView(AdminModelListView):
236 nav_section = "Jobs"
237 nav_icon = "clipboard-check"
238 model = JobResult
239 title = "Results"
240 description = "Completed jobs with their success/failure status."
241 fields = [
242 "id",
243 "job_class",
244 "priority",
245 "created_at",
246 "status",
247 "retried",
248 "is_retry",
249 ]
250 field_templates = {
251 "status": "jobs/values/job_status.html",
252 }
253 search_fields = [
254 "uuid",
255 "job_process_uuid",
256 "job_request_uuid",
257 "job_class",
258 ]
259 cards = [
260 JobResultsTrendCard,
261 SuccessfulJobsCard,
262 ErroredJobsCard,
263 LostJobsCard,
264 RetriedJobsCard,
265 ]
266 filters = [
267 "Successful",
268 "Errored",
269 "Cancelled",
270 "Lost",
271 "Retried",
272 ]
273 actions = [
274 "Retry",
275 ]
276
277 def get_initial_queryset(self) -> JobResultQuerySet:
278 queryset: JobResultQuerySet = super().get_initial_queryset() # ty: ignore[invalid-assignment]
279 return queryset.annotate(
280 retried=Case(
281 When(retry_job_request_uuid__isnull=False, then=True),
282 default=False,
283 output_field=postgres.BooleanField(),
284 ),
285 is_retry=Case(
286 When(retry_attempt__gt=0, then=True),
287 default=False,
288 output_field=postgres.BooleanField(),
289 ),
290 )
291
292 def filter_queryset(self, queryset: JobResultQuerySet) -> JobResultQuerySet:
293 if self.filter == "Successful":
294 return queryset.successful()
295 if self.filter == "Errored":
296 return queryset.errored()
297 if self.filter == "Cancelled":
298 return queryset.cancelled()
299 if self.filter == "Lost":
300 return queryset.lost()
301 if self.filter == "Retried":
302 return queryset.retried()
303 return queryset
304
305 def get_fields(self) -> list[str]:
306 fields = super().get_fields()
307 if self.filter == "Retried":
308 fields.append("retries")
309 fields.append("retry_attempt")
310 return fields
311
312 def perform_action(self, action: str, target_ids: list[int]) -> None:
313 if action == "Retry":
314 for result in JobResult.query.filter(id__in=target_ids):
315 result.retry_job(delay=0)
316 else:
317 raise ValueError("Invalid action")
318
319 class DetailView(AdminModelDetailView):
320 model = JobResult
321 title = "Result"
322
323 def post(self) -> RedirectResponse:
324 self.object.retry_job(delay=0)
325 return RedirectResponse(".")
326
327
328@register_viewset
329class WorkerHeartbeatViewset(AdminViewset):
330 class ListView(AdminModelListView):
331 nav_section = "Jobs"
332 nav_icon = "heart-pulse"
333 model = WorkerHeartbeat
334 title = "Workers"
335 description = (
336 "Live worker processes. Each row is refreshed while its worker is "
337 "running and deleted on clean shutdown."
338 )
339 fields = [
340 "worker_id",
341 "hostname",
342 "pid",
343 "queues",
344 "started_at",
345 "last_heartbeat_at",
346 "stale",
347 ]
348 search_fields = [
349 "worker_id",
350 "hostname",
351 ]
352 filters = [
353 "Active",
354 "Stale",
355 ]
356 queryset_order = ["-last_heartbeat_at"]
357
358 def get_initial_queryset(self) -> postgres.QuerySet[WorkerHeartbeat]:
359 queryset = super().get_initial_queryset()
360 return queryset.annotate(
361 stale=Case(
362 When(last_heartbeat_at__lt=heartbeat_cutoff(), then=True),
363 default=False,
364 output_field=postgres.BooleanField(),
365 ),
366 )
367
368 def filter_queryset(
369 self, queryset: postgres.QuerySet[WorkerHeartbeat]
370 ) -> postgres.QuerySet[WorkerHeartbeat]:
371 cutoff = heartbeat_cutoff()
372 if self.filter == "Active":
373 return queryset.filter(last_heartbeat_at__gte=cutoff)
374 if self.filter == "Stale":
375 return queryset.filter(last_heartbeat_at__lt=cutoff)
376 return queryset
377
378 class DetailView(AdminModelDetailView):
379 model = WorkerHeartbeat
380 title = "Worker"