1from __future__ import annotations
2
3import collections.abc
4import copy
5import datetime
6import decimal
7import enum
8import operator
9import uuid
10import warnings
11from base64 import b64decode, b64encode
12from collections.abc import Callable, Sequence
13from functools import cached_property
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Generic,
18 Protocol,
19 Self,
20 TypedDict,
21 TypeVar,
22 cast,
23 overload,
24)
25
26from plain import exceptions, validators
27from plain.models.constants import LOOKUP_SEP
28from plain.models.db import db_connection
29from plain.models.enums import ChoicesMeta
30from plain.models.query_utils import RegisterLookupMixin
31from plain.preflight import PreflightResult
32from plain.utils import timezone
33from plain.utils.datastructures import DictWrapper
34from plain.utils.dateparse import (
35 parse_date,
36 parse_datetime,
37 parse_duration,
38 parse_time,
39)
40from plain.utils.duration import duration_microseconds, duration_string
41from plain.utils.functional import Promise
42from plain.utils.ipv6 import clean_ipv6_address
43from plain.utils.itercompat import is_iterable
44
45from ..registry import models_registry
46
47if TYPE_CHECKING:
48 from plain.models.backends.base.base import BaseDatabaseWrapper
49 from plain.models.base import Model
50 from plain.models.expressions import Col
51 from plain.models.fields.reverse_related import ForeignObjectRel
52 from plain.models.sql.compiler import SQLCompiler
53
54
55class DbParameters(TypedDict, total=False):
56 """Return type for Field.db_parameters()."""
57
58 type: str | None
59 check: str | None
60
61
62__all__ = [
63 "BLANK_CHOICE_DASH",
64 "DbParameters",
65 "PrimaryKeyField",
66 "BigIntegerField",
67 "BinaryField",
68 "BooleanField",
69 "CharField",
70 "DateField",
71 "DateTimeField",
72 "DecimalField",
73 "DurationField",
74 "EmailField",
75 "Empty",
76 "Field",
77 "FloatField",
78 "GenericIPAddressField",
79 "IntegerField",
80 "NOT_PROVIDED",
81 "PositiveBigIntegerField",
82 "PositiveIntegerField",
83 "PositiveSmallIntegerField",
84 "SmallIntegerField",
85 "TextField",
86 "TimeField",
87 "URLField",
88 "UUIDField",
89]
90
91
92class Empty:
93 pass
94
95
96class NOT_PROVIDED:
97 pass
98
99
100# The values to use for "blank" in SelectFields. Will be appended to the start
101# of most "choices" lists.
102BLANK_CHOICE_DASH = [("", "---------")]
103
104
105def _load_field(
106 package_label: str, model_name: str, field_name: str
107) -> Field[Any] | ForeignObjectRel:
108 return models_registry.get_model(package_label, model_name)._model_meta.get_field(
109 field_name
110 )
111
112
113# A guide to Field parameters:
114#
115# * name: The name of the field specified in the model.
116# * attname: The attribute to use on the model object. This is the same as
117# "name", except in the case of ForeignKeys, where "_id" is
118# appended.
119# * column: The database column for this field. This is the same as
120# "attname".
121#
122# Code that introspects values, or does other dynamic things, should use
123# attname.
124
125
126def _empty(of_cls: type) -> Empty:
127 new = Empty()
128 new.__class__ = of_cls
129 return new
130
131
132def return_None() -> None:
133 return None
134
135
136# TypeVar for Field's generic type parameter
137T = TypeVar("T")
138
139
140class Field(RegisterLookupMixin, Generic[T]):
141 """Base class for all field types"""
142
143 # Instance attributes set during field lifecycle
144 # Set by __init__
145 name: str | None
146 max_length: int | None
147 # Set by set_attributes_from_name (called by contribute_to_class)
148 attname: str
149 column: str
150 concrete: bool
151 # Set by contribute_to_class
152 model: type[Model]
153
154 # Designates whether empty strings fundamentally are allowed at the
155 # database level.
156 empty_strings_allowed = True
157 empty_values = list(validators.EMPTY_VALUES)
158
159 default_validators = [] # Default set of validators
160 default_error_messages = {
161 "invalid_choice": "Value %(value)r is not a valid choice.",
162 "allow_null": "This field cannot be null.",
163 "required": "This field is be required.",
164 "unique": "A %(model_name)s with this %(field_label)s already exists.",
165 }
166
167 # Attributes that don't affect a column definition.
168 # These attributes are ignored when altering the field.
169 non_db_attrs = (
170 "required",
171 "choices",
172 "error_messages",
173 "limit_choices_to",
174 # Database-level options are not supported, see #21961.
175 "on_delete",
176 "related_query_name",
177 "validators",
178 )
179
180 # Generic field type description, usually overridden by subclasses
181 def _description(self) -> str:
182 return f"Field of type: {self.__class__.__name__}"
183
184 description = property(_description)
185
186 def __init__(
187 self,
188 *,
189 max_length: int | None = None,
190 required: bool = True,
191 allow_null: bool = False,
192 default: Any = NOT_PROVIDED,
193 choices: Any = None,
194 validators: Sequence[Callable[..., Any]] = (),
195 error_messages: dict[str, str] | None = None,
196 ):
197 self.name = None # Set by set_attributes_from_name
198 self.max_length = max_length
199 self.required, self.allow_null = required, allow_null
200 self.default = default
201 if isinstance(choices, ChoicesMeta):
202 choices = choices.choices
203 elif isinstance(choices, enum.EnumMeta):
204 choices = [(member.value, member.name) for member in choices]
205 if isinstance(choices, collections.abc.Iterator):
206 choices = list(choices)
207 self.choices = choices
208
209 self.primary_key = False
210 self.auto_created = False
211
212 self._validators = list(validators) # Store for deconstruction later
213
214 self._error_messages = error_messages # Store for deconstruction later
215
216 def __str__(self) -> str:
217 """
218 Return "package_label.model_label.field_name" for fields attached to
219 models.
220 """
221 if not hasattr(self, "model"):
222 return super().__str__()
223 model = self.model
224 return f"{model.model_options.label}.{self.name}"
225
226 def __repr__(self) -> str:
227 """Display the module, class, and name of the field."""
228 path = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
229 name = getattr(self, "name", None)
230 if name is not None:
231 return f"<{path}: {name}>"
232 return f"<{path}>"
233
234 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
235 return [
236 *self._check_field_name(),
237 *self._check_choices(),
238 *self._check_null_allowed_for_primary_keys(),
239 *self._check_backend_specific_checks(),
240 *self._check_validators(),
241 ]
242
243 def _check_field_name(self) -> list[PreflightResult]:
244 """
245 Check if field name is valid, i.e. 1) does not end with an
246 underscore, 2) does not contain "__" and 3) is not "id".
247 """
248 assert self.name is not None, "Field name must be set before checking"
249 if self.name.endswith("_"):
250 return [
251 PreflightResult(
252 fix="Field names must not end with an underscore.",
253 obj=self,
254 id="fields.name_ends_with_underscore",
255 )
256 ]
257 elif LOOKUP_SEP in self.name:
258 return [
259 PreflightResult(
260 fix=f'Field names must not contain "{LOOKUP_SEP}".',
261 obj=self,
262 id="fields.name_contains_lookup_separator",
263 )
264 ]
265 elif self.name == "id":
266 return [
267 PreflightResult(
268 fix="'id' is a reserved word that cannot be used as a field name.",
269 obj=self,
270 id="fields.reserved_field_name_id",
271 )
272 ]
273 else:
274 return []
275
276 @classmethod
277 def _choices_is_value(cls, value: Any) -> bool:
278 return isinstance(value, str | Promise) or not is_iterable(value)
279
280 def _check_choices(self) -> list[PreflightResult]:
281 if not self.choices:
282 return []
283
284 if not is_iterable(self.choices) or isinstance(self.choices, str):
285 return [
286 PreflightResult(
287 fix="'choices' must be an iterable (e.g., a list or tuple).",
288 obj=self,
289 id="fields.choices_not_iterable",
290 )
291 ]
292
293 choice_max_length = 0
294 # Expect [group_name, [value, display]]
295 for choices_group in self.choices:
296 try:
297 group_name, group_choices = choices_group
298 except (TypeError, ValueError):
299 # Containing non-pairs
300 break
301 try:
302 if not all(
303 self._choices_is_value(value) and self._choices_is_value(human_name)
304 for value, human_name in group_choices
305 ):
306 break
307 if self.max_length is not None and group_choices:
308 choice_max_length = max(
309 [
310 choice_max_length,
311 *(
312 len(value)
313 for value, _ in group_choices
314 if isinstance(value, str)
315 ),
316 ]
317 )
318 except (TypeError, ValueError):
319 # No groups, choices in the form [value, display]
320 value, human_name = group_name, group_choices
321 if not self._choices_is_value(value) or not self._choices_is_value(
322 human_name
323 ):
324 break
325 if self.max_length is not None and isinstance(value, str):
326 choice_max_length = max(choice_max_length, len(value))
327
328 # Special case: choices=['ab']
329 if isinstance(choices_group, str):
330 break
331 else:
332 if self.max_length is not None and choice_max_length > self.max_length:
333 return [
334 PreflightResult(
335 fix="'max_length' is too small to fit the longest value " # noqa: UP031
336 "in 'choices' (%d characters)." % choice_max_length,
337 obj=self,
338 id="fields.max_length_too_small_for_choices",
339 ),
340 ]
341 return []
342
343 return [
344 PreflightResult(
345 fix="'choices' must be an iterable containing "
346 "(actual value, human readable name) tuples.",
347 obj=self,
348 id="fields.choices_invalid_format",
349 )
350 ]
351
352 def _check_null_allowed_for_primary_keys(self) -> list[PreflightResult]:
353 if self.primary_key and self.allow_null:
354 # We cannot reliably check this for backends like Oracle which
355 # consider NULL and '' to be equal (and thus set up
356 # character-based fields a little differently).
357 return [
358 PreflightResult(
359 fix="Primary keys must not have allow_null=True. "
360 "Set allow_null=False on the field, or "
361 "remove primary_key=True argument.",
362 obj=self,
363 id="fields.primary_key_allows_null",
364 )
365 ]
366 else:
367 return []
368
369 def _check_backend_specific_checks(self) -> list[PreflightResult]:
370 errors = []
371 errors.extend(db_connection.validation.check_field(self))
372 return errors
373
374 def _check_validators(self) -> list[PreflightResult]:
375 errors = []
376 for i, validator in enumerate(self.validators):
377 if not callable(validator):
378 errors.append(
379 PreflightResult(
380 fix=(
381 "All 'validators' must be callable. "
382 f"validators[{i}] ({repr(validator)}) isn't a function or "
383 "instance of a validator class."
384 ),
385 obj=self,
386 id="fields.invalid_validator",
387 )
388 )
389 return errors
390
391 def get_col(self, alias: str | None, output_field: Field | None = None) -> Col:
392 if alias == self.model.model_options.db_table and (
393 output_field is None or output_field == self
394 ):
395 return self.cached_col
396 from plain.models.expressions import Col
397
398 return Col(alias, self, output_field)
399
400 @cached_property
401 def cached_col(self) -> Col:
402 from plain.models.expressions import Col
403
404 return Col(self.model.model_options.db_table, self)
405
406 def select_format(
407 self, compiler: SQLCompiler, sql: str, params: Any
408 ) -> tuple[str, Any]:
409 """
410 Custom format for select clauses. For example, GIS columns need to be
411 selected as AsText(table.col) on MySQL as the table.col data can't be
412 used by Plain.
413 """
414 return sql, params
415
416 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
417 """
418 Return enough information to recreate the field as a 4-tuple:
419
420 * The name of the field on the model, if contribute_to_class() has
421 been run.
422 * The import path of the field, including the class, e.g.
423 plain.models.IntegerField. This should be the most portable
424 version, so less specific may be better.
425 * A list of positional arguments.
426 * A dict of keyword arguments.
427
428 Note that the positional or keyword arguments must contain values of
429 the following types (including inner values of collection types):
430
431 * None, bool, str, int, float, complex, set, frozenset, list, tuple,
432 dict
433 * UUID
434 * datetime.datetime (naive), datetime.date
435 * top-level classes, top-level functions - will be referenced by their
436 full import path
437 * Storage instances - these have their own deconstruct() method
438
439 This is because the values here must be serialized into a text format
440 (possibly new Python code, possibly JSON) and these are the only types
441 with encoding handlers defined.
442
443 There's no need to return the exact way the field was instantiated this
444 time, just ensure that the resulting field is the same - prefer keyword
445 arguments over positional ones, and omit parameters with their default
446 values.
447 """
448 # Short-form way of fetching all the default parameters
449 keywords = {}
450 possibles = {
451 "max_length": None,
452 "required": True,
453 "allow_null": False,
454 "default": NOT_PROVIDED,
455 "choices": None,
456 "validators": [],
457 "error_messages": None,
458 }
459 attr_overrides = {
460 "error_messages": "_error_messages",
461 "validators": "_validators",
462 }
463 equals_comparison = {"choices", "validators"}
464 for name, default in possibles.items():
465 value = getattr(self, attr_overrides.get(name, name))
466 # Unroll anything iterable for choices into a concrete list
467 if name == "choices" and isinstance(value, collections.abc.Iterable):
468 value = list(value)
469 # Do correct kind of comparison
470 if name in equals_comparison:
471 if value != default:
472 keywords[name] = value
473 else:
474 if value is not default:
475 keywords[name] = value
476 # Work out path - we shorten it for known Plain core fields
477 path = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
478 if path.startswith("plain.models.fields.related"):
479 path = path.replace("plain.models.fields.related", "plain.models")
480 elif path.startswith("plain.models.fields.json"):
481 path = path.replace("plain.models.fields.json", "plain.models")
482 elif path.startswith("plain.models.fields.proxy"):
483 path = path.replace("plain.models.fields.proxy", "plain.models")
484 elif path.startswith("plain.models.fields.timezones"):
485 path = path.replace("plain.models.fields.timezones", "plain.models")
486 elif path.startswith("plain.models.fields"):
487 path = path.replace("plain.models.fields", "plain.models")
488 # Return basic info - other fields should override this.
489 # Note: self.name can be None during migration state rendering when fields are cloned
490 return (self.name, path, [], keywords)
491
492 def clone(self) -> Self:
493 """
494 Uses deconstruct() to clone a new copy of this Field.
495 Will not preserve any class attachments/attribute names.
496 """
497 name, path, args, kwargs = self.deconstruct()
498 return self.__class__(*args, **kwargs)
499
500 def __deepcopy__(self, memodict: dict[int, Any]) -> Self:
501 # We don't have to deepcopy very much here, since most things are not
502 # intended to be altered after initial creation.
503 obj = copy.copy(self)
504 memodict[id(self)] = obj
505 return obj
506
507 def __copy__(self) -> Self:
508 # We need to avoid hitting __reduce__, so define this
509 # slightly weird copy construct.
510 obj = Empty()
511 obj.__class__ = self.__class__
512 obj.__dict__ = self.__dict__.copy()
513 return cast(Self, obj)
514
515 def __reduce__(
516 self,
517 ) -> (
518 tuple[Callable[..., Any], tuple[Any, ...], dict[str, Any]]
519 | tuple[Callable[..., Field[Any] | ForeignObjectRel], tuple[str, str, str]]
520 ):
521 """
522 Pickling should return the model._model_meta.fields instance of the field,
523 not a new copy of that field. So, use the app registry to load the
524 model and then the field back.
525 """
526 model = getattr(self, "model", None)
527 if model is None:
528 # Fields are sometimes used without attaching them to models (for
529 # example in aggregation). In this case give back a plain field
530 # instance. The code below will create a new empty instance of
531 # class self.__class__, then update its dict with self.__dict__
532 # values - so, this is very close to normal pickle.
533 state = self.__dict__.copy()
534 # The _get_default cached_property can't be pickled due to lambda
535 # usage.
536 state.pop("_get_default", None)
537 return _empty, (self.__class__,), state
538 assert self.name is not None
539 options = model.model_options
540 return _load_field, (
541 options.package_label,
542 options.object_name,
543 self.name,
544 )
545
546 def get_id_value_on_save(self, instance: Model) -> T | None:
547 """
548 Hook to generate new primary key values on save. This method is called when
549 saving instances with no primary key value set. If this method returns
550 something else than None, then the returned value is used when saving
551 the new instance.
552 """
553 if self.default:
554 return self.get_default()
555 return None
556
557 def to_python(self, value: Any) -> T | None:
558 """
559 Convert the input value into the expected Python data type, raising
560 plain.exceptions.ValidationError if the data can't be converted.
561 Return the converted value. Subclasses should override this.
562 """
563 return value
564
565 @cached_property
566 def error_messages(self) -> dict[str, str]:
567 messages = {}
568 for c in reversed(self.__class__.__mro__):
569 messages.update(getattr(c, "default_error_messages", {}))
570 messages.update(self._error_messages or {})
571 return messages
572
573 @cached_property
574 def validators(self) -> list[Callable[..., Any]]:
575 """
576 Some validators can't be created at field initialization time.
577 This method provides a way to delay their creation until required.
578 """
579 return [*self.default_validators, *self._validators]
580
581 def run_validators(self, value: Any) -> None:
582 if value in self.empty_values:
583 return
584
585 errors = []
586 for v in self.validators:
587 try:
588 v(value)
589 except exceptions.ValidationError as e:
590 if hasattr(e, "code") and e.code in self.error_messages:
591 e.message = self.error_messages[e.code]
592 errors.extend(e.error_list)
593
594 if errors:
595 raise exceptions.ValidationError(errors)
596
597 def validate(self, value: Any, model_instance: Model) -> None:
598 """
599 Validate value and raise ValidationError if necessary. Subclasses
600 should override this to provide validation logic.
601 """
602
603 if self.choices is not None and value not in self.empty_values:
604 for option_key, option_value in self.choices:
605 if isinstance(option_value, list | tuple):
606 # This is an optgroup, so look inside the group for
607 # options.
608 for optgroup_key, optgroup_value in option_value:
609 if value == optgroup_key:
610 return
611 elif value == option_key:
612 return
613 raise exceptions.ValidationError(
614 self.error_messages["invalid_choice"],
615 code="invalid_choice",
616 params={"value": value},
617 )
618
619 if value is None and not self.allow_null:
620 raise exceptions.ValidationError(
621 self.error_messages["allow_null"], code="allow_null"
622 )
623
624 if self.required and value in self.empty_values:
625 raise exceptions.ValidationError(
626 self.error_messages["required"], code="required"
627 )
628
629 def clean(self, value: Any, model_instance: Model) -> T | None:
630 """
631 Convert the value's type and run validation. Validation errors
632 from to_python() and validate() are propagated. Return the correct
633 value if no error is raised.
634 """
635 value = self.to_python(value)
636 self.validate(value, model_instance)
637 self.run_validators(value)
638 return value
639
640 def db_type_parameters(self, connection: BaseDatabaseWrapper) -> DictWrapper:
641 return DictWrapper(self.__dict__, connection.ops.quote_name, "qn_")
642
643 def db_check(self, connection: BaseDatabaseWrapper) -> str | None:
644 """
645 Return the database column check constraint for this field, for the
646 provided connection. Works the same way as db_type() for the case that
647 get_internal_type() does not map to a preexisting model field.
648 """
649 data = self.db_type_parameters(connection)
650 try:
651 return (
652 connection.data_type_check_constraints[self.get_internal_type()] % data
653 )
654 except KeyError:
655 return None
656
657 def db_type(self, connection: BaseDatabaseWrapper) -> str | None:
658 """
659 Return the database column data type for this field, for the provided
660 connection.
661 """
662 # The default implementation of this method looks at the
663 # backend-specific data_types dictionary, looking up the field by its
664 # "internal type".
665 #
666 # A Field class can implement the get_internal_type() method to specify
667 # which *preexisting* Plain Field class it's most similar to -- i.e.,
668 # a custom field might be represented by a TEXT column type, which is
669 # the same as the TextField Plain field type, which means the custom
670 # field's get_internal_type() returns 'TextField'.
671 #
672 # But the limitation of the get_internal_type() / data_types approach
673 # is that it cannot handle database column types that aren't already
674 # mapped to one of the built-in Plain field types. In this case, you
675 # can implement db_type() instead of get_internal_type() to specify
676 # exactly which wacky database column type you want to use.
677 data = self.db_type_parameters(connection)
678 try:
679 column_type = connection.data_types[self.get_internal_type()]
680 except KeyError:
681 return None
682 else:
683 # column_type is either a single-parameter function or a string.
684 if callable(column_type):
685 return column_type(data)
686 return column_type % data
687
688 def rel_db_type(self, connection: BaseDatabaseWrapper) -> str | None:
689 """
690 Return the data type that a related field pointing to this field should
691 use. For example, this method is called by ForeignKeyField to determine its data type.
692 """
693 return self.db_type(connection)
694
695 def cast_db_type(self, connection: BaseDatabaseWrapper) -> str | None:
696 """Return the data type to use in the Cast() function."""
697 db_type = connection.ops.cast_data_types.get(self.get_internal_type())
698 if db_type:
699 return db_type % self.db_type_parameters(connection)
700 return self.db_type(connection)
701
702 def db_parameters(self, connection: BaseDatabaseWrapper) -> DbParameters:
703 """
704 Extension of db_type(), providing a range of different return values
705 (type, checks). This will look at db_type(), allowing custom model
706 fields to override it.
707 """
708 type_string = self.db_type(connection)
709 check_string = self.db_check(connection)
710 return {
711 "type": type_string,
712 "check": check_string,
713 }
714
715 def db_type_suffix(self, connection: BaseDatabaseWrapper) -> str | None:
716 return connection.data_types_suffix.get(self.get_internal_type())
717
718 def get_db_converters(
719 self, connection: BaseDatabaseWrapper
720 ) -> list[Callable[..., Any]]:
721 if from_db_value := getattr(self, "from_db_value", None):
722 return [from_db_value]
723 return []
724
725 @property
726 def db_returning(self) -> bool:
727 """
728 Private API intended only to be used by Plain itself. Currently only
729 the PostgreSQL backend supports returning multiple fields on a model.
730 """
731 return False
732
733 def set_attributes_from_name(self, name: str) -> None:
734 self.name = self.name or name
735 self.attname = self.get_attname()
736 self.column = self.attname
737 self.concrete = self.column is not None
738
739 def contribute_to_class(self, cls: type[Model], name: str) -> None:
740 """
741 Register the field with the model class it belongs to.
742
743 Field now acts as its own descriptor - it stays on the class and handles
744 __get__/__set__/__delete__ directly.
745 """
746 self.set_attributes_from_name(name)
747 self.model = cls
748 cls._model_meta.add_field(self)
749
750 # Field is now a descriptor itself - ensure it's set on the class
751 # This is important for inherited fields that get deepcopied in Meta.__get__
752 if self.column:
753 setattr(cls, self.attname, self)
754
755 # Descriptor protocol implementation
756 @overload
757 def __get__(self, instance: None, owner: type[Model]) -> Self: ...
758
759 @overload
760 def __get__(self, instance: Model, owner: type[Model]) -> T: ...
761
762 def __get__(self, instance: Model | None, owner: type[Model]) -> Self | T:
763 """
764 Descriptor __get__ for attribute access.
765
766 Class access (User.email) returns the Field descriptor itself.
767 Instance access (user.email) returns the field value from instance.__dict__,
768 with lazy loading support if the value is not yet loaded.
769 """
770 # Class access - return the Field descriptor
771 if instance is None:
772 return self
773
774 # If field hasn't been contributed to a class yet (e.g., used standalone
775 # as an output_field in aggregates), just return self
776 if not hasattr(self, "attname"):
777 return self
778
779 # Instance access - get value from instance dict
780 data = instance.__dict__
781 field_name = self.attname
782
783 # If value not in dict, lazy load from database
784 if field_name not in data:
785 # Deferred field - load it from the database
786 instance.refresh_from_db(fields=[field_name])
787
788 return cast(T, data.get(field_name))
789
790 def __set__(self, instance: Model, value: Any) -> None:
791 """
792 Descriptor __set__ for attribute assignment.
793
794 Validates and converts the value using to_python(), then stores it
795 in instance.__dict__[attname].
796 """
797 # Safety check: ensure field has been properly initialized
798 if not hasattr(self, "attname"):
799 raise AttributeError(
800 f"Field {self.__class__.__name__} has not been initialized properly. "
801 f"The field's contribute_to_class() has not been called yet. "
802 f"This usually means the field is being used before it was added to a model class."
803 )
804
805 # Convert/validate the value
806 if value is not None:
807 value = self.to_python(value)
808
809 # Store in instance dict
810 instance.__dict__[self.attname] = value
811
812 def __delete__(self, instance: Model) -> None:
813 """
814 Descriptor __delete__ for attribute deletion.
815
816 Removes the value from instance.__dict__.
817 """
818 try:
819 del instance.__dict__[self.attname]
820 except KeyError:
821 raise AttributeError(
822 f"{instance.__class__.__name__!r} object has no attribute {self.attname!r}"
823 )
824
825 def get_attname(self) -> str:
826 assert self.name is not None # Field name must be set
827 return self.name
828
829 def get_internal_type(self) -> str:
830 return self.__class__.__name__
831
832 def pre_save(self, model_instance: Model, add: bool) -> T | None:
833 """Return field's value just before saving."""
834 return getattr(model_instance, self.attname)
835
836 def get_prep_value(self, value: Any) -> Any:
837 """Perform preliminary non-db specific value checks and conversions."""
838 if isinstance(value, Promise):
839 value = value._proxy____cast()
840 return value
841
842 def get_db_prep_value(
843 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
844 ) -> Any:
845 """
846 Return field's value prepared for interacting with the database backend.
847
848 Used by the default implementations of get_db_prep_save().
849 """
850 if not prepared:
851 value = self.get_prep_value(value)
852 return value
853
854 def get_db_prep_save(self, value: Any, connection: BaseDatabaseWrapper) -> Any:
855 """Return field's value prepared for saving into a database."""
856 if hasattr(value, "as_sql"):
857 return value
858 return self.get_db_prep_value(value, connection=connection, prepared=False)
859
860 def has_default(self) -> bool:
861 """Return a boolean of whether this field has a default value."""
862 return self.default is not NOT_PROVIDED
863
864 def get_default(self) -> T | None:
865 """Return the default value for this field."""
866 return self._get_default()
867
868 @cached_property
869 def _get_default(self) -> Callable[[], Any]:
870 if self.has_default():
871 if callable(self.default):
872 return self.default
873 return lambda: self.default
874
875 if not self.empty_strings_allowed or self.allow_null:
876 return return_None
877 return str # return empty string
878
879 def get_limit_choices_to(self) -> Any:
880 """
881 Return ``limit_choices_to`` for this model field.
882 Overridden by related fields (ForeignKey, etc.).
883 """
884 raise NotImplementedError(
885 "get_limit_choices_to() should only be called on related fields"
886 )
887
888 def get_choices(
889 self,
890 include_blank: bool = True,
891 blank_choice: list[tuple[str, str]] = BLANK_CHOICE_DASH,
892 limit_choices_to: Any = None,
893 ordering: tuple[str, ...] = (),
894 ) -> list[tuple[Any, str]]:
895 """
896 Return choices with a default blank choices included, for use
897 as <select> choices for this field.
898 """
899 if self.choices is not None:
900 choices = list(self.choices)
901 if include_blank:
902 blank_defined = any(
903 choice in ("", None) for choice, _ in self.flatchoices
904 )
905 if not blank_defined:
906 choices = blank_choice + choices
907 return choices
908 remote_field = getattr(self, "remote_field", None)
909 if remote_field is None or getattr(remote_field, "model", None) is None:
910 return blank_choice if include_blank else []
911 rel_model = remote_field.model
912 limit_choices_to = limit_choices_to or self.get_limit_choices_to()
913 related_field_name = (
914 remote_field.get_related_field().attname
915 if hasattr(remote_field, "get_related_field")
916 else "id"
917 )
918 choice_func = operator.attrgetter(related_field_name)
919 qs = rel_model.query.complex_filter(limit_choices_to)
920 if ordering:
921 qs = qs.order_by(*ordering)
922 return (blank_choice if include_blank else []) + [
923 (choice_func(x), str(x)) for x in qs
924 ]
925
926 def value_to_string(self, obj: Model) -> str:
927 """
928 Return a string value of this field from the passed obj.
929 This is used by the serialization framework.
930 """
931 return str(self.value_from_object(obj))
932
933 def _get_flatchoices(self) -> list[tuple[Any, Any]]:
934 """Flattened version of choices tuple."""
935 if self.choices is None:
936 return []
937 flat = []
938 for choice, value in self.choices:
939 if isinstance(value, list | tuple):
940 flat.extend(value)
941 else:
942 flat.append((choice, value))
943 return flat
944
945 flatchoices = property(_get_flatchoices)
946
947 def save_form_data(self, instance: Model, data: Any) -> None:
948 assert self.name is not None
949 setattr(instance, self.name, data)
950
951 def value_from_object(self, obj: Model) -> T | None:
952 """Return the value of this field in the given model instance."""
953 return getattr(obj, self.attname)
954
955
956class BooleanField(Field[bool]):
957 empty_strings_allowed = False
958 default_error_messages = {
959 "invalid": '"%(value)s" value must be either True or False.',
960 "invalid_nullable": '"%(value)s" value must be either True, False, or None.',
961 }
962 description = "Boolean (Either True or False)"
963
964 def get_internal_type(self) -> str:
965 return "BooleanField"
966
967 def to_python(self, value: Any) -> bool | None:
968 if self.allow_null and value in self.empty_values:
969 return None
970 if value in (True, False):
971 # 1/0 are equal to True/False. bool() converts former to latter.
972 return bool(value)
973 if value in ("t", "True", "1"):
974 return True
975 if value in ("f", "False", "0"):
976 return False
977 raise exceptions.ValidationError(
978 self.error_messages["invalid_nullable" if self.allow_null else "invalid"],
979 code="invalid",
980 params={"value": value},
981 )
982
983 def get_prep_value(self, value: Any) -> Any:
984 value = super().get_prep_value(value)
985 if value is None:
986 return None
987 return self.to_python(value)
988
989
990class CharField(Field[str]):
991 def __init__(self, **kwargs: Any):
992 super().__init__(**kwargs)
993 if self.max_length is not None:
994 self.validators.append(validators.MaxLengthValidator(self.max_length))
995
996 @property
997 def description(self) -> str:
998 if self.max_length is not None:
999 return "String (up to %(max_length)s)"
1000 else:
1001 return "String (unlimited)"
1002
1003 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1004 return [
1005 *super().preflight(**kwargs),
1006 *self._check_max_length_attribute(),
1007 ]
1008
1009 def _check_max_length_attribute(self, **kwargs: Any) -> list[PreflightResult]:
1010 if self.max_length is None:
1011 if (
1012 db_connection.features.supports_unlimited_charfield
1013 or "supports_unlimited_charfield"
1014 in self.model.model_options.required_db_features
1015 ):
1016 return []
1017 return [
1018 PreflightResult(
1019 fix="CharFields must define a 'max_length' attribute.",
1020 obj=self,
1021 id="fields.charfield_missing_max_length",
1022 )
1023 ]
1024 elif (
1025 not isinstance(self.max_length, int)
1026 or isinstance(self.max_length, bool)
1027 or self.max_length <= 0
1028 ):
1029 return [
1030 PreflightResult(
1031 fix="'max_length' must be a positive integer.",
1032 obj=self,
1033 id="fields.charfield_invalid_max_length",
1034 )
1035 ]
1036 else:
1037 return []
1038
1039 def cast_db_type(self, connection: BaseDatabaseWrapper) -> str | None:
1040 if self.max_length is None:
1041 return connection.ops.cast_char_field_without_max_length
1042 return super().cast_db_type(connection)
1043
1044 def get_internal_type(self) -> str:
1045 return "CharField"
1046
1047 def to_python(self, value: Any) -> str | None:
1048 if isinstance(value, str) or value is None:
1049 return value
1050 return str(value)
1051
1052 def get_prep_value(self, value: Any) -> Any:
1053 value = super().get_prep_value(value)
1054 return self.to_python(value)
1055
1056
1057def _to_naive(value: datetime.datetime) -> datetime.datetime:
1058 if timezone.is_aware(value):
1059 value = timezone.make_naive(value, datetime.UTC)
1060 return value
1061
1062
1063def _get_naive_now() -> datetime.datetime:
1064 return _to_naive(timezone.now())
1065
1066
1067class DateTimeCheckMixin(Field):
1068 auto_now: bool
1069 auto_now_add: bool
1070
1071 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1072 return [
1073 *super().preflight(**kwargs),
1074 *self._check_mutually_exclusive_options(),
1075 *self._check_fix_default_value(),
1076 ]
1077
1078 def _check_mutually_exclusive_options(self) -> list[PreflightResult]:
1079 # auto_now, auto_now_add, and default are mutually exclusive
1080 # options. The use of more than one of these options together
1081 # will trigger an Error
1082 mutually_exclusive_options = [
1083 self.auto_now_add,
1084 self.auto_now,
1085 self.has_default(),
1086 ]
1087 enabled_options = [
1088 option not in (None, False) for option in mutually_exclusive_options
1089 ].count(True)
1090 if enabled_options > 1:
1091 return [
1092 PreflightResult(
1093 fix="The options auto_now, auto_now_add, and default "
1094 "are mutually exclusive. Only one of these options "
1095 "may be present.",
1096 obj=self,
1097 id="fields.datetime_auto_options_mutually_exclusive",
1098 )
1099 ]
1100 else:
1101 return []
1102
1103 def _check_fix_default_value(self) -> list[PreflightResult]:
1104 return []
1105
1106 # Concrete subclasses use this in their implementations of
1107 # _check_fix_default_value().
1108 def _check_if_value_fixed(
1109 self,
1110 value: datetime.date | datetime.datetime,
1111 now: datetime.datetime | None = None,
1112 ) -> list[PreflightResult]:
1113 """
1114 Check if the given value appears to have been provided as a "fixed"
1115 time value, and include a warning in the returned list if it does. The
1116 value argument must be a date object or aware/naive datetime object. If
1117 now is provided, it must be a naive datetime object.
1118 """
1119 if now is None:
1120 now = _get_naive_now()
1121 offset = datetime.timedelta(seconds=10)
1122 lower = now - offset
1123 upper = now + offset
1124 if isinstance(value, datetime.datetime):
1125 value = _to_naive(value)
1126 else:
1127 assert isinstance(value, datetime.date)
1128 lower = lower.date()
1129 upper = upper.date()
1130 if lower <= value <= upper:
1131 return [
1132 PreflightResult(
1133 fix="Fixed default value provided. "
1134 "It seems you set a fixed date / time / datetime "
1135 "value as default for this field. This may not be "
1136 "what you want. If you want to have the current date "
1137 "as default, use `plain.utils.timezone.now`",
1138 obj=self,
1139 id="fields.datetime_naive_default_value",
1140 warning=True,
1141 )
1142 ]
1143 return []
1144
1145
1146class DateField(DateTimeCheckMixin, Field[datetime.date]):
1147 empty_strings_allowed = False
1148 default_error_messages = {
1149 "invalid": '"%(value)s" value has an invalid date format. It must be in YYYY-MM-DD format.',
1150 "invalid_date": '"%(value)s" value has the correct format (YYYY-MM-DD) but it is an invalid date.',
1151 }
1152 description = "Date (without time)"
1153
1154 def __init__(
1155 self, *, auto_now: bool = False, auto_now_add: bool = False, **kwargs: Any
1156 ):
1157 self.auto_now, self.auto_now_add = auto_now, auto_now_add
1158 if auto_now or auto_now_add:
1159 kwargs["required"] = False
1160 super().__init__(**kwargs)
1161
1162 def _check_fix_default_value(self) -> list[PreflightResult]:
1163 """
1164 Warn that using an actual date or datetime value is probably wrong;
1165 it's only evaluated on server startup.
1166 """
1167 if not self.has_default():
1168 return []
1169
1170 value = self.default
1171 if isinstance(value, datetime.datetime):
1172 value = _to_naive(value).date()
1173 elif isinstance(value, datetime.date):
1174 pass
1175 else:
1176 # No explicit date / datetime value -- no checks necessary
1177 return []
1178 # At this point, value is a date object.
1179 return self._check_if_value_fixed(value)
1180
1181 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1182 name, path, args, kwargs = super().deconstruct()
1183 if self.auto_now:
1184 kwargs["auto_now"] = True
1185 if self.auto_now_add:
1186 kwargs["auto_now_add"] = True
1187 if self.auto_now or self.auto_now_add:
1188 del kwargs["required"]
1189 return name, path, args, kwargs
1190
1191 def get_internal_type(self) -> str:
1192 return "DateField"
1193
1194 def to_python(self, value: Any) -> datetime.date | None:
1195 if value is None:
1196 return value
1197 if isinstance(value, datetime.datetime):
1198 if timezone.is_aware(value):
1199 # Convert aware datetimes to the default time zone
1200 # before casting them to dates (#17742).
1201 default_timezone = timezone.get_default_timezone()
1202 value = timezone.make_naive(value, default_timezone)
1203 return value.date()
1204 if isinstance(value, datetime.date):
1205 return value
1206
1207 try:
1208 parsed = parse_date(value)
1209 if parsed is not None:
1210 return parsed
1211 except ValueError:
1212 raise exceptions.ValidationError(
1213 self.error_messages["invalid_date"],
1214 code="invalid_date",
1215 params={"value": value},
1216 )
1217
1218 raise exceptions.ValidationError(
1219 self.error_messages["invalid"],
1220 code="invalid",
1221 params={"value": value},
1222 )
1223
1224 def pre_save(self, model_instance: Model, add: bool) -> datetime.date | None:
1225 if self.auto_now or (self.auto_now_add and add):
1226 value = datetime.date.today()
1227 setattr(model_instance, self.attname, value)
1228 return value
1229 else:
1230 return super().pre_save(model_instance, add)
1231
1232 def get_prep_value(self, value: Any) -> Any:
1233 value = super().get_prep_value(value)
1234 return self.to_python(value)
1235
1236 def get_db_prep_value(
1237 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
1238 ) -> Any:
1239 # Casts dates into the format expected by the backend
1240 if not prepared:
1241 value = self.get_prep_value(value)
1242 return connection.ops.adapt_datefield_value(value)
1243
1244 def value_to_string(self, obj: Model) -> str:
1245 val = self.value_from_object(obj)
1246 return "" if val is None else val.isoformat()
1247
1248
1249class DateTimeField(DateField):
1250 empty_strings_allowed = False
1251 default_error_messages = {
1252 "invalid": '"%(value)s" value has an invalid format. It must be in YYYY-MM-DD HH:MM[:ss[.uuuuuu]][TZ] format.',
1253 "invalid_date": '"%(value)s" value has the correct format (YYYY-MM-DD) but it is an invalid date.',
1254 "invalid_datetime": '"%(value)s" value has the correct format (YYYY-MM-DD HH:MM[:ss[.uuuuuu]][TZ]) but it is an invalid date/time.',
1255 }
1256 description = "Date (with time)"
1257
1258 # __init__ is inherited from DateField
1259
1260 def _check_fix_default_value(self) -> list[PreflightResult]:
1261 """
1262 Warn that using an actual date or datetime value is probably wrong;
1263 it's only evaluated on server startup.
1264 """
1265 if not self.has_default():
1266 return []
1267
1268 value = self.default
1269 if isinstance(value, datetime.datetime | datetime.date):
1270 return self._check_if_value_fixed(value)
1271 # No explicit date / datetime value -- no checks necessary.
1272 return []
1273
1274 def get_internal_type(self) -> str:
1275 return "DateTimeField"
1276
1277 def to_python(self, value: Any) -> datetime.datetime | None:
1278 if value is None:
1279 return value
1280 if isinstance(value, datetime.datetime):
1281 return value
1282 if isinstance(value, datetime.date):
1283 value = datetime.datetime(value.year, value.month, value.day)
1284
1285 # For backwards compatibility, interpret naive datetimes in
1286 # local time. This won't work during DST change, but we can't
1287 # do much about it, so we let the exceptions percolate up the
1288 # call stack.
1289 warnings.warn(
1290 f"DateTimeField {self.model.__name__}.{self.name} received a naive datetime "
1291 f"({value}) while time zone support is active.",
1292 RuntimeWarning,
1293 )
1294 default_timezone = timezone.get_default_timezone()
1295 value = timezone.make_aware(value, default_timezone)
1296
1297 return value
1298
1299 try:
1300 parsed = parse_datetime(value)
1301 if parsed is not None:
1302 return parsed
1303 except ValueError:
1304 raise exceptions.ValidationError(
1305 self.error_messages["invalid_datetime"],
1306 code="invalid_datetime",
1307 params={"value": value},
1308 )
1309
1310 try:
1311 parsed = parse_date(value)
1312 if parsed is not None:
1313 return datetime.datetime(parsed.year, parsed.month, parsed.day)
1314 except ValueError:
1315 raise exceptions.ValidationError(
1316 self.error_messages["invalid_date"],
1317 code="invalid_date",
1318 params={"value": value},
1319 )
1320
1321 raise exceptions.ValidationError(
1322 self.error_messages["invalid"],
1323 code="invalid",
1324 params={"value": value},
1325 )
1326
1327 def pre_save(self, model_instance: Model, add: bool) -> datetime.datetime | None:
1328 if self.auto_now or (self.auto_now_add and add):
1329 value = timezone.now()
1330 setattr(model_instance, self.attname, value)
1331 return value
1332 else:
1333 return getattr(model_instance, self.attname)
1334
1335 def get_prep_value(self, value: Any) -> Any:
1336 value = super().get_prep_value(value)
1337 value = self.to_python(value)
1338 if value is not None and timezone.is_naive(value):
1339 # For backwards compatibility, interpret naive datetimes in local
1340 # time. This won't work during DST change, but we can't do much
1341 # about it, so we let the exceptions percolate up the call stack.
1342 try:
1343 name = f"{self.model.__name__}.{self.name}"
1344 except AttributeError:
1345 name = "(unbound)"
1346 warnings.warn(
1347 f"DateTimeField {name} received a naive datetime ({value})"
1348 " while time zone support is active.",
1349 RuntimeWarning,
1350 )
1351 default_timezone = timezone.get_default_timezone()
1352 value = timezone.make_aware(value, default_timezone)
1353 return value
1354
1355 def get_db_prep_value(
1356 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
1357 ) -> Any:
1358 # Casts datetimes into the format expected by the backend
1359 if not prepared:
1360 value = self.get_prep_value(value)
1361 return connection.ops.adapt_datetimefield_value(value)
1362
1363 def value_to_string(self, obj: Model) -> str:
1364 val = self.value_from_object(obj)
1365 return "" if val is None else val.isoformat()
1366
1367
1368class DecimalField(Field[decimal.Decimal]):
1369 empty_strings_allowed = False
1370 default_error_messages = {
1371 "invalid": '"%(value)s" value must be a decimal number.',
1372 }
1373 description = "Decimal number"
1374
1375 def __init__(
1376 self,
1377 *,
1378 max_digits: int | None = None,
1379 decimal_places: int | None = None,
1380 **kwargs: Any,
1381 ):
1382 self.max_digits, self.decimal_places = max_digits, decimal_places
1383 super().__init__(**kwargs)
1384
1385 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1386 errors = super().preflight(**kwargs)
1387
1388 digits_errors = [
1389 *self._check_decimal_places(),
1390 *self._check_max_digits(),
1391 ]
1392 if not digits_errors:
1393 errors.extend(self._check_decimal_places_and_max_digits())
1394 else:
1395 errors.extend(digits_errors)
1396 return errors
1397
1398 def _check_decimal_places(self) -> list[PreflightResult]:
1399 if self.decimal_places is None:
1400 return [
1401 PreflightResult(
1402 fix="DecimalFields must define a 'decimal_places' attribute.",
1403 obj=self,
1404 id="fields.decimalfield_missing_decimal_places",
1405 )
1406 ]
1407 try:
1408 decimal_places = int(self.decimal_places)
1409 if decimal_places < 0:
1410 raise ValueError()
1411 except ValueError:
1412 return [
1413 PreflightResult(
1414 fix="'decimal_places' must be a non-negative integer.",
1415 obj=self,
1416 id="fields.decimalfield_invalid_decimal_places",
1417 )
1418 ]
1419 else:
1420 return []
1421
1422 def _check_max_digits(self) -> list[PreflightResult]:
1423 if self.max_digits is None:
1424 return [
1425 PreflightResult(
1426 fix="DecimalFields must define a 'max_digits' attribute.",
1427 obj=self,
1428 id="fields.decimalfield_missing_max_digits",
1429 )
1430 ]
1431 try:
1432 max_digits = int(self.max_digits)
1433 if max_digits <= 0:
1434 raise ValueError()
1435 except ValueError:
1436 return [
1437 PreflightResult(
1438 fix="'max_digits' must be a positive integer.",
1439 obj=self,
1440 id="fields.decimalfield_invalid_max_digits",
1441 )
1442 ]
1443 else:
1444 return []
1445
1446 def _check_decimal_places_and_max_digits(self) -> list[PreflightResult]:
1447 if self.decimal_places is None or self.max_digits is None:
1448 return []
1449 if self.decimal_places > self.max_digits:
1450 return [
1451 PreflightResult(
1452 fix="'max_digits' must be greater or equal to 'decimal_places'.",
1453 obj=self,
1454 id="fields.decimalfield_decimal_places_exceeds_max_digits",
1455 )
1456 ]
1457 return []
1458
1459 @cached_property
1460 def validators(self) -> list[Callable[..., Any]]:
1461 return super().validators + [
1462 validators.DecimalValidator(self.max_digits, self.decimal_places)
1463 ]
1464
1465 @cached_property
1466 def context(self) -> decimal.Context:
1467 return decimal.Context(prec=self.max_digits)
1468
1469 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1470 name, path, args, kwargs = super().deconstruct()
1471 if self.max_digits is not None:
1472 kwargs["max_digits"] = self.max_digits
1473 if self.decimal_places is not None:
1474 kwargs["decimal_places"] = self.decimal_places
1475 return name, path, args, kwargs
1476
1477 def get_internal_type(self) -> str:
1478 return "DecimalField"
1479
1480 def to_python(self, value: Any) -> decimal.Decimal | None:
1481 if value is None:
1482 return value
1483 try:
1484 if isinstance(value, float):
1485 decimal_value = self.context.create_decimal_from_float(value)
1486 else:
1487 decimal_value = decimal.Decimal(value)
1488 except (decimal.InvalidOperation, TypeError, ValueError):
1489 raise exceptions.ValidationError(
1490 self.error_messages["invalid"],
1491 code="invalid",
1492 params={"value": value},
1493 )
1494 if not decimal_value.is_finite():
1495 raise exceptions.ValidationError(
1496 self.error_messages["invalid"],
1497 code="invalid",
1498 params={"value": value},
1499 )
1500 return decimal_value
1501
1502 def get_db_prep_value(
1503 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
1504 ) -> Any:
1505 if not prepared:
1506 value = self.get_prep_value(value)
1507 if hasattr(value, "as_sql"):
1508 return value
1509 return connection.ops.adapt_decimalfield_value(
1510 value, self.max_digits, self.decimal_places
1511 )
1512
1513 def get_prep_value(self, value: Any) -> Any:
1514 value = super().get_prep_value(value)
1515 return self.to_python(value)
1516
1517
1518class DurationField(Field[datetime.timedelta]):
1519 """
1520 Store timedelta objects.
1521
1522 Use interval on PostgreSQL, INTERVAL DAY TO SECOND on Oracle, and bigint
1523 of microseconds on other databases.
1524 """
1525
1526 empty_strings_allowed = False
1527 default_error_messages = {
1528 "invalid": '"%(value)s" value has an invalid format. It must be in [DD] [[HH:]MM:]ss[.uuuuuu] format.',
1529 }
1530 description = "Duration"
1531
1532 def get_internal_type(self) -> str:
1533 return "DurationField"
1534
1535 def to_python(self, value: Any) -> datetime.timedelta | None:
1536 if value is None:
1537 return value
1538 if isinstance(value, datetime.timedelta):
1539 return value
1540 try:
1541 parsed = parse_duration(value)
1542 except ValueError:
1543 pass
1544 else:
1545 if parsed is not None:
1546 return parsed
1547
1548 raise exceptions.ValidationError(
1549 self.error_messages["invalid"],
1550 code="invalid",
1551 params={"value": value},
1552 )
1553
1554 def get_db_prep_value(
1555 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
1556 ) -> Any:
1557 if connection.features.has_native_duration_field:
1558 return value
1559 if value is None:
1560 return None
1561 return duration_microseconds(value)
1562
1563 def get_db_converters(
1564 self, connection: BaseDatabaseWrapper
1565 ) -> list[Callable[..., Any]]:
1566 converters = []
1567 if not connection.features.has_native_duration_field:
1568 converters.append(connection.ops.convert_durationfield_value)
1569 return converters + super().get_db_converters(connection)
1570
1571 def value_to_string(self, obj: Model) -> str:
1572 val = self.value_from_object(obj)
1573 return "" if val is None else duration_string(val)
1574
1575
1576class EmailField(CharField):
1577 default_validators = [validators.validate_email]
1578 description = "Email address"
1579
1580 def __init__(self, **kwargs: Any):
1581 # max_length=254 to be compliant with RFCs 3696 and 5321
1582 kwargs.setdefault("max_length", 254)
1583 super().__init__(**kwargs)
1584
1585 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1586 name, path, args, kwargs = super().deconstruct()
1587 # We do not exclude max_length if it matches default as we want to change
1588 # the default in future.
1589 return name, path, args, kwargs
1590
1591
1592class FloatField(Field[float]):
1593 empty_strings_allowed = False
1594 default_error_messages = {
1595 "invalid": '"%(value)s" value must be a float.',
1596 }
1597 description = "Floating point number"
1598
1599 def get_prep_value(self, value: Any) -> Any:
1600 value = super().get_prep_value(value)
1601 if value is None:
1602 return None
1603 try:
1604 return float(value)
1605 except (TypeError, ValueError) as e:
1606 raise e.__class__(
1607 f"Field '{self.name}' expected a number but got {value!r}.",
1608 ) from e
1609
1610 def get_internal_type(self) -> str:
1611 return "FloatField"
1612
1613 def to_python(self, value: Any) -> float | None:
1614 if value is None:
1615 return value
1616 try:
1617 return float(value)
1618 except (TypeError, ValueError):
1619 raise exceptions.ValidationError(
1620 self.error_messages["invalid"],
1621 code="invalid",
1622 params={"value": value},
1623 )
1624
1625
1626class IntegerField(Field[int]):
1627 empty_strings_allowed = False
1628 default_error_messages = {
1629 "invalid": '"%(value)s" value must be an integer.',
1630 }
1631 description = "Integer"
1632
1633 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1634 return [
1635 *super().preflight(**kwargs),
1636 *self._check_max_length_warning(),
1637 ]
1638
1639 def _check_max_length_warning(self) -> list[PreflightResult]:
1640 if self.max_length is not None:
1641 return [
1642 PreflightResult(
1643 fix=f"'max_length' is ignored when used with {self.__class__.__name__}. Remove 'max_length' from field.",
1644 obj=self,
1645 id="fields.max_length_ignored",
1646 warning=True,
1647 )
1648 ]
1649 return []
1650
1651 @cached_property
1652 def validators(self) -> list[Callable[..., Any]]:
1653 # These validators can't be added at field initialization time since
1654 # they're based on values retrieved from the database connection.
1655 validators_ = super().validators
1656 internal_type = self.get_internal_type()
1657 min_value, max_value = db_connection.ops.integer_field_range(internal_type)
1658 if min_value is not None and not any(
1659 (
1660 isinstance(validator, validators.MinValueValidator)
1661 and (
1662 validator.limit_value()
1663 if callable(validator.limit_value)
1664 else validator.limit_value
1665 )
1666 >= min_value
1667 )
1668 for validator in validators_
1669 ):
1670 validators_.append(validators.MinValueValidator(min_value))
1671 if max_value is not None and not any(
1672 (
1673 isinstance(validator, validators.MaxValueValidator)
1674 and (
1675 validator.limit_value()
1676 if callable(validator.limit_value)
1677 else validator.limit_value
1678 )
1679 <= max_value
1680 )
1681 for validator in validators_
1682 ):
1683 validators_.append(validators.MaxValueValidator(max_value))
1684 return validators_
1685
1686 def get_prep_value(self, value: Any) -> Any:
1687 value = super().get_prep_value(value)
1688 if value is None:
1689 return None
1690 try:
1691 return int(value)
1692 except (TypeError, ValueError) as e:
1693 raise e.__class__(
1694 f"Field '{self.name}' expected a number but got {value!r}.",
1695 ) from e
1696
1697 def get_db_prep_value(
1698 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
1699 ) -> Any:
1700 value = super().get_db_prep_value(value, connection, prepared)
1701 return connection.ops.adapt_integerfield_value(value, self.get_internal_type())
1702
1703 def get_internal_type(self) -> str:
1704 return "IntegerField"
1705
1706 def to_python(self, value: Any) -> int | None:
1707 if value is None:
1708 return value
1709 try:
1710 return int(value)
1711 except (TypeError, ValueError):
1712 raise exceptions.ValidationError(
1713 self.error_messages["invalid"],
1714 code="invalid",
1715 params={"value": value},
1716 )
1717
1718
1719class BigIntegerField(IntegerField):
1720 description = "Big (8 byte) integer"
1721
1722 def get_internal_type(self) -> str:
1723 return "BigIntegerField"
1724
1725
1726class SmallIntegerField(IntegerField):
1727 description = "Small integer"
1728
1729 def get_internal_type(self) -> str:
1730 return "SmallIntegerField"
1731
1732
1733class GenericIPAddressField(Field[str]):
1734 empty_strings_allowed = False
1735 description = "IP address"
1736 default_error_messages = {}
1737
1738 def __init__(
1739 self,
1740 *,
1741 protocol: str = "both",
1742 unpack_ipv4: bool = False,
1743 **kwargs: Any,
1744 ):
1745 self.unpack_ipv4 = unpack_ipv4
1746 self.protocol = protocol
1747 (
1748 self.default_validators,
1749 invalid_error_message,
1750 ) = validators.ip_address_validators(protocol, unpack_ipv4)
1751 self.default_error_messages["invalid"] = invalid_error_message
1752 kwargs["max_length"] = 39
1753 super().__init__(**kwargs)
1754
1755 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1756 return [
1757 *super().preflight(**kwargs),
1758 *self._check_required_and_null_values(),
1759 ]
1760
1761 def _check_required_and_null_values(self) -> list[PreflightResult]:
1762 if not getattr(self, "allow_null", False) and not getattr(
1763 self, "required", True
1764 ):
1765 return [
1766 PreflightResult(
1767 fix="GenericIPAddressFields cannot have required=False if allow_null=False, "
1768 "as blank values are stored as nulls.",
1769 obj=self,
1770 id="fields.generic_ip_field_null_blank_config",
1771 )
1772 ]
1773 return []
1774
1775 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1776 name, path, args, kwargs = super().deconstruct()
1777 if self.unpack_ipv4 is not False:
1778 kwargs["unpack_ipv4"] = self.unpack_ipv4
1779 if self.protocol != "both":
1780 kwargs["protocol"] = self.protocol
1781 if kwargs.get("max_length") == 39:
1782 del kwargs["max_length"]
1783 return name, path, args, kwargs
1784
1785 def get_internal_type(self) -> str:
1786 return "GenericIPAddressField"
1787
1788 def to_python(self, value: Any) -> str | None:
1789 if value is None:
1790 return None
1791 if not isinstance(value, str):
1792 value = str(value)
1793 value = value.strip()
1794 if ":" in value:
1795 return clean_ipv6_address(
1796 value, self.unpack_ipv4, self.error_messages["invalid"]
1797 )
1798 return value
1799
1800 def get_db_prep_value(
1801 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
1802 ) -> Any:
1803 if not prepared:
1804 value = self.get_prep_value(value)
1805 return connection.ops.adapt_ipaddressfield_value(value)
1806
1807 def get_prep_value(self, value: Any) -> Any:
1808 value = super().get_prep_value(value)
1809 if value is None:
1810 return None
1811 if value and ":" in value:
1812 try:
1813 return clean_ipv6_address(value, self.unpack_ipv4)
1814 except exceptions.ValidationError:
1815 pass
1816 return str(value)
1817
1818
1819class _HasDbType(Protocol):
1820 """Protocol for objects that have a db_type method and integer_field_class."""
1821
1822 integer_field_class: type[IntegerField]
1823
1824 def db_type(self, connection: BaseDatabaseWrapper) -> str | None: ...
1825
1826
1827class PositiveIntegerRelDbTypeMixin(IntegerField):
1828 integer_field_class: type[IntegerField]
1829
1830 def __init_subclass__(cls, **kwargs: Any) -> None:
1831 super().__init_subclass__(**kwargs)
1832 if not hasattr(cls, "integer_field_class"):
1833 # Find the first IntegerField parent in the MRO
1834 integer_parent = next(
1835 (
1836 parent
1837 for parent in cls.__mro__[1:]
1838 if issubclass(parent, IntegerField)
1839 ),
1840 None,
1841 )
1842 if integer_parent is not None:
1843 cls.integer_field_class = integer_parent
1844
1845 def rel_db_type(self: _HasDbType, connection: BaseDatabaseWrapper) -> str | None:
1846 """
1847 Return the data type that a related field pointing to this field should
1848 use. In most cases, a foreign key pointing to a positive integer
1849 primary key will have an integer column data type but some databases
1850 (e.g. MySQL) have an unsigned integer type. In that case
1851 (related_fields_match_type=True), the primary key should return its
1852 db_type.
1853 """
1854 if connection.features.related_fields_match_type:
1855 return self.db_type(connection)
1856 else:
1857 return self.integer_field_class().db_type(connection=connection)
1858
1859
1860class PositiveBigIntegerField(PositiveIntegerRelDbTypeMixin, BigIntegerField):
1861 description = "Positive big integer"
1862
1863 def get_internal_type(self) -> str:
1864 return "PositiveBigIntegerField"
1865
1866
1867class PositiveIntegerField(PositiveIntegerRelDbTypeMixin, IntegerField):
1868 description = "Positive integer"
1869
1870 def get_internal_type(self) -> str:
1871 return "PositiveIntegerField"
1872
1873
1874class PositiveSmallIntegerField(PositiveIntegerRelDbTypeMixin, SmallIntegerField):
1875 description = "Positive small integer"
1876
1877 def get_internal_type(self) -> str:
1878 return "PositiveSmallIntegerField"
1879
1880
1881class TextField(Field[str]):
1882 description = "Text"
1883
1884 def get_internal_type(self) -> str:
1885 return "TextField"
1886
1887 def to_python(self, value: Any) -> str | None:
1888 if isinstance(value, str) or value is None:
1889 return value
1890 return str(value)
1891
1892 def get_prep_value(self, value: Any) -> Any:
1893 value = super().get_prep_value(value)
1894 return self.to_python(value)
1895
1896
1897class TimeField(DateTimeCheckMixin, Field[datetime.time]):
1898 empty_strings_allowed = False
1899 default_error_messages = {
1900 "invalid": '"%(value)s" value has an invalid format. It must be in HH:MM[:ss[.uuuuuu]] format.',
1901 "invalid_time": '"%(value)s" value has the correct format (HH:MM[:ss[.uuuuuu]]) but it is an invalid time.',
1902 }
1903 description = "Time"
1904
1905 def __init__(
1906 self, *, auto_now: bool = False, auto_now_add: bool = False, **kwargs: Any
1907 ):
1908 self.auto_now, self.auto_now_add = auto_now, auto_now_add
1909 if auto_now or auto_now_add:
1910 kwargs["required"] = False
1911 super().__init__(**kwargs)
1912
1913 def _check_fix_default_value(self) -> list[PreflightResult]:
1914 """
1915 Warn that using an actual date or datetime value is probably wrong;
1916 it's only evaluated on server startup.
1917 """
1918 if not self.has_default():
1919 return []
1920
1921 value = self.default
1922 if isinstance(value, datetime.datetime):
1923 now = None
1924 elif isinstance(value, datetime.time):
1925 now = _get_naive_now()
1926 # This will not use the right date in the race condition where now
1927 # is just before the date change and value is just past 0:00.
1928 value = datetime.datetime.combine(now.date(), value)
1929 else:
1930 # No explicit time / datetime value -- no checks necessary
1931 return []
1932 # At this point, value is a datetime object.
1933 return self._check_if_value_fixed(value, now=now)
1934
1935 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1936 name, path, args, kwargs = super().deconstruct()
1937 if self.auto_now is not False:
1938 kwargs["auto_now"] = self.auto_now
1939 if self.auto_now_add is not False:
1940 kwargs["auto_now_add"] = self.auto_now_add
1941 if self.auto_now or self.auto_now_add:
1942 del kwargs["required"]
1943 return name, path, args, kwargs
1944
1945 def get_internal_type(self) -> str:
1946 return "TimeField"
1947
1948 def to_python(self, value: Any) -> datetime.time | None:
1949 if value is None:
1950 return None
1951 if isinstance(value, datetime.time):
1952 return value
1953 if isinstance(value, datetime.datetime):
1954 # Not usually a good idea to pass in a datetime here (it loses
1955 # information), but this can be a side-effect of interacting with a
1956 # database backend (e.g. Oracle), so we'll be accommodating.
1957 return value.time()
1958
1959 try:
1960 parsed = parse_time(value)
1961 if parsed is not None:
1962 return parsed
1963 except ValueError:
1964 raise exceptions.ValidationError(
1965 self.error_messages["invalid_time"],
1966 code="invalid_time",
1967 params={"value": value},
1968 )
1969
1970 raise exceptions.ValidationError(
1971 self.error_messages["invalid"],
1972 code="invalid",
1973 params={"value": value},
1974 )
1975
1976 def pre_save(self, model_instance: Model, add: bool) -> datetime.time | None:
1977 if self.auto_now or (self.auto_now_add and add):
1978 value = datetime.datetime.now().time()
1979 setattr(model_instance, self.attname, value)
1980 return value
1981 else:
1982 return super().pre_save(model_instance, add)
1983
1984 def get_prep_value(self, value: Any) -> Any:
1985 value = super().get_prep_value(value)
1986 return self.to_python(value)
1987
1988 def get_db_prep_value(
1989 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
1990 ) -> Any:
1991 # Casts times into the format expected by the backend
1992 if not prepared:
1993 value = self.get_prep_value(value)
1994 return connection.ops.adapt_timefield_value(value)
1995
1996 def value_to_string(self, obj: Model) -> str:
1997 val = self.value_from_object(obj)
1998 return "" if val is None else val.isoformat()
1999
2000
2001class URLField(CharField):
2002 default_validators = [validators.URLValidator()]
2003 description = "URL"
2004
2005 def __init__(self, **kwargs: Any):
2006 kwargs.setdefault("max_length", 200)
2007 super().__init__(**kwargs)
2008
2009 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
2010 name, path, args, kwargs = super().deconstruct()
2011 if kwargs.get("max_length") == 200:
2012 del kwargs["max_length"]
2013 return name, path, args, kwargs
2014
2015
2016class BinaryField(Field[bytes | memoryview]):
2017 description = "Raw binary data"
2018 empty_values = [None, b""]
2019
2020 def __init__(self, **kwargs: Any):
2021 super().__init__(**kwargs)
2022 if self.max_length is not None:
2023 self.validators.append(validators.MaxLengthValidator(self.max_length))
2024
2025 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
2026 return [*super().preflight(**kwargs), *self._check_str_default_value()]
2027
2028 def _check_str_default_value(self) -> list[PreflightResult]:
2029 if self.has_default() and isinstance(self.default, str):
2030 return [
2031 PreflightResult(
2032 fix="BinaryField's default cannot be a string. Use bytes "
2033 "content instead.",
2034 obj=self,
2035 id="fields.filefield_upload_to_not_callable",
2036 )
2037 ]
2038 return []
2039
2040 def get_internal_type(self) -> str:
2041 return "BinaryField"
2042
2043 def get_placeholder(
2044 self, value: Any, compiler: SQLCompiler, connection: BaseDatabaseWrapper
2045 ) -> Any:
2046 return connection.ops.binary_placeholder_sql(value)
2047
2048 def get_default(self) -> bytes | memoryview | None:
2049 if self.has_default() and not callable(self.default):
2050 return self.default
2051 default = super().get_default()
2052 if default == "":
2053 return b""
2054 return default
2055
2056 def get_db_prep_value(
2057 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
2058 ) -> Any:
2059 value = super().get_db_prep_value(value, connection, prepared)
2060 if value is not None:
2061 return connection.Database.Binary(value)
2062 return value
2063
2064 def value_to_string(self, obj: Model) -> str:
2065 """Binary data is serialized as base64"""
2066 val = self.value_from_object(obj)
2067 if val is None:
2068 return ""
2069 return b64encode(val).decode("ascii")
2070
2071 def to_python(self, value: Any) -> bytes | memoryview | None:
2072 # If it's a string, it should be base64-encoded data
2073 if isinstance(value, str):
2074 return memoryview(b64decode(value.encode("ascii")))
2075 return value
2076
2077
2078class UUIDField(Field[uuid.UUID]):
2079 default_error_messages = {
2080 "invalid": '"%(value)s" is not a valid UUID.',
2081 }
2082 description = "Universally unique identifier"
2083 empty_strings_allowed = False
2084
2085 def __init__(self, **kwargs: Any):
2086 kwargs["max_length"] = 32
2087 super().__init__(**kwargs)
2088
2089 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
2090 name, path, args, kwargs = super().deconstruct()
2091 del kwargs["max_length"]
2092 return name, path, args, kwargs
2093
2094 def get_internal_type(self) -> str:
2095 return "UUIDField"
2096
2097 def get_prep_value(self, value: Any) -> Any:
2098 value = super().get_prep_value(value)
2099 return self.to_python(value)
2100
2101 def get_db_prep_value(
2102 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
2103 ) -> str | uuid.UUID | None:
2104 if value is None:
2105 return None
2106 if not isinstance(value, uuid.UUID):
2107 value = self.to_python(value)
2108 if value is None:
2109 return None
2110
2111 if connection.features.has_native_uuid_field:
2112 return value
2113 return value.hex
2114
2115 def to_python(self, value: Any) -> uuid.UUID | None:
2116 if value is not None and not isinstance(value, uuid.UUID):
2117 input_form = "int" if isinstance(value, int) else "hex"
2118 try:
2119 return uuid.UUID(**{input_form: value})
2120 except (AttributeError, ValueError):
2121 raise exceptions.ValidationError(
2122 self.error_messages["invalid"],
2123 code="invalid",
2124 params={"value": value},
2125 )
2126 return value
2127
2128
2129class PrimaryKeyField(BigIntegerField):
2130 db_returning = True
2131
2132 def __init__(self):
2133 super().__init__(required=False)
2134 self.primary_key = True
2135 self.auto_created = True
2136
2137 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
2138 errors = super().preflight(**kwargs)
2139 # Remove the reserved_field_name_id error for 'id' field name since PrimaryKeyField is allowed to use it
2140 errors = [e for e in errors if e.id != "fields.reserved_field_name_id"]
2141 return errors
2142
2143 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
2144 # PrimaryKeyField takes no parameters, so we return an empty kwargs dict
2145 return (
2146 self.name,
2147 "plain.models.PrimaryKeyField",
2148 cast(list[Any], []),
2149 cast(dict[str, Any], {}),
2150 )
2151
2152 def validate(self, value: Any, model_instance: Model) -> None:
2153 pass
2154
2155 def get_db_prep_value(
2156 self, value: Any, connection: BaseDatabaseWrapper, prepared: bool = False
2157 ) -> Any:
2158 if not prepared:
2159 value = self.get_prep_value(value)
2160 value = connection.ops.validate_autopk_value(value)
2161 return value
2162
2163 def get_internal_type(self) -> str:
2164 return "PrimaryKeyField"
2165
2166 def rel_db_type(self, connection: BaseDatabaseWrapper) -> str | None:
2167 return BigIntegerField().db_type(connection=connection)