Plain is headed towards 1.0! Subscribe for development updates →

  1from plain.models import Model, models_registry
  2
  3
  4class JobParameters:
  5    @staticmethod
  6    def to_json(args, kwargs):
  7        serialized_args = []
  8        for arg in args:
  9            if isinstance(arg, Model):
 10                serialized_args.append(ModelInstanceParameter.from_instance(arg))
 11            else:
 12                serialized_args.append(arg)
 13
 14        serialized_kwargs = {}
 15        for key, value in kwargs.items():
 16            if isinstance(value, Model):
 17                serialized_kwargs[key] = ModelInstanceParameter.from_instance(value)
 18            else:
 19                serialized_kwargs[key] = value
 20
 21        return {"args": serialized_args, "kwargs": serialized_kwargs}
 22
 23    @staticmethod
 24    def from_json(data):
 25        args = []
 26        for arg in data["args"]:
 27            if ModelInstanceParameter.is_gid(arg):
 28                args.append(ModelInstanceParameter.to_instance(arg))
 29            else:
 30                args.append(arg)
 31
 32        kwargs = {}
 33        for key, value in data["kwargs"].items():
 34            if ModelInstanceParameter.is_gid(value):
 35                kwargs[key] = ModelInstanceParameter.to_instance(value)
 36            else:
 37                kwargs[key] = value
 38
 39        return args, kwargs
 40
 41
 42class ModelInstanceParameter:
 43    """
 44    A string representation of a model instance,
 45    so we can convert a single parameter (model instance itself)
 46    into a string that can be serialized and stored in the database.
 47    """
 48
 49    @staticmethod
 50    def from_instance(instance):
 51        return f"gid://{instance._meta.package_label}/{instance._meta.model_name}/{instance.pk}"
 52
 53    @staticmethod
 54    def to_instance(s):
 55        if not s.startswith("gid://"):
 56            raise ValueError("Invalid ModelInstanceParameter string")
 57        package, model, pk = s[6:].split("/")
 58        model = models_registry.get_model(package, model)
 59        return model.objects.get(pk=pk)
 60
 61    @staticmethod
 62    def is_gid(x):
 63        if not isinstance(x, str):
 64            return False
 65        return x.startswith("gid://")
 66
 67
 68class JobsRegistry:
 69    def __init__(self):
 70        self.jobs = {}
 71        self.ready = False
 72
 73    def register_job(self, job_class, alias=""):
 74        name = self.get_job_class_name(job_class)
 75        self.jobs[name] = job_class
 76
 77        if alias:
 78            self.jobs[alias] = job_class
 79
 80    def get_job_class_name(self, job_class):
 81        return f"{job_class.__module__}.{job_class.__qualname__}"
 82
 83    def get_job_class(self, name: str):
 84        return self.jobs[name]
 85
 86    def load_job(self, job_class_name: str, parameters):
 87        if not self.ready:
 88            raise RuntimeError("Jobs registry is not ready yet")
 89
 90        job_class = self.get_job_class(job_class_name)
 91        args, kwargs = JobParameters.from_json(parameters)
 92        return job_class(*args, **kwargs)
 93
 94
 95jobs_registry = JobsRegistry()
 96
 97
 98def register_job(job_class=None, *, alias=""):
 99    """
100    A decorator that registers a job class in the jobs registry with an optional alias.
101    Can be used both with and without parentheses.
102    """
103    if job_class is None:
104
105        def wrapper(cls):
106            jobs_registry.register_job(cls, alias=alias)
107            return cls
108
109        return wrapper
110    else:
111        jobs_registry.register_job(job_class, alias=alias)
112        return job_class