1from __future__ import annotations
2
3from collections.abc import Callable
4from typing import TYPE_CHECKING, Any, TypeVar
5
6from .parameters import JobParameters
7
8if TYPE_CHECKING:
9 from .jobs import Job
10
11T = TypeVar("T", bound=type["Job"])
12
13
14class JobsRegistry:
15 def __init__(self) -> None:
16 self.jobs: dict[str, type[Job]] = {}
17 self.ready = False
18
19 def register_job(self, job_class: type[Job], alias: str = "") -> None:
20 name = self.get_job_class_name(job_class)
21 self.jobs[name] = job_class
22
23 if alias:
24 self.jobs[alias] = job_class
25
26 def get_job_class_name(self, job_class: type[Job]) -> str:
27 return f"{job_class.__module__}.{job_class.__qualname__}"
28
29 def get_job_class(self, name: str) -> type[Job]:
30 return self.jobs[name]
31
32 def load_job(self, job_class_name: str, parameters: dict[str, Any]) -> Job:
33 if not self.ready:
34 raise RuntimeError("Jobs registry is not ready yet")
35
36 job_class = self.get_job_class(job_class_name)
37 args, kwargs = JobParameters.from_json(parameters)
38 return job_class(*args, **kwargs)
39
40
41jobs_registry = JobsRegistry()
42
43
44def register_job(
45 job_class: T | None = None, *, alias: str = ""
46) -> T | Callable[[T], T]:
47 """
48 A decorator that registers a job class in the jobs registry with an optional alias.
49 Can be used both with and without parentheses.
50 """
51 if job_class is None:
52
53 def wrapper(cls: T) -> T:
54 jobs_registry.register_job(cls, alias=alias)
55 return cls
56
57 return wrapper
58 else:
59 jobs_registry.register_job(job_class, alias=alias)
60 return job_class