Plain is headed towards 1.0! Subscribe for development updates →

Worker

Process background jobs with a database-driven worker.

from plain.worker import Job
from plain.mail import send_mail

# Create a new job class
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_mail(
            subject="Welcome!",
            message=f"Hello from Plain, {self.user}",
            from_email="[email protected]",
            recipient_list=[self.user.email],
        )


# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()

The worker process is run separately using plain worker run.

Admin

Job history

Scheduled jobs

Monitoring

  1import datetime
  2import logging
  3import signal
  4
  5import click
  6
  7from plain.cli import register_cli
  8from plain.runtime import settings
  9from plain.utils import timezone
 10
 11from .models import Job, JobRequest, JobResult
 12from .registry import jobs_registry
 13from .scheduling import load_schedule
 14from .workers import Worker
 15
 16logger = logging.getLogger("plain.worker")
 17
 18
 19@register_cli("worker")
 20@click.group()
 21def cli():
 22    pass
 23
 24
 25@cli.command()
 26@click.option(
 27    "queues",
 28    "--queue",
 29    default=["default"],
 30    multiple=True,
 31    type=str,
 32    help="Queue to process",
 33)
 34@click.option(
 35    "--max-processes",
 36    "max_processes",
 37    default=None,
 38    type=int,
 39    envvar="PLAIN_WORKER_MAX_PROCESSES",
 40)
 41@click.option(
 42    "--max-jobs-per-process",
 43    "max_jobs_per_process",
 44    default=None,
 45    type=int,
 46    envvar="PLAIN_WORKER_MAX_JOBS_PER_PROCESS",
 47)
 48@click.option(
 49    "--max-pending-per-process",
 50    "max_pending_per_process",
 51    default=10,
 52    type=int,
 53    envvar="PLAIN_WORKER_MAX_PENDING_PER_PROCESS",
 54)
 55@click.option(
 56    "--stats-every",
 57    "stats_every",
 58    default=60,
 59    type=int,
 60    envvar="PLAIN_WORKER_STATS_EVERY",
 61)
 62def run(
 63    queues, max_processes, max_jobs_per_process, max_pending_per_process, stats_every
 64):
 65    jobs_schedule = load_schedule(settings.WORKER_JOBS_SCHEDULE)
 66
 67    worker = Worker(
 68        queues=queues,
 69        jobs_schedule=jobs_schedule,
 70        max_processes=max_processes,
 71        max_jobs_per_process=max_jobs_per_process,
 72        max_pending_per_process=max_pending_per_process,
 73        stats_every=stats_every,
 74    )
 75
 76    def _shutdown(signalnum, _):
 77        logger.info("Job worker shutdown signal received signalnum=%s", signalnum)
 78        worker.shutdown()
 79
 80    # Allow the worker to be stopped gracefully on SIGTERM
 81    signal.signal(signal.SIGTERM, _shutdown)
 82    signal.signal(signal.SIGINT, _shutdown)
 83
 84    # Start processing jobs
 85    worker.run()
 86
 87
 88@cli.command()
 89def clear_completed():
 90    """Clear all completed job results in all queues."""
 91    cutoff = timezone.now() - datetime.timedelta(
 92        seconds=settings.WORKER_JOBS_CLEARABLE_AFTER
 93    )
 94    click.echo(f"Clearing job results created before {cutoff}")
 95    results = JobResult.objects.filter(created_at__lt=cutoff).delete()
 96    click.echo(f"Deleted {results[0]} jobs")
 97
 98
 99@cli.command()
100def stats():
101    """Stats across all queues."""
102    pending = JobRequest.objects.count()
103    processing = Job.objects.count()
104
105    successful = JobResult.objects.successful().count()
106    errored = JobResult.objects.errored().count()
107    lost = JobResult.objects.lost().count()
108
109    click.secho(f"Pending: {pending}", bold=True)
110    click.secho(f"Processing: {processing}", bold=True)
111    click.secho(f"Successful: {successful}", bold=True, fg="green")
112    click.secho(f"Errored: {errored}", bold=True, fg="red")
113    click.secho(f"Lost: {lost}", bold=True, fg="yellow")
114
115
116@cli.command()
117def purge_processing():
118    """Delete all running and pending jobs regardless of queue."""
119    if not click.confirm(
120        "Are you sure you want to clear all running and pending jobs? This will delete all current Jobs and JobRequests"
121    ):
122        return
123
124    deleted = JobRequest.objects.all().delete()[0]
125    click.echo(f"Deleted {deleted} job requests")
126
127    deleted = Job.objects.all().delete()[0]
128    click.echo(f"Deleted {deleted} jobs")
129
130
131@cli.command()
132@click.argument("job_class_name", type=str)
133def run_job(job_class_name):
134    """Run a job class directly (and not using a worker)."""
135    job = jobs_registry.load_job(job_class_name, {"args": [], "kwargs": {}})
136    click.secho("Loaded job: ", bold=True, nl=False)
137    print(job)
138    job.run()
139
140
141@cli.command()
142def registered_jobs():
143    """List all registered jobs."""
144    for name, job_class in jobs_registry.jobs.items():
145        click.echo(f"{click.style(name, fg='blue')}: {job_class}")