1from __future__ import annotations
2
3import datetime
4from typing import Any
5
6from plain.models import Model, models_registry
7
8
9class JobParameter:
10 """Base class for job parameter serialization/deserialization."""
11
12 STR_PREFIX: str | None = None # Subclasses should define this
13
14 @classmethod
15 def serialize(cls, value: Any) -> str | None:
16 """Return serialized string or None if can't handle this value."""
17 return None
18
19 @classmethod
20 def deserialize(cls, data: Any) -> Any:
21 """Return deserialized value or None if can't handle this data."""
22 return None
23
24 @classmethod
25 def _extract_string_value(cls, data: Any) -> str | None:
26 """Extract value from string with prefix, return None if invalid format."""
27 if not isinstance(data, str) or not cls.STR_PREFIX:
28 return None
29 if not data.startswith(cls.STR_PREFIX) or len(data) <= len(cls.STR_PREFIX):
30 return None
31 return data[len(cls.STR_PREFIX) :]
32
33
34class ModelParameter(JobParameter):
35 """Handle Plain model instances using a new string format."""
36
37 STR_PREFIX = "__plain://model/"
38
39 @classmethod
40 def serialize(cls, value: Any) -> str | None:
41 if isinstance(value, Model):
42 return f"{cls.STR_PREFIX}{value.model_options.package_label}/{value.model_options.model_name}/{value.id}"
43 return None
44
45 @classmethod
46 def deserialize(cls, data: Any) -> Model | None:
47 if value_part := cls._extract_string_value(data):
48 try:
49 parts = value_part.split("/")
50 if len(parts) == 3 and all(parts):
51 package, model_name, obj_id = parts
52 model = models_registry.get_model(package, model_name)
53 return model.query.get(id=obj_id)
54 except (ValueError, Exception):
55 pass
56 return None
57
58
59class DateParameter(JobParameter):
60 """Handle date objects."""
61
62 STR_PREFIX = "__plain://date/"
63
64 @classmethod
65 def serialize(cls, value: Any) -> str | None:
66 if isinstance(value, datetime.date) and not isinstance(
67 value, datetime.datetime
68 ):
69 return f"{cls.STR_PREFIX}{value.isoformat()}"
70 return None
71
72 @classmethod
73 def deserialize(cls, data: Any) -> datetime.date | None:
74 if value_part := cls._extract_string_value(data):
75 try:
76 return datetime.date.fromisoformat(value_part)
77 except ValueError:
78 pass
79 return None
80
81
82class DateTimeParameter(JobParameter):
83 """Handle datetime objects."""
84
85 STR_PREFIX = "__plain://datetime/"
86
87 @classmethod
88 def serialize(cls, value: Any) -> str | None:
89 if isinstance(value, datetime.datetime):
90 return f"{cls.STR_PREFIX}{value.isoformat()}"
91 return None
92
93 @classmethod
94 def deserialize(cls, data: Any) -> datetime.datetime | None:
95 if value_part := cls._extract_string_value(data):
96 try:
97 return datetime.datetime.fromisoformat(value_part)
98 except ValueError:
99 pass
100 return None
101
102
103class LegacyModelParameter(JobParameter):
104 """Legacy model parameter handling for backwards compatibility."""
105
106 STR_PREFIX = "gid://"
107
108 @classmethod
109 def serialize(cls, value: Any) -> str | None:
110 # Don't serialize new instances with legacy format
111 return None
112
113 @classmethod
114 def deserialize(cls, data: Any) -> Model | None:
115 if value_part := cls._extract_string_value(data):
116 try:
117 package, model, obj_id = value_part.split("/")
118 model = models_registry.get_model(package, model)
119 return model.query.get(id=obj_id)
120 except (ValueError, Exception):
121 pass
122 return None
123
124
125# Registry of parameter types to check in order
126# The order matters - more specific types should come first
127# DateTimeParameter must come before DateParameter since datetime is a subclass of date
128# LegacyModelParameter is last since it only handles deserialization
129PARAMETER_TYPES = [
130 ModelParameter,
131 DateTimeParameter,
132 DateParameter,
133 LegacyModelParameter,
134]
135
136
137class JobParameters:
138 """
139 Main interface for serializing and deserializing job parameters.
140 Uses the registered parameter types to handle different value types.
141 """
142
143 @staticmethod
144 def to_json(args: tuple[Any, ...], kwargs: dict[str, Any]) -> dict[str, Any]:
145 serialized_args = []
146 for arg in args:
147 serialized = JobParameters._serialize_value(arg)
148 serialized_args.append(serialized)
149
150 serialized_kwargs = {}
151 for key, value in kwargs.items():
152 serialized = JobParameters._serialize_value(value)
153 serialized_kwargs[key] = serialized
154
155 return {"args": serialized_args, "kwargs": serialized_kwargs}
156
157 @staticmethod
158 def _serialize_value(value: Any) -> Any:
159 """Serialize a single value using the registered parameter types."""
160 # Try each parameter type to see if it can serialize this value
161 for param_type in PARAMETER_TYPES:
162 result = param_type.serialize(value)
163 if result is not None:
164 return result
165
166 # If no parameter type can handle it, return as-is
167 return value
168
169 @staticmethod
170 def from_json(data: dict[str, Any]) -> tuple[tuple[Any, ...], dict[str, Any]]:
171 args = []
172 for arg in data["args"]:
173 deserialized = JobParameters._deserialize_value(arg)
174 args.append(deserialized)
175
176 kwargs = {}
177 for key, value in data["kwargs"].items():
178 deserialized = JobParameters._deserialize_value(value)
179 kwargs[key] = deserialized
180
181 return tuple(args), kwargs
182
183 @staticmethod
184 def _deserialize_value(value: Any) -> Any:
185 """Deserialize a single value using the registered parameter types."""
186 # Try each parameter type to see if it can deserialize this value
187 for param_type in PARAMETER_TYPES:
188 result = param_type.deserialize(value)
189 if result is not None:
190 return result
191
192 # If no parameter type can handle it, return as-is
193 return value