1import copy
2from collections import defaultdict
3from contextlib import contextmanager
4from functools import cached_property, partial
5
6from plain import models
7from plain.exceptions import FieldDoesNotExist
8from plain.models.fields import NOT_PROVIDED
9from plain.models.fields.related import RECURSIVE_RELATIONSHIP_CONSTANT
10from plain.models.migrations.utils import field_is_referenced, get_references
11from plain.models.options import DEFAULT_NAMES
12from plain.models.registry import ModelsRegistry
13from plain.models.registry import models_registry as global_models
14from plain.packages import packages_registry
15from plain.utils.module_loading import import_string
16
17from .exceptions import InvalidBasesError
18from .utils import resolve_relation
19
20
21def _get_package_label_and_model_name(model, package_label=""):
22 if isinstance(model, str):
23 split = model.split(".", 1)
24 return tuple(split) if len(split) == 2 else (package_label, split[0])
25 else:
26 return model._meta.package_label, model._meta.model_name
27
28
29def _get_related_models(m):
30 """Return all models that have a direct relationship to the given model."""
31 related_models = [
32 subclass
33 for subclass in m.__subclasses__()
34 if issubclass(subclass, models.Model)
35 ]
36 related_fields_models = set()
37 for f in m._meta.get_fields(include_hidden=True):
38 if (
39 f.is_relation
40 and f.related_model is not None
41 and not isinstance(f.related_model, str)
42 ):
43 related_fields_models.add(f.model)
44 related_models.append(f.related_model)
45 return related_models
46
47
48def get_related_models_tuples(model):
49 """
50 Return a list of typical (package_label, model_name) tuples for all related
51 models for the given model.
52 """
53 return {
54 (rel_mod._meta.package_label, rel_mod._meta.model_name)
55 for rel_mod in _get_related_models(model)
56 }
57
58
59def get_related_models_recursive(model):
60 """
61 Return all models that have a direct or indirect relationship
62 to the given model.
63
64 Relationships are either defined by explicit relational fields, like
65 ForeignKey or ManyToManyField, or by inheriting from another
66 model (a superclass is related to its subclasses, but not vice versa).
67 """
68 seen = set()
69 queue = _get_related_models(model)
70 for rel_mod in queue:
71 rel_package_label, rel_model_name = (
72 rel_mod._meta.package_label,
73 rel_mod._meta.model_name,
74 )
75 if (rel_package_label, rel_model_name) in seen:
76 continue
77 seen.add((rel_package_label, rel_model_name))
78 queue.extend(_get_related_models(rel_mod))
79 return seen - {(model._meta.package_label, model._meta.model_name)}
80
81
82class ProjectState:
83 """
84 Represent the entire project's overall state. This is the item that is
85 passed around - do it here rather than at the app level so that cross-app
86 FKs/etc. resolve properly.
87 """
88
89 def __init__(self, models=None, real_packages=None):
90 self.models = models or {}
91 # Packages to include from main registry, usually unmigrated ones
92 if real_packages is None:
93 real_packages = set()
94 else:
95 assert isinstance(real_packages, set)
96 self.real_packages = real_packages
97 self.is_delayed = False
98 # {remote_model_key: {model_key: {field_name: field}}}
99 self._relations = None
100
101 @property
102 def relations(self):
103 if self._relations is None:
104 self.resolve_fields_and_relations()
105 return self._relations
106
107 def add_model(self, model_state):
108 model_key = model_state.package_label, model_state.name_lower
109 self.models[model_key] = model_state
110 if self._relations is not None:
111 self.resolve_model_relations(model_key)
112 if "models_registry" in self.__dict__: # hasattr would cache the property
113 self.reload_model(*model_key)
114
115 def remove_model(self, package_label, model_name):
116 model_key = package_label, model_name
117 del self.models[model_key]
118 if self._relations is not None:
119 self._relations.pop(model_key, None)
120 # Call list() since _relations can change size during iteration.
121 for related_model_key, model_relations in list(self._relations.items()):
122 model_relations.pop(model_key, None)
123 if not model_relations:
124 del self._relations[related_model_key]
125 if "models_registry" in self.__dict__: # hasattr would cache the property
126 self.models_registry.unregister_model(*model_key)
127 # Need to do this explicitly since unregister_model() doesn't clear
128 # the cache automatically (#24513)
129 self.models_registry.clear_cache()
130
131 def rename_model(self, package_label, old_name, new_name):
132 # Add a new model.
133 old_name_lower = old_name.lower()
134 new_name_lower = new_name.lower()
135 renamed_model = self.models[package_label, old_name_lower].clone()
136 renamed_model.name = new_name
137 self.models[package_label, new_name_lower] = renamed_model
138 # Repoint all fields pointing to the old model to the new one.
139 old_model_tuple = (package_label, old_name_lower)
140 new_remote_model = f"{package_label}.{new_name}"
141 to_reload = set()
142 for model_state, name, field, reference in get_references(
143 self, old_model_tuple
144 ):
145 changed_field = None
146 if reference.to:
147 changed_field = field.clone()
148 changed_field.remote_field.model = new_remote_model
149 if reference.through:
150 if changed_field is None:
151 changed_field = field.clone()
152 changed_field.remote_field.through = new_remote_model
153 if changed_field:
154 model_state.fields[name] = changed_field
155 to_reload.add((model_state.package_label, model_state.name_lower))
156 if self._relations is not None:
157 old_name_key = package_label, old_name_lower
158 new_name_key = package_label, new_name_lower
159 if old_name_key in self._relations:
160 self._relations[new_name_key] = self._relations.pop(old_name_key)
161 for model_relations in self._relations.values():
162 if old_name_key in model_relations:
163 model_relations[new_name_key] = model_relations.pop(old_name_key)
164 # Reload models related to old model before removing the old model.
165 self.reload_models(to_reload, delay=True)
166 # Remove the old model.
167 self.remove_model(package_label, old_name_lower)
168 self.reload_model(package_label, new_name_lower, delay=True)
169
170 def alter_model_options(self, package_label, model_name, options, option_keys=None):
171 model_state = self.models[package_label, model_name]
172 model_state.options = {**model_state.options, **options}
173 if option_keys:
174 for key in option_keys:
175 if key not in options:
176 model_state.options.pop(key, False)
177 self.reload_model(package_label, model_name, delay=True)
178
179 def alter_model_managers(self, package_label, model_name, managers):
180 model_state = self.models[package_label, model_name]
181 model_state.managers = list(managers)
182 self.reload_model(package_label, model_name, delay=True)
183
184 def _append_option(self, package_label, model_name, option_name, obj):
185 model_state = self.models[package_label, model_name]
186 model_state.options[option_name] = [*model_state.options[option_name], obj]
187 self.reload_model(package_label, model_name, delay=True)
188
189 def _remove_option(self, package_label, model_name, option_name, obj_name):
190 model_state = self.models[package_label, model_name]
191 objs = model_state.options[option_name]
192 model_state.options[option_name] = [obj for obj in objs if obj.name != obj_name]
193 self.reload_model(package_label, model_name, delay=True)
194
195 def add_index(self, package_label, model_name, index):
196 self._append_option(package_label, model_name, "indexes", index)
197
198 def remove_index(self, package_label, model_name, index_name):
199 self._remove_option(package_label, model_name, "indexes", index_name)
200
201 def rename_index(self, package_label, model_name, old_index_name, new_index_name):
202 model_state = self.models[package_label, model_name]
203 objs = model_state.options["indexes"]
204
205 new_indexes = []
206 for obj in objs:
207 if obj.name == old_index_name:
208 obj = obj.clone()
209 obj.name = new_index_name
210 new_indexes.append(obj)
211
212 model_state.options["indexes"] = new_indexes
213 self.reload_model(package_label, model_name, delay=True)
214
215 def add_constraint(self, package_label, model_name, constraint):
216 self._append_option(package_label, model_name, "constraints", constraint)
217
218 def remove_constraint(self, package_label, model_name, constraint_name):
219 self._remove_option(package_label, model_name, "constraints", constraint_name)
220
221 def add_field(self, package_label, model_name, name, field, preserve_default):
222 # If preserve default is off, don't use the default for future state.
223 if not preserve_default:
224 field = field.clone()
225 field.default = NOT_PROVIDED
226 else:
227 field = field
228 model_key = package_label, model_name
229 self.models[model_key].fields[name] = field
230 if self._relations is not None:
231 self.resolve_model_field_relations(model_key, name, field)
232 # Delay rendering of relationships if it's not a relational field.
233 delay = not field.is_relation
234 self.reload_model(*model_key, delay=delay)
235
236 def remove_field(self, package_label, model_name, name):
237 model_key = package_label, model_name
238 model_state = self.models[model_key]
239 old_field = model_state.fields.pop(name)
240 if self._relations is not None:
241 self.resolve_model_field_relations(model_key, name, old_field)
242 # Delay rendering of relationships if it's not a relational field.
243 delay = not old_field.is_relation
244 self.reload_model(*model_key, delay=delay)
245
246 def alter_field(self, package_label, model_name, name, field, preserve_default):
247 if not preserve_default:
248 field = field.clone()
249 field.default = NOT_PROVIDED
250 else:
251 field = field
252 model_key = package_label, model_name
253 fields = self.models[model_key].fields
254 if self._relations is not None:
255 old_field = fields.pop(name)
256 if old_field.is_relation:
257 self.resolve_model_field_relations(model_key, name, old_field)
258 fields[name] = field
259 if field.is_relation:
260 self.resolve_model_field_relations(model_key, name, field)
261 else:
262 fields[name] = field
263 # TODO: investigate if old relational fields must be reloaded or if
264 # it's sufficient if the new field is (#27737).
265 # Delay rendering of relationships if it's not a relational field and
266 # not referenced by a foreign key.
267 delay = not field.is_relation and not field_is_referenced(
268 self, model_key, (name, field)
269 )
270 self.reload_model(*model_key, delay=delay)
271
272 def rename_field(self, package_label, model_name, old_name, new_name):
273 model_key = package_label, model_name
274 model_state = self.models[model_key]
275 # Rename the field.
276 fields = model_state.fields
277 try:
278 found = fields.pop(old_name)
279 except KeyError:
280 raise FieldDoesNotExist(
281 f"{package_label}.{model_name} has no field named '{old_name}'"
282 )
283 fields[new_name] = found
284 for field in fields.values():
285 # Fix from_fields to refer to the new field.
286 from_fields = getattr(field, "from_fields", None)
287 if from_fields:
288 field.from_fields = tuple(
289 [
290 new_name if from_field_name == old_name else from_field_name
291 for from_field_name in from_fields
292 ]
293 )
294
295 # Fix to_fields to refer to the new field.
296 delay = True
297 references = get_references(self, model_key, (old_name, found))
298 for *_, field, reference in references:
299 delay = False
300 if reference.to:
301 remote_field, to_fields = reference.to
302 if getattr(remote_field, "field_name", None) == old_name:
303 remote_field.field_name = new_name
304 if to_fields:
305 field.to_fields = tuple(
306 [
307 new_name if to_field_name == old_name else to_field_name
308 for to_field_name in to_fields
309 ]
310 )
311 if self._relations is not None:
312 old_name_lower = old_name.lower()
313 new_name_lower = new_name.lower()
314 for to_model in self._relations.values():
315 if old_name_lower in to_model[model_key]:
316 field = to_model[model_key].pop(old_name_lower)
317 field.name = new_name_lower
318 to_model[model_key][new_name_lower] = field
319 self.reload_model(*model_key, delay=delay)
320
321 def _find_reload_model(self, package_label, model_name, delay=False):
322 if delay:
323 self.is_delayed = True
324
325 related_models = set()
326
327 try:
328 old_model = self.models_registry.get_model(package_label, model_name)
329 except LookupError:
330 pass
331 else:
332 # Get all relations to and from the old model before reloading,
333 # as _meta.models_registry may change
334 if delay:
335 related_models = get_related_models_tuples(old_model)
336 else:
337 related_models = get_related_models_recursive(old_model)
338
339 # Get all outgoing references from the model to be rendered
340 model_state = self.models[(package_label, model_name)]
341 # Directly related models are the models pointed to by ForeignKeys and ManyToManyFields.
342 direct_related_models = set()
343 for field in model_state.fields.values():
344 if field.is_relation:
345 if field.remote_field.model == RECURSIVE_RELATIONSHIP_CONSTANT:
346 continue
347 rel_package_label, rel_model_name = _get_package_label_and_model_name(
348 field.related_model, package_label
349 )
350 direct_related_models.add((rel_package_label, rel_model_name.lower()))
351
352 # For all direct related models recursively get all related models.
353 related_models.update(direct_related_models)
354 for rel_package_label, rel_model_name in direct_related_models:
355 try:
356 rel_model = self.models_registry.get_model(
357 rel_package_label, rel_model_name
358 )
359 except LookupError:
360 pass
361 else:
362 if delay:
363 related_models.update(get_related_models_tuples(rel_model))
364 else:
365 related_models.update(get_related_models_recursive(rel_model))
366
367 # Include the model itself
368 related_models.add((package_label, model_name))
369
370 return related_models
371
372 def reload_model(self, package_label, model_name, delay=False):
373 if "models_registry" in self.__dict__: # hasattr would cache the property
374 related_models = self._find_reload_model(package_label, model_name, delay)
375 self._reload(related_models)
376
377 def reload_models(self, models, delay=True):
378 if "models_registry" in self.__dict__: # hasattr would cache the property
379 related_models = set()
380 for package_label, model_name in models:
381 related_models.update(
382 self._find_reload_model(package_label, model_name, delay)
383 )
384 self._reload(related_models)
385
386 def _reload(self, related_models):
387 # Unregister all related models
388 with self.models_registry.bulk_update():
389 for rel_package_label, rel_model_name in related_models:
390 self.models_registry.unregister_model(rel_package_label, rel_model_name)
391
392 states_to_be_rendered = []
393 # Gather all models states of those models that will be rerendered.
394 # This includes:
395 # 1. All related models of unmigrated packages
396 for model_state in self.models_registry.real_models:
397 if (model_state.package_label, model_state.name_lower) in related_models:
398 states_to_be_rendered.append(model_state)
399
400 # 2. All related models of migrated packages
401 for rel_package_label, rel_model_name in related_models:
402 try:
403 model_state = self.models[rel_package_label, rel_model_name]
404 except KeyError:
405 pass
406 else:
407 states_to_be_rendered.append(model_state)
408
409 # Render all models
410 self.models_registry.render_multiple(states_to_be_rendered)
411
412 def update_model_field_relation(
413 self,
414 model,
415 model_key,
416 field_name,
417 field,
418 concretes,
419 ):
420 remote_model_key = resolve_relation(model, *model_key)
421 if (
422 remote_model_key[0] not in self.real_packages
423 and remote_model_key in concretes
424 ):
425 remote_model_key = concretes[remote_model_key]
426 relations_to_remote_model = self._relations[remote_model_key]
427 if field_name in self.models[model_key].fields:
428 # The assert holds because it's a new relation, or an altered
429 # relation, in which case references have been removed by
430 # alter_field().
431 assert field_name not in relations_to_remote_model[model_key]
432 relations_to_remote_model[model_key][field_name] = field
433 else:
434 del relations_to_remote_model[model_key][field_name]
435 if not relations_to_remote_model[model_key]:
436 del relations_to_remote_model[model_key]
437
438 def resolve_model_field_relations(
439 self,
440 model_key,
441 field_name,
442 field,
443 concretes=None,
444 ):
445 remote_field = field.remote_field
446 if not remote_field:
447 return
448 if concretes is None:
449 concretes = self._get_concrete_models_mapping()
450
451 self.update_model_field_relation(
452 remote_field.model,
453 model_key,
454 field_name,
455 field,
456 concretes,
457 )
458
459 through = getattr(remote_field, "through", None)
460 if not through:
461 return
462 self.update_model_field_relation(
463 through, model_key, field_name, field, concretes
464 )
465
466 def resolve_model_relations(self, model_key, concretes=None):
467 if concretes is None:
468 concretes = self._get_concrete_models_mapping()
469
470 model_state = self.models[model_key]
471 for field_name, field in model_state.fields.items():
472 self.resolve_model_field_relations(model_key, field_name, field, concretes)
473
474 def resolve_fields_and_relations(self):
475 # Resolve fields.
476 for model_state in self.models.values():
477 for field_name, field in model_state.fields.items():
478 field.name = field_name
479 # Resolve relations.
480 # {remote_model_key: {model_key: {field_name: field}}}
481 self._relations = defaultdict(partial(defaultdict, dict))
482 concretes = self._get_concrete_models_mapping()
483
484 for model_key in concretes:
485 self.resolve_model_relations(model_key, concretes)
486
487 def _get_concrete_models_mapping(self):
488 concrete_models_mapping = {}
489 for model_key, model_state in self.models.items():
490 concrete_models_mapping[model_key] = model_key
491 return concrete_models_mapping
492
493 def clone(self):
494 """Return an exact copy of this ProjectState."""
495 new_state = ProjectState(
496 models={k: v.clone() for k, v in self.models.items()},
497 real_packages=self.real_packages,
498 )
499 if "models_registry" in self.__dict__:
500 new_state.models_registry = self.models_registry.clone()
501 new_state.is_delayed = self.is_delayed
502 return new_state
503
504 def clear_delayed_models_cache(self):
505 if self.is_delayed and "models_registry" in self.__dict__:
506 del self.__dict__["models_registry"]
507
508 @cached_property
509 def models_registry(self):
510 return StateModelsRegistry(self.real_packages, self.models)
511
512 @classmethod
513 def from_models_registry(cls, models_registry):
514 """Take an Packages and return a ProjectState matching it."""
515 app_models = {}
516 for model in models_registry.get_models():
517 model_state = ModelState.from_model(model)
518 app_models[(model_state.package_label, model_state.name_lower)] = (
519 model_state
520 )
521 return cls(app_models)
522
523 def __eq__(self, other):
524 return self.models == other.models and self.real_packages == other.real_packages
525
526
527class StateModelsRegistry(ModelsRegistry):
528 """
529 Subclass of the global Packages registry class to better handle dynamic model
530 additions and removals.
531 """
532
533 def __init__(self, real_packages, models):
534 # Any packages in self.real_packages should have all their models included
535 # in the render. We don't use the original model instances as there
536 # are some variables that refer to the Packages object.
537 # FKs/M2Ms from real packages are also not included as they just
538 # mess things up with partial states (due to lack of dependencies)
539 self.real_models = []
540 for package_label in real_packages:
541 for model in global_models.get_models(package_label=package_label):
542 self.real_models.append(ModelState.from_model(model, exclude_rels=True))
543
544 super().__init__()
545
546 self.render_multiple([*models.values(), *self.real_models])
547
548 self.ready = True
549
550 # There shouldn't be any operations pending at this point.
551 from plain.models.preflight import _check_lazy_references
552
553 if errors := _check_lazy_references(self, packages_registry):
554 raise ValueError("\n".join(error.msg for error in errors))
555
556 @contextmanager
557 def bulk_update(self):
558 # Avoid clearing each model's cache for each change. Instead, clear
559 # all caches when we're finished updating the model instances.
560 ready = self.ready
561 self.ready = False
562 try:
563 yield
564 finally:
565 self.ready = ready
566 self.clear_cache()
567
568 def render_multiple(self, model_states):
569 # We keep trying to render the models in a loop, ignoring invalid
570 # base errors, until the size of the unrendered models doesn't
571 # decrease by at least one, meaning there's a base dependency loop/
572 # missing base.
573 if not model_states:
574 return
575 # Prevent that all model caches are expired for each render.
576 with self.bulk_update():
577 unrendered_models = model_states
578 while unrendered_models:
579 new_unrendered_models = []
580 for model in unrendered_models:
581 try:
582 model.render(self)
583 except InvalidBasesError:
584 new_unrendered_models.append(model)
585 if len(new_unrendered_models) == len(unrendered_models):
586 raise InvalidBasesError(
587 f"Cannot resolve bases for {new_unrendered_models!r}\nThis can happen if you are "
588 "inheriting models from an app with migrations (e.g. "
589 "contrib.auth)\n in an app with no migrations"
590 )
591 unrendered_models = new_unrendered_models
592
593 def clone(self):
594 """Return a clone of this registry."""
595 clone = StateModelsRegistry([], {})
596 clone.all_models = copy.deepcopy(self.all_models)
597
598 # No need to actually clone them, they'll never change
599 clone.real_models = self.real_models
600 return clone
601
602 def register_model(self, package_label, model):
603 self.all_models[package_label][model._meta.model_name] = model
604 self.do_pending_operations(model)
605 self.clear_cache()
606
607 def unregister_model(self, package_label, model_name):
608 try:
609 del self.all_models[package_label][model_name]
610 except KeyError:
611 pass
612
613
614class ModelState:
615 """
616 Represent a Plain Model. Don't use the actual Model class as it's not
617 designed to have its options changed - instead, mutate this one and then
618 render it into a Model as required.
619
620 Note that while you are allowed to mutate .fields, you are not allowed
621 to mutate the Field instances inside there themselves - you must instead
622 assign new ones, as these are not detached during a clone.
623 """
624
625 def __init__(
626 self, package_label, name, fields, options=None, bases=None, managers=None
627 ):
628 self.package_label = package_label
629 self.name = name
630 self.fields = dict(fields)
631 self.options = options or {}
632 self.options.setdefault("indexes", [])
633 self.options.setdefault("constraints", [])
634 self.bases = bases or (models.Model,)
635 self.managers = managers or []
636 for name, field in self.fields.items():
637 # Sanity-check that fields are NOT already bound to a model.
638 if hasattr(field, "model"):
639 raise ValueError(
640 f'ModelState.fields cannot be bound to a model - "{name}" is.'
641 )
642 # Sanity-check that relation fields are NOT referring to a model class.
643 if field.is_relation and hasattr(field.related_model, "_meta"):
644 raise ValueError(
645 f'ModelState.fields cannot refer to a model class - "{name}.to" does. '
646 "Use a string reference instead."
647 )
648 if field.many_to_many and hasattr(field.remote_field.through, "_meta"):
649 raise ValueError(
650 f'ModelState.fields cannot refer to a model class - "{name}.through" '
651 "does. Use a string reference instead."
652 )
653 # Sanity-check that indexes have their name set.
654 for index in self.options["indexes"]:
655 if not index.name:
656 raise ValueError(
657 "Indexes passed to ModelState require a name attribute. "
658 f"{index!r} doesn't have one."
659 )
660
661 @cached_property
662 def name_lower(self):
663 return self.name.lower()
664
665 def get_field(self, field_name):
666 return self.fields[field_name]
667
668 @classmethod
669 def from_model(cls, model, exclude_rels=False):
670 """Given a model, return a ModelState representing it."""
671 # Deconstruct the fields
672 fields = []
673 for field in model._meta.local_fields:
674 if getattr(field, "remote_field", None) and exclude_rels:
675 continue
676 name = field.name
677 try:
678 fields.append((name, field.clone()))
679 except TypeError as e:
680 raise TypeError(
681 f"Couldn't reconstruct field {name} on {model._meta.label}: {e}"
682 )
683 if not exclude_rels:
684 for field in model._meta.local_many_to_many:
685 name = field.name
686 try:
687 fields.append((name, field.clone()))
688 except TypeError as e:
689 raise TypeError(
690 f"Couldn't reconstruct m2m field {name} on {model._meta.object_name}: {e}"
691 )
692 # Extract the options
693 options = {}
694 for name in DEFAULT_NAMES:
695 # Ignore some special options
696 if name in ["models_registry", "package_label"]:
697 continue
698 elif name in model._meta.original_attrs:
699 if name == "indexes":
700 indexes = [idx.clone() for idx in model._meta.indexes]
701 for index in indexes:
702 if not index.name:
703 index.set_name_with_model(model)
704 options["indexes"] = indexes
705 elif name == "constraints":
706 options["constraints"] = [
707 con.clone() for con in model._meta.constraints
708 ]
709 else:
710 options[name] = model._meta.original_attrs[name]
711
712 def flatten_bases(model):
713 bases = []
714 for base in model.__bases__:
715 bases.append(base)
716 return bases
717
718 # We can't rely on __mro__ directly because we only want to flatten
719 # abstract models and not the whole tree. However by recursing on
720 # __bases__ we may end up with duplicates and ordering issues, we
721 # therefore discard any duplicates and reorder the bases according
722 # to their index in the MRO.
723 flattened_bases = sorted(
724 set(flatten_bases(model)), key=lambda x: model.__mro__.index(x)
725 )
726
727 # Make our record
728 bases = tuple(
729 (base._meta.label_lower if hasattr(base, "_meta") else base)
730 for base in flattened_bases
731 )
732 # Ensure at least one base inherits from models.Model
733 if not any(
734 (isinstance(base, str) or issubclass(base, models.Model)) for base in bases
735 ):
736 bases = (models.Model,)
737
738 managers = []
739 manager_names = set()
740 default_manager_shim = None
741 for manager in model._meta.managers:
742 if manager.name in manager_names:
743 # Skip overridden managers.
744 continue
745 elif manager.use_in_migrations:
746 # Copy managers usable in migrations.
747 new_manager = copy.copy(manager)
748 new_manager._set_creation_counter()
749 elif manager is model._base_manager or manager is model._default_manager:
750 # Shim custom managers used as default and base managers.
751 new_manager = models.Manager()
752 new_manager.model = manager.model
753 new_manager.name = manager.name
754 if manager is model._default_manager:
755 default_manager_shim = new_manager
756 else:
757 continue
758 manager_names.add(manager.name)
759 managers.append((manager.name, new_manager))
760
761 # Ignore a shimmed default manager called objects if it's the only one.
762 if managers == [("objects", default_manager_shim)]:
763 managers = []
764
765 # Construct the new ModelState
766 return cls(
767 model._meta.package_label,
768 model._meta.object_name,
769 fields,
770 options,
771 bases,
772 managers,
773 )
774
775 def construct_managers(self):
776 """Deep-clone the managers using deconstruction."""
777 # Sort all managers by their creation counter
778 sorted_managers = sorted(self.managers, key=lambda v: v[1].creation_counter)
779 for mgr_name, manager in sorted_managers:
780 as_manager, manager_path, qs_path, args, kwargs = manager.deconstruct()
781 if as_manager:
782 qs_class = import_string(qs_path)
783 yield mgr_name, qs_class.as_manager()
784 else:
785 manager_class = import_string(manager_path)
786 yield mgr_name, manager_class(*args, **kwargs)
787
788 def clone(self):
789 """Return an exact copy of this ModelState."""
790 return self.__class__(
791 package_label=self.package_label,
792 name=self.name,
793 fields=dict(self.fields),
794 # Since options are shallow-copied here, operations such as
795 # AddIndex must replace their option (e.g 'indexes') rather
796 # than mutating it.
797 options=dict(self.options),
798 bases=self.bases,
799 managers=list(self.managers),
800 )
801
802 def render(self, models_registry):
803 """Create a Model object from our current state into the given packages."""
804 # First, make a Meta object
805 meta_contents = {
806 "package_label": self.package_label,
807 "models_registry": models_registry,
808 **self.options,
809 }
810 meta = type("Meta", (), meta_contents)
811 # Then, work out our bases
812 try:
813 bases = tuple(
814 (models_registry.get_model(base) if isinstance(base, str) else base)
815 for base in self.bases
816 )
817 except LookupError:
818 raise InvalidBasesError(
819 f"Cannot resolve one or more bases from {self.bases!r}"
820 )
821 # Clone fields for the body, add other bits.
822 body = {name: field.clone() for name, field in self.fields.items()}
823 body["Meta"] = meta
824 body["__module__"] = "__fake__"
825
826 # Restore managers
827 body.update(self.construct_managers())
828 # Then, make a Model object (models_registry.register_model is called in __new__)
829 model_class = type(self.name, bases, body)
830 from plain.models import register_model
831
832 # Register it to the models_registry associated with the model meta
833 # (could probably do this directly right here too...)
834 register_model(model_class)
835
836 return model_class
837
838 def get_index_by_name(self, name):
839 for index in self.options["indexes"]:
840 if index.name == name:
841 return index
842 raise ValueError(f"No index named {name} on model {self.name}")
843
844 def get_constraint_by_name(self, name):
845 for constraint in self.options["constraints"]:
846 if constraint.name == name:
847 return constraint
848 raise ValueError(f"No constraint named {name} on model {self.name}")
849
850 def __repr__(self):
851 return f"<{self.__class__.__name__}: '{self.package_label}.{self.name}'>"
852
853 def __eq__(self, other):
854 return (
855 (self.package_label == other.package_label)
856 and (self.name == other.name)
857 and (len(self.fields) == len(other.fields))
858 and all(
859 k1 == k2 and f1.deconstruct()[1:] == f2.deconstruct()[1:]
860 for (k1, f1), (k2, f2) in zip(
861 sorted(self.fields.items()),
862 sorted(other.fields.items()),
863 )
864 )
865 and (self.options == other.options)
866 and (self.bases == other.bases)
867 and (self.managers == other.managers)
868 )