Plain is headed towards 1.0! Subscribe for development updates →

Worker

Process background jobs with a database-driven worker.

from plain.worker import Job
from plain.mail import send_mail

# Create a new job class
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_mail(
            subject="Welcome!",
            message=f"Hello from Plain, {self.user}",
            from_email="[email protected]",
            recipient_list=[self.user.email],
        )


# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()

The worker process is run separately using plain worker run.

Admin

Job history

Scheduled jobs

Monitoring

  1from datetime import timedelta
  2
  3from plain import models
  4from plain.admin.cards import Card
  5from plain.admin.views import (
  6    AdminModelDetailView,
  7    AdminModelListView,
  8    AdminViewset,
  9    register_viewset,
 10)
 11from plain.http import ResponseRedirect
 12from plain.runtime import settings
 13
 14from .models import Job, JobRequest, JobResult
 15
 16
 17def _td_format(td_object):
 18    seconds = int(td_object.total_seconds())
 19    periods = [
 20        ("year", 60 * 60 * 24 * 365),
 21        ("month", 60 * 60 * 24 * 30),
 22        ("day", 60 * 60 * 24),
 23        ("hour", 60 * 60),
 24        ("minute", 60),
 25        ("second", 1),
 26    ]
 27
 28    strings = []
 29    for period_name, period_seconds in periods:
 30        if seconds > period_seconds:
 31            period_value, seconds = divmod(seconds, period_seconds)
 32            has_s = "s" if period_value > 1 else ""
 33            strings.append(f"{period_value} {period_name}{has_s}")
 34
 35    return ", ".join(strings)
 36
 37
 38class SuccessfulJobsCard(Card):
 39    title = "Successful Jobs"
 40    text = "View"
 41
 42    def get_number(self):
 43        return JobResult.objects.successful().count()
 44
 45    def get_link(self):
 46        return JobResultViewset.ListView.get_view_url() + "?display=Successful"
 47
 48
 49class ErroredJobsCard(Card):
 50    title = "Errored Jobs"
 51    text = "View"
 52
 53    def get_number(self):
 54        return JobResult.objects.errored().count()
 55
 56    def get_link(self):
 57        return JobResultViewset.ListView.get_view_url() + "?display=Errored"
 58
 59
 60class LostJobsCard(Card):
 61    title = "Lost Jobs"
 62    text = "View"  # TODO make not required - just an icon?
 63
 64    def get_description(self):
 65        delta = timedelta(seconds=settings.WORKER_JOBS_LOST_AFTER)
 66        return f"Jobs are considered lost after {_td_format(delta)}"
 67
 68    def get_number(self):
 69        return JobResult.objects.lost().count()
 70
 71    def get_link(self):
 72        return JobResultViewset.ListView.get_view_url() + "?display=Lost"
 73
 74
 75class RetriedJobsCard(Card):
 76    title = "Retried Jobs"
 77    text = "View"  # TODO make not required - just an icon?
 78
 79    def get_number(self):
 80        return JobResult.objects.retried().count()
 81
 82    def get_link(self):
 83        return JobResultViewset.ListView.get_view_url() + "?display=Retried"
 84
 85
 86class WaitingJobsCard(Card):
 87    title = "Waiting Jobs"
 88
 89    def get_number(self):
 90        return Job.objects.waiting().count()
 91
 92
 93class RunningJobsCard(Card):
 94    title = "Running Jobs"
 95
 96    def get_number(self):
 97        return Job.objects.running().count()
 98
 99
100@register_viewset
101class JobRequestViewset(AdminViewset):
102    class ListView(AdminModelListView):
103        nav_section = "Worker"
104        model = JobRequest
105        title = "Job requests"
106        fields = ["id", "job_class", "priority", "created_at", "start_at", "unique_key"]
107        actions = ["Delete"]
108
109        def perform_action(self, action: str, target_pks: list):
110            if action == "Delete":
111                JobRequest.objects.filter(pk__in=target_pks).delete()
112
113    class DetailView(AdminModelDetailView):
114        model = JobRequest
115        title = "Job Request"
116
117
118@register_viewset
119class JobViewset(AdminViewset):
120    class ListView(AdminModelListView):
121        nav_section = "Worker"
122        model = Job
123        fields = [
124            "id",
125            "job_class",
126            "priority",
127            "created_at",
128            "started_at",
129            "unique_key",
130        ]
131        actions = ["Delete"]
132        cards = [
133            WaitingJobsCard,
134            RunningJobsCard,
135        ]
136
137        def perform_action(self, action: str, target_pks: list):
138            if action == "Delete":
139                Job.objects.filter(pk__in=target_pks).delete()
140
141    class DetailView(AdminModelDetailView):
142        model = Job
143
144
145@register_viewset
146class JobResultViewset(AdminViewset):
147    class ListView(AdminModelListView):
148        nav_section = "Worker"
149        model = JobResult
150        title = "Job results"
151        fields = [
152            "id",
153            "job_class",
154            "priority",
155            "created_at",
156            "status",
157            "retried",
158            "is_retry",
159        ]
160        search_fields = [
161            "uuid",
162            "job_uuid",
163            "job_request_uuid",
164            "job_class",
165        ]
166        cards = [
167            SuccessfulJobsCard,
168            ErroredJobsCard,
169            LostJobsCard,
170            RetriedJobsCard,
171        ]
172        filters = [
173            "Successful",
174            "Errored",
175            "Cancelled",
176            "Lost",
177            "Retried",
178        ]
179        actions = [
180            "Retry",
181        ]
182        allow_global_search = False
183
184        def get_description(self):
185            delta = timedelta(seconds=settings.WORKER_JOBS_CLEARABLE_AFTER)
186            return f"Jobs are cleared after {_td_format(delta)}"
187
188        def get_initial_queryset(self):
189            queryset = super().get_initial_queryset()
190            queryset = queryset.annotate(
191                retried=models.Case(
192                    models.When(retry_job_request_uuid__isnull=False, then=True),
193                    default=False,
194                    output_field=models.BooleanField(),
195                ),
196                is_retry=models.Case(
197                    models.When(retry_attempt__gt=0, then=True),
198                    default=False,
199                    output_field=models.BooleanField(),
200                ),
201            )
202            if self.display == "Successful":
203                return queryset.successful()
204            if self.display == "Errored":
205                return queryset.errored()
206            if self.display == "Cancelled":
207                return queryset.cancelled()
208            if self.display == "Lost":
209                return queryset.lost()
210            if self.display == "Retried":
211                return queryset.retried()
212            return queryset
213
214        def get_fields(self):
215            fields = super().get_fields()
216            if self.display == "Retried":
217                fields.append("retries")
218                fields.append("retry_attempt")
219            return fields
220
221        def perform_action(self, action: str, target_pks: list):
222            if action == "Retry":
223                for result in JobResult.objects.filter(pk__in=target_pks):
224                    result.retry_job(delay=0)
225            else:
226                raise ValueError("Invalid action")
227
228    class DetailView(AdminModelDetailView):
229        model = JobResult
230        title = "Job result"
231
232        def post(self):
233            self.load_object()
234            self.object.retry_job(delay=0)
235            return ResponseRedirect(".")