1from __future__ import annotations
2
3import collections.abc
4import copy
5import datetime
6import decimal
7import enum
8import operator
9import uuid
10import warnings
11from base64 import b64decode, b64encode
12from collections.abc import Callable, Sequence
13from functools import cached_property
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Generic,
18 Protocol,
19 Self,
20 TypedDict,
21 TypeVar,
22 cast,
23 overload,
24)
25
26from plain import exceptions, validators
27from plain.models.constants import LOOKUP_SEP
28from plain.models.db import db_connection
29from plain.models.enums import ChoicesMeta
30from plain.models.query_utils import RegisterLookupMixin
31from plain.preflight import PreflightResult
32from plain.utils import timezone
33from plain.utils.datastructures import DictWrapper
34from plain.utils.dateparse import (
35 parse_date,
36 parse_datetime,
37 parse_duration,
38 parse_time,
39)
40from plain.utils.duration import duration_microseconds, duration_string
41from plain.utils.functional import Promise
42from plain.utils.ipv6 import clean_ipv6_address
43from plain.utils.itercompat import is_iterable
44
45from ..registry import models_registry
46
47if TYPE_CHECKING:
48 from plain.models.backends.base.base import BaseDatabaseWrapper
49 from plain.models.base import Model
50 from plain.models.expressions import Col
51 from plain.models.fields.reverse_related import ForeignObjectRel
52 from plain.models.sql.compiler import SQLCompiler
53
54
55class DbParameters(TypedDict, total=False):
56 """Return type for Field.db_parameters()."""
57
58 type: str | None
59 check: str | None
60 collation: str | None
61
62
63__all__ = [
64 "BLANK_CHOICE_DASH",
65 "DbParameters",
66 "PrimaryKeyField",
67 "BigIntegerField",
68 "BinaryField",
69 "BooleanField",
70 "CharField",
71 "DateField",
72 "DateTimeField",
73 "DecimalField",
74 "DurationField",
75 "EmailField",
76 "Empty",
77 "Field",
78 "FloatField",
79 "GenericIPAddressField",
80 "IntegerField",
81 "NOT_PROVIDED",
82 "PositiveBigIntegerField",
83 "PositiveIntegerField",
84 "PositiveSmallIntegerField",
85 "SmallIntegerField",
86 "TextField",
87 "TimeField",
88 "URLField",
89 "UUIDField",
90]
91
92
93class Empty:
94 pass
95
96
97class NOT_PROVIDED:
98 pass
99
100
101# The values to use for "blank" in SelectFields. Will be appended to the start
102# of most "choices" lists.
103BLANK_CHOICE_DASH = [("", "---------")]
104
105
106def _load_field(
107 package_label: str, model_name: str, field_name: str
108) -> Field[Any] | ForeignObjectRel:
109 return models_registry.get_model(package_label, model_name)._model_meta.get_field(
110 field_name
111 )
112
113
114# A guide to Field parameters:
115#
116# * name: The name of the field specified in the model.
117# * attname: The attribute to use on the model object. This is the same as
118# "name", except in the case of ForeignKeys, where "_id" is
119# appended.
120# * db_column: The db_column specified in the model (or None).
121# * column: The database column for this field. This is the same as
122# "attname", except if db_column is specified.
123#
124# Code that introspects values, or does other dynamic things, should use
125# attname.
126
127
128def _empty(of_cls: type) -> Empty:
129 new = Empty()
130 new.__class__ = of_cls
131 return new
132
133
134def return_None() -> None:
135 return None
136
137
138# TypeVar for Field's generic type parameter
139T = TypeVar("T")
140
141
142class Field(RegisterLookupMixin, Generic[T]):
143 """Base class for all field types"""
144
145 # Instance attributes set during field lifecycle
146 # Set by __init__
147 name: str | None
148 max_length: int | None
149 db_column: str | None
150 # Set by set_attributes_from_name (called by contribute_to_class)
151 attname: str
152 column: str
153 concrete: bool
154 # Set by contribute_to_class
155 model: type[Model]
156
157 # Designates whether empty strings fundamentally are allowed at the
158 # database level.
159 empty_strings_allowed = True
160 empty_values = list(validators.EMPTY_VALUES)
161
162 default_validators = [] # Default set of validators
163 default_error_messages = {
164 "invalid_choice": "Value %(value)r is not a valid choice.",
165 "allow_null": "This field cannot be null.",
166 "required": "This field is be required.",
167 "unique": "A %(model_name)s with this %(field_label)s already exists.",
168 }
169
170 # Attributes that don't affect a column definition.
171 # These attributes are ignored when altering the field.
172 non_db_attrs = (
173 "required",
174 "choices",
175 "db_column",
176 "error_messages",
177 "limit_choices_to",
178 # Database-level options are not supported, see #21961.
179 "on_delete",
180 "related_query_name",
181 "validators",
182 )
183
184 # Generic field type description, usually overridden by subclasses
185 def _description(self) -> str:
186 return f"Field of type: {self.__class__.__name__}"
187
188 description = property(_description)
189
190 def __init__(
191 self,
192 *,
193 max_length: int | None = None,
194 required: bool = True,
195 allow_null: bool = False,
196 default: Any = NOT_PROVIDED,
197 choices: Any = None,
198 db_column: str | None = None,
199 validators: Sequence[Callable[..., Any]] = (),
200 error_messages: dict[str, str] | None = None,
201 db_comment: str | None = None,
202 ):
203 self.name = None # Set by set_attributes_from_name
204 self.max_length = max_length
205 self.required, self.allow_null = required, allow_null
206 self.default = default
207 if isinstance(choices, ChoicesMeta):
208 choices = choices.choices
209 elif isinstance(choices, enum.EnumMeta):
210 choices = [(member.value, member.name) for member in choices]
211 if isinstance(choices, collections.abc.Iterator):
212 choices = list(choices)
213 self.choices = choices
214 self.db_column = db_column
215 self.db_comment = db_comment
216
217 self.primary_key = False
218 self.auto_created = False
219
220 self._validators = list(validators) # Store for deconstruction later
221
222 self._error_messages = error_messages # Store for deconstruction later
223
224 def __str__(self) -> str:
225 """
226 Return "package_label.model_label.field_name" for fields attached to
227 models.
228 """
229 if not hasattr(self, "model"):
230 return super().__str__()
231 model = self.model
232 return f"{model.model_options.label}.{self.name}"
233
234 def __repr__(self) -> str:
235 """Display the module, class, and name of the field."""
236 path = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
237 name = getattr(self, "name", None)
238 if name is not None:
239 return f"<{path}: {name}>"
240 return f"<{path}>"
241
242 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
243 return [
244 *self._check_field_name(),
245 *self._check_choices(),
246 *self._check_db_comment(),
247 *self._check_null_allowed_for_primary_keys(),
248 *self._check_backend_specific_checks(),
249 *self._check_validators(),
250 ]
251
252 def _check_field_name(self) -> list[PreflightResult]:
253 """
254 Check if field name is valid, i.e. 1) does not end with an
255 underscore, 2) does not contain "__" and 3) is not "id".
256 """
257 assert self.name is not None, "Field name must be set before checking"
258 if self.name.endswith("_"):
259 return [
260 PreflightResult(
261 fix="Field names must not end with an underscore.",
262 obj=self,
263 id="fields.name_ends_with_underscore",
264 )
265 ]
266 elif LOOKUP_SEP in self.name:
267 return [
268 PreflightResult(
269 fix=f'Field names must not contain "{LOOKUP_SEP}".',
270 obj=self,
271 id="fields.name_contains_lookup_separator",
272 )
273 ]
274 elif self.name == "id":
275 return [
276 PreflightResult(
277 fix="'id' is a reserved word that cannot be used as a field name.",
278 obj=self,
279 id="fields.reserved_field_name_id",
280 )
281 ]
282 else:
283 return []
284
285 @classmethod
286 def _choices_is_value(cls, value: Any) -> bool:
287 return isinstance(value, str | Promise) or not is_iterable(value)
288
289 def _check_choices(self) -> list[PreflightResult]:
290 if not self.choices:
291 return []
292
293 if not is_iterable(self.choices) or isinstance(self.choices, str):
294 return [
295 PreflightResult(
296 fix="'choices' must be an iterable (e.g., a list or tuple).",
297 obj=self,
298 id="fields.choices_not_iterable",
299 )
300 ]
301
302 choice_max_length = 0
303 # Expect [group_name, [value, display]]
304 for choices_group in self.choices:
305 try:
306 group_name, group_choices = choices_group
307 except (TypeError, ValueError):
308 # Containing non-pairs
309 break
310 try:
311 if not all(
312 self._choices_is_value(value) and self._choices_is_value(human_name)
313 for value, human_name in group_choices
314 ):
315 break
316 if self.max_length is not None and group_choices:
317 choice_max_length = max(
318 [
319 choice_max_length,
320 *(
321 len(value)
322 for value, _ in group_choices
323 if isinstance(value, str)
324 ),
325 ]
326 )
327 except (TypeError, ValueError):
328 # No groups, choices in the form [value, display]
329 value, human_name = group_name, group_choices
330 if not self._choices_is_value(value) or not self._choices_is_value(
331 human_name
332 ):
333 break
334 if self.max_length is not None and isinstance(value, str):
335 choice_max_length = max(choice_max_length, len(value))
336
337 # Special case: choices=['ab']
338 if isinstance(choices_group, str):
339 break
340 else:
341 if self.max_length is not None and choice_max_length > self.max_length:
342 return [
343 PreflightResult(
344 fix="'max_length' is too small to fit the longest value " # noqa: UP031
345 "in 'choices' (%d characters)." % choice_max_length,
346 obj=self,
347 id="fields.max_length_too_small_for_choices",
348 ),
349 ]
350 return []
351
352 return [
353 PreflightResult(
354 fix="'choices' must be an iterable containing "
355 "(actual value, human readable name) tuples.",
356 obj=self,
357 id="fields.choices_invalid_format",
358 )
359 ]
360
361 def _check_db_comment(self) -> list[PreflightResult]:
362 if not self.db_comment:
363 return []
364 errors = []
365 if not (
366 db_connection.features.supports_comments
367 or "supports_comments" in self.model.model_options.required_db_features
368 ):
369 errors.append(
370 PreflightResult(
371 fix=f"{db_connection.display_name} does not support comments on "
372 f"columns (db_comment).",
373 obj=self,
374 id="fields.db_comment_unsupported",
375 warning=True,
376 )
377 )
378 return errors
379
380 def _check_null_allowed_for_primary_keys(self) -> list[PreflightResult]:
381 if self.primary_key and self.allow_null:
382 # We cannot reliably check this for backends like Oracle which
383 # consider NULL and '' to be equal (and thus set up
384 # character-based fields a little differently).
385 return [
386 PreflightResult(
387 fix="Primary keys must not have allow_null=True. "
388 "Set allow_null=False on the field, or "
389 "remove primary_key=True argument.",
390 obj=self,
391 id="fields.primary_key_allows_null",
392 )
393 ]
394 else:
395 return []
396
397 def _check_backend_specific_checks(self) -> list[PreflightResult]:
398 errors = []
399 errors.extend(db_connection.validation.check_field(self))
400 return errors
401
402 def _check_validators(self) -> list[PreflightResult]:
403 errors = []
404 for i, validator in enumerate(self.validators):
405 if not callable(validator):
406 errors.append(
407 PreflightResult(
408 fix=(
409 "All 'validators' must be callable. "
410 f"validators[{i}] ({repr(validator)}) isn't a function or "
411 "instance of a validator class."
412 ),
413 obj=self,
414 id="fields.invalid_validator",
415 )
416 )
417 return errors
418
419 def get_col(self, alias: str | None, output_field: Field | None = None) -> Col:
420 if alias == self.model.model_options.db_table and (
421 output_field is None or output_field == self
422 ):
423 return self.cached_col
424 from plain.models.expressions import Col
425
426 return Col(alias, self, output_field)
427
428 @cached_property
429 def cached_col(self) -> Col:
430 from plain.models.expressions import Col
431
432 return Col(self.model.model_options.db_table, self)
433
434 def select_format(
435 self, compiler: SQLCompiler, sql: str, params: Any
436 ) -> tuple[str, Any]:
437 """
438 Custom format for select clauses. For example, GIS columns need to be
439 selected as AsText(table.col) on MySQL as the table.col data can't be
440 used by Plain.
441 """
442 return sql, params
443
444 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
445 """
446 Return enough information to recreate the field as a 4-tuple:
447
448 * The name of the field on the model, if contribute_to_class() has
449 been run.
450 * The import path of the field, including the class, e.g.
451 plain.models.IntegerField. This should be the most portable
452 version, so less specific may be better.
453 * A list of positional arguments.
454 * A dict of keyword arguments.
455
456 Note that the positional or keyword arguments must contain values of
457 the following types (including inner values of collection types):
458
459 * None, bool, str, int, float, complex, set, frozenset, list, tuple,
460 dict
461 * UUID
462 * datetime.datetime (naive), datetime.date
463 * top-level classes, top-level functions - will be referenced by their
464 full import path
465 * Storage instances - these have their own deconstruct() method
466
467 This is because the values here must be serialized into a text format
468 (possibly new Python code, possibly JSON) and these are the only types
469 with encoding handlers defined.
470
471 There's no need to return the exact way the field was instantiated this
472 time, just ensure that the resulting field is the same - prefer keyword
473 arguments over positional ones, and omit parameters with their default
474 values.
475 """
476 # Short-form way of fetching all the default parameters
477 keywords = {}
478 possibles = {
479 "max_length": None,
480 "required": True,
481 "allow_null": False,
482 "default": NOT_PROVIDED,
483 "choices": None,
484 "db_column": None,
485 "db_comment": None,
486 "validators": [],
487 "error_messages": None,
488 }
489 attr_overrides = {
490 "error_messages": "_error_messages",
491 "validators": "_validators",
492 }
493 equals_comparison = {"choices", "validators"}
494 for name, default in possibles.items():
495 value = getattr(self, attr_overrides.get(name, name))
496 # Unroll anything iterable for choices into a concrete list
497 if name == "choices" and isinstance(value, collections.abc.Iterable):
498 value = list(value)
499 # Do correct kind of comparison
500 if name in equals_comparison:
501 if value != default:
502 keywords[name] = value
503 else:
504 if value is not default:
505 keywords[name] = value
506 # Work out path - we shorten it for known Plain core fields
507 path = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
508 if path.startswith("plain.models.fields.related"):
509 path = path.replace("plain.models.fields.related", "plain.models")
510 elif path.startswith("plain.models.fields.json"):
511 path = path.replace("plain.models.fields.json", "plain.models")
512 elif path.startswith("plain.models.fields.proxy"):
513 path = path.replace("plain.models.fields.proxy", "plain.models")
514 elif path.startswith("plain.models.fields.timezones"):
515 path = path.replace("plain.models.fields.timezones", "plain.models")
516 elif path.startswith("plain.models.fields"):
517 path = path.replace("plain.models.fields", "plain.models")
518 # Return basic info - other fields should override this.
519 # Note: self.name can be None during migration state rendering when fields are cloned
520 return (self.name, path, [], keywords)
521
522 def clone(self) -> Self:
523 """
524 Uses deconstruct() to clone a new copy of this Field.
525 Will not preserve any class attachments/attribute names.
526 """
527 name, path, args, kwargs = self.deconstruct()
528 return self.__class__(*args, **kwargs)
529
530 def __deepcopy__(self, memodict: dict[int, Any]) -> Self:
531 # We don't have to deepcopy very much here, since most things are not
532 # intended to be altered after initial creation.
533 obj = copy.copy(self)
534 memodict[id(self)] = obj
535 return obj
536
537 def __copy__(self) -> Self:
538 # We need to avoid hitting __reduce__, so define this
539 # slightly weird copy construct.
540 obj = Empty()
541 obj.__class__ = self.__class__
542 obj.__dict__ = self.__dict__.copy()
543 return cast(Self, obj)
544
545 def __reduce__(
546 self,
547 ) -> (
548 tuple[Callable[..., Any], tuple[Any, ...], dict[str, Any]]
549 | tuple[Callable[..., Field[Any] | ForeignObjectRel], tuple[str, str, str]]
550 ):
551 """
552 Pickling should return the model._model_meta.fields instance of the field,
553 not a new copy of that field. So, use the app registry to load the
554 model and then the field back.
555 """
556 model = getattr(self, "model", None)
557 if model is None:
558 # Fields are sometimes used without attaching them to models (for
559 # example in aggregation). In this case give back a plain field
560 # instance. The code below will create a new empty instance of
561 # class self.__class__, then update its dict with self.__dict__
562 # values - so, this is very close to normal pickle.
563 state = self.__dict__.copy()
564 # The _get_default cached_property can't be pickled due to lambda
565 # usage.
566 state.pop("_get_default", None)
567 return _empty, (self.__class__,), state
568 assert self.name is not None
569 options = model.model_options
570 return _load_field, (
571 options.package_label,
572 options.object_name,
573 self.name,
574 )
575
576 def get_id_value_on_save(self, instance: Model) -> T | None:
577 """
578 Hook to generate new primary key values on save. This method is called when
579 saving instances with no primary key value set. If this method returns
580 something else than None, then the returned value is used when saving
581 the new instance.
582 """
583 if self.default:
584 return self.get_default()
585 return None
586
587 def to_python(self, value: Any) -> T | None:
588 """
589 Convert the input value into the expected Python data type, raising
590 plain.exceptions.ValidationError if the data can't be converted.
591 Return the converted value. Subclasses should override this.
592 """
593 return value
594
595 @cached_property
596 def error_messages(self) -> dict[str, str]:
597 messages = {}
598 for c in reversed(self.__class__.__mro__):
599 messages.update(getattr(c, "default_error_messages", {}))
600 messages.update(self._error_messages or {})
601 return messages
602
603 @cached_property
604 def validators(self) -> list[Callable[..., Any]]:
605 """
606 Some validators can't be created at field initialization time.
607 This method provides a way to delay their creation until required.
608 """
609 return [*self.default_validators, *self._validators]
610
611 def run_validators(self, value: Any) -> None:
612 if value in self.empty_values:
613 return
614
615 errors = []
616 for v in self.validators:
617 try:
618 v(value)
619 except exceptions.ValidationError as e:
620 if hasattr(e, "code") and e.code in self.error_messages:
621 e.message = self.error_messages[e.code]
622 errors.extend(e.error_list)
623
624 if errors:
625 raise exceptions.ValidationError(errors)
626
627 def validate(self, value: Any, model_instance: Model) -> None:
628 """
629 Validate value and raise ValidationError if necessary. Subclasses
630 should override this to provide validation logic.
631 """
632
633 if self.choices is not None and value not in self.empty_values:
634 for option_key, option_value in self.choices:
635 if isinstance(option_value, list | tuple):
636 # This is an optgroup, so look inside the group for
637 # options.
638 for optgroup_key, optgroup_value in option_value:
639 if value == optgroup_key:
640 return
641 elif value == option_key:
642 return
643 raise exceptions.ValidationError(
644 self.error_messages["invalid_choice"],
645 code="invalid_choice",
646 params={"value": value},
647 )
648
649 if value is None and not self.allow_null:
650 raise exceptions.ValidationError(
651 self.error_messages["allow_null"], code="allow_null"
652 )
653
654 if self.required and value in self.empty_values:
655 raise exceptions.ValidationError(
656 self.error_messages["required"], code="required"
657 )
658
659 def clean(self, value: Any, model_instance: Model) -> T | None:
660 """
661 Convert the value's type and run validation. Validation errors
662 from to_python() and validate() are propagated. Return the correct
663 value if no error is raised.
664 """
665 value = self.to_python(value)
666 self.validate(value, model_instance)
667 self.run_validators(value)
668 return value
669
670 def db_type_parameters(self, connection: BaseDatabaseWrapper) -> DictWrapper:
671 return DictWrapper(self.__dict__, connection.ops.quote_name, "qn_")
672
673 def db_check(self, connection: BaseDatabaseWrapper) -> str | None:
674 """
675 Return the database column check constraint for this field, for the
676 provided connection. Works the same way as db_type() for the case that
677 get_internal_type() does not map to a preexisting model field.
678 """
679 data = self.db_type_parameters(connection)
680 try:
681 return (
682 connection.data_type_check_constraints[self.get_internal_type()] % data
683 )
684 except KeyError:
685 return None
686
687 def db_type(self, connection: BaseDatabaseWrapper) -> str | None:
688 """
689 Return the database column data type for this field, for the provided
690 connection.
691 """
692 # The default implementation of this method looks at the
693 # backend-specific data_types dictionary, looking up the field by its
694 # "internal type".
695 #
696 # A Field class can implement the get_internal_type() method to specify
697 # which *preexisting* Plain Field class it's most similar to -- i.e.,
698 # a custom field might be represented by a TEXT column type, which is
699 # the same as the TextField Plain field type, which means the custom
700 # field's get_internal_type() returns 'TextField'.
701 #
702 # But the limitation of the get_internal_type() / data_types approach
703 # is that it cannot handle database column types that aren't already
704 # mapped to one of the built-in Plain field types. In this case, you
705 # can implement db_type() instead of get_internal_type() to specify
706 # exactly which wacky database column type you want to use.
707 data = self.db_type_parameters(connection)
708 try:
709 column_type = connection.data_types[self.get_internal_type()]
710 except KeyError:
711 return None
712 else:
713 # column_type is either a single-parameter function or a string.
714 if callable(column_type):
715 return column_type(data)
716 return column_type % data
717
718 def rel_db_type(self, connection: BaseDatabaseWrapper) -> str | None:
719 """
720 Return the data type that a related field pointing to this field should
721 use. For example, this method is called by ForeignKeyField to determine its data type.
722 """
723 return self.db_type(connection)
724
725 def cast_db_type(self, connection: BaseDatabaseWrapper) -> str | None:
726 """Return the data type to use in the Cast() function."""
727 db_type = connection.ops.cast_data_types.get(self.get_internal_type())
728 if db_type:
729 return db_type % self.db_type_parameters(connection)
730 return self.db_type(connection)
731
732 def db_parameters(self, connection: BaseDatabaseWrapper) -> DbParameters:
733 """
734 Extension of db_type(), providing a range of different return values
735 (type, checks). This will look at db_type(), allowing custom model
736 fields to override it.
737 """
738 type_string = self.db_type(connection)
739 check_string = self.db_check(connection)
740 return {
741 "type": type_string,
742 "check": check_string,
743 }
744
745 def db_type_suffix(self, connection: BaseDatabaseWrapper) -> str | None:
746 return connection.data_types_suffix.get(self.get_internal_type())
747
748 def get_db_converters(
749 self, connection: BaseDatabaseWrapper
750 ) -> list[Callable[..., Any]]:
751 if from_db_value := getattr(self, "from_db_value", None):
752 return [from_db_value]
753 return []
754
755 @property
756 def db_returning(self) -> bool:
757 """
758 Private API intended only to be used by Plain itself. Currently only
759 the PostgreSQL backend supports returning multiple fields on a model.
760 """
761 return False
762
763 def set_attributes_from_name(self, name: str) -> None:
764 self.name = self.name or name
765 self.attname, self.column = self.get_attname_column()
766 self.concrete = self.column is not None
767
768 def contribute_to_class(self, cls: type[Model], name: str) -> None:
769 """
770 Register the field with the model class it belongs to.
771
772 Field now acts as its own descriptor - it stays on the class and handles
773 __get__/__set__/__delete__ directly.
774 """
775 self.set_attributes_from_name(name)
776 self.model = cls
777 cls._model_meta.add_field(self)
778
779 # Field is now a descriptor itself - ensure it's set on the class
780 # This is important for inherited fields that get deepcopied in Meta.__get__
781 if self.column:
782 setattr(cls, self.attname, self)
783
784 # Descriptor protocol implementation
785 @overload
786 def __get__(self, instance: None, owner: type[Model]) -> Self: ...
787
788 @overload
789 def __get__(self, instance: Model, owner: type[Model]) -> T: ...
790
791 def __get__(self, instance: Model | None, owner: type[Model]) -> Self | T:
792 """
793 Descriptor __get__ for attribute access.
794
795 Class access (User.email) returns the Field descriptor itself.
796 Instance access (user.email) returns the field value from instance.__dict__,
797 with lazy loading support if the value is not yet loaded.
798 """
799 # Class access - return the Field descriptor
800 if instance is None:
801 return self
802
803 # If field hasn't been contributed to a class yet (e.g., used standalone
804 # as an output_field in aggregates), just return self
805 if not hasattr(self, "attname"):
806 return self
807
808 # Instance access - get value from instance dict
809 data = instance.__dict__
810 field_name = self.attname
811
812 # If value not in dict, lazy load from database
813 if field_name not in data:
814 # Deferred field - load it from the database
815 instance.refresh_from_db(fields=[field_name])
816
817 return cast(T, data.get(field_name))
818
819 def __set__(self, instance: Model, value: Any) -> None:
820 """
821 Descriptor __set__ for attribute assignment.
822
823 Validates and converts the value using to_python(), then stores it
824 in instance.__dict__[attname].
825 """
826 # Safety check: ensure field has been properly initialized
827 if not hasattr(self, "attname"):
828 raise AttributeError(
829 f"Field {self.__class__.__name__} has not been initialized properly. "
830 f"The field's contribute_to_class() has not been called yet. "
831 f"This usually means the field is being used before it was added to a model class."
832 )
833
834 # Convert/validate the value
835 if value is not None:
836 value = self.to_python(value)
837
838 # Store in instance dict
839 instance.__dict__[self.attname] = value
840
841 def __delete__(self, instance: Model) -> None:
842 """
843 Descriptor __delete__ for attribute deletion.
844
845 Removes the value from instance.__dict__.
846 """
847 try:
848 del instance.__dict__[self.attname]
849 except KeyError:
850 raise AttributeError(
851 f"{instance.__class__.__name__!r} object has no attribute {self.attname!r}"
852 )
853
854 def get_attname(self) -> str:
855 assert self.name is not None # Field name must be set
856 return self.name
857
858 def get_attname_column(self) -> tuple[str, str]:
859 attname = self.get_attname()
860 column = self.db_column or attname
861 return attname, column
862
863 def get_internal_type(self) -> str:
864 return self.__class__.__name__
865
866 def pre_save(self, model_instance: Model, add: bool) -> T | None:
867 """Return field's value just before saving."""
868 return getattr(model_instance, self.attname)
869
870 def get_prep_value(self, value: Any) -> Any:
871 """Perform preliminary non-db specific value checks and conversions."""
872 if isinstance(value, Promise):
873 value = value._proxy____cast()
874 return value
875
876 def get_db_prep_value(
877 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
878 ) -> Any:
879 """
880 Return field's value prepared for interacting with the database backend.
881
882 Used by the default implementations of get_db_prep_save().
883 """
884 if not prepared:
885 value = self.get_prep_value(value)
886 return value
887
888 def get_db_prep_save(self, value: Any, connection: BaseDatabaseWrapper) -> Any:
889 """Return field's value prepared for saving into a database."""
890 if hasattr(value, "as_sql"):
891 return value
892 return self.get_db_prep_value(value, connection=connection, prepared=False)
893
894 def has_default(self) -> bool:
895 """Return a boolean of whether this field has a default value."""
896 return self.default is not NOT_PROVIDED
897
898 def get_default(self) -> T | None:
899 """Return the default value for this field."""
900 return self._get_default()
901
902 @cached_property
903 def _get_default(self) -> Callable[[], Any]:
904 if self.has_default():
905 if callable(self.default):
906 return self.default
907 return lambda: self.default
908
909 if not self.empty_strings_allowed or self.allow_null:
910 return return_None
911 return str # return empty string
912
913 def get_limit_choices_to(self) -> Any:
914 """
915 Return ``limit_choices_to`` for this model field.
916 Overridden by related fields (ForeignKey, etc.).
917 """
918 raise NotImplementedError(
919 "get_limit_choices_to() should only be called on related fields"
920 )
921
922 def get_choices(
923 self,
924 include_blank: bool = True,
925 blank_choice: list[tuple[str, str]] = BLANK_CHOICE_DASH,
926 limit_choices_to: Any = None,
927 ordering: tuple[str, ...] = (),
928 ) -> list[tuple[Any, str]]:
929 """
930 Return choices with a default blank choices included, for use
931 as <select> choices for this field.
932 """
933 if self.choices is not None:
934 choices = list(self.choices)
935 if include_blank:
936 blank_defined = any(
937 choice in ("", None) for choice, _ in self.flatchoices
938 )
939 if not blank_defined:
940 choices = blank_choice + choices
941 return choices
942 remote_field = getattr(self, "remote_field", None)
943 if remote_field is None or getattr(remote_field, "model", None) is None:
944 return blank_choice if include_blank else []
945 rel_model = remote_field.model
946 limit_choices_to = limit_choices_to or self.get_limit_choices_to()
947 related_field_name = (
948 remote_field.get_related_field().attname
949 if hasattr(remote_field, "get_related_field")
950 else "id"
951 )
952 choice_func = operator.attrgetter(related_field_name)
953 qs = rel_model.query.complex_filter(limit_choices_to)
954 if ordering:
955 qs = qs.order_by(*ordering)
956 return (blank_choice if include_blank else []) + [
957 (choice_func(x), str(x)) for x in qs
958 ]
959
960 def value_to_string(self, obj: Model) -> str:
961 """
962 Return a string value of this field from the passed obj.
963 This is used by the serialization framework.
964 """
965 return str(self.value_from_object(obj))
966
967 def _get_flatchoices(self) -> list[tuple[Any, Any]]:
968 """Flattened version of choices tuple."""
969 if self.choices is None:
970 return []
971 flat = []
972 for choice, value in self.choices:
973 if isinstance(value, list | tuple):
974 flat.extend(value)
975 else:
976 flat.append((choice, value))
977 return flat
978
979 flatchoices = property(_get_flatchoices)
980
981 def save_form_data(self, instance: Model, data: Any) -> None:
982 assert self.name is not None
983 setattr(instance, self.name, data)
984
985 def value_from_object(self, obj: Model) -> T | None:
986 """Return the value of this field in the given model instance."""
987 return getattr(obj, self.attname)
988
989
990class BooleanField(Field[bool]):
991 empty_strings_allowed = False
992 default_error_messages = {
993 "invalid": '"%(value)s" value must be either True or False.',
994 "invalid_nullable": '"%(value)s" value must be either True, False, or None.',
995 }
996 description = "Boolean (Either True or False)"
997
998 def get_internal_type(self) -> str:
999 return "BooleanField"
1000
1001 def to_python(self, value: Any) -> bool | None:
1002 if self.allow_null and value in self.empty_values:
1003 return None
1004 if value in (True, False):
1005 # 1/0 are equal to True/False. bool() converts former to latter.
1006 return bool(value)
1007 if value in ("t", "True", "1"):
1008 return True
1009 if value in ("f", "False", "0"):
1010 return False
1011 raise exceptions.ValidationError(
1012 self.error_messages["invalid_nullable" if self.allow_null else "invalid"],
1013 code="invalid",
1014 params={"value": value},
1015 )
1016
1017 def get_prep_value(self, value: Any) -> Any:
1018 value = super().get_prep_value(value)
1019 if value is None:
1020 return None
1021 return self.to_python(value)
1022
1023
1024class CharField(Field[str]):
1025 def __init__(self, *, db_collation: str | None = None, **kwargs: Any):
1026 super().__init__(**kwargs)
1027 self.db_collation = db_collation
1028 if self.max_length is not None:
1029 self.validators.append(validators.MaxLengthValidator(self.max_length))
1030
1031 @property
1032 def description(self) -> str:
1033 if self.max_length is not None:
1034 return "String (up to %(max_length)s)"
1035 else:
1036 return "String (unlimited)"
1037
1038 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1039 return [
1040 *super().preflight(**kwargs),
1041 *self._check_db_collation(),
1042 *self._check_max_length_attribute(),
1043 ]
1044
1045 def _check_max_length_attribute(self, **kwargs: Any) -> list[PreflightResult]:
1046 if self.max_length is None:
1047 if (
1048 db_connection.features.supports_unlimited_charfield
1049 or "supports_unlimited_charfield"
1050 in self.model.model_options.required_db_features
1051 ):
1052 return []
1053 return [
1054 PreflightResult(
1055 fix="CharFields must define a 'max_length' attribute.",
1056 obj=self,
1057 id="fields.charfield_missing_max_length",
1058 )
1059 ]
1060 elif (
1061 not isinstance(self.max_length, int)
1062 or isinstance(self.max_length, bool)
1063 or self.max_length <= 0
1064 ):
1065 return [
1066 PreflightResult(
1067 fix="'max_length' must be a positive integer.",
1068 obj=self,
1069 id="fields.charfield_invalid_max_length",
1070 )
1071 ]
1072 else:
1073 return []
1074
1075 def _check_db_collation(self) -> list[PreflightResult]:
1076 errors = []
1077 if not (
1078 self.db_collation is None
1079 or "supports_collation_on_charfield"
1080 in self.model.model_options.required_db_features
1081 or db_connection.features.supports_collation_on_charfield
1082 ):
1083 errors.append(
1084 PreflightResult(
1085 fix=f"{db_connection.display_name} does not support a database collation on "
1086 "CharFields.",
1087 obj=self,
1088 id="fields.db_collation_unsupported",
1089 ),
1090 )
1091 return errors
1092
1093 def cast_db_type(self, connection: BaseDatabaseWrapper) -> str | None:
1094 if self.max_length is None:
1095 return connection.ops.cast_char_field_without_max_length
1096 return super().cast_db_type(connection)
1097
1098 def db_parameters(self, connection: BaseDatabaseWrapper) -> DbParameters:
1099 db_params = super().db_parameters(connection)
1100 db_params["collation"] = self.db_collation
1101 return db_params
1102
1103 def get_internal_type(self) -> str:
1104 return "CharField"
1105
1106 def to_python(self, value: Any) -> str | None:
1107 if isinstance(value, str) or value is None:
1108 return value
1109 return str(value)
1110
1111 def get_prep_value(self, value: Any) -> Any:
1112 value = super().get_prep_value(value)
1113 return self.to_python(value)
1114
1115 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1116 name, path, args, kwargs = super().deconstruct()
1117 if self.db_collation:
1118 kwargs["db_collation"] = self.db_collation
1119 return name, path, args, kwargs
1120
1121
1122def _to_naive(value: datetime.datetime) -> datetime.datetime:
1123 if timezone.is_aware(value):
1124 value = timezone.make_naive(value, datetime.UTC)
1125 return value
1126
1127
1128def _get_naive_now() -> datetime.datetime:
1129 return _to_naive(timezone.now())
1130
1131
1132class DateTimeCheckMixin(Field):
1133 auto_now: bool
1134 auto_now_add: bool
1135
1136 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1137 return [
1138 *super().preflight(**kwargs),
1139 *self._check_mutually_exclusive_options(),
1140 *self._check_fix_default_value(),
1141 ]
1142
1143 def _check_mutually_exclusive_options(self) -> list[PreflightResult]:
1144 # auto_now, auto_now_add, and default are mutually exclusive
1145 # options. The use of more than one of these options together
1146 # will trigger an Error
1147 mutually_exclusive_options = [
1148 self.auto_now_add,
1149 self.auto_now,
1150 self.has_default(),
1151 ]
1152 enabled_options = [
1153 option not in (None, False) for option in mutually_exclusive_options
1154 ].count(True)
1155 if enabled_options > 1:
1156 return [
1157 PreflightResult(
1158 fix="The options auto_now, auto_now_add, and default "
1159 "are mutually exclusive. Only one of these options "
1160 "may be present.",
1161 obj=self,
1162 id="fields.datetime_auto_options_mutually_exclusive",
1163 )
1164 ]
1165 else:
1166 return []
1167
1168 def _check_fix_default_value(self) -> list[PreflightResult]:
1169 return []
1170
1171 # Concrete subclasses use this in their implementations of
1172 # _check_fix_default_value().
1173 def _check_if_value_fixed(
1174 self,
1175 value: datetime.date | datetime.datetime,
1176 now: datetime.datetime | None = None,
1177 ) -> list[PreflightResult]:
1178 """
1179 Check if the given value appears to have been provided as a "fixed"
1180 time value, and include a warning in the returned list if it does. The
1181 value argument must be a date object or aware/naive datetime object. If
1182 now is provided, it must be a naive datetime object.
1183 """
1184 if now is None:
1185 now = _get_naive_now()
1186 offset = datetime.timedelta(seconds=10)
1187 lower = now - offset
1188 upper = now + offset
1189 if isinstance(value, datetime.datetime):
1190 value = _to_naive(value)
1191 else:
1192 assert isinstance(value, datetime.date)
1193 lower = lower.date()
1194 upper = upper.date()
1195 if lower <= value <= upper:
1196 return [
1197 PreflightResult(
1198 fix="Fixed default value provided. "
1199 "It seems you set a fixed date / time / datetime "
1200 "value as default for this field. This may not be "
1201 "what you want. If you want to have the current date "
1202 "as default, use `plain.utils.timezone.now`",
1203 obj=self,
1204 id="fields.datetime_naive_default_value",
1205 warning=True,
1206 )
1207 ]
1208 return []
1209
1210
1211class DateField(DateTimeCheckMixin, Field[datetime.date]):
1212 empty_strings_allowed = False
1213 default_error_messages = {
1214 "invalid": '"%(value)s" value has an invalid date format. It must be in YYYY-MM-DD format.',
1215 "invalid_date": '"%(value)s" value has the correct format (YYYY-MM-DD) but it is an invalid date.',
1216 }
1217 description = "Date (without time)"
1218
1219 def __init__(
1220 self, *, auto_now: bool = False, auto_now_add: bool = False, **kwargs: Any
1221 ):
1222 self.auto_now, self.auto_now_add = auto_now, auto_now_add
1223 if auto_now or auto_now_add:
1224 kwargs["required"] = False
1225 super().__init__(**kwargs)
1226
1227 def _check_fix_default_value(self) -> list[PreflightResult]:
1228 """
1229 Warn that using an actual date or datetime value is probably wrong;
1230 it's only evaluated on server startup.
1231 """
1232 if not self.has_default():
1233 return []
1234
1235 value = self.default
1236 if isinstance(value, datetime.datetime):
1237 value = _to_naive(value).date()
1238 elif isinstance(value, datetime.date):
1239 pass
1240 else:
1241 # No explicit date / datetime value -- no checks necessary
1242 return []
1243 # At this point, value is a date object.
1244 return self._check_if_value_fixed(value)
1245
1246 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1247 name, path, args, kwargs = super().deconstruct()
1248 if self.auto_now:
1249 kwargs["auto_now"] = True
1250 if self.auto_now_add:
1251 kwargs["auto_now_add"] = True
1252 if self.auto_now or self.auto_now_add:
1253 del kwargs["required"]
1254 return name, path, args, kwargs
1255
1256 def get_internal_type(self) -> str:
1257 return "DateField"
1258
1259 def to_python(self, value: Any) -> datetime.date | None:
1260 if value is None:
1261 return value
1262 if isinstance(value, datetime.datetime):
1263 if timezone.is_aware(value):
1264 # Convert aware datetimes to the default time zone
1265 # before casting them to dates (#17742).
1266 default_timezone = timezone.get_default_timezone()
1267 value = timezone.make_naive(value, default_timezone)
1268 return value.date()
1269 if isinstance(value, datetime.date):
1270 return value
1271
1272 try:
1273 parsed = parse_date(value)
1274 if parsed is not None:
1275 return parsed
1276 except ValueError:
1277 raise exceptions.ValidationError(
1278 self.error_messages["invalid_date"],
1279 code="invalid_date",
1280 params={"value": value},
1281 )
1282
1283 raise exceptions.ValidationError(
1284 self.error_messages["invalid"],
1285 code="invalid",
1286 params={"value": value},
1287 )
1288
1289 def pre_save(self, model_instance: Model, add: bool) -> datetime.date | None:
1290 if self.auto_now or (self.auto_now_add and add):
1291 value = datetime.date.today()
1292 setattr(model_instance, self.attname, value)
1293 return value
1294 else:
1295 return super().pre_save(model_instance, add)
1296
1297 def get_prep_value(self, value: Any) -> Any:
1298 value = super().get_prep_value(value)
1299 return self.to_python(value)
1300
1301 def get_db_prep_value(
1302 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
1303 ) -> Any:
1304 # Casts dates into the format expected by the backend
1305 if not prepared:
1306 value = self.get_prep_value(value)
1307 return connection.ops.adapt_datefield_value(value)
1308
1309 def value_to_string(self, obj: Model) -> str:
1310 val = self.value_from_object(obj)
1311 return "" if val is None else val.isoformat()
1312
1313
1314class DateTimeField(DateField):
1315 empty_strings_allowed = False
1316 default_error_messages = {
1317 "invalid": '"%(value)s" value has an invalid format. It must be in YYYY-MM-DD HH:MM[:ss[.uuuuuu]][TZ] format.',
1318 "invalid_date": '"%(value)s" value has the correct format (YYYY-MM-DD) but it is an invalid date.',
1319 "invalid_datetime": '"%(value)s" value has the correct format (YYYY-MM-DD HH:MM[:ss[.uuuuuu]][TZ]) but it is an invalid date/time.',
1320 }
1321 description = "Date (with time)"
1322
1323 # __init__ is inherited from DateField
1324
1325 def _check_fix_default_value(self) -> list[PreflightResult]:
1326 """
1327 Warn that using an actual date or datetime value is probably wrong;
1328 it's only evaluated on server startup.
1329 """
1330 if not self.has_default():
1331 return []
1332
1333 value = self.default
1334 if isinstance(value, datetime.datetime | datetime.date):
1335 return self._check_if_value_fixed(value)
1336 # No explicit date / datetime value -- no checks necessary.
1337 return []
1338
1339 def get_internal_type(self) -> str:
1340 return "DateTimeField"
1341
1342 def to_python(self, value: Any) -> datetime.datetime | None:
1343 if value is None:
1344 return value
1345 if isinstance(value, datetime.datetime):
1346 return value
1347 if isinstance(value, datetime.date):
1348 value = datetime.datetime(value.year, value.month, value.day)
1349
1350 # For backwards compatibility, interpret naive datetimes in
1351 # local time. This won't work during DST change, but we can't
1352 # do much about it, so we let the exceptions percolate up the
1353 # call stack.
1354 warnings.warn(
1355 f"DateTimeField {self.model.__name__}.{self.name} received a naive datetime "
1356 f"({value}) while time zone support is active.",
1357 RuntimeWarning,
1358 )
1359 default_timezone = timezone.get_default_timezone()
1360 value = timezone.make_aware(value, default_timezone)
1361
1362 return value
1363
1364 try:
1365 parsed = parse_datetime(value)
1366 if parsed is not None:
1367 return parsed
1368 except ValueError:
1369 raise exceptions.ValidationError(
1370 self.error_messages["invalid_datetime"],
1371 code="invalid_datetime",
1372 params={"value": value},
1373 )
1374
1375 try:
1376 parsed = parse_date(value)
1377 if parsed is not None:
1378 return datetime.datetime(parsed.year, parsed.month, parsed.day)
1379 except ValueError:
1380 raise exceptions.ValidationError(
1381 self.error_messages["invalid_date"],
1382 code="invalid_date",
1383 params={"value": value},
1384 )
1385
1386 raise exceptions.ValidationError(
1387 self.error_messages["invalid"],
1388 code="invalid",
1389 params={"value": value},
1390 )
1391
1392 def pre_save(self, model_instance: Model, add: bool) -> datetime.datetime | None:
1393 if self.auto_now or (self.auto_now_add and add):
1394 value = timezone.now()
1395 setattr(model_instance, self.attname, value)
1396 return value
1397 else:
1398 return getattr(model_instance, self.attname)
1399
1400 def get_prep_value(self, value: Any) -> Any:
1401 value = super().get_prep_value(value)
1402 value = self.to_python(value)
1403 if value is not None and timezone.is_naive(value):
1404 # For backwards compatibility, interpret naive datetimes in local
1405 # time. This won't work during DST change, but we can't do much
1406 # about it, so we let the exceptions percolate up the call stack.
1407 try:
1408 name = f"{self.model.__name__}.{self.name}"
1409 except AttributeError:
1410 name = "(unbound)"
1411 warnings.warn(
1412 f"DateTimeField {name} received a naive datetime ({value})"
1413 " while time zone support is active.",
1414 RuntimeWarning,
1415 )
1416 default_timezone = timezone.get_default_timezone()
1417 value = timezone.make_aware(value, default_timezone)
1418 return value
1419
1420 def get_db_prep_value(
1421 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
1422 ) -> Any:
1423 # Casts datetimes into the format expected by the backend
1424 if not prepared:
1425 value = self.get_prep_value(value)
1426 return connection.ops.adapt_datetimefield_value(value)
1427
1428 def value_to_string(self, obj: Model) -> str:
1429 val = self.value_from_object(obj)
1430 return "" if val is None else val.isoformat()
1431
1432
1433class DecimalField(Field[decimal.Decimal]):
1434 empty_strings_allowed = False
1435 default_error_messages = {
1436 "invalid": '"%(value)s" value must be a decimal number.',
1437 }
1438 description = "Decimal number"
1439
1440 def __init__(
1441 self,
1442 *,
1443 max_digits: int | None = None,
1444 decimal_places: int | None = None,
1445 **kwargs: Any,
1446 ):
1447 self.max_digits, self.decimal_places = max_digits, decimal_places
1448 super().__init__(**kwargs)
1449
1450 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1451 errors = super().preflight(**kwargs)
1452
1453 digits_errors = [
1454 *self._check_decimal_places(),
1455 *self._check_max_digits(),
1456 ]
1457 if not digits_errors:
1458 errors.extend(self._check_decimal_places_and_max_digits())
1459 else:
1460 errors.extend(digits_errors)
1461 return errors
1462
1463 def _check_decimal_places(self) -> list[PreflightResult]:
1464 if self.decimal_places is None:
1465 return [
1466 PreflightResult(
1467 fix="DecimalFields must define a 'decimal_places' attribute.",
1468 obj=self,
1469 id="fields.decimalfield_missing_decimal_places",
1470 )
1471 ]
1472 try:
1473 decimal_places = int(self.decimal_places)
1474 if decimal_places < 0:
1475 raise ValueError()
1476 except ValueError:
1477 return [
1478 PreflightResult(
1479 fix="'decimal_places' must be a non-negative integer.",
1480 obj=self,
1481 id="fields.decimalfield_invalid_decimal_places",
1482 )
1483 ]
1484 else:
1485 return []
1486
1487 def _check_max_digits(self) -> list[PreflightResult]:
1488 if self.max_digits is None:
1489 return [
1490 PreflightResult(
1491 fix="DecimalFields must define a 'max_digits' attribute.",
1492 obj=self,
1493 id="fields.decimalfield_missing_max_digits",
1494 )
1495 ]
1496 try:
1497 max_digits = int(self.max_digits)
1498 if max_digits <= 0:
1499 raise ValueError()
1500 except ValueError:
1501 return [
1502 PreflightResult(
1503 fix="'max_digits' must be a positive integer.",
1504 obj=self,
1505 id="fields.decimalfield_invalid_max_digits",
1506 )
1507 ]
1508 else:
1509 return []
1510
1511 def _check_decimal_places_and_max_digits(self) -> list[PreflightResult]:
1512 if self.decimal_places is None or self.max_digits is None:
1513 return []
1514 if self.decimal_places > self.max_digits:
1515 return [
1516 PreflightResult(
1517 fix="'max_digits' must be greater or equal to 'decimal_places'.",
1518 obj=self,
1519 id="fields.decimalfield_decimal_places_exceeds_max_digits",
1520 )
1521 ]
1522 return []
1523
1524 @cached_property
1525 def validators(self) -> list[Callable[..., Any]]:
1526 return super().validators + [
1527 validators.DecimalValidator(self.max_digits, self.decimal_places)
1528 ]
1529
1530 @cached_property
1531 def context(self) -> decimal.Context:
1532 return decimal.Context(prec=self.max_digits)
1533
1534 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1535 name, path, args, kwargs = super().deconstruct()
1536 if self.max_digits is not None:
1537 kwargs["max_digits"] = self.max_digits
1538 if self.decimal_places is not None:
1539 kwargs["decimal_places"] = self.decimal_places
1540 return name, path, args, kwargs
1541
1542 def get_internal_type(self) -> str:
1543 return "DecimalField"
1544
1545 def to_python(self, value: Any) -> decimal.Decimal | None:
1546 if value is None:
1547 return value
1548 try:
1549 if isinstance(value, float):
1550 decimal_value = self.context.create_decimal_from_float(value)
1551 else:
1552 decimal_value = decimal.Decimal(value)
1553 except (decimal.InvalidOperation, TypeError, ValueError):
1554 raise exceptions.ValidationError(
1555 self.error_messages["invalid"],
1556 code="invalid",
1557 params={"value": value},
1558 )
1559 if not decimal_value.is_finite():
1560 raise exceptions.ValidationError(
1561 self.error_messages["invalid"],
1562 code="invalid",
1563 params={"value": value},
1564 )
1565 return decimal_value
1566
1567 def get_db_prep_value(
1568 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
1569 ) -> Any:
1570 if not prepared:
1571 value = self.get_prep_value(value)
1572 if hasattr(value, "as_sql"):
1573 return value
1574 return connection.ops.adapt_decimalfield_value(
1575 value, self.max_digits, self.decimal_places
1576 )
1577
1578 def get_prep_value(self, value: Any) -> Any:
1579 value = super().get_prep_value(value)
1580 return self.to_python(value)
1581
1582
1583class DurationField(Field[datetime.timedelta]):
1584 """
1585 Store timedelta objects.
1586
1587 Use interval on PostgreSQL, INTERVAL DAY TO SECOND on Oracle, and bigint
1588 of microseconds on other databases.
1589 """
1590
1591 empty_strings_allowed = False
1592 default_error_messages = {
1593 "invalid": '"%(value)s" value has an invalid format. It must be in [DD] [[HH:]MM:]ss[.uuuuuu] format.',
1594 }
1595 description = "Duration"
1596
1597 def get_internal_type(self) -> str:
1598 return "DurationField"
1599
1600 def to_python(self, value: Any) -> datetime.timedelta | None:
1601 if value is None:
1602 return value
1603 if isinstance(value, datetime.timedelta):
1604 return value
1605 try:
1606 parsed = parse_duration(value)
1607 except ValueError:
1608 pass
1609 else:
1610 if parsed is not None:
1611 return parsed
1612
1613 raise exceptions.ValidationError(
1614 self.error_messages["invalid"],
1615 code="invalid",
1616 params={"value": value},
1617 )
1618
1619 def get_db_prep_value(
1620 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
1621 ) -> Any:
1622 if connection.features.has_native_duration_field:
1623 return value
1624 if value is None:
1625 return None
1626 return duration_microseconds(value)
1627
1628 def get_db_converters(
1629 self, connection: BaseDatabaseWrapper
1630 ) -> list[Callable[..., Any]]:
1631 converters = []
1632 if not connection.features.has_native_duration_field:
1633 converters.append(connection.ops.convert_durationfield_value)
1634 return converters + super().get_db_converters(connection)
1635
1636 def value_to_string(self, obj: Model) -> str:
1637 val = self.value_from_object(obj)
1638 return "" if val is None else duration_string(val)
1639
1640
1641class EmailField(CharField):
1642 default_validators = [validators.validate_email]
1643 description = "Email address"
1644
1645 def __init__(self, **kwargs: Any):
1646 # max_length=254 to be compliant with RFCs 3696 and 5321
1647 kwargs.setdefault("max_length", 254)
1648 super().__init__(**kwargs)
1649
1650 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1651 name, path, args, kwargs = super().deconstruct()
1652 # We do not exclude max_length if it matches default as we want to change
1653 # the default in future.
1654 return name, path, args, kwargs
1655
1656
1657class FloatField(Field[float]):
1658 empty_strings_allowed = False
1659 default_error_messages = {
1660 "invalid": '"%(value)s" value must be a float.',
1661 }
1662 description = "Floating point number"
1663
1664 def get_prep_value(self, value: Any) -> Any:
1665 value = super().get_prep_value(value)
1666 if value is None:
1667 return None
1668 try:
1669 return float(value)
1670 except (TypeError, ValueError) as e:
1671 raise e.__class__(
1672 f"Field '{self.name}' expected a number but got {value!r}.",
1673 ) from e
1674
1675 def get_internal_type(self) -> str:
1676 return "FloatField"
1677
1678 def to_python(self, value: Any) -> float | None:
1679 if value is None:
1680 return value
1681 try:
1682 return float(value)
1683 except (TypeError, ValueError):
1684 raise exceptions.ValidationError(
1685 self.error_messages["invalid"],
1686 code="invalid",
1687 params={"value": value},
1688 )
1689
1690
1691class IntegerField(Field[int]):
1692 empty_strings_allowed = False
1693 default_error_messages = {
1694 "invalid": '"%(value)s" value must be an integer.',
1695 }
1696 description = "Integer"
1697
1698 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1699 return [
1700 *super().preflight(**kwargs),
1701 *self._check_max_length_warning(),
1702 ]
1703
1704 def _check_max_length_warning(self) -> list[PreflightResult]:
1705 if self.max_length is not None:
1706 return [
1707 PreflightResult(
1708 fix=f"'max_length' is ignored when used with {self.__class__.__name__}. Remove 'max_length' from field.",
1709 obj=self,
1710 id="fields.max_length_ignored",
1711 warning=True,
1712 )
1713 ]
1714 return []
1715
1716 @cached_property
1717 def validators(self) -> list[Callable[..., Any]]:
1718 # These validators can't be added at field initialization time since
1719 # they're based on values retrieved from the database connection.
1720 validators_ = super().validators
1721 internal_type = self.get_internal_type()
1722 min_value, max_value = db_connection.ops.integer_field_range(internal_type)
1723 if min_value is not None and not any(
1724 (
1725 isinstance(validator, validators.MinValueValidator)
1726 and (
1727 validator.limit_value()
1728 if callable(validator.limit_value)
1729 else validator.limit_value
1730 )
1731 >= min_value
1732 )
1733 for validator in validators_
1734 ):
1735 validators_.append(validators.MinValueValidator(min_value))
1736 if max_value is not None and not any(
1737 (
1738 isinstance(validator, validators.MaxValueValidator)
1739 and (
1740 validator.limit_value()
1741 if callable(validator.limit_value)
1742 else validator.limit_value
1743 )
1744 <= max_value
1745 )
1746 for validator in validators_
1747 ):
1748 validators_.append(validators.MaxValueValidator(max_value))
1749 return validators_
1750
1751 def get_prep_value(self, value: Any) -> Any:
1752 value = super().get_prep_value(value)
1753 if value is None:
1754 return None
1755 try:
1756 return int(value)
1757 except (TypeError, ValueError) as e:
1758 raise e.__class__(
1759 f"Field '{self.name}' expected a number but got {value!r}.",
1760 ) from e
1761
1762 def get_db_prep_value(
1763 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
1764 ) -> Any:
1765 value = super().get_db_prep_value(value, connection, prepared)
1766 return connection.ops.adapt_integerfield_value(value, self.get_internal_type())
1767
1768 def get_internal_type(self) -> str:
1769 return "IntegerField"
1770
1771 def to_python(self, value: Any) -> int | None:
1772 if value is None:
1773 return value
1774 try:
1775 return int(value)
1776 except (TypeError, ValueError):
1777 raise exceptions.ValidationError(
1778 self.error_messages["invalid"],
1779 code="invalid",
1780 params={"value": value},
1781 )
1782
1783
1784class BigIntegerField(IntegerField):
1785 description = "Big (8 byte) integer"
1786
1787 def get_internal_type(self) -> str:
1788 return "BigIntegerField"
1789
1790
1791class SmallIntegerField(IntegerField):
1792 description = "Small integer"
1793
1794 def get_internal_type(self) -> str:
1795 return "SmallIntegerField"
1796
1797
1798class GenericIPAddressField(Field[str]):
1799 empty_strings_allowed = False
1800 description = "IP address"
1801 default_error_messages = {}
1802
1803 def __init__(
1804 self,
1805 *,
1806 protocol: str = "both",
1807 unpack_ipv4: bool = False,
1808 **kwargs: Any,
1809 ):
1810 self.unpack_ipv4 = unpack_ipv4
1811 self.protocol = protocol
1812 (
1813 self.default_validators,
1814 invalid_error_message,
1815 ) = validators.ip_address_validators(protocol, unpack_ipv4)
1816 self.default_error_messages["invalid"] = invalid_error_message
1817 kwargs["max_length"] = 39
1818 super().__init__(**kwargs)
1819
1820 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1821 return [
1822 *super().preflight(**kwargs),
1823 *self._check_required_and_null_values(),
1824 ]
1825
1826 def _check_required_and_null_values(self) -> list[PreflightResult]:
1827 if not getattr(self, "allow_null", False) and not getattr(
1828 self, "required", True
1829 ):
1830 return [
1831 PreflightResult(
1832 fix="GenericIPAddressFields cannot have required=False if allow_null=False, "
1833 "as blank values are stored as nulls.",
1834 obj=self,
1835 id="fields.generic_ip_field_null_blank_config",
1836 )
1837 ]
1838 return []
1839
1840 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1841 name, path, args, kwargs = super().deconstruct()
1842 if self.unpack_ipv4 is not False:
1843 kwargs["unpack_ipv4"] = self.unpack_ipv4
1844 if self.protocol != "both":
1845 kwargs["protocol"] = self.protocol
1846 if kwargs.get("max_length") == 39:
1847 del kwargs["max_length"]
1848 return name, path, args, kwargs
1849
1850 def get_internal_type(self) -> str:
1851 return "GenericIPAddressField"
1852
1853 def to_python(self, value: Any) -> str | None:
1854 if value is None:
1855 return None
1856 if not isinstance(value, str):
1857 value = str(value)
1858 value = value.strip()
1859 if ":" in value:
1860 return clean_ipv6_address(
1861 value, self.unpack_ipv4, self.error_messages["invalid"]
1862 )
1863 return value
1864
1865 def get_db_prep_value(
1866 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
1867 ) -> Any:
1868 if not prepared:
1869 value = self.get_prep_value(value)
1870 return connection.ops.adapt_ipaddressfield_value(value)
1871
1872 def get_prep_value(self, value: Any) -> Any:
1873 value = super().get_prep_value(value)
1874 if value is None:
1875 return None
1876 if value and ":" in value:
1877 try:
1878 return clean_ipv6_address(value, self.unpack_ipv4)
1879 except exceptions.ValidationError:
1880 pass
1881 return str(value)
1882
1883
1884class _HasDbType(Protocol):
1885 """Protocol for objects that have a db_type method and integer_field_class."""
1886
1887 integer_field_class: type[IntegerField]
1888
1889 def db_type(self, connection: BaseDatabaseWrapper) -> str | None: ...
1890
1891
1892class PositiveIntegerRelDbTypeMixin(IntegerField):
1893 integer_field_class: type[IntegerField]
1894
1895 def __init_subclass__(cls, **kwargs: Any) -> None:
1896 super().__init_subclass__(**kwargs)
1897 if not hasattr(cls, "integer_field_class"):
1898 # Find the first IntegerField parent in the MRO
1899 integer_parent = next(
1900 (
1901 parent
1902 for parent in cls.__mro__[1:]
1903 if issubclass(parent, IntegerField)
1904 ),
1905 None,
1906 )
1907 if integer_parent is not None:
1908 cls.integer_field_class = integer_parent
1909
1910 def rel_db_type(self: _HasDbType, connection: BaseDatabaseWrapper) -> str | None:
1911 """
1912 Return the data type that a related field pointing to this field should
1913 use. In most cases, a foreign key pointing to a positive integer
1914 primary key will have an integer column data type but some databases
1915 (e.g. MySQL) have an unsigned integer type. In that case
1916 (related_fields_match_type=True), the primary key should return its
1917 db_type.
1918 """
1919 if connection.features.related_fields_match_type:
1920 return self.db_type(connection)
1921 else:
1922 return self.integer_field_class().db_type(connection=connection)
1923
1924
1925class PositiveBigIntegerField(PositiveIntegerRelDbTypeMixin, BigIntegerField):
1926 description = "Positive big integer"
1927
1928 def get_internal_type(self) -> str:
1929 return "PositiveBigIntegerField"
1930
1931
1932class PositiveIntegerField(PositiveIntegerRelDbTypeMixin, IntegerField):
1933 description = "Positive integer"
1934
1935 def get_internal_type(self) -> str:
1936 return "PositiveIntegerField"
1937
1938
1939class PositiveSmallIntegerField(PositiveIntegerRelDbTypeMixin, SmallIntegerField):
1940 description = "Positive small integer"
1941
1942 def get_internal_type(self) -> str:
1943 return "PositiveSmallIntegerField"
1944
1945
1946class TextField(Field[str]):
1947 description = "Text"
1948
1949 def __init__(self, *, db_collation: str | None = None, **kwargs: Any):
1950 super().__init__(**kwargs)
1951 self.db_collation = db_collation
1952
1953 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1954 return [
1955 *super().preflight(**kwargs),
1956 *self._check_db_collation(),
1957 ]
1958
1959 def _check_db_collation(self) -> list[PreflightResult]:
1960 errors = []
1961 if not (
1962 self.db_collation is None
1963 or "supports_collation_on_textfield"
1964 in self.model.model_options.required_db_features
1965 or db_connection.features.supports_collation_on_textfield
1966 ):
1967 errors.append(
1968 PreflightResult(
1969 fix=f"{db_connection.display_name} does not support a database collation on "
1970 "TextFields.",
1971 obj=self,
1972 id="fields.db_collation_unsupported",
1973 ),
1974 )
1975 return errors
1976
1977 def db_parameters(self, connection: BaseDatabaseWrapper) -> DbParameters:
1978 db_params = super().db_parameters(connection)
1979 db_params["collation"] = self.db_collation
1980 return db_params
1981
1982 def get_internal_type(self) -> str:
1983 return "TextField"
1984
1985 def to_python(self, value: Any) -> str | None:
1986 if isinstance(value, str) or value is None:
1987 return value
1988 return str(value)
1989
1990 def get_prep_value(self, value: Any) -> Any:
1991 value = super().get_prep_value(value)
1992 return self.to_python(value)
1993
1994 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1995 name, path, args, kwargs = super().deconstruct()
1996 if self.db_collation:
1997 kwargs["db_collation"] = self.db_collation
1998 return name, path, args, kwargs
1999
2000
2001class TimeField(DateTimeCheckMixin, Field[datetime.time]):
2002 empty_strings_allowed = False
2003 default_error_messages = {
2004 "invalid": '"%(value)s" value has an invalid format. It must be in HH:MM[:ss[.uuuuuu]] format.',
2005 "invalid_time": '"%(value)s" value has the correct format (HH:MM[:ss[.uuuuuu]]) but it is an invalid time.',
2006 }
2007 description = "Time"
2008
2009 def __init__(
2010 self, *, auto_now: bool = False, auto_now_add: bool = False, **kwargs: Any
2011 ):
2012 self.auto_now, self.auto_now_add = auto_now, auto_now_add
2013 if auto_now or auto_now_add:
2014 kwargs["required"] = False
2015 super().__init__(**kwargs)
2016
2017 def _check_fix_default_value(self) -> list[PreflightResult]:
2018 """
2019 Warn that using an actual date or datetime value is probably wrong;
2020 it's only evaluated on server startup.
2021 """
2022 if not self.has_default():
2023 return []
2024
2025 value = self.default
2026 if isinstance(value, datetime.datetime):
2027 now = None
2028 elif isinstance(value, datetime.time):
2029 now = _get_naive_now()
2030 # This will not use the right date in the race condition where now
2031 # is just before the date change and value is just past 0:00.
2032 value = datetime.datetime.combine(now.date(), value)
2033 else:
2034 # No explicit time / datetime value -- no checks necessary
2035 return []
2036 # At this point, value is a datetime object.
2037 return self._check_if_value_fixed(value, now=now)
2038
2039 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
2040 name, path, args, kwargs = super().deconstruct()
2041 if self.auto_now is not False:
2042 kwargs["auto_now"] = self.auto_now
2043 if self.auto_now_add is not False:
2044 kwargs["auto_now_add"] = self.auto_now_add
2045 if self.auto_now or self.auto_now_add:
2046 del kwargs["required"]
2047 return name, path, args, kwargs
2048
2049 def get_internal_type(self) -> str:
2050 return "TimeField"
2051
2052 def to_python(self, value: Any) -> datetime.time | None:
2053 if value is None:
2054 return None
2055 if isinstance(value, datetime.time):
2056 return value
2057 if isinstance(value, datetime.datetime):
2058 # Not usually a good idea to pass in a datetime here (it loses
2059 # information), but this can be a side-effect of interacting with a
2060 # database backend (e.g. Oracle), so we'll be accommodating.
2061 return value.time()
2062
2063 try:
2064 parsed = parse_time(value)
2065 if parsed is not None:
2066 return parsed
2067 except ValueError:
2068 raise exceptions.ValidationError(
2069 self.error_messages["invalid_time"],
2070 code="invalid_time",
2071 params={"value": value},
2072 )
2073
2074 raise exceptions.ValidationError(
2075 self.error_messages["invalid"],
2076 code="invalid",
2077 params={"value": value},
2078 )
2079
2080 def pre_save(self, model_instance: Model, add: bool) -> datetime.time | None:
2081 if self.auto_now or (self.auto_now_add and add):
2082 value = datetime.datetime.now().time()
2083 setattr(model_instance, self.attname, value)
2084 return value
2085 else:
2086 return super().pre_save(model_instance, add)
2087
2088 def get_prep_value(self, value: Any) -> Any:
2089 value = super().get_prep_value(value)
2090 return self.to_python(value)
2091
2092 def get_db_prep_value(
2093 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
2094 ) -> Any:
2095 # Casts times into the format expected by the backend
2096 if not prepared:
2097 value = self.get_prep_value(value)
2098 return connection.ops.adapt_timefield_value(value)
2099
2100 def value_to_string(self, obj: Model) -> str:
2101 val = self.value_from_object(obj)
2102 return "" if val is None else val.isoformat()
2103
2104
2105class URLField(CharField):
2106 default_validators = [validators.URLValidator()]
2107 description = "URL"
2108
2109 def __init__(self, **kwargs: Any):
2110 kwargs.setdefault("max_length", 200)
2111 super().__init__(**kwargs)
2112
2113 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
2114 name, path, args, kwargs = super().deconstruct()
2115 if kwargs.get("max_length") == 200:
2116 del kwargs["max_length"]
2117 return name, path, args, kwargs
2118
2119
2120class BinaryField(Field[bytes | memoryview]):
2121 description = "Raw binary data"
2122 empty_values = [None, b""]
2123
2124 def __init__(self, **kwargs: Any):
2125 super().__init__(**kwargs)
2126 if self.max_length is not None:
2127 self.validators.append(validators.MaxLengthValidator(self.max_length))
2128
2129 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
2130 return [*super().preflight(**kwargs), *self._check_str_default_value()]
2131
2132 def _check_str_default_value(self) -> list[PreflightResult]:
2133 if self.has_default() and isinstance(self.default, str):
2134 return [
2135 PreflightResult(
2136 fix="BinaryField's default cannot be a string. Use bytes "
2137 "content instead.",
2138 obj=self,
2139 id="fields.filefield_upload_to_not_callable",
2140 )
2141 ]
2142 return []
2143
2144 def get_internal_type(self) -> str:
2145 return "BinaryField"
2146
2147 def get_placeholder(
2148 self, value: Any, compiler: SQLCompiler, connection: BaseDatabaseWrapper
2149 ) -> Any:
2150 return connection.ops.binary_placeholder_sql(value)
2151
2152 def get_default(self) -> bytes | memoryview | None:
2153 if self.has_default() and not callable(self.default):
2154 return self.default
2155 default = super().get_default()
2156 if default == "":
2157 return b""
2158 return default
2159
2160 def get_db_prep_value(
2161 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
2162 ) -> Any:
2163 value = super().get_db_prep_value(value, connection, prepared)
2164 if value is not None:
2165 return connection.Database.Binary(value)
2166 return value
2167
2168 def value_to_string(self, obj: Model) -> str:
2169 """Binary data is serialized as base64"""
2170 val = self.value_from_object(obj)
2171 if val is None:
2172 return ""
2173 return b64encode(val).decode("ascii")
2174
2175 def to_python(self, value: Any) -> bytes | memoryview | None:
2176 # If it's a string, it should be base64-encoded data
2177 if isinstance(value, str):
2178 return memoryview(b64decode(value.encode("ascii")))
2179 return value
2180
2181
2182class UUIDField(Field[uuid.UUID]):
2183 default_error_messages = {
2184 "invalid": '"%(value)s" is not a valid UUID.',
2185 }
2186 description = "Universally unique identifier"
2187 empty_strings_allowed = False
2188
2189 def __init__(self, **kwargs: Any):
2190 kwargs["max_length"] = 32
2191 super().__init__(**kwargs)
2192
2193 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
2194 name, path, args, kwargs = super().deconstruct()
2195 del kwargs["max_length"]
2196 return name, path, args, kwargs
2197
2198 def get_internal_type(self) -> str:
2199 return "UUIDField"
2200
2201 def get_prep_value(self, value: Any) -> Any:
2202 value = super().get_prep_value(value)
2203 return self.to_python(value)
2204
2205 def get_db_prep_value(
2206 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
2207 ) -> str | uuid.UUID | None:
2208 if value is None:
2209 return None
2210 if not isinstance(value, uuid.UUID):
2211 value = self.to_python(value)
2212 if value is None:
2213 return None
2214
2215 if connection.features.has_native_uuid_field:
2216 return value
2217 return value.hex
2218
2219 def to_python(self, value: Any) -> uuid.UUID | None:
2220 if value is not None and not isinstance(value, uuid.UUID):
2221 input_form = "int" if isinstance(value, int) else "hex"
2222 try:
2223 return uuid.UUID(**{input_form: value})
2224 except (AttributeError, ValueError):
2225 raise exceptions.ValidationError(
2226 self.error_messages["invalid"],
2227 code="invalid",
2228 params={"value": value},
2229 )
2230 return value
2231
2232
2233class PrimaryKeyField(BigIntegerField):
2234 db_returning = True
2235
2236 def __init__(self):
2237 super().__init__(required=False)
2238 self.primary_key = True
2239 self.auto_created = True
2240
2241 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
2242 errors = super().preflight(**kwargs)
2243 # Remove the reserved_field_name_id error for 'id' field name since PrimaryKeyField is allowed to use it
2244 errors = [e for e in errors if e.id != "fields.reserved_field_name_id"]
2245 return errors
2246
2247 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
2248 # PrimaryKeyField takes no parameters, so we return an empty kwargs dict
2249 return (
2250 self.name,
2251 "plain.models.PrimaryKeyField",
2252 cast(list[Any], []),
2253 cast(dict[str, Any], {}),
2254 )
2255
2256 def validate(self, value: Any, model_instance: Model) -> None:
2257 pass
2258
2259 def get_db_prep_value(
2260 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
2261 ) -> Any:
2262 if not prepared:
2263 value = self.get_prep_value(value)
2264 value = connection.ops.validate_autopk_value(value)
2265 return value
2266
2267 def get_internal_type(self) -> str:
2268 return "PrimaryKeyField"
2269
2270 def rel_db_type(self, connection: BaseDatabaseWrapper) -> str | None:
2271 return BigIntegerField().db_type(connection=connection)