v0.146.0
  1from __future__ import annotations
  2
  3import datetime
  4import signal
  5from typing import Any
  6
  7import click
  8
  9from plain.cli import SettingOption, register_cli
 10from plain.logs import get_framework_logger
 11from plain.runtime import settings
 12from plain.utils import timezone
 13
 14from .models import JobProcess, JobRequest, JobResult
 15from .registry import jobs_registry
 16from .scheduling import load_schedule
 17from .workers import Worker
 18
 19logger = get_framework_logger()
 20
 21
 22@register_cli("jobs")
 23@click.group()
 24def cli() -> None:
 25    """Background job management"""
 26
 27
 28@cli.command()
 29@click.option(
 30    "queues",
 31    "--queue",
 32    default=["default"],
 33    multiple=True,
 34    type=str,
 35    help="Queue to process",
 36)
 37@click.option(
 38    "--max-processes",
 39    "max_processes",
 40    type=int,
 41    cls=SettingOption,
 42    setting="JOBS_WORKER_MAX_PROCESSES",
 43)
 44@click.option(
 45    "--max-jobs-per-process",
 46    "max_jobs_per_process",
 47    type=int,
 48    cls=SettingOption,
 49    setting="JOBS_WORKER_MAX_JOBS_PER_PROCESS",
 50)
 51@click.option(
 52    "--max-pending-per-process",
 53    "max_pending_per_process",
 54    type=int,
 55    cls=SettingOption,
 56    setting="JOBS_WORKER_MAX_PENDING_PER_PROCESS",
 57)
 58@click.option(
 59    "--stats-every",
 60    "stats_every",
 61    type=int,
 62    cls=SettingOption,
 63    setting="JOBS_WORKER_STATS_EVERY",
 64)
 65@click.option(
 66    "--reload",
 67    is_flag=True,
 68    help="Watch files and auto-reload worker on changes",
 69)
 70def worker(
 71    queues: tuple[str, ...],
 72    max_processes: int | None,
 73    max_jobs_per_process: int | None,
 74    max_pending_per_process: int,
 75    stats_every: int,
 76    reload: bool,
 77) -> None:
 78    """Run the job worker"""
 79    jobs_schedule = load_schedule(settings.JOBS_SCHEDULE)
 80
 81    worker_kwargs = {
 82        "queues": list(queues),
 83        "jobs_schedule": jobs_schedule,
 84        "max_processes": max_processes,
 85        "max_jobs_per_process": max_jobs_per_process,
 86        "max_pending_per_process": max_pending_per_process,
 87        "stats_every": stats_every,
 88    }
 89
 90    if reload:
 91        _run_with_reload(worker_kwargs)
 92    else:
 93        _run_once(worker_kwargs)
 94
 95
 96def _run_with_reload(worker_kwargs: dict[str, Any]) -> None:
 97    from plain.internal.reloader import Reloader
 98
 99    should_restart = {"value": True}
100    current_worker: dict[str, Worker | None] = {"instance": None}
101
102    def file_changed(filename: str) -> None:
103        if current_worker["instance"]:
104            current_worker["instance"].shutdown()
105
106    def signal_shutdown(signalnum: int, _: Any) -> None:
107        should_restart["value"] = False
108        if current_worker["instance"]:
109            current_worker["instance"].shutdown()
110
111    signal.signal(signal.SIGTERM, signal_shutdown)
112    signal.signal(signal.SIGINT, signal_shutdown)
113
114    reloader = Reloader(callback=file_changed, watch_html=False)
115    reloader.start()
116
117    while should_restart["value"]:
118        w = Worker(**worker_kwargs)
119        current_worker["instance"] = w
120        w.run()
121
122
123def _run_once(worker_kwargs: dict[str, Any]) -> None:
124    w = Worker(**worker_kwargs)
125
126    def _shutdown(signalnum: int, _: Any) -> None:
127        logger.info(
128            "Job worker shutdown signal received",
129            extra={"signalnum": signalnum},
130        )
131        w.shutdown()
132
133    signal.signal(signal.SIGTERM, _shutdown)
134    signal.signal(signal.SIGINT, _shutdown)
135
136    w.run()
137
138
139@cli.command()
140def clear() -> None:
141    """Clear completed job results"""
142    cutoff = timezone.now() - datetime.timedelta(
143        seconds=settings.JOBS_RESULTS_RETENTION
144    )
145    click.echo(f"Clearing job results created before {cutoff}")
146    count = JobResult.query.filter(created_at__lt=cutoff).delete()
147    click.echo(f"Deleted {count} jobs")
148
149
150@cli.command()
151def stats() -> None:
152    """Show job queue statistics"""
153    pending = JobRequest.query.count()
154    processing = JobProcess.query.count()
155
156    successful = JobResult.query.successful().count()
157    errored = JobResult.query.errored().count()
158    lost = JobResult.query.lost().count()
159
160    click.secho(f"Pending: {pending}", bold=True)
161    click.secho(f"Processing: {processing}", bold=True)
162    click.secho(f"Successful: {successful}", bold=True, fg="green")
163    click.secho(f"Errored: {errored}", bold=True, fg="red")
164    click.secho(f"Lost: {lost}", bold=True, fg="yellow")
165
166
167@cli.command()
168@click.option("--yes", "-y", is_flag=True, help="Skip confirmation prompt.")
169def purge(yes: bool) -> None:
170    """Delete all pending and running jobs"""
171    if not yes and not click.confirm(
172        "Are you sure you want to clear all running and pending jobs? This will delete all current Jobs and JobRequests"
173    ):
174        return
175
176    deleted = JobRequest.query.all().delete()
177    click.echo(f"Deleted {deleted} job requests")
178
179    deleted = JobProcess.query.all().delete()
180    click.echo(f"Deleted {deleted} jobs")
181
182
183@cli.command()
184@click.argument("job_class_name", type=str)
185def run(job_class_name: str) -> None:
186    """Run a job directly without a worker"""
187    job = jobs_registry.load_job(job_class_name, {"args": [], "kwargs": {}})
188    click.secho("Loaded job: ", bold=True, nl=False)
189    print(job)
190    job.run()
191
192
193@cli.command("list")
194def list_jobs() -> None:
195    """List all registered jobs"""
196    for name, job_class in jobs_registry.jobs.items():
197        click.secho(name, bold=True, nl=False)
198        description = job_class.__doc__.strip() if job_class.__doc__ else ""
199        if description:
200            click.secho(f": {description}", dim=True)
201        else:
202            click.echo("")