Plain is headed towards 1.0! Subscribe for development updates →

Worker

Process background jobs with a database-driven worker.

from plain.worker import Job
from plain.mail import send_mail

# Create a new job class
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_mail(
            subject="Welcome!",
            message=f"Hello from Plain, {self.user}",
            from_email="[email protected]",
            recipient_list=[self.user.email],
        )


# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()

The worker process is run separately using plain worker run.

Staff

Job history

Scheduled jobs

Monitoring

  1from datetime import timedelta
  2
  3from plain import models
  4from plain.http import ResponseRedirect
  5from plain.runtime import settings
  6from plain.staff.cards import Card
  7from plain.staff.dates import DatetimeRangeAliases
  8from plain.staff.views import (
  9    StaffModelDetailView,
 10    StaffModelListView,
 11    StaffModelViewset,
 12    register_viewset,
 13)
 14
 15from .models import Job, JobRequest, JobResult
 16
 17
 18def _td_format(td_object):
 19    seconds = int(td_object.total_seconds())
 20    periods = [
 21        ("year", 60 * 60 * 24 * 365),
 22        ("month", 60 * 60 * 24 * 30),
 23        ("day", 60 * 60 * 24),
 24        ("hour", 60 * 60),
 25        ("minute", 60),
 26        ("second", 1),
 27    ]
 28
 29    strings = []
 30    for period_name, period_seconds in periods:
 31        if seconds > period_seconds:
 32            period_value, seconds = divmod(seconds, period_seconds)
 33            has_s = "s" if period_value > 1 else ""
 34            strings.append(f"{period_value} {period_name}{has_s}")
 35
 36    return ", ".join(strings)
 37
 38
 39class SuccessfulJobsCard(Card):
 40    title = "Successful Jobs"
 41    text = "View"
 42
 43    def get_number(self):
 44        return (
 45            JobResult.objects.successful()
 46            .filter(created_at__range=self.datetime_range.as_tuple())
 47            .count()
 48        )
 49
 50    def get_link(self):
 51        return JobResultViewset.ListView.get_absolute_url() + "?filter=Successful"
 52
 53
 54class ErroredJobsCard(Card):
 55    title = "Errored Jobs"
 56    text = "View"
 57
 58    def get_number(self):
 59        return (
 60            JobResult.objects.errored()
 61            .filter(created_at__range=self.datetime_range.as_tuple())
 62            .count()
 63        )
 64
 65    def get_link(self):
 66        return JobResultViewset.ListView.get_absolute_url() + "?filter=Errored"
 67
 68
 69class LostJobsCard(Card):
 70    title = "Lost Jobs"
 71    text = "View"  # TODO make not required - just an icon?
 72
 73    def get_description(self):
 74        delta = timedelta(seconds=settings.WORKER_JOBS_LOST_AFTER)
 75        return f"Jobs are considered lost after {_td_format(delta)}"
 76
 77    def get_number(self):
 78        return (
 79            JobResult.objects.lost()
 80            .filter(created_at__range=self.datetime_range.as_tuple())
 81            .count()
 82        )
 83
 84    def get_link(self):
 85        return JobResultViewset.ListView.get_absolute_url() + "?filter=Lost"
 86
 87
 88class RetriedJobsCard(Card):
 89    title = "Retried Jobs"
 90    text = "View"  # TODO make not required - just an icon?
 91
 92    def get_number(self):
 93        return (
 94            JobResult.objects.retried()
 95            .filter(created_at__range=self.datetime_range.as_tuple())
 96            .count()
 97        )
 98
 99    def get_link(self):
100        return JobResultViewset.ListView.get_absolute_url() + "?filter=Retried"
101
102
103class WaitingJobsCard(Card):
104    title = "Waiting Jobs"
105
106    def get_number(self):
107        return Job.objects.waiting().count()
108
109
110class RunningJobsCard(Card):
111    title = "Running Jobs"
112
113    def get_number(self):
114        return Job.objects.running().count()
115
116
117@register_viewset
118class JobRequestViewset(StaffModelViewset):
119    class ListView(StaffModelListView):
120        nav_section = "Worker"
121        model = JobRequest
122        title = "Job requests"
123        fields = ["id", "job_class", "priority", "created_at", "start_at", "unique_key"]
124        actions = ["Delete"]
125
126        def perform_action(self, action: str, target_pks: list):
127            if action == "Delete":
128                JobRequest.objects.filter(pk__in=target_pks).delete()
129
130    class DetailView(StaffModelDetailView):
131        model = JobRequest
132        title = "Job Request"
133
134
135@register_viewset
136class JobViewset(StaffModelViewset):
137    class ListView(StaffModelListView):
138        nav_section = "Worker"
139        model = Job
140        fields = [
141            "id",
142            "job_class",
143            "priority",
144            "created_at",
145            "started_at",
146            "unique_key",
147        ]
148        actions = ["Delete"]
149        cards = [
150            WaitingJobsCard,
151            RunningJobsCard,
152        ]
153
154        def perform_action(self, action: str, target_pks: list):
155            if action == "Delete":
156                Job.objects.filter(pk__in=target_pks).delete()
157
158    class DetailView(StaffModelDetailView):
159        model = Job
160
161
162@register_viewset
163class JobResultViewset(StaffModelViewset):
164    class ListView(StaffModelListView):
165        nav_section = "Worker"
166        model = JobResult
167        title = "Job results"
168        fields = [
169            "id",
170            "job_class",
171            "priority",
172            "created_at",
173            "status",
174            "retried",
175            "is_retry",
176        ]
177        search_fields = [
178            "uuid",
179            "job_uuid",
180            "job_request_uuid",
181            "job_class",
182        ]
183        cards = [
184            SuccessfulJobsCard,
185            ErroredJobsCard,
186            LostJobsCard,
187            RetriedJobsCard,
188        ]
189        filters = [
190            "Successful",
191            "Errored",
192            "Cancelled",
193            "Lost",
194            "Retried",
195        ]
196        actions = [
197            "Retry",
198        ]
199        allow_global_search = False
200        default_datetime_range = DatetimeRangeAliases.LAST_30_DAYS
201
202        def get_description(self):
203            delta = timedelta(seconds=settings.WORKER_JOBS_CLEARABLE_AFTER)
204            return f"Jobs are cleared after {_td_format(delta)}"
205
206        def get_initial_queryset(self):
207            queryset = super().get_initial_queryset()
208            queryset = queryset.annotate(
209                retried=models.Case(
210                    models.When(retry_job_request_uuid__isnull=False, then=True),
211                    default=False,
212                    output_field=models.BooleanField(),
213                ),
214                is_retry=models.Case(
215                    models.When(retry_attempt__gt=0, then=True),
216                    default=False,
217                    output_field=models.BooleanField(),
218                ),
219            )
220            if self.filter == "Successful":
221                return queryset.successful()
222            if self.filter == "Errored":
223                return queryset.errored()
224            if self.filter == "Cancelled":
225                return queryset.cancelled()
226            if self.filter == "Lost":
227                return queryset.lost()
228            if self.filter == "Retried":
229                return queryset.retried()
230            return queryset
231
232        def get_fields(self):
233            fields = super().get_fields()
234            if self.filter == "Retried":
235                fields.append("retries")
236                fields.append("retry_attempt")
237            return fields
238
239        def perform_action(self, action: str, target_pks: list):
240            if action == "Retry":
241                for result in JobResult.objects.filter(pk__in=target_pks):
242                    result.retry_job(delay=0)
243            else:
244                raise ValueError("Invalid action")
245
246    class DetailView(StaffModelDetailView):
247        model = JobResult
248        title = "Job result"
249
250        def post(self):
251            self.load_object()
252            self.object.retry_job(delay=0)
253            return ResponseRedirect(".")