1from __future__ import annotations
2
3from datetime import timedelta
4from typing import Any
5
6from plain import models
7from plain.admin.cards import Card
8from plain.admin.views import (
9 AdminModelDetailView,
10 AdminModelListView,
11 AdminViewset,
12 register_viewset,
13)
14from plain.http import ResponseRedirect
15from plain.runtime import settings
16
17from .models import JobProcess, JobRequest, JobResult
18
19
20def _td_format(td_object: timedelta) -> str:
21 seconds = int(td_object.total_seconds())
22 periods = [
23 ("year", 60 * 60 * 24 * 365),
24 ("month", 60 * 60 * 24 * 30),
25 ("day", 60 * 60 * 24),
26 ("hour", 60 * 60),
27 ("minute", 60),
28 ("second", 1),
29 ]
30
31 strings = []
32 for period_name, period_seconds in periods:
33 if seconds > period_seconds:
34 period_value, seconds = divmod(seconds, period_seconds)
35 has_s = "s" if period_value > 1 else ""
36 strings.append(f"{period_value} {period_name}{has_s}")
37
38 return ", ".join(strings)
39
40
41class SuccessfulJobsCard(Card):
42 title = "Successful"
43 text = "View"
44
45 def get_number(self) -> int:
46 return JobResult.query.successful().count()
47
48 def get_link(self) -> str:
49 return JobResultViewset.ListView.get_view_url() + "?display=Successful"
50
51
52class ErroredJobsCard(Card):
53 title = "Errored"
54 text = "View"
55
56 def get_number(self) -> int:
57 return JobResult.query.errored().count()
58
59 def get_link(self) -> str:
60 return JobResultViewset.ListView.get_view_url() + "?display=Errored"
61
62
63class LostJobsCard(Card):
64 title = "Lost"
65 text = "View" # TODO make not required - just an icon?
66
67 def get_description(self) -> str:
68 delta = timedelta(seconds=settings.JOBS_TIMEOUT)
69 return f"Jobs are considered lost after {_td_format(delta)}"
70
71 def get_number(self) -> int:
72 return JobResult.query.lost().count()
73
74 def get_link(self) -> str:
75 return JobResultViewset.ListView.get_view_url() + "?display=Lost"
76
77
78class RetriedJobsCard(Card):
79 title = "Retried"
80 text = "View" # TODO make not required - just an icon?
81
82 def get_number(self) -> int:
83 return JobResult.query.retried().count()
84
85 def get_link(self) -> str:
86 return JobResultViewset.ListView.get_view_url() + "?display=Retried"
87
88
89class WaitingJobsCard(Card):
90 title = "Waiting"
91
92 def get_number(self) -> int:
93 return JobProcess.query.waiting().count()
94
95
96class RunningJobsCard(Card):
97 title = "Running"
98
99 def get_number(self) -> int:
100 return JobProcess.query.running().count()
101
102
103@register_viewset
104class JobRequestViewset(AdminViewset):
105 class ListView(AdminModelListView):
106 nav_section = "Jobs"
107 nav_icon = "gear"
108 model = JobRequest
109 title = "Requests"
110 fields = ["id", "job_class", "priority", "created_at", "start_at", "unique_key"]
111 actions = ["Delete"]
112
113 def perform_action(self, action: str, target_ids: list[int]) -> None:
114 if action == "Delete":
115 JobRequest.query.filter(id__in=target_ids).delete()
116
117 class DetailView(AdminModelDetailView):
118 model = JobRequest
119 title = "Request"
120
121
122@register_viewset
123class JobProcessViewset(AdminViewset):
124 class ListView(AdminModelListView):
125 nav_section = "Jobs"
126 nav_icon = "gear"
127 model = JobProcess
128 title = "Processes"
129 fields = [
130 "id",
131 "job_class",
132 "priority",
133 "created_at",
134 "started_at",
135 "unique_key",
136 ]
137 actions = ["Delete"]
138 cards = [
139 WaitingJobsCard,
140 RunningJobsCard,
141 ]
142
143 def perform_action(self, action: str, target_ids: list[int]) -> None:
144 if action == "Delete":
145 JobProcess.query.filter(id__in=target_ids).delete()
146
147 class DetailView(AdminModelDetailView):
148 model = JobProcess
149 title = "Process"
150
151
152@register_viewset
153class JobResultViewset(AdminViewset):
154 class ListView(AdminModelListView):
155 nav_section = "Jobs"
156 nav_icon = "gear"
157 model = JobResult
158 title = "Results"
159 fields = [
160 "id",
161 "job_class",
162 "priority",
163 "created_at",
164 "status",
165 "retried",
166 "is_retry",
167 ]
168 search_fields = [
169 "uuid",
170 "job_process_uuid",
171 "job_request_uuid",
172 "job_class",
173 ]
174 cards = [
175 SuccessfulJobsCard,
176 ErroredJobsCard,
177 LostJobsCard,
178 RetriedJobsCard,
179 ]
180 filters = [
181 "Successful",
182 "Errored",
183 "Cancelled",
184 "Lost",
185 "Retried",
186 ]
187 actions = [
188 "Retry",
189 ]
190 allow_global_search = False
191
192 def get_initial_queryset(self) -> Any:
193 queryset = super().get_initial_queryset()
194 queryset = queryset.annotate(
195 retried=models.Case(
196 models.When(retry_job_request_uuid__isnull=False, then=True),
197 default=False,
198 output_field=models.BooleanField(),
199 ),
200 is_retry=models.Case(
201 models.When(retry_attempt__gt=0, then=True),
202 default=False,
203 output_field=models.BooleanField(),
204 ),
205 )
206 if self.display == "Successful":
207 return queryset.successful()
208 if self.display == "Errored":
209 return queryset.errored()
210 if self.display == "Cancelled":
211 return queryset.cancelled()
212 if self.display == "Lost":
213 return queryset.lost()
214 if self.display == "Retried":
215 return queryset.retried()
216 return queryset
217
218 def get_fields(self) -> list[str]:
219 fields = super().get_fields()
220 if self.display == "Retried":
221 fields.append("retries")
222 fields.append("retry_attempt")
223 return fields
224
225 def perform_action(self, action: str, target_ids: list[int]) -> None:
226 if action == "Retry":
227 for result in JobResult.query.filter(id__in=target_ids):
228 result.retry_job(delay=0)
229 else:
230 raise ValueError("Invalid action")
231
232 class DetailView(AdminModelDetailView):
233 model = JobResult
234 title = "Result"
235
236 def post(self) -> ResponseRedirect:
237 self.object.retry_job(delay=0)
238 return ResponseRedirect(".")