Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import datetime
  4import logging
  5import signal
  6from typing import Any
  7
  8import click
  9
 10from plain.cli import register_cli
 11from plain.runtime import settings
 12from plain.utils import timezone
 13
 14from .models import JobProcess, JobRequest, JobResult
 15from .registry import jobs_registry
 16from .scheduling import load_schedule
 17from .workers import Worker
 18
 19logger = logging.getLogger("plain.jobs")
 20
 21
 22@register_cli("jobs")
 23@click.group()
 24def cli() -> None:
 25    pass
 26
 27
 28@cli.command()
 29@click.option(
 30    "queues",
 31    "--queue",
 32    default=["default"],
 33    multiple=True,
 34    type=str,
 35    help="Queue to process",
 36)
 37@click.option(
 38    "--max-processes",
 39    "max_processes",
 40    default=None,
 41    type=int,
 42    envvar="PLAIN_JOBS_WORKER_MAX_PROCESSES",
 43)
 44@click.option(
 45    "--max-jobs-per-process",
 46    "max_jobs_per_process",
 47    default=None,
 48    type=int,
 49    envvar="PLAIN_JOBS_WORKER_MAX_JOBS_PER_PROCESS",
 50)
 51@click.option(
 52    "--max-pending-per-process",
 53    "max_pending_per_process",
 54    default=10,
 55    type=int,
 56    envvar="PLAIN_JOBS_WORKER_MAX_PENDING_PER_PROCESS",
 57)
 58@click.option(
 59    "--stats-every",
 60    "stats_every",
 61    default=60,
 62    type=int,
 63    envvar="PLAIN_JOBS_WORKER_STATS_EVERY",
 64)
 65def worker(
 66    queues: tuple[str, ...],
 67    max_processes: int | None,
 68    max_jobs_per_process: int | None,
 69    max_pending_per_process: int,
 70    stats_every: int,
 71) -> None:
 72    """Run the job worker."""
 73    jobs_schedule = load_schedule(settings.JOBS_SCHEDULE)
 74
 75    worker = Worker(
 76        queues=list(queues),
 77        jobs_schedule=jobs_schedule,
 78        max_processes=max_processes,
 79        max_jobs_per_process=max_jobs_per_process,
 80        max_pending_per_process=max_pending_per_process,
 81        stats_every=stats_every,
 82    )
 83
 84    def _shutdown(signalnum: int, _: Any) -> None:
 85        logger.info("Job worker shutdown signal received signalnum=%s", signalnum)
 86        worker.shutdown()
 87
 88    # Allow the worker to be stopped gracefully on SIGTERM
 89    signal.signal(signal.SIGTERM, _shutdown)
 90    signal.signal(signal.SIGINT, _shutdown)
 91
 92    # Start processing jobs
 93    worker.run()
 94
 95
 96@cli.command()
 97def clear() -> None:
 98    """Clear all completed job results in all queues."""
 99    cutoff = timezone.now() - datetime.timedelta(
100        seconds=settings.JOBS_RESULTS_RETENTION
101    )
102    click.echo(f"Clearing job results created before {cutoff}")
103    results = JobResult.query.filter(created_at__lt=cutoff).delete()
104    click.echo(f"Deleted {results[0]} jobs")
105
106
107@cli.command()
108def stats() -> None:
109    """Stats across all queues."""
110    pending = JobRequest.query.count()
111    processing = JobProcess.query.count()
112
113    successful = JobResult.query.successful().count()
114    errored = JobResult.query.errored().count()
115    lost = JobResult.query.lost().count()
116
117    click.secho(f"Pending: {pending}", bold=True)
118    click.secho(f"Processing: {processing}", bold=True)
119    click.secho(f"Successful: {successful}", bold=True, fg="green")
120    click.secho(f"Errored: {errored}", bold=True, fg="red")
121    click.secho(f"Lost: {lost}", bold=True, fg="yellow")
122
123
124@cli.command()
125def purge() -> None:
126    """Delete all running and pending jobs regardless of queue."""
127    if not click.confirm(
128        "Are you sure you want to clear all running and pending jobs? This will delete all current Jobs and JobRequests"
129    ):
130        return
131
132    deleted = JobRequest.query.all().delete()[0]
133    click.echo(f"Deleted {deleted} job requests")
134
135    deleted = JobProcess.query.all().delete()[0]
136    click.echo(f"Deleted {deleted} jobs")
137
138
139@cli.command()
140@click.argument("job_class_name", type=str)
141def run(job_class_name: str) -> None:
142    """Run a job class directly (and not using a worker)."""
143    job = jobs_registry.load_job(job_class_name, {"args": [], "kwargs": {}})
144    click.secho("Loaded job: ", bold=True, nl=False)
145    print(job)
146    job.run()
147
148
149@cli.command("list")
150def list_jobs() -> None:
151    """List all registered jobs."""
152    for name, job_class in jobs_registry.jobs.items():
153        click.echo(f"{click.style(name, fg='blue')}: {job_class}")