1import copy
2from collections import defaultdict
3from contextlib import contextmanager
4from functools import cached_property, partial
5
6from plain import models
7from plain.exceptions import FieldDoesNotExist
8from plain.models.fields import NOT_PROVIDED
9from plain.models.fields.related import RECURSIVE_RELATIONSHIP_CONSTANT
10from plain.models.migrations.utils import field_is_referenced, get_references
11from plain.models.options import DEFAULT_NAMES
12from plain.models.registry import ModelsRegistry
13from plain.models.registry import models_registry as global_models
14from plain.packages import packages_registry
15
16from .exceptions import InvalidBasesError
17from .utils import resolve_relation
18
19
20def _get_package_label_and_model_name(model, package_label=""):
21 if isinstance(model, str):
22 split = model.split(".", 1)
23 return tuple(split) if len(split) == 2 else (package_label, split[0])
24 else:
25 return model._meta.package_label, model._meta.model_name
26
27
28def _get_related_models(m):
29 """Return all models that have a direct relationship to the given model."""
30 related_models = [
31 subclass
32 for subclass in m.__subclasses__()
33 if issubclass(subclass, models.Model)
34 ]
35 related_fields_models = set()
36 for f in m._meta.get_fields(include_hidden=True):
37 if (
38 f.is_relation
39 and f.related_model is not None
40 and not isinstance(f.related_model, str)
41 ):
42 related_fields_models.add(f.model)
43 related_models.append(f.related_model)
44 return related_models
45
46
47def get_related_models_tuples(model):
48 """
49 Return a list of typical (package_label, model_name) tuples for all related
50 models for the given model.
51 """
52 return {
53 (rel_mod._meta.package_label, rel_mod._meta.model_name)
54 for rel_mod in _get_related_models(model)
55 }
56
57
58def get_related_models_recursive(model):
59 """
60 Return all models that have a direct or indirect relationship
61 to the given model.
62
63 Relationships are either defined by explicit relational fields, like
64 ForeignKey or ManyToManyField, or by inheriting from another
65 model (a superclass is related to its subclasses, but not vice versa).
66 """
67 seen = set()
68 queue = _get_related_models(model)
69 for rel_mod in queue:
70 rel_package_label, rel_model_name = (
71 rel_mod._meta.package_label,
72 rel_mod._meta.model_name,
73 )
74 if (rel_package_label, rel_model_name) in seen:
75 continue
76 seen.add((rel_package_label, rel_model_name))
77 queue.extend(_get_related_models(rel_mod))
78 return seen - {(model._meta.package_label, model._meta.model_name)}
79
80
81class ProjectState:
82 """
83 Represent the entire project's overall state. This is the item that is
84 passed around - do it here rather than at the app level so that cross-app
85 FKs/etc. resolve properly.
86 """
87
88 def __init__(self, models=None, real_packages=None):
89 self.models = models or {}
90 # Packages to include from main registry, usually unmigrated ones
91 if real_packages is None:
92 real_packages = set()
93 else:
94 assert isinstance(real_packages, set)
95 self.real_packages = real_packages
96 self.is_delayed = False
97 # {remote_model_key: {model_key: {field_name: field}}}
98 self._relations = None
99
100 @property
101 def relations(self):
102 if self._relations is None:
103 self.resolve_fields_and_relations()
104 return self._relations
105
106 def add_model(self, model_state):
107 model_key = model_state.package_label, model_state.name_lower
108 self.models[model_key] = model_state
109 if self._relations is not None:
110 self.resolve_model_relations(model_key)
111 if "models_registry" in self.__dict__: # hasattr would cache the property
112 self.reload_model(*model_key)
113
114 def remove_model(self, package_label, model_name):
115 model_key = package_label, model_name
116 del self.models[model_key]
117 if self._relations is not None:
118 self._relations.pop(model_key, None)
119 # Call list() since _relations can change size during iteration.
120 for related_model_key, model_relations in list(self._relations.items()):
121 model_relations.pop(model_key, None)
122 if not model_relations:
123 del self._relations[related_model_key]
124 if "models_registry" in self.__dict__: # hasattr would cache the property
125 self.models_registry.unregister_model(*model_key)
126 # Need to do this explicitly since unregister_model() doesn't clear
127 # the cache automatically (#24513)
128 self.models_registry.clear_cache()
129
130 def rename_model(self, package_label, old_name, new_name):
131 # Add a new model.
132 old_name_lower = old_name.lower()
133 new_name_lower = new_name.lower()
134 renamed_model = self.models[package_label, old_name_lower].clone()
135 renamed_model.name = new_name
136 self.models[package_label, new_name_lower] = renamed_model
137 # Repoint all fields pointing to the old model to the new one.
138 old_model_tuple = (package_label, old_name_lower)
139 new_remote_model = f"{package_label}.{new_name}"
140 to_reload = set()
141 for model_state, name, field, reference in get_references(
142 self, old_model_tuple
143 ):
144 changed_field = None
145 if reference.to:
146 changed_field = field.clone()
147 changed_field.remote_field.model = new_remote_model
148 if reference.through:
149 if changed_field is None:
150 changed_field = field.clone()
151 changed_field.remote_field.through = new_remote_model
152 if changed_field:
153 model_state.fields[name] = changed_field
154 to_reload.add((model_state.package_label, model_state.name_lower))
155 if self._relations is not None:
156 old_name_key = package_label, old_name_lower
157 new_name_key = package_label, new_name_lower
158 if old_name_key in self._relations:
159 self._relations[new_name_key] = self._relations.pop(old_name_key)
160 for model_relations in self._relations.values():
161 if old_name_key in model_relations:
162 model_relations[new_name_key] = model_relations.pop(old_name_key)
163 # Reload models related to old model before removing the old model.
164 self.reload_models(to_reload, delay=True)
165 # Remove the old model.
166 self.remove_model(package_label, old_name_lower)
167 self.reload_model(package_label, new_name_lower, delay=True)
168
169 def alter_model_options(self, package_label, model_name, options, option_keys=None):
170 model_state = self.models[package_label, model_name]
171 model_state.options = {**model_state.options, **options}
172 if option_keys:
173 for key in option_keys:
174 if key not in options:
175 model_state.options.pop(key, False)
176 self.reload_model(package_label, model_name, delay=True)
177
178 def _append_option(self, package_label, model_name, option_name, obj):
179 model_state = self.models[package_label, model_name]
180 model_state.options[option_name] = [*model_state.options[option_name], obj]
181 self.reload_model(package_label, model_name, delay=True)
182
183 def _remove_option(self, package_label, model_name, option_name, obj_name):
184 model_state = self.models[package_label, model_name]
185 objs = model_state.options[option_name]
186 model_state.options[option_name] = [obj for obj in objs if obj.name != obj_name]
187 self.reload_model(package_label, model_name, delay=True)
188
189 def add_index(self, package_label, model_name, index):
190 self._append_option(package_label, model_name, "indexes", index)
191
192 def remove_index(self, package_label, model_name, index_name):
193 self._remove_option(package_label, model_name, "indexes", index_name)
194
195 def rename_index(self, package_label, model_name, old_index_name, new_index_name):
196 model_state = self.models[package_label, model_name]
197 objs = model_state.options["indexes"]
198
199 new_indexes = []
200 for obj in objs:
201 if obj.name == old_index_name:
202 obj = obj.clone()
203 obj.name = new_index_name
204 new_indexes.append(obj)
205
206 model_state.options["indexes"] = new_indexes
207 self.reload_model(package_label, model_name, delay=True)
208
209 def add_constraint(self, package_label, model_name, constraint):
210 self._append_option(package_label, model_name, "constraints", constraint)
211
212 def remove_constraint(self, package_label, model_name, constraint_name):
213 self._remove_option(package_label, model_name, "constraints", constraint_name)
214
215 def add_field(self, package_label, model_name, name, field, preserve_default):
216 # If preserve default is off, don't use the default for future state.
217 if not preserve_default:
218 field = field.clone()
219 field.default = NOT_PROVIDED
220 else:
221 field = field
222 model_key = package_label, model_name
223 self.models[model_key].fields[name] = field
224 if self._relations is not None:
225 self.resolve_model_field_relations(model_key, name, field)
226 # Delay rendering of relationships if it's not a relational field.
227 delay = not field.is_relation
228 self.reload_model(*model_key, delay=delay)
229
230 def remove_field(self, package_label, model_name, name):
231 model_key = package_label, model_name
232 model_state = self.models[model_key]
233 old_field = model_state.fields.pop(name)
234 if self._relations is not None:
235 self.resolve_model_field_relations(model_key, name, old_field)
236 # Delay rendering of relationships if it's not a relational field.
237 delay = not old_field.is_relation
238 self.reload_model(*model_key, delay=delay)
239
240 def alter_field(self, package_label, model_name, name, field, preserve_default):
241 if not preserve_default:
242 field = field.clone()
243 field.default = NOT_PROVIDED
244 else:
245 field = field
246 model_key = package_label, model_name
247 fields = self.models[model_key].fields
248 if self._relations is not None:
249 old_field = fields.pop(name)
250 if old_field.is_relation:
251 self.resolve_model_field_relations(model_key, name, old_field)
252 fields[name] = field
253 if field.is_relation:
254 self.resolve_model_field_relations(model_key, name, field)
255 else:
256 fields[name] = field
257 # TODO: investigate if old relational fields must be reloaded or if
258 # it's sufficient if the new field is (#27737).
259 # Delay rendering of relationships if it's not a relational field and
260 # not referenced by a foreign key.
261 delay = not field.is_relation and not field_is_referenced(
262 self, model_key, (name, field)
263 )
264 self.reload_model(*model_key, delay=delay)
265
266 def rename_field(self, package_label, model_name, old_name, new_name):
267 model_key = package_label, model_name
268 model_state = self.models[model_key]
269 # Rename the field.
270 fields = model_state.fields
271 try:
272 found = fields.pop(old_name)
273 except KeyError:
274 raise FieldDoesNotExist(
275 f"{package_label}.{model_name} has no field named '{old_name}'"
276 )
277 fields[new_name] = found
278 # Check if there are any references to this field
279 references = get_references(self, model_key, (old_name, found))
280 delay = not bool(references)
281 if self._relations is not None:
282 old_name_lower = old_name.lower()
283 new_name_lower = new_name.lower()
284 for to_model in self._relations.values():
285 if old_name_lower in to_model[model_key]:
286 field = to_model[model_key].pop(old_name_lower)
287 field.name = new_name_lower
288 to_model[model_key][new_name_lower] = field
289 self.reload_model(*model_key, delay=delay)
290
291 def _find_reload_model(self, package_label, model_name, delay=False):
292 if delay:
293 self.is_delayed = True
294
295 related_models = set()
296
297 try:
298 old_model = self.models_registry.get_model(package_label, model_name)
299 except LookupError:
300 pass
301 else:
302 # Get all relations to and from the old model before reloading,
303 # as _meta.models_registry may change
304 if delay:
305 related_models = get_related_models_tuples(old_model)
306 else:
307 related_models = get_related_models_recursive(old_model)
308
309 # Get all outgoing references from the model to be rendered
310 model_state = self.models[(package_label, model_name)]
311 # Directly related models are the models pointed to by ForeignKeys and ManyToManyFields.
312 direct_related_models = set()
313 for field in model_state.fields.values():
314 if field.is_relation:
315 if field.remote_field.model == RECURSIVE_RELATIONSHIP_CONSTANT:
316 continue
317 rel_package_label, rel_model_name = _get_package_label_and_model_name(
318 field.related_model, package_label
319 )
320 direct_related_models.add((rel_package_label, rel_model_name.lower()))
321
322 # For all direct related models recursively get all related models.
323 related_models.update(direct_related_models)
324 for rel_package_label, rel_model_name in direct_related_models:
325 try:
326 rel_model = self.models_registry.get_model(
327 rel_package_label, rel_model_name
328 )
329 except LookupError:
330 pass
331 else:
332 if delay:
333 related_models.update(get_related_models_tuples(rel_model))
334 else:
335 related_models.update(get_related_models_recursive(rel_model))
336
337 # Include the model itself
338 related_models.add((package_label, model_name))
339
340 return related_models
341
342 def reload_model(self, package_label, model_name, delay=False):
343 if "models_registry" in self.__dict__: # hasattr would cache the property
344 related_models = self._find_reload_model(package_label, model_name, delay)
345 self._reload(related_models)
346
347 def reload_models(self, models, delay=True):
348 if "models_registry" in self.__dict__: # hasattr would cache the property
349 related_models = set()
350 for package_label, model_name in models:
351 related_models.update(
352 self._find_reload_model(package_label, model_name, delay)
353 )
354 self._reload(related_models)
355
356 def _reload(self, related_models):
357 # Unregister all related models
358 with self.models_registry.bulk_update():
359 for rel_package_label, rel_model_name in related_models:
360 self.models_registry.unregister_model(rel_package_label, rel_model_name)
361
362 states_to_be_rendered = []
363 # Gather all models states of those models that will be rerendered.
364 # This includes:
365 # 1. All related models of unmigrated packages
366 for model_state in self.models_registry.real_models:
367 if (model_state.package_label, model_state.name_lower) in related_models:
368 states_to_be_rendered.append(model_state)
369
370 # 2. All related models of migrated packages
371 for rel_package_label, rel_model_name in related_models:
372 try:
373 model_state = self.models[rel_package_label, rel_model_name]
374 except KeyError:
375 pass
376 else:
377 states_to_be_rendered.append(model_state)
378
379 # Render all models
380 self.models_registry.render_multiple(states_to_be_rendered)
381
382 def update_model_field_relation(
383 self,
384 model,
385 model_key,
386 field_name,
387 field,
388 concretes,
389 ):
390 remote_model_key = resolve_relation(model, *model_key)
391 if (
392 remote_model_key[0] not in self.real_packages
393 and remote_model_key in concretes
394 ):
395 remote_model_key = concretes[remote_model_key]
396 relations_to_remote_model = self._relations[remote_model_key]
397 if field_name in self.models[model_key].fields:
398 # The assert holds because it's a new relation, or an altered
399 # relation, in which case references have been removed by
400 # alter_field().
401 assert field_name not in relations_to_remote_model[model_key]
402 relations_to_remote_model[model_key][field_name] = field
403 else:
404 del relations_to_remote_model[model_key][field_name]
405 if not relations_to_remote_model[model_key]:
406 del relations_to_remote_model[model_key]
407
408 def resolve_model_field_relations(
409 self,
410 model_key,
411 field_name,
412 field,
413 concretes=None,
414 ):
415 remote_field = field.remote_field
416 if not remote_field:
417 return
418 if concretes is None:
419 concretes = self._get_concrete_models_mapping()
420
421 self.update_model_field_relation(
422 remote_field.model,
423 model_key,
424 field_name,
425 field,
426 concretes,
427 )
428
429 through = getattr(remote_field, "through", None)
430 if not through:
431 return
432 self.update_model_field_relation(
433 through, model_key, field_name, field, concretes
434 )
435
436 def resolve_model_relations(self, model_key, concretes=None):
437 if concretes is None:
438 concretes = self._get_concrete_models_mapping()
439
440 model_state = self.models[model_key]
441 for field_name, field in model_state.fields.items():
442 self.resolve_model_field_relations(model_key, field_name, field, concretes)
443
444 def resolve_fields_and_relations(self):
445 # Resolve fields.
446 for model_state in self.models.values():
447 for field_name, field in model_state.fields.items():
448 field.name = field_name
449 # Resolve relations.
450 # {remote_model_key: {model_key: {field_name: field}}}
451 self._relations = defaultdict(partial(defaultdict, dict))
452 concretes = self._get_concrete_models_mapping()
453
454 for model_key in concretes:
455 self.resolve_model_relations(model_key, concretes)
456
457 def _get_concrete_models_mapping(self):
458 concrete_models_mapping = {}
459 for model_key, model_state in self.models.items():
460 concrete_models_mapping[model_key] = model_key
461 return concrete_models_mapping
462
463 def clone(self):
464 """Return an exact copy of this ProjectState."""
465 new_state = ProjectState(
466 models={k: v.clone() for k, v in self.models.items()},
467 real_packages=self.real_packages,
468 )
469 if "models_registry" in self.__dict__:
470 new_state.models_registry = self.models_registry.clone()
471 new_state.is_delayed = self.is_delayed
472 return new_state
473
474 def clear_delayed_models_cache(self):
475 if self.is_delayed and "models_registry" in self.__dict__:
476 del self.__dict__["models_registry"]
477
478 @cached_property
479 def models_registry(self):
480 return StateModelsRegistry(self.real_packages, self.models)
481
482 @classmethod
483 def from_models_registry(cls, models_registry):
484 """Take an Packages and return a ProjectState matching it."""
485 app_models = {}
486 for model in models_registry.get_models():
487 model_state = ModelState.from_model(model)
488 app_models[(model_state.package_label, model_state.name_lower)] = (
489 model_state
490 )
491 return cls(app_models)
492
493 def __eq__(self, other):
494 return self.models == other.models and self.real_packages == other.real_packages
495
496
497class StateModelsRegistry(ModelsRegistry):
498 """
499 Subclass of the global Packages registry class to better handle dynamic model
500 additions and removals.
501 """
502
503 def __init__(self, real_packages, models):
504 # Any packages in self.real_packages should have all their models included
505 # in the render. We don't use the original model instances as there
506 # are some variables that refer to the Packages object.
507 # FKs/M2Ms from real packages are also not included as they just
508 # mess things up with partial states (due to lack of dependencies)
509 self.real_models = []
510 for package_label in real_packages:
511 for model in global_models.get_models(package_label=package_label):
512 self.real_models.append(ModelState.from_model(model, exclude_rels=True))
513
514 super().__init__()
515
516 self.render_multiple([*models.values(), *self.real_models])
517
518 self.ready = True
519
520 # There shouldn't be any operations pending at this point.
521 from plain.models.preflight import _check_lazy_references
522
523 if errors := _check_lazy_references(self, packages_registry):
524 raise ValueError("\n".join(error.msg for error in errors))
525
526 @contextmanager
527 def bulk_update(self):
528 # Avoid clearing each model's cache for each change. Instead, clear
529 # all caches when we're finished updating the model instances.
530 ready = self.ready
531 self.ready = False
532 try:
533 yield
534 finally:
535 self.ready = ready
536 self.clear_cache()
537
538 def render_multiple(self, model_states):
539 # We keep trying to render the models in a loop, ignoring invalid
540 # base errors, until the size of the unrendered models doesn't
541 # decrease by at least one, meaning there's a base dependency loop/
542 # missing base.
543 if not model_states:
544 return
545 # Prevent that all model caches are expired for each render.
546 with self.bulk_update():
547 unrendered_models = model_states
548 while unrendered_models:
549 new_unrendered_models = []
550 for model in unrendered_models:
551 try:
552 model.render(self)
553 except InvalidBasesError:
554 new_unrendered_models.append(model)
555 if len(new_unrendered_models) == len(unrendered_models):
556 raise InvalidBasesError(
557 f"Cannot resolve bases for {new_unrendered_models!r}\nThis can happen if you are "
558 "inheriting models from an app with migrations (e.g. "
559 "contrib.auth)\n in an app with no migrations"
560 )
561 unrendered_models = new_unrendered_models
562
563 def clone(self):
564 """Return a clone of this registry."""
565 clone = StateModelsRegistry([], {})
566 clone.all_models = copy.deepcopy(self.all_models)
567
568 # No need to actually clone them, they'll never change
569 clone.real_models = self.real_models
570 return clone
571
572 def register_model(self, package_label, model):
573 self.all_models[package_label][model._meta.model_name] = model
574 self.do_pending_operations(model)
575 self.clear_cache()
576
577 def unregister_model(self, package_label, model_name):
578 try:
579 del self.all_models[package_label][model_name]
580 except KeyError:
581 pass
582
583
584class ModelState:
585 """
586 Represent a Plain Model. Don't use the actual Model class as it's not
587 designed to have its options changed - instead, mutate this one and then
588 render it into a Model as required.
589
590 Note that while you are allowed to mutate .fields, you are not allowed
591 to mutate the Field instances inside there themselves - you must instead
592 assign new ones, as these are not detached during a clone.
593 """
594
595 def __init__(self, package_label, name, fields, options=None, bases=None):
596 self.package_label = package_label
597 self.name = name
598 self.fields = dict(fields)
599 self.options = options or {}
600 self.options.setdefault("indexes", [])
601 self.options.setdefault("constraints", [])
602 self.bases = bases or (models.Model,)
603 for name, field in self.fields.items():
604 # Sanity-check that fields are NOT already bound to a model.
605 if hasattr(field, "model"):
606 raise ValueError(
607 f'ModelState.fields cannot be bound to a model - "{name}" is.'
608 )
609 # Sanity-check that relation fields are NOT referring to a model class.
610 if field.is_relation and hasattr(field.related_model, "_meta"):
611 raise ValueError(
612 f'ModelState.fields cannot refer to a model class - "{name}.to" does. '
613 "Use a string reference instead."
614 )
615 if field.many_to_many and hasattr(field.remote_field.through, "_meta"):
616 raise ValueError(
617 f'ModelState.fields cannot refer to a model class - "{name}.through" '
618 "does. Use a string reference instead."
619 )
620 # Sanity-check that indexes have their name set.
621 for index in self.options["indexes"]:
622 if not index.name:
623 raise ValueError(
624 "Indexes passed to ModelState require a name attribute. "
625 f"{index!r} doesn't have one."
626 )
627
628 @cached_property
629 def name_lower(self):
630 return self.name.lower()
631
632 def get_field(self, field_name):
633 return self.fields[field_name]
634
635 @classmethod
636 def from_model(cls, model, exclude_rels=False):
637 """Given a model, return a ModelState representing it."""
638 # Deconstruct the fields
639 fields = []
640 for field in model._meta.local_fields:
641 if getattr(field, "remote_field", None) and exclude_rels:
642 continue
643 name = field.name
644 try:
645 fields.append((name, field.clone()))
646 except TypeError as e:
647 raise TypeError(
648 f"Couldn't reconstruct field {name} on {model._meta.label}: {e}"
649 )
650 if not exclude_rels:
651 for field in model._meta.local_many_to_many:
652 name = field.name
653 try:
654 fields.append((name, field.clone()))
655 except TypeError as e:
656 raise TypeError(
657 f"Couldn't reconstruct m2m field {name} on {model._meta.object_name}: {e}"
658 )
659 # Extract the options
660 options = {}
661 for name in DEFAULT_NAMES:
662 # Ignore some special options
663 if name in ["models_registry", "package_label"]:
664 continue
665 elif name in model._meta.original_attrs:
666 if name == "indexes":
667 indexes = [idx.clone() for idx in model._meta.indexes]
668 for index in indexes:
669 if not index.name:
670 index.set_name_with_model(model)
671 options["indexes"] = indexes
672 elif name == "constraints":
673 options["constraints"] = [
674 con.clone() for con in model._meta.constraints
675 ]
676 else:
677 options[name] = model._meta.original_attrs[name]
678
679 def flatten_bases(model):
680 bases = []
681 for base in model.__bases__:
682 bases.append(base)
683 return bases
684
685 # We can't rely on __mro__ directly because we only want to flatten
686 # abstract models and not the whole tree. However by recursing on
687 # __bases__ we may end up with duplicates and ordering issues, we
688 # therefore discard any duplicates and reorder the bases according
689 # to their index in the MRO.
690 flattened_bases = sorted(
691 set(flatten_bases(model)), key=lambda x: model.__mro__.index(x)
692 )
693
694 # Make our record
695 bases = tuple(
696 (base._meta.label_lower if hasattr(base, "_meta") else base)
697 for base in flattened_bases
698 )
699 # Ensure at least one base inherits from models.Model
700 if not any(
701 (isinstance(base, str) or issubclass(base, models.Model)) for base in bases
702 ):
703 bases = (models.Model,)
704
705 # Construct the new ModelState
706 return cls(
707 model._meta.package_label,
708 model._meta.object_name,
709 fields,
710 options,
711 bases,
712 )
713
714 def clone(self):
715 """Return an exact copy of this ModelState."""
716 return self.__class__(
717 package_label=self.package_label,
718 name=self.name,
719 fields=dict(self.fields),
720 # Since options are shallow-copied here, operations such as
721 # AddIndex must replace their option (e.g 'indexes') rather
722 # than mutating it.
723 options=dict(self.options),
724 bases=self.bases,
725 )
726
727 def render(self, models_registry):
728 """Create a Model object from our current state into the given packages."""
729 # First, make a Meta object
730 meta_contents = {
731 "package_label": self.package_label,
732 "models_registry": models_registry,
733 **self.options,
734 }
735 meta = type("Meta", (), meta_contents)
736 # Then, work out our bases
737 try:
738 bases = tuple(
739 (models_registry.get_model(base) if isinstance(base, str) else base)
740 for base in self.bases
741 )
742 except LookupError:
743 raise InvalidBasesError(
744 f"Cannot resolve one or more bases from {self.bases!r}"
745 )
746 # Clone fields for the body, add other bits.
747 body = {name: field.clone() for name, field in self.fields.items()}
748 body["Meta"] = meta
749 body["__module__"] = "__fake__"
750
751 # Then, make a Model object (models_registry.register_model is called in __new__)
752 model_class = type(self.name, bases, body)
753 from plain.models import register_model
754
755 # Register it to the models_registry associated with the model meta
756 # (could probably do this directly right here too...)
757 register_model(model_class)
758
759 return model_class
760
761 def get_index_by_name(self, name):
762 for index in self.options["indexes"]:
763 if index.name == name:
764 return index
765 raise ValueError(f"No index named {name} on model {self.name}")
766
767 def get_constraint_by_name(self, name):
768 for constraint in self.options["constraints"]:
769 if constraint.name == name:
770 return constraint
771 raise ValueError(f"No constraint named {name} on model {self.name}")
772
773 def __repr__(self):
774 return f"<{self.__class__.__name__}: '{self.package_label}.{self.name}'>"
775
776 def __eq__(self, other):
777 return (
778 (self.package_label == other.package_label)
779 and (self.name == other.name)
780 and (len(self.fields) == len(other.fields))
781 and all(
782 k1 == k2 and f1.deconstruct()[1:] == f2.deconstruct()[1:]
783 for (k1, f1), (k2, f2) in zip(
784 sorted(self.fields.items()),
785 sorted(other.fields.items()),
786 )
787 )
788 and (self.options == other.options)
789 and (self.bases == other.bases)
790 )