1from plain.models import Model, models_registry
2
3
4class JobParameters:
5 @staticmethod
6 def to_json(args, kwargs):
7 serialized_args = []
8 for arg in args:
9 if isinstance(arg, Model):
10 serialized_args.append(ModelInstanceParameter.from_instance(arg))
11 else:
12 serialized_args.append(arg)
13
14 serialized_kwargs = {}
15 for key, value in kwargs.items():
16 if isinstance(value, Model):
17 serialized_kwargs[key] = ModelInstanceParameter.from_instance(value)
18 else:
19 serialized_kwargs[key] = value
20
21 return {"args": serialized_args, "kwargs": serialized_kwargs}
22
23 @staticmethod
24 def from_json(data):
25 args = []
26 for arg in data["args"]:
27 if ModelInstanceParameter.is_gid(arg):
28 args.append(ModelInstanceParameter.to_instance(arg))
29 else:
30 args.append(arg)
31
32 kwargs = {}
33 for key, value in data["kwargs"].items():
34 if ModelInstanceParameter.is_gid(value):
35 kwargs[key] = ModelInstanceParameter.to_instance(value)
36 else:
37 kwargs[key] = value
38
39 return args, kwargs
40
41
42class ModelInstanceParameter:
43 """
44 A string representation of a model instance,
45 so we can convert a single parameter (model instance itself)
46 into a string that can be serialized and stored in the database.
47 """
48
49 @staticmethod
50 def from_instance(instance):
51 return f"gid://{instance._meta.package_label}/{instance._meta.model_name}/{instance.pk}"
52
53 @staticmethod
54 def to_instance(s):
55 if not s.startswith("gid://"):
56 raise ValueError("Invalid ModelInstanceParameter string")
57 package, model, pk = s[6:].split("/")
58 model = models_registry.get_model(package, model)
59 return model.objects.get(pk=pk)
60
61 @staticmethod
62 def is_gid(x):
63 if not isinstance(x, str):
64 return False
65 return x.startswith("gid://")
66
67
68class JobsRegistry:
69 def __init__(self):
70 self.jobs = {}
71 self.ready = False
72
73 def register_job(self, job_class, alias=""):
74 name = self.get_job_class_name(job_class)
75 self.jobs[name] = job_class
76
77 if alias:
78 self.jobs[alias] = job_class
79
80 def get_job_class_name(self, job_class):
81 return f"{job_class.__module__}.{job_class.__qualname__}"
82
83 def get_job_class(self, name: str):
84 return self.jobs[name]
85
86 def load_job(self, job_class_name: str, parameters):
87 if not self.ready:
88 raise RuntimeError("Jobs registry is not ready yet")
89
90 job_class = self.get_job_class(job_class_name)
91 args, kwargs = JobParameters.from_json(parameters)
92 return job_class(*args, **kwargs)
93
94
95jobs_registry = JobsRegistry()
96
97
98def register_job(job_class=None, *, alias=""):
99 """
100 A decorator that registers a job class in the jobs registry with an optional alias.
101 Can be used both with and without parentheses.
102 """
103 if job_class is None:
104
105 def wrapper(cls):
106 jobs_registry.register_job(cls, alias=alias)
107 return cls
108
109 return wrapper
110 else:
111 jobs_registry.register_job(job_class, alias=alias)
112 return job_class