Plain is headed towards 1.0! Subscribe for development updates →

 1from __future__ import annotations
 2
 3from collections.abc import Callable
 4from typing import TYPE_CHECKING, Any, TypeVar
 5
 6from .parameters import JobParameters
 7
 8if TYPE_CHECKING:
 9    from .jobs import Job
10
11T = TypeVar("T", bound=type["Job"])
12
13
14class JobsRegistry:
15    def __init__(self) -> None:
16        self.jobs: dict[str, type[Job]] = {}
17        self.ready = False
18
19    def register_job(self, job_class: type[Job], alias: str = "") -> None:
20        name = self.get_job_class_name(job_class)
21        self.jobs[name] = job_class
22
23        if alias:
24            self.jobs[alias] = job_class
25
26    def get_job_class_name(self, job_class: type[Job]) -> str:
27        return f"{job_class.__module__}.{job_class.__qualname__}"
28
29    def get_job_class(self, name: str) -> type[Job]:
30        return self.jobs[name]
31
32    def load_job(self, job_class_name: str, parameters: dict[str, Any]) -> Job:
33        if not self.ready:
34            raise RuntimeError("Jobs registry is not ready yet")
35
36        job_class = self.get_job_class(job_class_name)
37        args, kwargs = JobParameters.from_json(parameters)
38        return job_class(*args, **kwargs)
39
40
41jobs_registry = JobsRegistry()
42
43
44def register_job(
45    job_class: T | None = None, *, alias: str = ""
46) -> T | Callable[[T], T]:
47    """
48    A decorator that registers a job class in the jobs registry with an optional alias.
49    Can be used both with and without parentheses.
50    """
51    if job_class is None:
52
53        def wrapper(cls: T) -> T:
54            jobs_registry.register_job(cls, alias=alias)
55            return cls
56
57        return wrapper
58    else:
59        jobs_registry.register_job(job_class, alias=alias)
60        return job_class