v0.146.0
  1from __future__ import annotations
  2
  3from datetime import timedelta
  4
  5from plain import postgres
  6from plain.admin.cards import Card, TrendCard
  7from plain.admin.views import (
  8    AdminModelDetailView,
  9    AdminModelListView,
 10    AdminViewset,
 11    register_viewset,
 12)
 13from plain.http import RedirectResponse
 14from plain.postgres.expressions import Case, When
 15from plain.runtime import settings
 16
 17from .models import (
 18    JobProcess,
 19    JobRequest,
 20    JobResult,
 21    JobResultQuerySet,
 22    WorkerHeartbeat,
 23    heartbeat_cutoff,
 24)
 25
 26
 27def _td_format(td_object: timedelta) -> str:
 28    seconds = int(td_object.total_seconds())
 29    periods = [
 30        ("year", 60 * 60 * 24 * 365),
 31        ("month", 60 * 60 * 24 * 30),
 32        ("day", 60 * 60 * 24),
 33        ("hour", 60 * 60),
 34        ("minute", 60),
 35        ("second", 1),
 36    ]
 37
 38    strings = []
 39    for period_name, period_seconds in periods:
 40        if seconds > period_seconds:
 41            period_value, seconds = divmod(seconds, period_seconds)
 42            has_s = "s" if period_value > 1 else ""
 43            strings.append(f"{period_value} {period_name}{has_s}")
 44
 45    return ", ".join(strings)
 46
 47
 48class JobResultsTrendCard(TrendCard):
 49    title = "Results trend"
 50    model = JobResult
 51    datetime_field = "created_at"
 52    size = TrendCard.Sizes.FULL
 53    group_field = "status"
 54    group_labels = {
 55        "SUCCESSFUL": "Successful",
 56        "ERRORED": "Errored",
 57        "CANCELLED": "Cancelled",
 58        "DEFERRED": "Deferred",
 59        "LOST": "Lost",
 60    }
 61    group_colors = {
 62        "SUCCESSFUL": "var(--success)",
 63        "ERRORED": "var(--danger)",
 64        "CANCELLED": "var(--muted-foreground)",
 65        "DEFERRED": "var(--info)",
 66        "LOST": "var(--warning)",
 67    }
 68
 69
 70class SuccessfulJobsCard(Card):
 71    title = "Successful"
 72    text = "View"
 73
 74    def get_metric(self) -> int:
 75        return JobResult.query.successful().count()
 76
 77    def get_link(self) -> str:
 78        return JobResultViewset.ListView.get_view_url() + "?display=Successful"
 79
 80
 81class ErroredJobsCard(Card):
 82    title = "Errored"
 83    text = "View"
 84
 85    def get_metric(self) -> int:
 86        return JobResult.query.errored().count()
 87
 88    def get_link(self) -> str:
 89        return JobResultViewset.ListView.get_view_url() + "?display=Errored"
 90
 91
 92class LostJobsCard(Card):
 93    title = "Lost"
 94    text = "View"  # TODO make not required - just an icon?
 95
 96    def get_description(self) -> str:
 97        delta = timedelta(seconds=settings.JOBS_HEARTBEAT_TIMEOUT)
 98        return (
 99            f"Jobs are considered lost when their worker stops heartbeating "
100            f"for more than {_td_format(delta)}"
101        )
102
103    def get_metric(self) -> int:
104        return JobResult.query.lost().count()
105
106    def get_link(self) -> str:
107        return JobResultViewset.ListView.get_view_url() + "?display=Lost"
108
109
110class RetriedJobsCard(Card):
111    title = "Retried"
112    text = "View"  # TODO make not required - just an icon?
113
114    def get_metric(self) -> int:
115        return JobResult.query.retried().count()
116
117    def get_link(self) -> str:
118        return JobResultViewset.ListView.get_view_url() + "?display=Retried"
119
120
121class WaitingJobsCard(Card):
122    title = "Waiting"
123
124    def get_metric(self) -> int:
125        return JobProcess.query.waiting().count()
126
127
128class RunningJobsCard(Card):
129    title = "Running"
130
131    def get_metric(self) -> int:
132        return JobProcess.query.running().count()
133
134
135class ActiveWorkersCard(Card):
136    title = "Active workers"
137    text = "View"
138
139    def get_description(self) -> str:
140        delta = timedelta(seconds=settings.JOBS_HEARTBEAT_TIMEOUT)
141        return f"Workers whose heartbeat is within the last {_td_format(delta)}."
142
143    def get_metric(self) -> int:
144        return WorkerHeartbeat.query.filter(
145            last_heartbeat_at__gte=heartbeat_cutoff()
146        ).count()
147
148    def get_link(self) -> str:
149        return WorkerHeartbeatViewset.ListView.get_view_url()
150
151
152class StaleWorkersCard(Card):
153    title = "Stale workers"
154    text = "View"
155
156    def get_description(self) -> str:
157        delta = timedelta(seconds=settings.JOBS_HEARTBEAT_TIMEOUT)
158        return (
159            f"Workers whose heartbeat is older than {_td_format(delta)}. "
160            f"Their in-flight jobs are about to be rescued as Lost."
161        )
162
163    def get_metric(self) -> int:
164        return WorkerHeartbeat.query.filter(
165            last_heartbeat_at__lt=heartbeat_cutoff()
166        ).count()
167
168    def get_link(self) -> str:
169        return WorkerHeartbeatViewset.ListView.get_view_url() + "?display=Stale"
170
171
172@register_viewset
173class JobRequestViewset(AdminViewset):
174    class ListView(AdminModelListView):
175        nav_section = "Jobs"
176        nav_icon = "inbox"
177        model = JobRequest
178        title = "Requests"
179        description = "Jobs waiting to be picked up by a worker."
180        fields = [
181            "id",
182            "job_class",
183            "priority",
184            "created_at",
185            "start_at",
186            "concurrency_key",
187        ]
188        actions = ["Delete"]
189        queryset_order = ["-priority", "-start_at", "-created_at"]
190
191        def perform_action(self, action: str, target_ids: list[int]) -> None:
192            if action == "Delete":
193                JobRequest.query.filter(id__in=target_ids).delete()
194
195    class DetailView(AdminModelDetailView):
196        model = JobRequest
197        title = "Request"
198
199
200@register_viewset
201class JobProcessViewset(AdminViewset):
202    class ListView(AdminModelListView):
203        nav_section = "Jobs"
204        nav_icon = "gear"
205        model = JobProcess
206        title = "Processes"
207        description = "Jobs currently being processed by a worker."
208        fields = [
209            "id",
210            "job_class",
211            "priority",
212            "created_at",
213            "started_at",
214            "concurrency_key",
215        ]
216        actions = ["Delete"]
217        cards = [
218            WaitingJobsCard,
219            RunningJobsCard,
220            ActiveWorkersCard,
221            StaleWorkersCard,
222        ]
223
224        def perform_action(self, action: str, target_ids: list[int]) -> None:
225            if action == "Delete":
226                JobProcess.query.filter(id__in=target_ids).delete()
227
228    class DetailView(AdminModelDetailView):
229        model = JobProcess
230        title = "Process"
231
232
233@register_viewset
234class JobResultViewset(AdminViewset):
235    class ListView(AdminModelListView):
236        nav_section = "Jobs"
237        nav_icon = "clipboard-check"
238        model = JobResult
239        title = "Results"
240        description = "Completed jobs with their success/failure status."
241        fields = [
242            "id",
243            "job_class",
244            "priority",
245            "created_at",
246            "status",
247            "retried",
248            "is_retry",
249        ]
250        field_templates = {
251            "status": "jobs/values/job_status.html",
252        }
253        search_fields = [
254            "uuid",
255            "job_process_uuid",
256            "job_request_uuid",
257            "job_class",
258        ]
259        cards = [
260            JobResultsTrendCard,
261            SuccessfulJobsCard,
262            ErroredJobsCard,
263            LostJobsCard,
264            RetriedJobsCard,
265        ]
266        filters = [
267            "Successful",
268            "Errored",
269            "Cancelled",
270            "Lost",
271            "Retried",
272        ]
273        actions = [
274            "Retry",
275        ]
276
277        def get_initial_queryset(self) -> JobResultQuerySet:
278            queryset: JobResultQuerySet = super().get_initial_queryset()  # ty: ignore[invalid-assignment]
279            return queryset.annotate(
280                retried=Case(
281                    When(retry_job_request_uuid__isnull=False, then=True),
282                    default=False,
283                    output_field=postgres.BooleanField(),
284                ),
285                is_retry=Case(
286                    When(retry_attempt__gt=0, then=True),
287                    default=False,
288                    output_field=postgres.BooleanField(),
289                ),
290            )
291
292        def filter_queryset(self, queryset: JobResultQuerySet) -> JobResultQuerySet:
293            if self.filter == "Successful":
294                return queryset.successful()
295            if self.filter == "Errored":
296                return queryset.errored()
297            if self.filter == "Cancelled":
298                return queryset.cancelled()
299            if self.filter == "Lost":
300                return queryset.lost()
301            if self.filter == "Retried":
302                return queryset.retried()
303            return queryset
304
305        def get_fields(self) -> list[str]:
306            fields = super().get_fields()
307            if self.filter == "Retried":
308                fields.append("retries")
309                fields.append("retry_attempt")
310            return fields
311
312        def perform_action(self, action: str, target_ids: list[int]) -> None:
313            if action == "Retry":
314                for result in JobResult.query.filter(id__in=target_ids):
315                    result.retry_job(delay=0)
316            else:
317                raise ValueError("Invalid action")
318
319    class DetailView(AdminModelDetailView):
320        model = JobResult
321        title = "Result"
322
323        def post(self) -> RedirectResponse:
324            self.object.retry_job(delay=0)
325            return RedirectResponse(".")
326
327
328@register_viewset
329class WorkerHeartbeatViewset(AdminViewset):
330    class ListView(AdminModelListView):
331        nav_section = "Jobs"
332        nav_icon = "heart-pulse"
333        model = WorkerHeartbeat
334        title = "Workers"
335        description = (
336            "Live worker processes. Each row is refreshed while its worker is "
337            "running and deleted on clean shutdown."
338        )
339        fields = [
340            "worker_id",
341            "hostname",
342            "pid",
343            "queues",
344            "started_at",
345            "last_heartbeat_at",
346            "stale",
347        ]
348        search_fields = [
349            "worker_id",
350            "hostname",
351        ]
352        filters = [
353            "Active",
354            "Stale",
355        ]
356        queryset_order = ["-last_heartbeat_at"]
357
358        def get_initial_queryset(self) -> postgres.QuerySet[WorkerHeartbeat]:
359            queryset = super().get_initial_queryset()
360            return queryset.annotate(
361                stale=Case(
362                    When(last_heartbeat_at__lt=heartbeat_cutoff(), then=True),
363                    default=False,
364                    output_field=postgres.BooleanField(),
365                ),
366            )
367
368        def filter_queryset(
369            self, queryset: postgres.QuerySet[WorkerHeartbeat]
370        ) -> postgres.QuerySet[WorkerHeartbeat]:
371            cutoff = heartbeat_cutoff()
372            if self.filter == "Active":
373                return queryset.filter(last_heartbeat_at__gte=cutoff)
374            if self.filter == "Stale":
375                return queryset.filter(last_heartbeat_at__lt=cutoff)
376            return queryset
377
378    class DetailView(AdminModelDetailView):
379        model = WorkerHeartbeat
380        title = "Worker"