Plain is headed towards 1.0! Subscribe for development updates →

Worker

Process background jobs with a database-driven worker.

from plain.worker import Job
from plain.mail import send_mail

# Create a new job class
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_mail(
            subject="Welcome!",
            message=f"Hello from Plain, {self.user}",
            from_email="[email protected]",
            recipient_list=[self.user.email],
        )


# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()

The worker process is run separately using plain worker run.

Staff

Job history

Scheduled jobs

Monitoring

  1import datetime
  2import logging
  3import signal
  4
  5import click
  6
  7from plain.runtime import settings
  8from plain.utils import timezone
  9
 10from .jobs import load_job
 11from .models import Job, JobRequest, JobResult
 12from .scheduling import load_schedule
 13from .workers import Worker
 14
 15logger = logging.getLogger("plain.worker")
 16
 17
 18@click.group()
 19def cli():
 20    pass
 21
 22
 23@cli.command()
 24@click.option(
 25    "queues",
 26    "--queue",
 27    default=["default"],
 28    multiple=True,
 29    type=str,
 30    help="Queue to process",
 31)
 32@click.option(
 33    "--max-processes",
 34    "max_processes",
 35    default=None,
 36    type=int,
 37    envvar="PLAIN_JOBS_MAX_PROCESSES",
 38)
 39@click.option(
 40    "--max-jobs-per-process",
 41    "max_jobs_per_process",
 42    default=None,
 43    type=int,
 44    envvar="PLAIN_JOBS_MAX_JOBS_PER_PROCESS",
 45)
 46@click.option(
 47    "--max-pending-per-process",
 48    "max_pending_per_process",
 49    default=10,
 50    type=int,
 51    envvar="PLAIN_JOBS_MAX_PENDING_PER_PROCESS",
 52)
 53@click.option(
 54    "--stats-every",
 55    "stats_every",
 56    default=60,
 57    type=int,
 58    envvar="PLAIN_JOBS_STATS_EVERY",
 59)
 60def run(
 61    queues, max_processes, max_jobs_per_process, max_pending_per_process, stats_every
 62):
 63    jobs_schedule = load_schedule(settings.WORKER_JOBS_SCHEDULE)
 64
 65    worker = Worker(
 66        queues=queues,
 67        jobs_schedule=jobs_schedule,
 68        max_processes=max_processes,
 69        max_jobs_per_process=max_jobs_per_process,
 70        max_pending_per_process=max_pending_per_process,
 71        stats_every=stats_every,
 72    )
 73
 74    def _shutdown(signalnum, _):
 75        logger.info("Job worker shutdown signal received signalnum=%s", signalnum)
 76        worker.shutdown()
 77
 78    # Allow the worker to be stopped gracefully on SIGTERM
 79    signal.signal(signal.SIGTERM, _shutdown)
 80    signal.signal(signal.SIGINT, _shutdown)
 81
 82    # Start processing jobs
 83    worker.run()
 84
 85
 86@cli.command()
 87def clear_completed():
 88    """Clear all completed job results in all queues."""
 89    cutoff = timezone.now() - datetime.timedelta(
 90        seconds=settings.WORKER_JOBS_CLEARABLE_AFTER
 91    )
 92    click.echo(f"Clearing job results created before {cutoff}")
 93    results = JobResult.objects.filter(created_at__lt=cutoff).delete()
 94    click.echo(f"Deleted {results[0]} jobs")
 95
 96
 97@cli.command()
 98def stats():
 99    """Stats across all queues."""
100    pending = JobRequest.objects.count()
101    processing = Job.objects.count()
102
103    successful = JobResult.objects.successful().count()
104    errored = JobResult.objects.errored().count()
105    lost = JobResult.objects.lost().count()
106
107    click.secho(f"Pending: {pending}", bold=True)
108    click.secho(f"Processing: {processing}", bold=True)
109    click.secho(f"Successful: {successful}", bold=True, fg="green")
110    click.secho(f"Errored: {errored}", bold=True, fg="red")
111    click.secho(f"Lost: {lost}", bold=True, fg="yellow")
112
113
114@cli.command()
115def purge_processing():
116    """Delete all running and pending jobs regardless of queue."""
117    if not click.confirm(
118        "Are you sure you want to clear all running and pending jobs? This will delete all current Jobs and JobRequests"
119    ):
120        return
121
122    deleted = JobRequest.objects.all().delete()[0]
123    click.echo(f"Deleted {deleted} job requests")
124
125    deleted = Job.objects.all().delete()[0]
126    click.echo(f"Deleted {deleted} jobs")
127
128
129@cli.command()
130@click.argument("job_class", type=str)
131def run_job(job_class):
132    """Run a job class directly (and not using a worker)."""
133    job = load_job(job_class, {"args": [], "kwargs": {}})
134    click.secho("Loaded job: ", bold=True, nl=False)
135    print(job)
136    job.run()