1from __future__ import annotations
2
3import datetime
4import logging
5import signal
6from typing import Any
7
8import click
9
10from plain.cli import register_cli
11from plain.runtime import settings
12from plain.utils import timezone
13
14from .models import JobProcess, JobRequest, JobResult
15from .registry import jobs_registry
16from .scheduling import load_schedule
17from .workers import Worker
18
19logger = logging.getLogger("plain.jobs")
20
21
22@register_cli("jobs")
23@click.group()
24def cli() -> None:
25 pass
26
27
28@cli.command()
29@click.option(
30 "queues",
31 "--queue",
32 default=["default"],
33 multiple=True,
34 type=str,
35 help="Queue to process",
36)
37@click.option(
38 "--max-processes",
39 "max_processes",
40 default=None,
41 type=int,
42 envvar="PLAIN_JOBS_WORKER_MAX_PROCESSES",
43)
44@click.option(
45 "--max-jobs-per-process",
46 "max_jobs_per_process",
47 default=None,
48 type=int,
49 envvar="PLAIN_JOBS_WORKER_MAX_JOBS_PER_PROCESS",
50)
51@click.option(
52 "--max-pending-per-process",
53 "max_pending_per_process",
54 default=10,
55 type=int,
56 envvar="PLAIN_JOBS_WORKER_MAX_PENDING_PER_PROCESS",
57)
58@click.option(
59 "--stats-every",
60 "stats_every",
61 default=60,
62 type=int,
63 envvar="PLAIN_JOBS_WORKER_STATS_EVERY",
64)
65def worker(
66 queues: tuple[str, ...],
67 max_processes: int | None,
68 max_jobs_per_process: int | None,
69 max_pending_per_process: int,
70 stats_every: int,
71) -> None:
72 """Run the job worker."""
73 jobs_schedule = load_schedule(settings.JOBS_SCHEDULE)
74
75 worker = Worker(
76 queues=list(queues),
77 jobs_schedule=jobs_schedule,
78 max_processes=max_processes,
79 max_jobs_per_process=max_jobs_per_process,
80 max_pending_per_process=max_pending_per_process,
81 stats_every=stats_every,
82 )
83
84 def _shutdown(signalnum: int, _: Any) -> None:
85 logger.info("Job worker shutdown signal received signalnum=%s", signalnum)
86 worker.shutdown()
87
88 # Allow the worker to be stopped gracefully on SIGTERM
89 signal.signal(signal.SIGTERM, _shutdown)
90 signal.signal(signal.SIGINT, _shutdown)
91
92 # Start processing jobs
93 worker.run()
94
95
96@cli.command()
97def clear() -> None:
98 """Clear all completed job results in all queues."""
99 cutoff = timezone.now() - datetime.timedelta(
100 seconds=settings.JOBS_RESULTS_RETENTION
101 )
102 click.echo(f"Clearing job results created before {cutoff}")
103 results = JobResult.query.filter(created_at__lt=cutoff).delete()
104 click.echo(f"Deleted {results[0]} jobs")
105
106
107@cli.command()
108def stats() -> None:
109 """Stats across all queues."""
110 pending = JobRequest.query.count()
111 processing = JobProcess.query.count()
112
113 successful = JobResult.query.successful().count()
114 errored = JobResult.query.errored().count()
115 lost = JobResult.query.lost().count()
116
117 click.secho(f"Pending: {pending}", bold=True)
118 click.secho(f"Processing: {processing}", bold=True)
119 click.secho(f"Successful: {successful}", bold=True, fg="green")
120 click.secho(f"Errored: {errored}", bold=True, fg="red")
121 click.secho(f"Lost: {lost}", bold=True, fg="yellow")
122
123
124@cli.command()
125def purge() -> None:
126 """Delete all running and pending jobs regardless of queue."""
127 if not click.confirm(
128 "Are you sure you want to clear all running and pending jobs? This will delete all current Jobs and JobRequests"
129 ):
130 return
131
132 deleted = JobRequest.query.all().delete()[0]
133 click.echo(f"Deleted {deleted} job requests")
134
135 deleted = JobProcess.query.all().delete()[0]
136 click.echo(f"Deleted {deleted} jobs")
137
138
139@cli.command()
140@click.argument("job_class_name", type=str)
141def run(job_class_name: str) -> None:
142 """Run a job class directly (and not using a worker)."""
143 job = jobs_registry.load_job(job_class_name, {"args": [], "kwargs": {}})
144 click.secho("Loaded job: ", bold=True, nl=False)
145 print(job)
146 job.run()
147
148
149@cli.command("list")
150def list_jobs() -> None:
151 """List all registered jobs."""
152 for name, job_class in jobs_registry.jobs.items():
153 click.echo(f"{click.style(name, fg='blue')}: {job_class}")