Plain is headed towards 1.0! Subscribe for development updates →

Worker

Process background jobs with a database-driven worker.

from plain.worker import Job
from plain.mail import send_mail

# Create a new job class
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_mail(
            subject="Welcome!",
            message=f"Hello from Plain, {self.user}",
            from_email="[email protected]",
            recipient_list=[self.user.email],
        )


# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()

The worker process is run separately using plain worker run.

Staff

Job history

Scheduled jobs

Monitoring

  1import datetime
  2import subprocess
  3
  4from plain.utils import timezone
  5
  6from .jobs import Job, load_job
  7
  8_MONTH_NAMES = {
  9    "JAN": 1,
 10    "FEB": 2,
 11    "MAR": 3,
 12    "APR": 4,
 13    "MAY": 5,
 14    "JUN": 6,
 15    "JUL": 7,
 16    "AUG": 8,
 17    "SEP": 9,
 18    "OCT": 10,
 19    "NOV": 11,
 20    "DEC": 12,
 21}
 22_DAY_NAMES = {
 23    "MON": 0,
 24    "TUE": 1,
 25    "WED": 2,
 26    "THU": 3,
 27    "FRI": 4,
 28    "SAT": 5,
 29    "SUN": 6,
 30}
 31
 32
 33class _ScheduleComponent:
 34    def __init__(self, values, raw=""):
 35        self.values = sorted(values)
 36        self._raw = raw
 37
 38    def __str__(self):
 39        if self._raw:
 40            return self._raw
 41        return ",".join(str(v) for v in self.values)
 42
 43    def __eq__(self, other):
 44        return self.values == other.values
 45
 46    @classmethod
 47    def parse(cls, value, min_allowed, max_allowed, str_conversions=None):
 48        if str_conversions is None:
 49            str_conversions = {}
 50
 51        if isinstance(value, int):
 52            if value < min_allowed or value > max_allowed:
 53                raise ValueError(
 54                    f"Schedule component should be between {min_allowed} and {max_allowed}"
 55                )
 56            return cls([value], raw=value)
 57
 58        if not isinstance(value, str):
 59            raise ValueError("Schedule component should be an int or str")
 60
 61        # First split any subcomponents and re-parse them
 62        if "," in value:
 63            return cls(
 64                sum(
 65                    (
 66                        cls.parse(
 67                            sub_value, min_allowed, max_allowed, str_conversions
 68                        ).values
 69                        for sub_value in value.split(",")
 70                    ),
 71                    [],
 72                ),
 73                raw=value,
 74            )
 75
 76        if value == "*":
 77            return cls(list(range(min_allowed, max_allowed + 1)), raw=value)
 78
 79        def _convert(value):
 80            result = str_conversions.get(value.upper(), value)
 81            return int(result)
 82
 83        if "/" in value:
 84            values, step = value.split("/")
 85            values = cls.parse(values, min_allowed, max_allowed, str_conversions)
 86            return cls([v for v in values.values if v % int(step) == 0], raw=value)
 87
 88        if "-" in value:
 89            start, end = value.split("-")
 90            return cls(list(range(_convert(start), _convert(end) + 1)), raw=value)
 91
 92        return cls([_convert(value)], raw=value)
 93
 94
 95class Schedule:
 96    def __init__(
 97        self,
 98        *,
 99        minute="*",
100        hour="*",
101        day_of_month="*",
102        month="*",
103        day_of_week="*",
104        raw="",
105    ):
106        self.minute = _ScheduleComponent.parse(minute, min_allowed=0, max_allowed=59)
107        self.hour = _ScheduleComponent.parse(hour, min_allowed=0, max_allowed=23)
108        self.day_of_month = _ScheduleComponent.parse(
109            day_of_month, min_allowed=1, max_allowed=31
110        )
111        self.month = _ScheduleComponent.parse(
112            month,
113            min_allowed=1,
114            max_allowed=12,
115            str_conversions=_MONTH_NAMES,
116        )
117        self.day_of_week = _ScheduleComponent.parse(
118            day_of_week,
119            min_allowed=0,
120            max_allowed=6,
121            str_conversions=_DAY_NAMES,
122        )
123        self._raw = raw
124
125    def __str__(self):
126        if self._raw:
127            return self._raw
128        return f"{self.minute} {self.hour} {self.day_of_month} {self.month} {self.day_of_week}"
129
130    def __repr__(self) -> str:
131        return f"<Schedule {self}>"
132
133    @classmethod
134    def from_cron(cls, cron):
135        raw = cron
136
137        if cron == "@yearly" or cron == "@annually":
138            cron = "0 0 1 1 *"
139        elif cron == "@monthly":
140            cron = "0 0 1 * *"
141        elif cron == "@weekly":
142            cron = "0 0 * * 0"
143        elif cron == "@daily" or cron == "@midnight":
144            cron = "0 0 * * *"
145        elif cron == "@hourly":
146            cron = "0 * * * *"
147
148        minute, hour, day_of_month, month, day_of_week = cron.split()
149
150        return cls(
151            minute=minute,
152            hour=hour,
153            day_of_month=day_of_month,
154            month=month,
155            day_of_week=day_of_week,
156            raw=raw,
157        )
158
159    def next(self, now=None):
160        """
161        Find the next datetime that matches the schedule after the given datetime.
162        """
163        dt = now or timezone.localtime()  # Use the defined plain timezone by default
164
165        # We only care about minutes, so immediately jump to the next minute
166        dt += datetime.timedelta(minutes=1)
167        dt = dt.replace(second=0, microsecond=0)
168
169        def _go_to_next_day(v):
170            v = v + datetime.timedelta(days=1)
171            return v.replace(
172                hour=self.hour.values[0],
173                minute=self.minute.values[0],
174            )
175
176        # If we don't find a value in the next 500 days,
177        # then the schedule is probably never going to match (i.e. Feb 31)
178        max_future = dt + datetime.timedelta(days=500)
179
180        while True:
181            is_valid_day = (
182                dt.month in self.month.values
183                and dt.day in self.day_of_month.values
184                and dt.weekday() in self.day_of_week.values
185            )
186            if is_valid_day:
187                # We're on a valid day, now find the next valid hour and minute
188                for hour in self.hour.values:
189                    if hour < dt.hour:
190                        continue
191                    for minute in self.minute.values:
192                        if hour == dt.hour and minute < dt.minute:
193                            continue
194                        candidate_datetime = dt.replace(hour=hour, minute=minute)
195                        if candidate_datetime >= dt:
196                            return candidate_datetime
197                # If no valid time is found today, reset to the first valid minute and hour of the next day
198                dt = _go_to_next_day(dt)
199            else:
200                # Increment the day until a valid month/day/weekday combination is found
201                dt = _go_to_next_day(dt)
202
203            if dt > max_future:
204                raise ValueError("No valid schedule match found in the next 500 days")
205
206
207class ScheduledCommand(Job):
208    def __init__(self, command):
209        self.command = command
210
211    def __repr__(self) -> str:
212        return f"<ScheduledCommand: {self.command}>"
213
214    def run(self):
215        subprocess.run(self.command, shell=True, check=True)
216
217    def get_unique_key(self) -> str:
218        # The ScheduledCommand can be used for different commands,
219        # so we need the unique_key to separate them in the scheduling uniqueness logic
220        return self.command
221
222
223def load_schedule(schedules):
224    jobs_schedule = []
225
226    for job, schedule in schedules:
227        if isinstance(job, str):
228            if job.startswith("cmd:"):
229                job = ScheduledCommand(job[4:])
230            else:
231                job = load_job(job, {"args": [], "kwargs": {}})
232
233        if isinstance(schedule, str):
234            schedule = Schedule.from_cron(schedule)
235
236        jobs_schedule.append((job, schedule))
237
238    return jobs_schedule