1from __future__ import annotations
2
3import collections.abc
4import copy
5import enum
6from collections.abc import Callable, Sequence
7from functools import cached_property
8from typing import (
9 TYPE_CHECKING,
10 Any,
11 Self,
12 cast,
13 overload,
14)
15
16from plain import exceptions, validators
17from plain.postgres.constants import LOOKUP_SEP
18from plain.postgres.dialect import quote_name
19from plain.postgres.enums import ChoicesMeta
20from plain.postgres.query_utils import RegisterLookupMixin
21from plain.preflight import PreflightResult
22from plain.utils.datastructures import DictWrapper
23from plain.utils.functional import Promise
24from plain.utils.itercompat import is_iterable
25
26from ..registry import models_registry
27
28if TYPE_CHECKING:
29 from plain.postgres.base import Model
30 from plain.postgres.connection import DatabaseConnection
31 from plain.postgres.expressions import Col, Func
32 from plain.postgres.fields.reverse_related import ForeignObjectRel
33 from plain.postgres.sql.compiler import SQLCompiler
34
35
36class Empty:
37 pass
38
39
40class NOT_PROVIDED:
41 pass
42
43
44class DatabaseDefault:
45 """Sentinel assigned to a field attribute when the column's persistent
46 DEFAULT (e.g. `create_now=True`, `generate=True`) should produce the
47 value on the next INSERT. The INSERT compiler recognizes the sentinel
48 and emits `DEFAULT` in the VALUES clause; RETURNING then populates the
49 real value back onto the instance."""
50
51 def __repr__(self) -> str:
52 return "<DatabaseDefault>"
53
54 def __str__(self) -> str:
55 return "<DatabaseDefault>"
56
57 def __reduce__(self) -> tuple[Any, tuple]:
58 # Pickling/unpickling must round-trip to the same singleton instance;
59 # downstream code uses `value is DATABASE_DEFAULT` identity checks.
60 return (_get_database_default_singleton, ())
61
62
63DATABASE_DEFAULT = DatabaseDefault()
64
65
66def _get_database_default_singleton() -> DatabaseDefault:
67 return DATABASE_DEFAULT
68
69
70# The values to use for "blank" in SelectFields. Will be appended to the start
71# of most "choices" lists.
72BLANK_CHOICE_DASH = [("", "---------")]
73
74
75def _load_field(
76 package_label: str, model_name: str, field_name: str
77) -> Field[Any] | ForeignObjectRel:
78 return models_registry.get_model(package_label, model_name)._model_meta.get_field(
79 field_name
80 )
81
82
83# A guide to Field parameters:
84#
85# * name: The name of the field specified in the model.
86# * attname: The attribute to use on the model object. This is the same as
87# "name", except in the case of ForeignKeys, where "_id" is
88# appended.
89# * column: The database column for this field. This is the same as
90# "attname".
91#
92# Code that introspects values, or does other dynamic things, should use
93# attname.
94
95
96def _empty(of_cls: type) -> Empty:
97 new = Empty()
98 new.__class__ = of_cls
99 return new
100
101
102class Field[T](RegisterLookupMixin):
103 """Base class for all field types"""
104
105 # SQL type for this field (e.g. "text", "integer", "boolean").
106 # String templates are interpolated against db_type_parameters().
107 # Subclasses that need dynamic logic override db_type() instead.
108 db_type_sql: str | None = None
109 # Column definition suffix (e.g. "GENERATED BY DEFAULT AS IDENTITY").
110 db_type_suffix_sql: str | None = None
111 # SQL type for Cast() expressions. Falls back to db_type() if None.
112 cast_db_type_sql: str | None = None
113
114 # Instance attributes set during field lifecycle
115 # Set by __init__
116 name: str | None
117 # Set by set_attributes_from_name (called by contribute_to_class)
118 attname: str
119 column: str
120 concrete: bool
121 # Set by contribute_to_class
122 model: type[Model]
123
124 # Designates whether empty strings fundamentally are allowed at the
125 # database level.
126 empty_strings_allowed = True
127 empty_values = list(validators.EMPTY_VALUES)
128
129 default_validators = [] # Default set of validators
130 unique_error_message = "A %(model_name)s with this %(field_label)s already exists."
131
132 # Kwargs that don't affect the column definition; the schema editor
133 # ignores these when deciding whether an ALTER is needed. Subclasses
134 # that introduce additional non-db kwargs extend this tuple.
135 non_migration_attrs: tuple[str, ...] = ()
136
137 def __init__(self) -> None:
138 self.name = None # Set by set_attributes_from_name
139 self.primary_key = False
140 self.auto_created = False
141
142 def __str__(self) -> str:
143 """
144 Return "package_label.model_label.field_name" for fields attached to
145 models.
146 """
147 if not hasattr(self, "model"):
148 return super().__str__()
149 model = self.model
150 return f"{model.model_options.label}.{self.name}"
151
152 def __repr__(self) -> str:
153 """Display the module, class, and name of the field."""
154 path = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
155 name = getattr(self, "name", None)
156 if name is not None:
157 return f"<{path}: {name}>"
158 return f"<{path}>"
159
160 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
161 return [*self._check_field_name()]
162
163 def _check_field_name(self) -> list[PreflightResult]:
164 """
165 Check if field name is valid, i.e. 1) does not end with an
166 underscore, 2) does not contain "__" and 3) is not "id".
167 """
168 assert self.name is not None, "Field name must be set before checking"
169 if self.name.endswith("_"):
170 return [
171 PreflightResult(
172 fix="Field names must not end with an underscore.",
173 obj=self,
174 id="fields.name_ends_with_underscore",
175 )
176 ]
177 elif LOOKUP_SEP in self.name:
178 return [
179 PreflightResult(
180 fix=f'Field names must not contain "{LOOKUP_SEP}".',
181 obj=self,
182 id="fields.name_contains_lookup_separator",
183 )
184 ]
185 elif self.name == "id":
186 return [
187 PreflightResult(
188 fix="'id' is a reserved word that cannot be used as a field name.",
189 obj=self,
190 id="fields.reserved_field_name_id",
191 )
192 ]
193 else:
194 return []
195
196 def get_col(self, alias: str | None, output_field: Field | None = None) -> Col:
197 if alias == self.model.model_options.db_table and (
198 output_field is None or output_field == self
199 ):
200 return self.cached_col
201 from plain.postgres.expressions import Col
202
203 return Col(alias, self, output_field)
204
205 @cached_property
206 def cached_col(self) -> Col:
207 from plain.postgres.expressions import Col
208
209 return Col(self.model.model_options.db_table, self)
210
211 def select_format(
212 self, compiler: SQLCompiler, sql: str, params: Any
213 ) -> tuple[str, Any]:
214 """
215 Custom format for select clauses.
216 """
217 return sql, params
218
219 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
220 """
221 Return enough information to recreate the field as a 4-tuple:
222
223 * The name of the field on the model, if contribute_to_class() has
224 been run.
225 * The import path of the field, including the class, e.g.
226 plain.postgres.IntegerField. This should be the most portable
227 version, so less specific may be better.
228 * A list of positional arguments.
229 * A dict of keyword arguments.
230
231 Note that the positional or keyword arguments must contain values of
232 the following types (including inner values of collection types):
233
234 * None, bool, str, int, float, complex, set, frozenset, list, tuple,
235 dict
236 * UUID
237 * datetime.datetime (naive), datetime.date
238 * top-level classes, top-level functions - will be referenced by their
239 full import path
240 * Storage instances - these have their own deconstruct() method
241
242 This is because the values here must be serialized into a text format
243 (possibly new Python code, possibly JSON) and these are the only types
244 with encoding handlers defined.
245
246 There's no need to return the exact way the field was instantiated this
247 time, just ensure that the resulting field is the same - prefer keyword
248 arguments over positional ones, and omit parameters with their default
249 values.
250 """
251 keywords: dict[str, Any] = {}
252 path = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
253 # Shorten `plain.postgres.fields.<submod>.X` to `plain.postgres.X`
254 # when the class is re-exported at the top-level `plain.postgres`
255 # namespace. The real submodule (`plain.postgres.fields.text`) is
256 # importable but the shortened form is what migration files use.
257 if path.startswith("plain.postgres.fields."):
258 import plain.postgres as _postgres_root
259
260 cls_name = self.__class__.__qualname__
261 if getattr(_postgres_root, cls_name, None) is self.__class__:
262 path = f"plain.postgres.{cls_name}"
263 # Note: self.name can be None during migration state rendering when fields are cloned
264 return (self.name, path, [], keywords)
265
266 def clone(self) -> Self:
267 """
268 Uses deconstruct() to clone a new copy of this Field.
269 Will not preserve any class attachments/attribute names.
270 """
271 name, path, args, kwargs = self.deconstruct()
272 return self.__class__(*args, **kwargs)
273
274 def __deepcopy__(self, memodict: dict[int, Any]) -> Self:
275 # We don't have to deepcopy very much here, since most things are not
276 # intended to be altered after initial creation.
277 obj = copy.copy(self)
278 memodict[id(self)] = obj
279 return obj
280
281 def __copy__(self) -> Self:
282 # We need to avoid hitting __reduce__, so define this
283 # slightly weird copy construct.
284 obj = Empty()
285 obj.__class__ = self.__class__
286 obj.__dict__ = self.__dict__.copy()
287 return cast(Self, obj)
288
289 def __reduce__(
290 self,
291 ) -> (
292 tuple[Callable[..., Any], tuple[Any, ...], dict[str, Any]]
293 | tuple[Callable[..., Field[Any] | ForeignObjectRel], tuple[str, str, str]]
294 ):
295 """
296 Pickling should return the model._model_meta.fields instance of the field,
297 not a new copy of that field. So, use the app registry to load the
298 model and then the field back.
299 """
300 model = getattr(self, "model", None)
301 if model is None:
302 # Fields are sometimes used without attaching them to models (for
303 # example in aggregation). In this case give back a plain field
304 # instance. The code below will create a new empty instance of
305 # class self.__class__, then update its dict with self.__dict__
306 # values - so, this is very close to normal pickle.
307 state = self.__dict__.copy()
308 return _empty, (self.__class__,), state
309 assert self.name is not None
310 options = model.model_options
311 return _load_field, (
312 options.package_label,
313 options.object_name,
314 self.name,
315 )
316
317 def to_python(self, value: Any) -> T | None:
318 """
319 Convert the input value into the expected Python data type, raising
320 plain.exceptions.ValidationError if the data can't be converted.
321 Return the converted value. Subclasses should override this.
322 """
323 return value
324
325 def db_type_parameters(self) -> DictWrapper:
326 return DictWrapper(self.__dict__, quote_name, "qn_")
327
328 def db_type(self) -> str | None:
329 """Return the database column data type for this field."""
330 if self.db_type_sql is None:
331 return None
332 return self.db_type_sql % self.db_type_parameters()
333
334 def unqualified_db_type(self) -> str | None:
335 # Uninterpolated so that parameter-only changes (e.g. `max_length`)
336 # aren't flagged as data type changes by ALTER FIELD.
337 if self.db_type_sql is not None:
338 return self.db_type_sql
339 return self.db_type()
340
341 def rel_db_type(self) -> str | None:
342 """
343 Return the data type that a related field pointing to this field should
344 use. For example, this method is called by ForeignKeyField to determine its data type.
345 """
346 return self.db_type()
347
348 def cast_db_type(self) -> str | None:
349 """Return the data type to use in the Cast() function."""
350 if self.cast_db_type_sql is not None:
351 return self.cast_db_type_sql % self.db_type_parameters()
352 return self.db_type()
353
354 def db_type_suffix(self) -> str | None:
355 return self.db_type_suffix_sql
356
357 def get_db_converters(
358 self, connection: DatabaseConnection
359 ) -> list[Callable[..., Any]]:
360 if from_db_value := getattr(self, "from_db_value", None):
361 return [from_db_value]
362 return []
363
364 @property
365 def db_returning(self) -> bool:
366 """True when the column produces a value that RETURNING should fetch
367 back (DB-expression default, identity column, etc.)."""
368 return self.has_db_default()
369
370 @property
371 def auto_fills_on_save(self) -> bool:
372 """True when pre_save populates the field's value at save time
373 (e.g. DateTimeField(update_now=True)). full_clean skips these so the
374 None-before-save doesn't fail nullability/required checks."""
375 return False
376
377 def set_attributes_from_name(self, name: str) -> None:
378 self.name = self.name or name
379 self.attname = self.get_attname()
380 self.column = self.attname
381 self.concrete = self.column is not None
382
383 def contribute_to_class(self, cls: type[Model], name: str) -> None:
384 """
385 Register the field with the model class it belongs to.
386
387 Field now acts as its own descriptor - it stays on the class and handles
388 __get__/__set__/__delete__ directly.
389 """
390 self.set_attributes_from_name(name)
391 self.model = cls
392 cls._model_meta.add_field(self)
393
394 # Field is its own descriptor; make sure it is set on the class so
395 # attribute access hits __get__/__set__.
396 if self.column:
397 setattr(cls, self.attname, self)
398
399 # Descriptor protocol implementation
400 @overload
401 def __get__(self, instance: None, owner: type[Model]) -> Self: ...
402
403 @overload
404 def __get__(self, instance: Model, owner: type[Model]) -> T: ...
405
406 def __get__(self, instance: Model | None, owner: type[Model]) -> Self | T:
407 """
408 Descriptor __get__ for attribute access.
409
410 Class access (User.email) returns the Field descriptor itself.
411 Instance access (user.email) returns the field value from instance.__dict__,
412 with lazy loading support if the value is not yet loaded.
413 """
414 # Class access - return the Field descriptor
415 if instance is None:
416 return self
417
418 # If field hasn't been contributed to a class yet (e.g., used standalone
419 # as an output_field in aggregates), just return self
420 if not hasattr(self, "attname"):
421 return self
422
423 # Instance access - get value from instance dict
424 data = instance.__dict__
425 field_name = self.attname
426
427 # If value not in dict, lazy load from database
428 if field_name not in data:
429 # Deferred field - load it from the database
430 instance.refresh_from_db(fields=[field_name])
431
432 return cast(T, data.get(field_name))
433
434 def __set__(self, instance: Model, value: Any) -> None:
435 """
436 Descriptor __set__ for attribute assignment.
437
438 Validates and converts the value using to_python(), then stores it
439 in instance.__dict__[attname].
440 """
441 # Safety check: ensure field has been properly initialized
442 if not hasattr(self, "attname"):
443 raise AttributeError(
444 f"Field {self.__class__.__name__} has not been initialized properly. "
445 f"The field's contribute_to_class() has not been called yet. "
446 f"This usually means the field is being used before it was added to a model class."
447 )
448
449 # Convert/validate the value. The DATABASE_DEFAULT sentinel is stored
450 # as-is so the INSERT compiler can emit `DEFAULT` in the VALUES clause.
451 if value is not None and value is not DATABASE_DEFAULT:
452 value = self.to_python(value)
453
454 # Store in instance dict
455 instance.__dict__[self.attname] = value
456
457 def __delete__(self, instance: Model) -> None:
458 """
459 Descriptor __delete__ for attribute deletion.
460
461 Removes the value from instance.__dict__.
462 """
463 try:
464 del instance.__dict__[self.attname]
465 except KeyError:
466 raise AttributeError(
467 f"{instance.__class__.__name__!r} object has no attribute {self.attname!r}"
468 )
469
470 def get_attname(self) -> str:
471 assert self.name is not None # Field name must be set
472 return self.name
473
474 def pre_save(self, model_instance: Model, add: bool) -> T | None:
475 """Return field's value just before saving."""
476 return getattr(model_instance, self.attname)
477
478 def get_prep_value(self, value: Any) -> Any:
479 """Perform preliminary non-db specific value checks and conversions."""
480 if isinstance(value, Promise):
481 value = value._proxy____cast()
482 return value
483
484 def get_db_prep_value(
485 self, value: Any, connection: DatabaseConnection, prepared: bool = False
486 ) -> Any:
487 """
488 Return field's value prepared for interacting with the database backend.
489
490 Used by the default implementations of get_db_prep_save().
491 """
492 if not prepared:
493 value = self.get_prep_value(value)
494 return value
495
496 def get_db_prep_save(self, value: Any, connection: DatabaseConnection) -> Any:
497 """Return field's value prepared for saving into a database."""
498 if hasattr(value, "as_sql"):
499 return value
500 return self.get_db_prep_value(value, connection=connection, prepared=False)
501
502 # Empty-value fallback used by ColumnField.get_default for the
503 # not-null + not-required + empty_strings_allowed case (Python-side
504 # Model() construction). BinaryField overrides with b"".
505 _default_empty_value: Any = ""
506
507 def has_default(self) -> bool:
508 return False
509
510 def has_persistent_literal_default(self) -> bool:
511 return False
512
513 def has_persistent_column_default(self) -> bool:
514 """Whether the column carries a DEFAULT clause that must be preserved
515 (DB-expression default or literal ``default=X``)."""
516 return self.db_returning or self.has_persistent_literal_default()
517
518 def get_db_default_expression(self) -> Func | None:
519 # Return the expression Postgres should evaluate on INSERT (e.g.
520 # Now(), GenRandomUUID()). Subclasses opt in via type-scoped kwargs
521 # (DateTimeField(create_now=True), UUIDField(generate=True)).
522 return None
523
524 def has_db_default(self) -> bool:
525 return self.get_db_default_expression() is not None
526
527 def save_form_data(self, instance: Model, data: Any) -> None:
528 assert self.name is not None
529 setattr(instance, self.name, data)
530
531 def value_from_object(self, obj: Model) -> T | None:
532 """Return the value of this field in the given model instance."""
533 return getattr(obj, self.attname)
534
535
536class ColumnField[T](Field[T]):
537 """Base for fields backed by a column value (required/allow_null/validators)."""
538
539 non_migration_attrs = (
540 *Field.non_migration_attrs,
541 "required",
542 "validators",
543 "allow_null",
544 )
545
546 def __init__(
547 self,
548 *,
549 required: bool = True,
550 allow_null: bool = False,
551 validators: Sequence[Callable[..., Any]] = (),
552 ):
553 self.required = required
554 self.allow_null = allow_null
555 self._validators = list(validators)
556 super().__init__()
557
558 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
559 return [
560 *super().preflight(**kwargs),
561 *self._check_null_allowed_for_primary_keys(),
562 *self._check_validators(),
563 ]
564
565 def _check_null_allowed_for_primary_keys(self) -> list[PreflightResult]:
566 if self.primary_key and self.allow_null:
567 return [
568 PreflightResult(
569 fix="Primary keys must not have allow_null=True. "
570 "Set allow_null=False on the field, or "
571 "remove primary_key=True argument.",
572 obj=self,
573 id="fields.primary_key_allows_null",
574 )
575 ]
576 return []
577
578 def _check_validators(self) -> list[PreflightResult]:
579 errors = []
580 for i, validator in enumerate(self.validators):
581 if not callable(validator):
582 errors.append(
583 PreflightResult(
584 fix=(
585 "All 'validators' must be callable. "
586 f"validators[{i}] ({repr(validator)}) isn't a function or "
587 "instance of a validator class."
588 ),
589 obj=self,
590 id="fields.invalid_validator",
591 )
592 )
593 return errors
594
595 @cached_property
596 def validators(self) -> list[Callable[..., Any]]:
597 # Some validators can't be created at field initialization time;
598 # subclasses (e.g. IntegerField) override to add range validators.
599 return [*self.default_validators, *self._validators]
600
601 def run_validators(self, value: Any) -> None:
602 if value in self.empty_values:
603 return
604 errors = []
605 for v in self.validators:
606 try:
607 v(value)
608 except exceptions.ValidationError as e:
609 errors.extend(e.error_list)
610 if errors:
611 raise exceptions.ValidationError(errors)
612
613 def validate(self, value: Any, model_instance: Model) -> None:
614 if value is None and not self.allow_null:
615 raise exceptions.ValidationError(
616 "This field cannot be null.", code="allow_null"
617 )
618 if self.required and value in self.empty_values:
619 raise exceptions.ValidationError("This field is required.", code="required")
620
621 def clean(self, value: Any, model_instance: Model) -> T | None:
622 value = self.to_python(value)
623 self.validate(value, model_instance)
624 self.run_validators(value)
625 return value
626
627 def get_default(self) -> Any:
628 # Python-side default used when constructing model instances without
629 # an explicit value. Independent of `required` (validate() catches
630 # required-but-empty separately). DefaultableField overrides to
631 # honor a user-provided default.
632 if not self.empty_strings_allowed or self.allow_null:
633 return None
634 return self._default_empty_value
635
636 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
637 name, path, args, kwargs = super().deconstruct()
638 if self.required is not True:
639 kwargs["required"] = self.required
640 if self.allow_null is not False:
641 kwargs["allow_null"] = self.allow_null
642 if list(self._validators):
643 kwargs["validators"] = list(self._validators)
644 return name, path, args, kwargs
645
646
647class DefaultableField[T](ColumnField[T]):
648 """Base for column-backed fields that accept a user-provided ``default``."""
649
650 non_migration_attrs = (*ColumnField.non_migration_attrs, "default")
651
652 def __init__(
653 self,
654 *,
655 default: Any = NOT_PROVIDED,
656 required: bool = True,
657 allow_null: bool = False,
658 validators: Sequence[Callable[..., Any]] = (),
659 ):
660 if default is not NOT_PROVIDED and callable(default):
661 raise TypeError(
662 f"{type(self).__name__}(default=...) must be a static literal. "
663 f"For empty collections pass default={{}} or default=[]; for "
664 f"per-row generation use a DB-side expression "
665 f"(create_now=True, generate=True, RandomStringField)."
666 )
667 if default is not NOT_PROVIDED and isinstance(default, str) and "\\" in default:
668 # psycopg quotes backslash-bearing strings with `E'...'` escape
669 # syntax, but pg_get_expr returns the stored DEFAULT as a standard
670 # `'...'` literal — the two forms don't compare lexically, so
671 # convergence would flag spurious drift on every sync. Reject at
672 # declaration time rather than ship a sync that never converges.
673 raise ValueError(
674 f"{type(self).__name__}(default=...) must not contain a backslash."
675 )
676 self.default = default
677 super().__init__(
678 required=required,
679 allow_null=allow_null,
680 validators=validators,
681 )
682
683 def has_default(self) -> bool:
684 return self.default is not NOT_PROVIDED
685
686 def has_persistent_literal_default(self) -> bool:
687 # `None` would emit `DEFAULT NULL` which is a no-op we don't want to
688 # track as drift.
689 return self.has_default() and self.default is not None
690
691 def get_default(self) -> Any:
692 if not self.has_default():
693 return super().get_default()
694 # Deep-copy so mutable literals (default=[] / default={}) don't leak
695 # shared state across instances.
696 return copy.deepcopy(self.default)
697
698 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
699 name, path, args, kwargs = super().deconstruct()
700 if self.default is not NOT_PROVIDED:
701 kwargs["default"] = self.default
702 return name, path, args, kwargs
703
704
705class ChoicesField[T](DefaultableField[T]):
706 """Base for fields that accept a ``choices=`` parameter."""
707
708 non_migration_attrs = (*DefaultableField.non_migration_attrs, "choices")
709
710 def __init__(
711 self,
712 *,
713 choices: Any = None,
714 required: bool = True,
715 allow_null: bool = False,
716 default: Any = NOT_PROVIDED,
717 validators: Sequence[Callable[..., Any]] = (),
718 ):
719 if isinstance(choices, ChoicesMeta):
720 choices = choices.choices
721 elif isinstance(choices, enum.EnumMeta):
722 choices = [(member.value, member.name) for member in choices]
723 if isinstance(choices, collections.abc.Iterator):
724 choices = list(choices)
725 self.choices = choices
726 super().__init__(
727 required=required,
728 allow_null=allow_null,
729 default=default,
730 validators=validators,
731 )
732
733 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
734 return [*super().preflight(**kwargs), *self._check_choices()]
735
736 @classmethod
737 def _choices_is_value(cls, value: Any) -> bool:
738 return isinstance(value, str | Promise) or not is_iterable(value)
739
740 def _max_length_for_choices_check(self) -> int | None:
741 return None
742
743 def _check_choices(self) -> list[PreflightResult]:
744 if not self.choices:
745 return []
746
747 if not is_iterable(self.choices) or isinstance(self.choices, str):
748 return [
749 PreflightResult(
750 fix="'choices' must be an iterable (e.g., a list or tuple).",
751 obj=self,
752 id="fields.choices_not_iterable",
753 )
754 ]
755
756 max_length = self._max_length_for_choices_check()
757 choice_max_length = 0
758 # Expect [group_name, [value, display]]
759 for choices_group in self.choices:
760 try:
761 group_name, group_choices = choices_group
762 except (TypeError, ValueError):
763 # Containing non-pairs
764 break
765 try:
766 if not all(
767 self._choices_is_value(value) and self._choices_is_value(human_name)
768 for value, human_name in group_choices
769 ):
770 break
771 if max_length is not None and group_choices:
772 choice_max_length = max(
773 [
774 choice_max_length,
775 *(
776 len(value)
777 for value, _ in group_choices
778 if isinstance(value, str)
779 ),
780 ]
781 )
782 except (TypeError, ValueError):
783 # No groups, choices in the form [value, display]
784 value, human_name = group_name, group_choices
785 if not self._choices_is_value(value) or not self._choices_is_value(
786 human_name
787 ):
788 break
789 if max_length is not None and isinstance(value, str):
790 choice_max_length = max(choice_max_length, len(value))
791
792 # Special case: choices=['ab']
793 if isinstance(choices_group, str):
794 break
795 else:
796 if max_length is not None and choice_max_length > max_length:
797 return [
798 PreflightResult(
799 fix="'max_length' is too small to fit the longest value " # noqa: UP031
800 "in 'choices' (%d characters)." % choice_max_length,
801 obj=self,
802 id="fields.max_length_too_small_for_choices",
803 ),
804 ]
805 return []
806
807 return [
808 PreflightResult(
809 fix="'choices' must be an iterable containing "
810 "(actual value, human readable name) tuples.",
811 obj=self,
812 id="fields.choices_invalid_format",
813 )
814 ]
815
816 def validate(self, value: Any, model_instance: Model) -> None:
817 if self.choices is not None and value not in self.empty_values:
818 for option_key, option_value in self.choices:
819 if isinstance(option_value, list | tuple):
820 # This is an optgroup, so look inside the group for
821 # options.
822 for optgroup_key, optgroup_value in option_value:
823 if value == optgroup_key:
824 return
825 elif value == option_key:
826 return
827 raise exceptions.ValidationError(
828 "Value %(value)r is not a valid choice.",
829 code="invalid_choice",
830 params={"value": value},
831 )
832 super().validate(value, model_instance)
833
834 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
835 name, path, args, kwargs = super().deconstruct()
836 if self.choices is not None:
837 choices = self.choices
838 if isinstance(choices, collections.abc.Iterable):
839 choices = list(choices)
840 kwargs["choices"] = choices
841 return name, path, args, kwargs
842
843 def get_choices(
844 self,
845 include_blank: bool = True,
846 blank_choice: list[tuple[str, str]] = BLANK_CHOICE_DASH,
847 ) -> list[tuple[Any, str]]:
848 """Return choices with an optional blank choice included."""
849 choices = list(self.choices) if self.choices is not None else []
850 if include_blank:
851 blank_defined = any(choice in ("", None) for choice, _ in self.flatchoices)
852 if not blank_defined:
853 choices = blank_choice + choices
854 return choices
855
856 @cached_property
857 def flatchoices(self) -> list[tuple[Any, Any]]:
858 """Flattened version of choices tuple (choices are fixed at init)."""
859 if self.choices is None:
860 return []
861 flat = []
862 for choice, value in self.choices:
863 if isinstance(value, list | tuple):
864 flat.extend(value)
865 else:
866 flat.append((choice, value))
867 return flat