Worker
Process background jobs with a database-driven worker.
from plain.worker import Job
from plain.mail import send_mail
# Create a new job class
class WelcomeUserJob(Job):
def __init__(self, user):
self.user = user
def run(self):
send_mail(
subject="Welcome!",
message=f"Hello from Plain, {self.user}",
from_email="[email protected]",
recipient_list=[self.user.email],
)
# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()
The worker process is run separately using plain worker run
.
Staff
Job history
Scheduled jobs
Monitoring
1import datetime
2import logging
3import signal
4
5import click
6
7from plain.runtime import settings
8from plain.utils import timezone
9
10from .jobs import load_job
11from .models import Job, JobRequest, JobResult
12from .scheduling import load_schedule
13from .workers import Worker
14
15logger = logging.getLogger("plain.worker")
16
17
18@click.group()
19def cli():
20 pass
21
22
23@cli.command()
24@click.option(
25 "queues",
26 "--queue",
27 default=["default"],
28 multiple=True,
29 type=str,
30 help="Queue to process",
31)
32@click.option(
33 "--max-processes",
34 "max_processes",
35 default=None,
36 type=int,
37 envvar="PLAIN_JOBS_MAX_PROCESSES",
38)
39@click.option(
40 "--max-jobs-per-process",
41 "max_jobs_per_process",
42 default=None,
43 type=int,
44 envvar="PLAIN_JOBS_MAX_JOBS_PER_PROCESS",
45)
46@click.option(
47 "--max-pending-per-process",
48 "max_pending_per_process",
49 default=10,
50 type=int,
51 envvar="PLAIN_JOBS_MAX_PENDING_PER_PROCESS",
52)
53@click.option(
54 "--stats-every",
55 "stats_every",
56 default=60,
57 type=int,
58 envvar="PLAIN_JOBS_STATS_EVERY",
59)
60def run(
61 queues, max_processes, max_jobs_per_process, max_pending_per_process, stats_every
62):
63 jobs_schedule = load_schedule(settings.WORKER_JOBS_SCHEDULE)
64
65 worker = Worker(
66 queues=queues,
67 jobs_schedule=jobs_schedule,
68 max_processes=max_processes,
69 max_jobs_per_process=max_jobs_per_process,
70 max_pending_per_process=max_pending_per_process,
71 stats_every=stats_every,
72 )
73
74 def _shutdown(signalnum, _):
75 logger.info("Job worker shutdown signal received signalnum=%s", signalnum)
76 worker.shutdown()
77
78 # Allow the worker to be stopped gracefully on SIGTERM
79 signal.signal(signal.SIGTERM, _shutdown)
80 signal.signal(signal.SIGINT, _shutdown)
81
82 # Start processing jobs
83 worker.run()
84
85
86@cli.command()
87def clear_completed():
88 """Clear all completed job results in all queues."""
89 cutoff = timezone.now() - datetime.timedelta(
90 seconds=settings.WORKER_JOBS_CLEARABLE_AFTER
91 )
92 click.echo(f"Clearing job results created before {cutoff}")
93 results = JobResult.objects.filter(created_at__lt=cutoff).delete()
94 click.echo(f"Deleted {results[0]} jobs")
95
96
97@cli.command()
98def stats():
99 """Stats across all queues."""
100 pending = JobRequest.objects.count()
101 processing = Job.objects.count()
102
103 successful = JobResult.objects.successful().count()
104 errored = JobResult.objects.errored().count()
105 lost = JobResult.objects.lost().count()
106
107 click.secho(f"Pending: {pending}", bold=True)
108 click.secho(f"Processing: {processing}", bold=True)
109 click.secho(f"Successful: {successful}", bold=True, fg="green")
110 click.secho(f"Errored: {errored}", bold=True, fg="red")
111 click.secho(f"Lost: {lost}", bold=True, fg="yellow")
112
113
114@cli.command()
115def purge_processing():
116 """Delete all running and pending jobs regardless of queue."""
117 if not click.confirm(
118 "Are you sure you want to clear all running and pending jobs? This will delete all current Jobs and JobRequests"
119 ):
120 return
121
122 deleted = JobRequest.objects.all().delete()[0]
123 click.echo(f"Deleted {deleted} job requests")
124
125 deleted = Job.objects.all().delete()[0]
126 click.echo(f"Deleted {deleted} jobs")
127
128
129@cli.command()
130@click.argument("job_class", type=str)
131def run_job(job_class):
132 """Run a job class directly (and not using a worker)."""
133 job = load_job(job_class, {"args": [], "kwargs": {}})
134 click.secho("Loaded job: ", bold=True, nl=False)
135 print(job)
136 job.run()