Worker
Process background jobs with a database-driven worker.
from plain.worker import Job
from plain.mail import send_mail
# Create a new job class
class WelcomeUserJob(Job):
def __init__(self, user):
self.user = user
def run(self):
send_mail(
subject="Welcome!",
message=f"Hello from Plain, {self.user}",
from_email="[email protected]",
recipient_list=[self.user.email],
)
# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()
The worker process is run separately using plain worker run
.
Admin
Job history
Scheduled jobs
Monitoring
1import datetime
2import logging
3import signal
4
5import click
6
7from plain.cli import register_cli
8from plain.runtime import settings
9from plain.utils import timezone
10
11from .models import Job, JobRequest, JobResult
12from .registry import jobs_registry
13from .scheduling import load_schedule
14from .workers import Worker
15
16logger = logging.getLogger("plain.worker")
17
18
19@register_cli("worker")
20@click.group()
21def cli():
22 pass
23
24
25@cli.command()
26@click.option(
27 "queues",
28 "--queue",
29 default=["default"],
30 multiple=True,
31 type=str,
32 help="Queue to process",
33)
34@click.option(
35 "--max-processes",
36 "max_processes",
37 default=None,
38 type=int,
39 envvar="PLAIN_WORKER_MAX_PROCESSES",
40)
41@click.option(
42 "--max-jobs-per-process",
43 "max_jobs_per_process",
44 default=None,
45 type=int,
46 envvar="PLAIN_WORKER_MAX_JOBS_PER_PROCESS",
47)
48@click.option(
49 "--max-pending-per-process",
50 "max_pending_per_process",
51 default=10,
52 type=int,
53 envvar="PLAIN_WORKER_MAX_PENDING_PER_PROCESS",
54)
55@click.option(
56 "--stats-every",
57 "stats_every",
58 default=60,
59 type=int,
60 envvar="PLAIN_WORKER_STATS_EVERY",
61)
62def run(
63 queues, max_processes, max_jobs_per_process, max_pending_per_process, stats_every
64):
65 jobs_schedule = load_schedule(settings.WORKER_JOBS_SCHEDULE)
66
67 worker = Worker(
68 queues=queues,
69 jobs_schedule=jobs_schedule,
70 max_processes=max_processes,
71 max_jobs_per_process=max_jobs_per_process,
72 max_pending_per_process=max_pending_per_process,
73 stats_every=stats_every,
74 )
75
76 def _shutdown(signalnum, _):
77 logger.info("Job worker shutdown signal received signalnum=%s", signalnum)
78 worker.shutdown()
79
80 # Allow the worker to be stopped gracefully on SIGTERM
81 signal.signal(signal.SIGTERM, _shutdown)
82 signal.signal(signal.SIGINT, _shutdown)
83
84 # Start processing jobs
85 worker.run()
86
87
88@cli.command()
89def clear_completed():
90 """Clear all completed job results in all queues."""
91 cutoff = timezone.now() - datetime.timedelta(
92 seconds=settings.WORKER_JOBS_CLEARABLE_AFTER
93 )
94 click.echo(f"Clearing job results created before {cutoff}")
95 results = JobResult.objects.filter(created_at__lt=cutoff).delete()
96 click.echo(f"Deleted {results[0]} jobs")
97
98
99@cli.command()
100def stats():
101 """Stats across all queues."""
102 pending = JobRequest.objects.count()
103 processing = Job.objects.count()
104
105 successful = JobResult.objects.successful().count()
106 errored = JobResult.objects.errored().count()
107 lost = JobResult.objects.lost().count()
108
109 click.secho(f"Pending: {pending}", bold=True)
110 click.secho(f"Processing: {processing}", bold=True)
111 click.secho(f"Successful: {successful}", bold=True, fg="green")
112 click.secho(f"Errored: {errored}", bold=True, fg="red")
113 click.secho(f"Lost: {lost}", bold=True, fg="yellow")
114
115
116@cli.command()
117def purge_processing():
118 """Delete all running and pending jobs regardless of queue."""
119 if not click.confirm(
120 "Are you sure you want to clear all running and pending jobs? This will delete all current Jobs and JobRequests"
121 ):
122 return
123
124 deleted = JobRequest.objects.all().delete()[0]
125 click.echo(f"Deleted {deleted} job requests")
126
127 deleted = Job.objects.all().delete()[0]
128 click.echo(f"Deleted {deleted} jobs")
129
130
131@cli.command()
132@click.argument("job_class_name", type=str)
133def run_job(job_class_name):
134 """Run a job class directly (and not using a worker)."""
135 job = jobs_registry.load_job(job_class_name, {"args": [], "kwargs": {}})
136 click.secho("Loaded job: ", bold=True, nl=False)
137 print(job)
138 job.run()
139
140
141@cli.command()
142def registered_jobs():
143 """List all registered jobs."""
144 for name, job_class in jobs_registry.jobs.items():
145 click.echo(f"{click.style(name, fg='blue')}: {job_class}")