Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3from datetime import timedelta
  4from typing import Any
  5
  6from plain import models
  7from plain.admin.cards import Card
  8from plain.admin.views import (
  9    AdminModelDetailView,
 10    AdminModelListView,
 11    AdminViewset,
 12    register_viewset,
 13)
 14from plain.http import ResponseRedirect
 15from plain.runtime import settings
 16
 17from .models import JobProcess, JobRequest, JobResult
 18
 19
 20def _td_format(td_object: timedelta) -> str:
 21    seconds = int(td_object.total_seconds())
 22    periods = [
 23        ("year", 60 * 60 * 24 * 365),
 24        ("month", 60 * 60 * 24 * 30),
 25        ("day", 60 * 60 * 24),
 26        ("hour", 60 * 60),
 27        ("minute", 60),
 28        ("second", 1),
 29    ]
 30
 31    strings = []
 32    for period_name, period_seconds in periods:
 33        if seconds > period_seconds:
 34            period_value, seconds = divmod(seconds, period_seconds)
 35            has_s = "s" if period_value > 1 else ""
 36            strings.append(f"{period_value} {period_name}{has_s}")
 37
 38    return ", ".join(strings)
 39
 40
 41class SuccessfulJobsCard(Card):
 42    title = "Successful"
 43    text = "View"
 44
 45    def get_number(self) -> int:
 46        return JobResult.query.successful().count()
 47
 48    def get_link(self) -> str:
 49        return JobResultViewset.ListView.get_view_url() + "?display=Successful"
 50
 51
 52class ErroredJobsCard(Card):
 53    title = "Errored"
 54    text = "View"
 55
 56    def get_number(self) -> int:
 57        return JobResult.query.errored().count()
 58
 59    def get_link(self) -> str:
 60        return JobResultViewset.ListView.get_view_url() + "?display=Errored"
 61
 62
 63class LostJobsCard(Card):
 64    title = "Lost"
 65    text = "View"  # TODO make not required - just an icon?
 66
 67    def get_description(self) -> str:
 68        delta = timedelta(seconds=settings.JOBS_TIMEOUT)
 69        return f"Jobs are considered lost after {_td_format(delta)}"
 70
 71    def get_number(self) -> int:
 72        return JobResult.query.lost().count()
 73
 74    def get_link(self) -> str:
 75        return JobResultViewset.ListView.get_view_url() + "?display=Lost"
 76
 77
 78class RetriedJobsCard(Card):
 79    title = "Retried"
 80    text = "View"  # TODO make not required - just an icon?
 81
 82    def get_number(self) -> int:
 83        return JobResult.query.retried().count()
 84
 85    def get_link(self) -> str:
 86        return JobResultViewset.ListView.get_view_url() + "?display=Retried"
 87
 88
 89class WaitingJobsCard(Card):
 90    title = "Waiting"
 91
 92    def get_number(self) -> int:
 93        return JobProcess.query.waiting().count()
 94
 95
 96class RunningJobsCard(Card):
 97    title = "Running"
 98
 99    def get_number(self) -> int:
100        return JobProcess.query.running().count()
101
102
103@register_viewset
104class JobRequestViewset(AdminViewset):
105    class ListView(AdminModelListView):
106        nav_section = "Jobs"
107        nav_icon = "gear"
108        model = JobRequest
109        title = "Requests"
110        fields = ["id", "job_class", "priority", "created_at", "start_at", "unique_key"]
111        actions = ["Delete"]
112
113        def perform_action(self, action: str, target_ids: list[int]) -> None:
114            if action == "Delete":
115                JobRequest.query.filter(id__in=target_ids).delete()
116
117    class DetailView(AdminModelDetailView):
118        model = JobRequest
119        title = "Request"
120
121
122@register_viewset
123class JobProcessViewset(AdminViewset):
124    class ListView(AdminModelListView):
125        nav_section = "Jobs"
126        nav_icon = "gear"
127        model = JobProcess
128        title = "Processes"
129        fields = [
130            "id",
131            "job_class",
132            "priority",
133            "created_at",
134            "started_at",
135            "unique_key",
136        ]
137        actions = ["Delete"]
138        cards = [
139            WaitingJobsCard,
140            RunningJobsCard,
141        ]
142
143        def perform_action(self, action: str, target_ids: list[int]) -> None:
144            if action == "Delete":
145                JobProcess.query.filter(id__in=target_ids).delete()
146
147    class DetailView(AdminModelDetailView):
148        model = JobProcess
149        title = "Process"
150
151
152@register_viewset
153class JobResultViewset(AdminViewset):
154    class ListView(AdminModelListView):
155        nav_section = "Jobs"
156        nav_icon = "gear"
157        model = JobResult
158        title = "Results"
159        fields = [
160            "id",
161            "job_class",
162            "priority",
163            "created_at",
164            "status",
165            "retried",
166            "is_retry",
167        ]
168        search_fields = [
169            "uuid",
170            "job_process_uuid",
171            "job_request_uuid",
172            "job_class",
173        ]
174        cards = [
175            SuccessfulJobsCard,
176            ErroredJobsCard,
177            LostJobsCard,
178            RetriedJobsCard,
179        ]
180        filters = [
181            "Successful",
182            "Errored",
183            "Cancelled",
184            "Lost",
185            "Retried",
186        ]
187        actions = [
188            "Retry",
189        ]
190        allow_global_search = False
191
192        def get_initial_queryset(self) -> Any:
193            queryset = super().get_initial_queryset()
194            queryset = queryset.annotate(
195                retried=models.Case(
196                    models.When(retry_job_request_uuid__isnull=False, then=True),
197                    default=False,
198                    output_field=models.BooleanField(),
199                ),
200                is_retry=models.Case(
201                    models.When(retry_attempt__gt=0, then=True),
202                    default=False,
203                    output_field=models.BooleanField(),
204                ),
205            )
206            if self.display == "Successful":
207                return queryset.successful()
208            if self.display == "Errored":
209                return queryset.errored()
210            if self.display == "Cancelled":
211                return queryset.cancelled()
212            if self.display == "Lost":
213                return queryset.lost()
214            if self.display == "Retried":
215                return queryset.retried()
216            return queryset
217
218        def get_fields(self) -> list[str]:
219            fields = super().get_fields()
220            if self.display == "Retried":
221                fields.append("retries")
222                fields.append("retry_attempt")
223            return fields
224
225        def perform_action(self, action: str, target_ids: list[int]) -> None:
226            if action == "Retry":
227                for result in JobResult.query.filter(id__in=target_ids):
228                    result.retry_job(delay=0)
229            else:
230                raise ValueError("Invalid action")
231
232    class DetailView(AdminModelDetailView):
233        model = JobResult
234        title = "Result"
235
236        def post(self) -> ResponseRedirect:
237            self.object.retry_job(delay=0)
238            return ResponseRedirect(".")