1from __future__ import annotations
2
3import datetime
4import signal
5from typing import Any
6
7import click
8
9from plain.cli import SettingOption, register_cli
10from plain.logs import get_framework_logger
11from plain.runtime import settings
12from plain.utils import timezone
13
14from .models import JobProcess, JobRequest, JobResult
15from .registry import jobs_registry
16from .scheduling import load_schedule
17from .workers import Worker
18
19logger = get_framework_logger()
20
21
22@register_cli("jobs")
23@click.group()
24def cli() -> None:
25 """Background job management"""
26
27
28@cli.command()
29@click.option(
30 "queues",
31 "--queue",
32 default=["default"],
33 multiple=True,
34 type=str,
35 help="Queue to process",
36)
37@click.option(
38 "--max-processes",
39 "max_processes",
40 type=int,
41 cls=SettingOption,
42 setting="JOBS_WORKER_MAX_PROCESSES",
43)
44@click.option(
45 "--max-jobs-per-process",
46 "max_jobs_per_process",
47 type=int,
48 cls=SettingOption,
49 setting="JOBS_WORKER_MAX_JOBS_PER_PROCESS",
50)
51@click.option(
52 "--max-pending-per-process",
53 "max_pending_per_process",
54 type=int,
55 cls=SettingOption,
56 setting="JOBS_WORKER_MAX_PENDING_PER_PROCESS",
57)
58@click.option(
59 "--stats-every",
60 "stats_every",
61 type=int,
62 cls=SettingOption,
63 setting="JOBS_WORKER_STATS_EVERY",
64)
65@click.option(
66 "--reload",
67 is_flag=True,
68 help="Watch files and auto-reload worker on changes",
69)
70def worker(
71 queues: tuple[str, ...],
72 max_processes: int | None,
73 max_jobs_per_process: int | None,
74 max_pending_per_process: int,
75 stats_every: int,
76 reload: bool,
77) -> None:
78 """Run the job worker"""
79 jobs_schedule = load_schedule(settings.JOBS_SCHEDULE)
80
81 worker_kwargs = {
82 "queues": list(queues),
83 "jobs_schedule": jobs_schedule,
84 "max_processes": max_processes,
85 "max_jobs_per_process": max_jobs_per_process,
86 "max_pending_per_process": max_pending_per_process,
87 "stats_every": stats_every,
88 }
89
90 if reload:
91 _run_with_reload(worker_kwargs)
92 else:
93 _run_once(worker_kwargs)
94
95
96def _run_with_reload(worker_kwargs: dict[str, Any]) -> None:
97 from plain.internal.reloader import Reloader
98
99 should_restart = {"value": True}
100 current_worker: dict[str, Worker | None] = {"instance": None}
101
102 def file_changed(filename: str) -> None:
103 if current_worker["instance"]:
104 current_worker["instance"].shutdown()
105
106 def signal_shutdown(signalnum: int, _: Any) -> None:
107 should_restart["value"] = False
108 if current_worker["instance"]:
109 current_worker["instance"].shutdown()
110
111 signal.signal(signal.SIGTERM, signal_shutdown)
112 signal.signal(signal.SIGINT, signal_shutdown)
113
114 reloader = Reloader(callback=file_changed, watch_html=False)
115 reloader.start()
116
117 while should_restart["value"]:
118 w = Worker(**worker_kwargs)
119 current_worker["instance"] = w
120 w.run()
121
122
123def _run_once(worker_kwargs: dict[str, Any]) -> None:
124 w = Worker(**worker_kwargs)
125
126 def _shutdown(signalnum: int, _: Any) -> None:
127 logger.info(
128 "Job worker shutdown signal received",
129 extra={"signalnum": signalnum},
130 )
131 w.shutdown()
132
133 signal.signal(signal.SIGTERM, _shutdown)
134 signal.signal(signal.SIGINT, _shutdown)
135
136 w.run()
137
138
139@cli.command()
140def clear() -> None:
141 """Clear completed job results"""
142 cutoff = timezone.now() - datetime.timedelta(
143 seconds=settings.JOBS_RESULTS_RETENTION
144 )
145 click.echo(f"Clearing job results created before {cutoff}")
146 count = JobResult.query.filter(created_at__lt=cutoff).delete()
147 click.echo(f"Deleted {count} jobs")
148
149
150@cli.command()
151def stats() -> None:
152 """Show job queue statistics"""
153 pending = JobRequest.query.count()
154 processing = JobProcess.query.count()
155
156 successful = JobResult.query.successful().count()
157 errored = JobResult.query.errored().count()
158 lost = JobResult.query.lost().count()
159
160 click.secho(f"Pending: {pending}", bold=True)
161 click.secho(f"Processing: {processing}", bold=True)
162 click.secho(f"Successful: {successful}", bold=True, fg="green")
163 click.secho(f"Errored: {errored}", bold=True, fg="red")
164 click.secho(f"Lost: {lost}", bold=True, fg="yellow")
165
166
167@cli.command()
168@click.option("--yes", "-y", is_flag=True, help="Skip confirmation prompt.")
169def purge(yes: bool) -> None:
170 """Delete all pending and running jobs"""
171 if not yes and not click.confirm(
172 "Are you sure you want to clear all running and pending jobs? This will delete all current Jobs and JobRequests"
173 ):
174 return
175
176 deleted = JobRequest.query.all().delete()
177 click.echo(f"Deleted {deleted} job requests")
178
179 deleted = JobProcess.query.all().delete()
180 click.echo(f"Deleted {deleted} jobs")
181
182
183@cli.command()
184@click.argument("job_class_name", type=str)
185def run(job_class_name: str) -> None:
186 """Run a job directly without a worker"""
187 job = jobs_registry.load_job(job_class_name, {"args": [], "kwargs": {}})
188 click.secho("Loaded job: ", bold=True, nl=False)
189 print(job)
190 job.run()
191
192
193@cli.command("list")
194def list_jobs() -> None:
195 """List all registered jobs"""
196 for name, job_class in jobs_registry.jobs.items():
197 click.secho(name, bold=True, nl=False)
198 description = job_class.__doc__.strip() if job_class.__doc__ else ""
199 if description:
200 click.secho(f": {description}", dim=True)
201 else:
202 click.echo("")