1from __future__ import annotations
2
3import collections.abc
4import copy
5import datetime
6import decimal
7import enum
8import operator
9import uuid
10import warnings
11from base64 import b64decode, b64encode
12from collections.abc import Callable, Sequence
13from functools import cached_property
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Protocol,
18 Self,
19 TypedDict,
20 cast,
21 overload,
22)
23
24import psycopg
25
26from plain import exceptions, validators
27from plain.models.constants import LOOKUP_SEP
28from plain.models.enums import ChoicesMeta
29from plain.models.postgres.sql import (
30 CAST_CHAR_FIELD_WITHOUT_MAX_LENGTH,
31 CAST_DATA_TYPES,
32 DATA_TYPE_CHECK_CONSTRAINTS,
33 DATA_TYPES,
34 DATA_TYPES_SUFFIX,
35 INTEGER_FIELD_RANGES,
36 adapt_integerfield_value,
37 adapt_ipaddressfield_value,
38 quote_name,
39)
40from plain.models.query_utils import RegisterLookupMixin
41from plain.preflight import PreflightResult
42from plain.utils import timezone
43from plain.utils.datastructures import DictWrapper
44from plain.utils.dateparse import (
45 parse_date,
46 parse_datetime,
47 parse_duration,
48 parse_time,
49)
50from plain.utils.duration import duration_string
51from plain.utils.functional import Promise
52from plain.utils.ipv6 import clean_ipv6_address
53from plain.utils.itercompat import is_iterable
54
55from ..registry import models_registry
56
57if TYPE_CHECKING:
58 from plain.models.base import Model
59 from plain.models.expressions import Col
60 from plain.models.fields.reverse_related import ForeignObjectRel
61 from plain.models.postgres.connection import DatabaseConnection
62 from plain.models.sql.compiler import SQLCompiler
63
64
65class DbParameters(TypedDict, total=False):
66 """Return type for Field.db_parameters()."""
67
68 type: str | None
69 check: str | None
70
71
72__all__ = [
73 "BLANK_CHOICE_DASH",
74 "DbParameters",
75 "PrimaryKeyField",
76 "BigIntegerField",
77 "BinaryField",
78 "BooleanField",
79 "CharField",
80 "DateField",
81 "DateTimeField",
82 "DecimalField",
83 "DurationField",
84 "EmailField",
85 "Empty",
86 "Field",
87 "FloatField",
88 "GenericIPAddressField",
89 "IntegerField",
90 "NOT_PROVIDED",
91 "PositiveBigIntegerField",
92 "PositiveIntegerField",
93 "PositiveSmallIntegerField",
94 "SmallIntegerField",
95 "TextField",
96 "TimeField",
97 "URLField",
98 "UUIDField",
99]
100
101
102class Empty:
103 pass
104
105
106class NOT_PROVIDED:
107 pass
108
109
110# The values to use for "blank" in SelectFields. Will be appended to the start
111# of most "choices" lists.
112BLANK_CHOICE_DASH = [("", "---------")]
113
114
115def _load_field(
116 package_label: str, model_name: str, field_name: str
117) -> Field[Any] | ForeignObjectRel:
118 return models_registry.get_model(package_label, model_name)._model_meta.get_field(
119 field_name
120 )
121
122
123# A guide to Field parameters:
124#
125# * name: The name of the field specified in the model.
126# * attname: The attribute to use on the model object. This is the same as
127# "name", except in the case of ForeignKeys, where "_id" is
128# appended.
129# * column: The database column for this field. This is the same as
130# "attname".
131#
132# Code that introspects values, or does other dynamic things, should use
133# attname.
134
135
136def _empty(of_cls: type) -> Empty:
137 new = Empty()
138 new.__class__ = of_cls
139 return new
140
141
142def return_None() -> None:
143 return None
144
145
146class Field[T](RegisterLookupMixin):
147 """Base class for all field types"""
148
149 # Instance attributes set during field lifecycle
150 # Set by __init__
151 name: str | None
152 max_length: int | None
153 # Set by set_attributes_from_name (called by contribute_to_class)
154 attname: str
155 column: str
156 concrete: bool
157 # Set by contribute_to_class
158 model: type[Model]
159
160 # Designates whether empty strings fundamentally are allowed at the
161 # database level.
162 empty_strings_allowed = True
163 empty_values = list(validators.EMPTY_VALUES)
164
165 default_validators = [] # Default set of validators
166 default_error_messages = {
167 "invalid_choice": "Value %(value)r is not a valid choice.",
168 "allow_null": "This field cannot be null.",
169 "required": "This field is be required.",
170 "unique": "A %(model_name)s with this %(field_label)s already exists.",
171 }
172
173 # Attributes that don't affect a column definition.
174 # These attributes are ignored when altering the field.
175 non_db_attrs = (
176 "required",
177 "choices",
178 "error_messages",
179 "limit_choices_to",
180 # Database-level options are not supported, see #21961.
181 "on_delete",
182 "related_query_name",
183 "validators",
184 )
185
186 # Generic field type description, usually overridden by subclasses
187 def _description(self) -> str:
188 return f"Field of type: {self.__class__.__name__}"
189
190 description = property(_description)
191
192 def __init__(
193 self,
194 *,
195 max_length: int | None = None,
196 required: bool = True,
197 allow_null: bool = False,
198 default: Any = NOT_PROVIDED,
199 choices: Any = None,
200 validators: Sequence[Callable[..., Any]] = (),
201 error_messages: dict[str, str] | None = None,
202 ):
203 self.name = None # Set by set_attributes_from_name
204 self.max_length = max_length
205 self.required, self.allow_null = required, allow_null
206 self.default = default
207 if isinstance(choices, ChoicesMeta):
208 choices = choices.choices
209 elif isinstance(choices, enum.EnumMeta):
210 choices = [(member.value, member.name) for member in choices]
211 if isinstance(choices, collections.abc.Iterator):
212 choices = list(choices)
213 self.choices = choices
214
215 self.primary_key = False
216 self.auto_created = False
217
218 self._validators = list(validators) # Store for deconstruction later
219
220 self._error_messages = error_messages # Store for deconstruction later
221
222 def __str__(self) -> str:
223 """
224 Return "package_label.model_label.field_name" for fields attached to
225 models.
226 """
227 if not hasattr(self, "model"):
228 return super().__str__()
229 model = self.model
230 return f"{model.model_options.label}.{self.name}"
231
232 def __repr__(self) -> str:
233 """Display the module, class, and name of the field."""
234 path = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
235 name = getattr(self, "name", None)
236 if name is not None:
237 return f"<{path}: {name}>"
238 return f"<{path}>"
239
240 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
241 return [
242 *self._check_field_name(),
243 *self._check_choices(),
244 *self._check_null_allowed_for_primary_keys(),
245 *self._check_validators(),
246 ]
247
248 def _check_field_name(self) -> list[PreflightResult]:
249 """
250 Check if field name is valid, i.e. 1) does not end with an
251 underscore, 2) does not contain "__" and 3) is not "id".
252 """
253 assert self.name is not None, "Field name must be set before checking"
254 if self.name.endswith("_"):
255 return [
256 PreflightResult(
257 fix="Field names must not end with an underscore.",
258 obj=self,
259 id="fields.name_ends_with_underscore",
260 )
261 ]
262 elif LOOKUP_SEP in self.name:
263 return [
264 PreflightResult(
265 fix=f'Field names must not contain "{LOOKUP_SEP}".',
266 obj=self,
267 id="fields.name_contains_lookup_separator",
268 )
269 ]
270 elif self.name == "id":
271 return [
272 PreflightResult(
273 fix="'id' is a reserved word that cannot be used as a field name.",
274 obj=self,
275 id="fields.reserved_field_name_id",
276 )
277 ]
278 else:
279 return []
280
281 @classmethod
282 def _choices_is_value(cls, value: Any) -> bool:
283 return isinstance(value, str | Promise) or not is_iterable(value)
284
285 def _check_choices(self) -> list[PreflightResult]:
286 if not self.choices:
287 return []
288
289 if not is_iterable(self.choices) or isinstance(self.choices, str):
290 return [
291 PreflightResult(
292 fix="'choices' must be an iterable (e.g., a list or tuple).",
293 obj=self,
294 id="fields.choices_not_iterable",
295 )
296 ]
297
298 choice_max_length = 0
299 # Expect [group_name, [value, display]]
300 for choices_group in self.choices:
301 try:
302 group_name, group_choices = choices_group
303 except (TypeError, ValueError):
304 # Containing non-pairs
305 break
306 try:
307 if not all(
308 self._choices_is_value(value) and self._choices_is_value(human_name)
309 for value, human_name in group_choices
310 ):
311 break
312 if self.max_length is not None and group_choices:
313 choice_max_length = max(
314 [
315 choice_max_length,
316 *(
317 len(value)
318 for value, _ in group_choices
319 if isinstance(value, str)
320 ),
321 ]
322 )
323 except (TypeError, ValueError):
324 # No groups, choices in the form [value, display]
325 value, human_name = group_name, group_choices
326 if not self._choices_is_value(value) or not self._choices_is_value(
327 human_name
328 ):
329 break
330 if self.max_length is not None and isinstance(value, str):
331 choice_max_length = max(choice_max_length, len(value))
332
333 # Special case: choices=['ab']
334 if isinstance(choices_group, str):
335 break
336 else:
337 if self.max_length is not None and choice_max_length > self.max_length:
338 return [
339 PreflightResult(
340 fix="'max_length' is too small to fit the longest value " # noqa: UP031
341 "in 'choices' (%d characters)." % choice_max_length,
342 obj=self,
343 id="fields.max_length_too_small_for_choices",
344 ),
345 ]
346 return []
347
348 return [
349 PreflightResult(
350 fix="'choices' must be an iterable containing "
351 "(actual value, human readable name) tuples.",
352 obj=self,
353 id="fields.choices_invalid_format",
354 )
355 ]
356
357 def _check_null_allowed_for_primary_keys(self) -> list[PreflightResult]:
358 if self.primary_key and self.allow_null:
359 return [
360 PreflightResult(
361 fix="Primary keys must not have allow_null=True. "
362 "Set allow_null=False on the field, or "
363 "remove primary_key=True argument.",
364 obj=self,
365 id="fields.primary_key_allows_null",
366 )
367 ]
368 else:
369 return []
370
371 def _check_validators(self) -> list[PreflightResult]:
372 errors = []
373 for i, validator in enumerate(self.validators):
374 if not callable(validator):
375 errors.append(
376 PreflightResult(
377 fix=(
378 "All 'validators' must be callable. "
379 f"validators[{i}] ({repr(validator)}) isn't a function or "
380 "instance of a validator class."
381 ),
382 obj=self,
383 id="fields.invalid_validator",
384 )
385 )
386 return errors
387
388 def get_col(self, alias: str | None, output_field: Field | None = None) -> Col:
389 if alias == self.model.model_options.db_table and (
390 output_field is None or output_field == self
391 ):
392 return self.cached_col
393 from plain.models.expressions import Col
394
395 return Col(alias, self, output_field)
396
397 @cached_property
398 def cached_col(self) -> Col:
399 from plain.models.expressions import Col
400
401 return Col(self.model.model_options.db_table, self)
402
403 def select_format(
404 self, compiler: SQLCompiler, sql: str, params: Any
405 ) -> tuple[str, Any]:
406 """
407 Custom format for select clauses.
408 """
409 return sql, params
410
411 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
412 """
413 Return enough information to recreate the field as a 4-tuple:
414
415 * The name of the field on the model, if contribute_to_class() has
416 been run.
417 * The import path of the field, including the class, e.g.
418 plain.models.IntegerField. This should be the most portable
419 version, so less specific may be better.
420 * A list of positional arguments.
421 * A dict of keyword arguments.
422
423 Note that the positional or keyword arguments must contain values of
424 the following types (including inner values of collection types):
425
426 * None, bool, str, int, float, complex, set, frozenset, list, tuple,
427 dict
428 * UUID
429 * datetime.datetime (naive), datetime.date
430 * top-level classes, top-level functions - will be referenced by their
431 full import path
432 * Storage instances - these have their own deconstruct() method
433
434 This is because the values here must be serialized into a text format
435 (possibly new Python code, possibly JSON) and these are the only types
436 with encoding handlers defined.
437
438 There's no need to return the exact way the field was instantiated this
439 time, just ensure that the resulting field is the same - prefer keyword
440 arguments over positional ones, and omit parameters with their default
441 values.
442 """
443 # Short-form way of fetching all the default parameters
444 keywords = {}
445 possibles = {
446 "max_length": None,
447 "required": True,
448 "allow_null": False,
449 "default": NOT_PROVIDED,
450 "choices": None,
451 "validators": [],
452 "error_messages": None,
453 }
454 attr_overrides = {
455 "error_messages": "_error_messages",
456 "validators": "_validators",
457 }
458 equals_comparison = {"choices", "validators"}
459 for name, default in possibles.items():
460 value = getattr(self, attr_overrides.get(name, name))
461 # Unroll anything iterable for choices into a concrete list
462 if name == "choices" and isinstance(value, collections.abc.Iterable):
463 value = list(value)
464 # Do correct kind of comparison
465 if name in equals_comparison:
466 if value != default:
467 keywords[name] = value
468 else:
469 if value is not default:
470 keywords[name] = value
471 # Work out path - we shorten it for known Plain core fields
472 path = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
473 if path.startswith("plain.models.fields.related"):
474 path = path.replace("plain.models.fields.related", "plain.models")
475 elif path.startswith("plain.models.fields.json"):
476 path = path.replace("plain.models.fields.json", "plain.models")
477 elif path.startswith("plain.models.fields.proxy"):
478 path = path.replace("plain.models.fields.proxy", "plain.models")
479 elif path.startswith("plain.models.fields.timezones"):
480 path = path.replace("plain.models.fields.timezones", "plain.models")
481 elif path.startswith("plain.models.fields"):
482 path = path.replace("plain.models.fields", "plain.models")
483 # Return basic info - other fields should override this.
484 # Note: self.name can be None during migration state rendering when fields are cloned
485 return (self.name, path, [], keywords)
486
487 def clone(self) -> Self:
488 """
489 Uses deconstruct() to clone a new copy of this Field.
490 Will not preserve any class attachments/attribute names.
491 """
492 name, path, args, kwargs = self.deconstruct()
493 return self.__class__(*args, **kwargs)
494
495 def __deepcopy__(self, memodict: dict[int, Any]) -> Self:
496 # We don't have to deepcopy very much here, since most things are not
497 # intended to be altered after initial creation.
498 obj = copy.copy(self)
499 memodict[id(self)] = obj
500 return obj
501
502 def __copy__(self) -> Self:
503 # We need to avoid hitting __reduce__, so define this
504 # slightly weird copy construct.
505 obj = Empty()
506 obj.__class__ = self.__class__
507 obj.__dict__ = self.__dict__.copy()
508 return cast(Self, obj)
509
510 def __reduce__(
511 self,
512 ) -> (
513 tuple[Callable[..., Any], tuple[Any, ...], dict[str, Any]]
514 | tuple[Callable[..., Field[Any] | ForeignObjectRel], tuple[str, str, str]]
515 ):
516 """
517 Pickling should return the model._model_meta.fields instance of the field,
518 not a new copy of that field. So, use the app registry to load the
519 model and then the field back.
520 """
521 model = getattr(self, "model", None)
522 if model is None:
523 # Fields are sometimes used without attaching them to models (for
524 # example in aggregation). In this case give back a plain field
525 # instance. The code below will create a new empty instance of
526 # class self.__class__, then update its dict with self.__dict__
527 # values - so, this is very close to normal pickle.
528 state = self.__dict__.copy()
529 # The _get_default cached_property can't be pickled due to lambda
530 # usage.
531 state.pop("_get_default", None)
532 return _empty, (self.__class__,), state
533 assert self.name is not None
534 options = model.model_options
535 return _load_field, (
536 options.package_label,
537 options.object_name,
538 self.name,
539 )
540
541 def get_id_value_on_save(self, instance: Model) -> T | None:
542 """
543 Hook to generate new primary key values on save. This method is called when
544 saving instances with no primary key value set. If this method returns
545 something else than None, then the returned value is used when saving
546 the new instance.
547 """
548 if self.default:
549 return self.get_default()
550 return None
551
552 def to_python(self, value: Any) -> T | None:
553 """
554 Convert the input value into the expected Python data type, raising
555 plain.exceptions.ValidationError if the data can't be converted.
556 Return the converted value. Subclasses should override this.
557 """
558 return value
559
560 @cached_property
561 def error_messages(self) -> dict[str, str]:
562 messages = {}
563 for c in reversed(self.__class__.__mro__):
564 messages.update(getattr(c, "default_error_messages", {}))
565 messages.update(self._error_messages or {})
566 return messages
567
568 @cached_property
569 def validators(self) -> list[Callable[..., Any]]:
570 """
571 Some validators can't be created at field initialization time.
572 This method provides a way to delay their creation until required.
573 """
574 return [*self.default_validators, *self._validators]
575
576 def run_validators(self, value: Any) -> None:
577 if value in self.empty_values:
578 return
579
580 errors = []
581 for v in self.validators:
582 try:
583 v(value)
584 except exceptions.ValidationError as e:
585 if hasattr(e, "code") and e.code in self.error_messages:
586 e.message = self.error_messages[e.code]
587 errors.extend(e.error_list)
588
589 if errors:
590 raise exceptions.ValidationError(errors)
591
592 def validate(self, value: Any, model_instance: Model) -> None:
593 """
594 Validate value and raise ValidationError if necessary. Subclasses
595 should override this to provide validation logic.
596 """
597
598 if self.choices is not None and value not in self.empty_values:
599 for option_key, option_value in self.choices:
600 if isinstance(option_value, list | tuple):
601 # This is an optgroup, so look inside the group for
602 # options.
603 for optgroup_key, optgroup_value in option_value:
604 if value == optgroup_key:
605 return
606 elif value == option_key:
607 return
608 raise exceptions.ValidationError(
609 self.error_messages["invalid_choice"],
610 code="invalid_choice",
611 params={"value": value},
612 )
613
614 if value is None and not self.allow_null:
615 raise exceptions.ValidationError(
616 self.error_messages["allow_null"], code="allow_null"
617 )
618
619 if self.required and value in self.empty_values:
620 raise exceptions.ValidationError(
621 self.error_messages["required"], code="required"
622 )
623
624 def clean(self, value: Any, model_instance: Model) -> T | None:
625 """
626 Convert the value's type and run validation. Validation errors
627 from to_python() and validate() are propagated. Return the correct
628 value if no error is raised.
629 """
630 value = self.to_python(value)
631 self.validate(value, model_instance)
632 self.run_validators(value)
633 return value
634
635 def db_type_parameters(self) -> DictWrapper:
636 return DictWrapper(self.__dict__, quote_name, "qn_")
637
638 def db_check(self) -> str | None:
639 """
640 Return the database column check constraint for this field.
641 Works the same way as db_type() for the case that
642 get_internal_type() does not map to a preexisting model field.
643 """
644 data = self.db_type_parameters()
645 try:
646 return DATA_TYPE_CHECK_CONSTRAINTS[self.get_internal_type()] % data
647 except KeyError:
648 return None
649
650 def db_type(self) -> str | None:
651 """
652 Return the database column data type for this field.
653 """
654 # The default implementation of this method looks at the
655 # backend-specific data_types dictionary, looking up the field by its
656 # "internal type".
657 #
658 # A Field class can implement the get_internal_type() method to specify
659 # which *preexisting* Plain Field class it's most similar to -- i.e.,
660 # a custom field might be represented by a TEXT column type, which is
661 # the same as the TextField Plain field type, which means the custom
662 # field's get_internal_type() returns 'TextField'.
663 #
664 # But the limitation of the get_internal_type() / data_types approach
665 # is that it cannot handle database column types that aren't already
666 # mapped to one of the built-in Plain field types. In this case, you
667 # can implement db_type() instead of get_internal_type() to specify
668 # exactly which wacky database column type you want to use.
669 data = self.db_type_parameters()
670 try:
671 column_type = DATA_TYPES[self.get_internal_type()]
672 except KeyError:
673 return None
674 else:
675 # column_type is either a single-parameter function or a string.
676 if callable(column_type):
677 return column_type(data)
678 return column_type % data
679
680 def rel_db_type(self) -> str | None:
681 """
682 Return the data type that a related field pointing to this field should
683 use. For example, this method is called by ForeignKeyField to determine its data type.
684 """
685 return self.db_type()
686
687 def cast_db_type(self) -> str | None:
688 """Return the data type to use in the Cast() function."""
689 db_type = CAST_DATA_TYPES.get(self.get_internal_type())
690 if db_type:
691 return db_type % self.db_type_parameters()
692 return self.db_type()
693
694 def db_parameters(self) -> DbParameters:
695 """
696 Extension of db_type(), providing a range of different return values
697 (type, checks). This will look at db_type(), allowing custom model
698 fields to override it.
699 """
700 type_string = self.db_type()
701 check_string = self.db_check()
702 return {
703 "type": type_string,
704 "check": check_string,
705 }
706
707 def db_type_suffix(self) -> str | None:
708 return DATA_TYPES_SUFFIX.get(self.get_internal_type())
709
710 def get_db_converters(
711 self, connection: DatabaseConnection
712 ) -> list[Callable[..., Any]]:
713 if from_db_value := getattr(self, "from_db_value", None):
714 return [from_db_value]
715 return []
716
717 @property
718 def db_returning(self) -> bool:
719 """
720 Private API intended only to be used by Plain itself. Currently only
721 the PostgreSQL backend supports returning multiple fields on a model.
722 """
723 return False
724
725 def set_attributes_from_name(self, name: str) -> None:
726 self.name = self.name or name
727 self.attname = self.get_attname()
728 self.column = self.attname
729 self.concrete = self.column is not None
730
731 def contribute_to_class(self, cls: type[Model], name: str) -> None:
732 """
733 Register the field with the model class it belongs to.
734
735 Field now acts as its own descriptor - it stays on the class and handles
736 __get__/__set__/__delete__ directly.
737 """
738 self.set_attributes_from_name(name)
739 self.model = cls
740 cls._model_meta.add_field(self)
741
742 # Field is now a descriptor itself - ensure it's set on the class
743 # This is important for inherited fields that get deepcopied in Meta.__get__
744 if self.column:
745 setattr(cls, self.attname, self)
746
747 # Descriptor protocol implementation
748 @overload
749 def __get__(self, instance: None, owner: type[Model]) -> Self: ...
750
751 @overload
752 def __get__(self, instance: Model, owner: type[Model]) -> T: ...
753
754 def __get__(self, instance: Model | None, owner: type[Model]) -> Self | T:
755 """
756 Descriptor __get__ for attribute access.
757
758 Class access (User.email) returns the Field descriptor itself.
759 Instance access (user.email) returns the field value from instance.__dict__,
760 with lazy loading support if the value is not yet loaded.
761 """
762 # Class access - return the Field descriptor
763 if instance is None:
764 return self
765
766 # If field hasn't been contributed to a class yet (e.g., used standalone
767 # as an output_field in aggregates), just return self
768 if not hasattr(self, "attname"):
769 return self
770
771 # Instance access - get value from instance dict
772 data = instance.__dict__
773 field_name = self.attname
774
775 # If value not in dict, lazy load from database
776 if field_name not in data:
777 # Deferred field - load it from the database
778 instance.refresh_from_db(fields=[field_name])
779
780 return cast(T, data.get(field_name))
781
782 def __set__(self, instance: Model, value: Any) -> None:
783 """
784 Descriptor __set__ for attribute assignment.
785
786 Validates and converts the value using to_python(), then stores it
787 in instance.__dict__[attname].
788 """
789 # Safety check: ensure field has been properly initialized
790 if not hasattr(self, "attname"):
791 raise AttributeError(
792 f"Field {self.__class__.__name__} has not been initialized properly. "
793 f"The field's contribute_to_class() has not been called yet. "
794 f"This usually means the field is being used before it was added to a model class."
795 )
796
797 # Convert/validate the value
798 if value is not None:
799 value = self.to_python(value)
800
801 # Store in instance dict
802 instance.__dict__[self.attname] = value
803
804 def __delete__(self, instance: Model) -> None:
805 """
806 Descriptor __delete__ for attribute deletion.
807
808 Removes the value from instance.__dict__.
809 """
810 try:
811 del instance.__dict__[self.attname]
812 except KeyError:
813 raise AttributeError(
814 f"{instance.__class__.__name__!r} object has no attribute {self.attname!r}"
815 )
816
817 def get_attname(self) -> str:
818 assert self.name is not None # Field name must be set
819 return self.name
820
821 def get_internal_type(self) -> str:
822 return self.__class__.__name__
823
824 def pre_save(self, model_instance: Model, add: bool) -> T | None:
825 """Return field's value just before saving."""
826 return getattr(model_instance, self.attname)
827
828 def get_prep_value(self, value: Any) -> Any:
829 """Perform preliminary non-db specific value checks and conversions."""
830 if isinstance(value, Promise):
831 value = value._proxy____cast()
832 return value
833
834 def get_db_prep_value(
835 self, value: Any, connection: DatabaseConnection, prepared: bool = False
836 ) -> Any:
837 """
838 Return field's value prepared for interacting with the database backend.
839
840 Used by the default implementations of get_db_prep_save().
841 """
842 if not prepared:
843 value = self.get_prep_value(value)
844 return value
845
846 def get_db_prep_save(self, value: Any, connection: DatabaseConnection) -> Any:
847 """Return field's value prepared for saving into a database."""
848 if hasattr(value, "as_sql"):
849 return value
850 return self.get_db_prep_value(value, connection=connection, prepared=False)
851
852 def has_default(self) -> bool:
853 """Return a boolean of whether this field has a default value."""
854 return self.default is not NOT_PROVIDED
855
856 def get_default(self) -> T | None:
857 """Return the default value for this field."""
858 return self._get_default()
859
860 @cached_property
861 def _get_default(self) -> Callable[[], Any]:
862 if self.has_default():
863 if callable(self.default):
864 return self.default
865 return lambda: self.default
866
867 if not self.empty_strings_allowed or self.allow_null:
868 return return_None
869 return str # return empty string
870
871 def get_limit_choices_to(self) -> Any:
872 """
873 Return ``limit_choices_to`` for this model field.
874 Overridden by related fields (ForeignKey, etc.).
875 """
876 raise NotImplementedError(
877 "get_limit_choices_to() should only be called on related fields"
878 )
879
880 def get_choices(
881 self,
882 include_blank: bool = True,
883 blank_choice: list[tuple[str, str]] = BLANK_CHOICE_DASH,
884 limit_choices_to: Any = None,
885 ordering: tuple[str, ...] = (),
886 ) -> list[tuple[Any, str]]:
887 """
888 Return choices with a default blank choices included, for use
889 as <select> choices for this field.
890 """
891 if self.choices is not None:
892 choices = list(self.choices)
893 if include_blank:
894 blank_defined = any(
895 choice in ("", None) for choice, _ in self.flatchoices
896 )
897 if not blank_defined:
898 choices = blank_choice + choices
899 return choices
900 remote_field = getattr(self, "remote_field", None)
901 if remote_field is None or getattr(remote_field, "model", None) is None:
902 return blank_choice if include_blank else []
903 rel_model = remote_field.model
904 limit_choices_to = limit_choices_to or self.get_limit_choices_to()
905 related_field_name = (
906 remote_field.get_related_field().attname
907 if hasattr(remote_field, "get_related_field")
908 else "id"
909 )
910 choice_func = operator.attrgetter(related_field_name)
911 qs = rel_model.query.complex_filter(limit_choices_to)
912 if ordering:
913 qs = qs.order_by(*ordering)
914 return (blank_choice if include_blank else []) + [
915 (choice_func(x), str(x)) for x in qs
916 ]
917
918 def value_to_string(self, obj: Model) -> str:
919 """
920 Return a string value of this field from the passed obj.
921 This is used by the serialization framework.
922 """
923 return str(self.value_from_object(obj))
924
925 def _get_flatchoices(self) -> list[tuple[Any, Any]]:
926 """Flattened version of choices tuple."""
927 if self.choices is None:
928 return []
929 flat = []
930 for choice, value in self.choices:
931 if isinstance(value, list | tuple):
932 flat.extend(value)
933 else:
934 flat.append((choice, value))
935 return flat
936
937 flatchoices = property(_get_flatchoices)
938
939 def save_form_data(self, instance: Model, data: Any) -> None:
940 assert self.name is not None
941 setattr(instance, self.name, data)
942
943 def value_from_object(self, obj: Model) -> T | None:
944 """Return the value of this field in the given model instance."""
945 return getattr(obj, self.attname)
946
947
948class BooleanField(Field[bool]):
949 empty_strings_allowed = False
950 default_error_messages = {
951 "invalid": '"%(value)s" value must be either True or False.',
952 "invalid_nullable": '"%(value)s" value must be either True, False, or None.',
953 }
954 description = "Boolean (Either True or False)"
955
956 def get_internal_type(self) -> str:
957 return "BooleanField"
958
959 def to_python(self, value: Any) -> bool | None:
960 if self.allow_null and value in self.empty_values:
961 return None
962 if value in (True, False):
963 # 1/0 are equal to True/False. bool() converts former to latter.
964 return bool(value)
965 if value in ("t", "True", "1"):
966 return True
967 if value in ("f", "False", "0"):
968 return False
969 raise exceptions.ValidationError(
970 self.error_messages["invalid_nullable" if self.allow_null else "invalid"],
971 code="invalid",
972 params={"value": value},
973 )
974
975 def get_prep_value(self, value: Any) -> Any:
976 value = super().get_prep_value(value)
977 if value is None:
978 return None
979 return self.to_python(value)
980
981
982class CharField(Field[str]):
983 def __init__(self, **kwargs: Any):
984 super().__init__(**kwargs)
985 if self.max_length is not None:
986 self.validators.append(validators.MaxLengthValidator(self.max_length))
987
988 @property
989 def description(self) -> str:
990 if self.max_length is not None:
991 return "String (up to %(max_length)s)"
992 else:
993 return "String (unlimited)"
994
995 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
996 return [
997 *super().preflight(**kwargs),
998 *self._check_max_length_attribute(),
999 ]
1000
1001 def _check_max_length_attribute(self, **kwargs: Any) -> list[PreflightResult]:
1002 # Unlimited VARCHAR is supported (no max_length required)
1003 if self.max_length is None:
1004 return []
1005 elif (
1006 not isinstance(self.max_length, int)
1007 or isinstance(self.max_length, bool)
1008 or self.max_length <= 0
1009 ):
1010 return [
1011 PreflightResult(
1012 fix="'max_length' must be a positive integer.",
1013 obj=self,
1014 id="fields.charfield_invalid_max_length",
1015 )
1016 ]
1017 else:
1018 return []
1019
1020 def cast_db_type(self) -> str | None:
1021 if self.max_length is None:
1022 return CAST_CHAR_FIELD_WITHOUT_MAX_LENGTH
1023 return super().cast_db_type()
1024
1025 def get_internal_type(self) -> str:
1026 return "CharField"
1027
1028 def to_python(self, value: Any) -> str | None:
1029 if isinstance(value, str) or value is None:
1030 return value
1031 return str(value)
1032
1033 def get_prep_value(self, value: Any) -> Any:
1034 value = super().get_prep_value(value)
1035 return self.to_python(value)
1036
1037
1038def _to_naive(value: datetime.datetime) -> datetime.datetime:
1039 if timezone.is_aware(value):
1040 value = timezone.make_naive(value, datetime.UTC)
1041 return value
1042
1043
1044def _get_naive_now() -> datetime.datetime:
1045 return _to_naive(timezone.now())
1046
1047
1048class DateTimeCheckMixin(Field):
1049 auto_now: bool
1050 auto_now_add: bool
1051
1052 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1053 return [
1054 *super().preflight(**kwargs),
1055 *self._check_mutually_exclusive_options(),
1056 *self._check_fix_default_value(),
1057 ]
1058
1059 def _check_mutually_exclusive_options(self) -> list[PreflightResult]:
1060 # auto_now, auto_now_add, and default are mutually exclusive
1061 # options. The use of more than one of these options together
1062 # will trigger an Error
1063 mutually_exclusive_options = [
1064 self.auto_now_add,
1065 self.auto_now,
1066 self.has_default(),
1067 ]
1068 enabled_options = [
1069 option not in (None, False) for option in mutually_exclusive_options
1070 ].count(True)
1071 if enabled_options > 1:
1072 return [
1073 PreflightResult(
1074 fix="The options auto_now, auto_now_add, and default "
1075 "are mutually exclusive. Only one of these options "
1076 "may be present.",
1077 obj=self,
1078 id="fields.datetime_auto_options_mutually_exclusive",
1079 )
1080 ]
1081 else:
1082 return []
1083
1084 def _check_fix_default_value(self) -> list[PreflightResult]:
1085 return []
1086
1087 # Concrete subclasses use this in their implementations of
1088 # _check_fix_default_value().
1089 def _check_if_value_fixed(
1090 self,
1091 value: datetime.date | datetime.datetime,
1092 now: datetime.datetime | None = None,
1093 ) -> list[PreflightResult]:
1094 """
1095 Check if the given value appears to have been provided as a "fixed"
1096 time value, and include a warning in the returned list if it does. The
1097 value argument must be a date object or aware/naive datetime object. If
1098 now is provided, it must be a naive datetime object.
1099 """
1100 if now is None:
1101 now = _get_naive_now()
1102 offset = datetime.timedelta(seconds=10)
1103 lower = now - offset
1104 upper = now + offset
1105 if isinstance(value, datetime.datetime):
1106 value = _to_naive(value)
1107 else:
1108 assert isinstance(value, datetime.date)
1109 lower = lower.date()
1110 upper = upper.date()
1111 if lower <= value <= upper:
1112 return [
1113 PreflightResult(
1114 fix="Fixed default value provided. "
1115 "It seems you set a fixed date / time / datetime "
1116 "value as default for this field. This may not be "
1117 "what you want. If you want to have the current date "
1118 "as default, use `plain.utils.timezone.now`",
1119 obj=self,
1120 id="fields.datetime_naive_default_value",
1121 warning=True,
1122 )
1123 ]
1124 return []
1125
1126
1127class DateField(DateTimeCheckMixin, Field[datetime.date]):
1128 empty_strings_allowed = False
1129 default_error_messages = {
1130 "invalid": '"%(value)s" value has an invalid date format. It must be in YYYY-MM-DD format.',
1131 "invalid_date": '"%(value)s" value has the correct format (YYYY-MM-DD) but it is an invalid date.',
1132 }
1133 description = "Date (without time)"
1134
1135 def __init__(
1136 self, *, auto_now: bool = False, auto_now_add: bool = False, **kwargs: Any
1137 ):
1138 self.auto_now, self.auto_now_add = auto_now, auto_now_add
1139 if auto_now or auto_now_add:
1140 kwargs["required"] = False
1141 super().__init__(**kwargs)
1142
1143 def _check_fix_default_value(self) -> list[PreflightResult]:
1144 """
1145 Warn that using an actual date or datetime value is probably wrong;
1146 it's only evaluated on server startup.
1147 """
1148 if not self.has_default():
1149 return []
1150
1151 value = self.default
1152 if isinstance(value, datetime.datetime):
1153 value = _to_naive(value).date()
1154 elif isinstance(value, datetime.date):
1155 pass
1156 else:
1157 # No explicit date / datetime value -- no checks necessary
1158 return []
1159 # At this point, value is a date object.
1160 return self._check_if_value_fixed(value)
1161
1162 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1163 name, path, args, kwargs = super().deconstruct()
1164 if self.auto_now:
1165 kwargs["auto_now"] = True
1166 if self.auto_now_add:
1167 kwargs["auto_now_add"] = True
1168 if self.auto_now or self.auto_now_add:
1169 del kwargs["required"]
1170 return name, path, args, kwargs
1171
1172 def get_internal_type(self) -> str:
1173 return "DateField"
1174
1175 def to_python(self, value: Any) -> datetime.date | None:
1176 if value is None:
1177 return value
1178 if isinstance(value, datetime.datetime):
1179 if timezone.is_aware(value):
1180 # Convert aware datetimes to the default time zone
1181 # before casting them to dates (#17742).
1182 default_timezone = timezone.get_default_timezone()
1183 value = timezone.make_naive(value, default_timezone)
1184 return value.date()
1185 if isinstance(value, datetime.date):
1186 return value
1187
1188 try:
1189 parsed = parse_date(value)
1190 if parsed is not None:
1191 return parsed
1192 except ValueError:
1193 raise exceptions.ValidationError(
1194 self.error_messages["invalid_date"],
1195 code="invalid_date",
1196 params={"value": value},
1197 )
1198
1199 raise exceptions.ValidationError(
1200 self.error_messages["invalid"],
1201 code="invalid",
1202 params={"value": value},
1203 )
1204
1205 def pre_save(self, model_instance: Model, add: bool) -> datetime.date | None:
1206 if self.auto_now or (self.auto_now_add and add):
1207 value = datetime.date.today()
1208 setattr(model_instance, self.attname, value)
1209 return value
1210 else:
1211 return super().pre_save(model_instance, add)
1212
1213 def get_prep_value(self, value: Any) -> Any:
1214 value = super().get_prep_value(value)
1215 return self.to_python(value)
1216
1217 def get_db_prep_value(
1218 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1219 ) -> Any:
1220 if not prepared:
1221 value = self.get_prep_value(value)
1222 return value
1223
1224 def value_to_string(self, obj: Model) -> str:
1225 val = self.value_from_object(obj)
1226 return "" if val is None else val.isoformat()
1227
1228
1229class DateTimeField(DateField):
1230 empty_strings_allowed = False
1231 default_error_messages = {
1232 "invalid": '"%(value)s" value has an invalid format. It must be in YYYY-MM-DD HH:MM[:ss[.uuuuuu]][TZ] format.',
1233 "invalid_date": '"%(value)s" value has the correct format (YYYY-MM-DD) but it is an invalid date.',
1234 "invalid_datetime": '"%(value)s" value has the correct format (YYYY-MM-DD HH:MM[:ss[.uuuuuu]][TZ]) but it is an invalid date/time.',
1235 }
1236 description = "Date (with time)"
1237
1238 # __init__ is inherited from DateField
1239
1240 def _check_fix_default_value(self) -> list[PreflightResult]:
1241 """
1242 Warn that using an actual date or datetime value is probably wrong;
1243 it's only evaluated on server startup.
1244 """
1245 if not self.has_default():
1246 return []
1247
1248 value = self.default
1249 if isinstance(value, datetime.datetime | datetime.date):
1250 return self._check_if_value_fixed(value)
1251 # No explicit date / datetime value -- no checks necessary.
1252 return []
1253
1254 def get_internal_type(self) -> str:
1255 return "DateTimeField"
1256
1257 def to_python(self, value: Any) -> datetime.datetime | None:
1258 if value is None:
1259 return value
1260 if isinstance(value, datetime.datetime):
1261 return value
1262 if isinstance(value, datetime.date):
1263 value = datetime.datetime(value.year, value.month, value.day)
1264
1265 # For backwards compatibility, interpret naive datetimes in
1266 # local time. This won't work during DST change, but we can't
1267 # do much about it, so we let the exceptions percolate up the
1268 # call stack.
1269 warnings.warn(
1270 f"DateTimeField {self.model.__name__}.{self.name} received a naive datetime "
1271 f"({value}) while time zone support is active.",
1272 RuntimeWarning,
1273 )
1274 default_timezone = timezone.get_default_timezone()
1275 value = timezone.make_aware(value, default_timezone)
1276
1277 return value
1278
1279 try:
1280 parsed = parse_datetime(value)
1281 if parsed is not None:
1282 return parsed
1283 except ValueError:
1284 raise exceptions.ValidationError(
1285 self.error_messages["invalid_datetime"],
1286 code="invalid_datetime",
1287 params={"value": value},
1288 )
1289
1290 try:
1291 parsed = parse_date(value)
1292 if parsed is not None:
1293 return datetime.datetime(parsed.year, parsed.month, parsed.day)
1294 except ValueError:
1295 raise exceptions.ValidationError(
1296 self.error_messages["invalid_date"],
1297 code="invalid_date",
1298 params={"value": value},
1299 )
1300
1301 raise exceptions.ValidationError(
1302 self.error_messages["invalid"],
1303 code="invalid",
1304 params={"value": value},
1305 )
1306
1307 def pre_save(self, model_instance: Model, add: bool) -> datetime.datetime | None:
1308 if self.auto_now or (self.auto_now_add and add):
1309 value = timezone.now()
1310 setattr(model_instance, self.attname, value)
1311 return value
1312 else:
1313 return getattr(model_instance, self.attname)
1314
1315 def get_prep_value(self, value: Any) -> Any:
1316 value = super().get_prep_value(value)
1317 value = self.to_python(value)
1318 if value is not None and timezone.is_naive(value):
1319 # For backwards compatibility, interpret naive datetimes in local
1320 # time. This won't work during DST change, but we can't do much
1321 # about it, so we let the exceptions percolate up the call stack.
1322 try:
1323 name = f"{self.model.__name__}.{self.name}"
1324 except AttributeError:
1325 name = "(unbound)"
1326 warnings.warn(
1327 f"DateTimeField {name} received a naive datetime ({value})"
1328 " while time zone support is active.",
1329 RuntimeWarning,
1330 )
1331 default_timezone = timezone.get_default_timezone()
1332 value = timezone.make_aware(value, default_timezone)
1333 return value
1334
1335 def get_db_prep_value(
1336 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1337 ) -> Any:
1338 if not prepared:
1339 value = self.get_prep_value(value)
1340 return value
1341
1342 def value_to_string(self, obj: Model) -> str:
1343 val = self.value_from_object(obj)
1344 return "" if val is None else val.isoformat()
1345
1346
1347class DecimalField(Field[decimal.Decimal]):
1348 empty_strings_allowed = False
1349 default_error_messages = {
1350 "invalid": '"%(value)s" value must be a decimal number.',
1351 }
1352 description = "Decimal number"
1353
1354 def __init__(
1355 self,
1356 *,
1357 max_digits: int | None = None,
1358 decimal_places: int | None = None,
1359 **kwargs: Any,
1360 ):
1361 self.max_digits, self.decimal_places = max_digits, decimal_places
1362 super().__init__(**kwargs)
1363
1364 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1365 errors = super().preflight(**kwargs)
1366
1367 digits_errors = [
1368 *self._check_decimal_places(),
1369 *self._check_max_digits(),
1370 ]
1371 if not digits_errors:
1372 errors.extend(self._check_decimal_places_and_max_digits())
1373 else:
1374 errors.extend(digits_errors)
1375 return errors
1376
1377 def _check_decimal_places(self) -> list[PreflightResult]:
1378 if self.decimal_places is None:
1379 return [
1380 PreflightResult(
1381 fix="DecimalFields must define a 'decimal_places' attribute.",
1382 obj=self,
1383 id="fields.decimalfield_missing_decimal_places",
1384 )
1385 ]
1386 try:
1387 decimal_places = int(self.decimal_places)
1388 if decimal_places < 0:
1389 raise ValueError()
1390 except ValueError:
1391 return [
1392 PreflightResult(
1393 fix="'decimal_places' must be a non-negative integer.",
1394 obj=self,
1395 id="fields.decimalfield_invalid_decimal_places",
1396 )
1397 ]
1398 else:
1399 return []
1400
1401 def _check_max_digits(self) -> list[PreflightResult]:
1402 if self.max_digits is None:
1403 return [
1404 PreflightResult(
1405 fix="DecimalFields must define a 'max_digits' attribute.",
1406 obj=self,
1407 id="fields.decimalfield_missing_max_digits",
1408 )
1409 ]
1410 try:
1411 max_digits = int(self.max_digits)
1412 if max_digits <= 0:
1413 raise ValueError()
1414 except ValueError:
1415 return [
1416 PreflightResult(
1417 fix="'max_digits' must be a positive integer.",
1418 obj=self,
1419 id="fields.decimalfield_invalid_max_digits",
1420 )
1421 ]
1422 else:
1423 return []
1424
1425 def _check_decimal_places_and_max_digits(self) -> list[PreflightResult]:
1426 if self.decimal_places is None or self.max_digits is None:
1427 return []
1428 if self.decimal_places > self.max_digits:
1429 return [
1430 PreflightResult(
1431 fix="'max_digits' must be greater or equal to 'decimal_places'.",
1432 obj=self,
1433 id="fields.decimalfield_decimal_places_exceeds_max_digits",
1434 )
1435 ]
1436 return []
1437
1438 @cached_property
1439 def validators(self) -> list[Callable[..., Any]]:
1440 return super().validators + [
1441 validators.DecimalValidator(self.max_digits, self.decimal_places)
1442 ]
1443
1444 @cached_property
1445 def context(self) -> decimal.Context:
1446 return decimal.Context(prec=self.max_digits)
1447
1448 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1449 name, path, args, kwargs = super().deconstruct()
1450 if self.max_digits is not None:
1451 kwargs["max_digits"] = self.max_digits
1452 if self.decimal_places is not None:
1453 kwargs["decimal_places"] = self.decimal_places
1454 return name, path, args, kwargs
1455
1456 def get_internal_type(self) -> str:
1457 return "DecimalField"
1458
1459 def to_python(self, value: Any) -> decimal.Decimal | None:
1460 if value is None:
1461 return value
1462 try:
1463 if isinstance(value, float):
1464 decimal_value = self.context.create_decimal_from_float(value)
1465 else:
1466 decimal_value = decimal.Decimal(value)
1467 except (decimal.InvalidOperation, TypeError, ValueError):
1468 raise exceptions.ValidationError(
1469 self.error_messages["invalid"],
1470 code="invalid",
1471 params={"value": value},
1472 )
1473 if not decimal_value.is_finite():
1474 raise exceptions.ValidationError(
1475 self.error_messages["invalid"],
1476 code="invalid",
1477 params={"value": value},
1478 )
1479 return decimal_value
1480
1481 def get_db_prep_value(
1482 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1483 ) -> Any:
1484 if not prepared:
1485 value = self.get_prep_value(value)
1486 return value
1487
1488 def get_prep_value(self, value: Any) -> Any:
1489 value = super().get_prep_value(value)
1490 return self.to_python(value)
1491
1492
1493class DurationField(Field[datetime.timedelta]):
1494 """Store timedelta objects using PostgreSQL's interval type."""
1495
1496 empty_strings_allowed = False
1497 default_error_messages = {
1498 "invalid": '"%(value)s" value has an invalid format. It must be in [DD] [[HH:]MM:]ss[.uuuuuu] format.',
1499 }
1500 description = "Duration"
1501
1502 def get_internal_type(self) -> str:
1503 return "DurationField"
1504
1505 def to_python(self, value: Any) -> datetime.timedelta | None:
1506 if value is None:
1507 return value
1508 if isinstance(value, datetime.timedelta):
1509 return value
1510 try:
1511 parsed = parse_duration(value)
1512 except ValueError:
1513 pass
1514 else:
1515 if parsed is not None:
1516 return parsed
1517
1518 raise exceptions.ValidationError(
1519 self.error_messages["invalid"],
1520 code="invalid",
1521 params={"value": value},
1522 )
1523
1524 def get_db_prep_value(
1525 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1526 ) -> Any:
1527 # PostgreSQL has native interval (duration) type
1528 return value
1529
1530 def get_db_converters(
1531 self, connection: DatabaseConnection
1532 ) -> list[Callable[..., Any]]:
1533 # PostgreSQL has native duration field, no converters needed
1534 return super().get_db_converters(connection)
1535
1536 def value_to_string(self, obj: Model) -> str:
1537 val = self.value_from_object(obj)
1538 return "" if val is None else duration_string(val)
1539
1540
1541class EmailField(CharField):
1542 default_validators = [validators.validate_email]
1543 description = "Email address"
1544
1545 def __init__(self, **kwargs: Any):
1546 # max_length=254 to be compliant with RFCs 3696 and 5321
1547 kwargs.setdefault("max_length", 254)
1548 super().__init__(**kwargs)
1549
1550 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1551 name, path, args, kwargs = super().deconstruct()
1552 # We do not exclude max_length if it matches default as we want to change
1553 # the default in future.
1554 return name, path, args, kwargs
1555
1556
1557class FloatField(Field[float]):
1558 empty_strings_allowed = False
1559 default_error_messages = {
1560 "invalid": '"%(value)s" value must be a float.',
1561 }
1562 description = "Floating point number"
1563
1564 def get_prep_value(self, value: Any) -> Any:
1565 value = super().get_prep_value(value)
1566 if value is None:
1567 return None
1568 try:
1569 return float(value)
1570 except (TypeError, ValueError) as e:
1571 raise e.__class__(
1572 f"Field '{self.name}' expected a number but got {value!r}.",
1573 ) from e
1574
1575 def get_internal_type(self) -> str:
1576 return "FloatField"
1577
1578 def to_python(self, value: Any) -> float | None:
1579 if value is None:
1580 return value
1581 try:
1582 return float(value)
1583 except (TypeError, ValueError):
1584 raise exceptions.ValidationError(
1585 self.error_messages["invalid"],
1586 code="invalid",
1587 params={"value": value},
1588 )
1589
1590
1591class IntegerField(Field[int]):
1592 empty_strings_allowed = False
1593 default_error_messages = {
1594 "invalid": '"%(value)s" value must be an integer.',
1595 }
1596 description = "Integer"
1597
1598 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1599 return [
1600 *super().preflight(**kwargs),
1601 *self._check_max_length_warning(),
1602 ]
1603
1604 def _check_max_length_warning(self) -> list[PreflightResult]:
1605 if self.max_length is not None:
1606 return [
1607 PreflightResult(
1608 fix=f"'max_length' is ignored when used with {self.__class__.__name__}. Remove 'max_length' from field.",
1609 obj=self,
1610 id="fields.max_length_ignored",
1611 warning=True,
1612 )
1613 ]
1614 return []
1615
1616 @cached_property
1617 def validators(self) -> list[Callable[..., Any]]:
1618 # These validators can't be added at field initialization time since
1619 # they're based on values retrieved from the database connection.
1620 validators_ = super().validators
1621 internal_type = self.get_internal_type()
1622 min_value, max_value = INTEGER_FIELD_RANGES[internal_type]
1623 if min_value is not None and not any(
1624 (
1625 isinstance(validator, validators.MinValueValidator)
1626 and (
1627 validator.limit_value()
1628 if callable(validator.limit_value)
1629 else validator.limit_value
1630 )
1631 >= min_value
1632 )
1633 for validator in validators_
1634 ):
1635 validators_.append(validators.MinValueValidator(min_value))
1636 if max_value is not None and not any(
1637 (
1638 isinstance(validator, validators.MaxValueValidator)
1639 and (
1640 validator.limit_value()
1641 if callable(validator.limit_value)
1642 else validator.limit_value
1643 )
1644 <= max_value
1645 )
1646 for validator in validators_
1647 ):
1648 validators_.append(validators.MaxValueValidator(max_value))
1649 return validators_
1650
1651 def get_prep_value(self, value: Any) -> Any:
1652 value = super().get_prep_value(value)
1653 if value is None:
1654 return None
1655 try:
1656 return int(value)
1657 except (TypeError, ValueError) as e:
1658 raise e.__class__(
1659 f"Field '{self.name}' expected a number but got {value!r}.",
1660 ) from e
1661
1662 def get_db_prep_value(
1663 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1664 ) -> Any:
1665 value = super().get_db_prep_value(value, connection, prepared)
1666 return adapt_integerfield_value(value, self.get_internal_type())
1667
1668 def get_internal_type(self) -> str:
1669 return "IntegerField"
1670
1671 def to_python(self, value: Any) -> int | None:
1672 if value is None:
1673 return value
1674 try:
1675 return int(value)
1676 except (TypeError, ValueError):
1677 raise exceptions.ValidationError(
1678 self.error_messages["invalid"],
1679 code="invalid",
1680 params={"value": value},
1681 )
1682
1683
1684class BigIntegerField(IntegerField):
1685 description = "Big (8 byte) integer"
1686
1687 def get_internal_type(self) -> str:
1688 return "BigIntegerField"
1689
1690
1691class SmallIntegerField(IntegerField):
1692 description = "Small integer"
1693
1694 def get_internal_type(self) -> str:
1695 return "SmallIntegerField"
1696
1697
1698class GenericIPAddressField(Field[str]):
1699 empty_strings_allowed = False
1700 description = "IP address"
1701 default_error_messages = {}
1702
1703 def __init__(
1704 self,
1705 *,
1706 protocol: str = "both",
1707 unpack_ipv4: bool = False,
1708 **kwargs: Any,
1709 ):
1710 self.unpack_ipv4 = unpack_ipv4
1711 self.protocol = protocol
1712 (
1713 self.default_validators,
1714 invalid_error_message,
1715 ) = validators.ip_address_validators(protocol, unpack_ipv4)
1716 self.default_error_messages["invalid"] = invalid_error_message
1717 kwargs["max_length"] = 39
1718 super().__init__(**kwargs)
1719
1720 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1721 return [
1722 *super().preflight(**kwargs),
1723 *self._check_required_and_null_values(),
1724 ]
1725
1726 def _check_required_and_null_values(self) -> list[PreflightResult]:
1727 if not getattr(self, "allow_null", False) and not getattr(
1728 self, "required", True
1729 ):
1730 return [
1731 PreflightResult(
1732 fix="GenericIPAddressFields cannot have required=False if allow_null=False, "
1733 "as blank values are stored as nulls.",
1734 obj=self,
1735 id="fields.generic_ip_field_null_blank_config",
1736 )
1737 ]
1738 return []
1739
1740 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1741 name, path, args, kwargs = super().deconstruct()
1742 if self.unpack_ipv4 is not False:
1743 kwargs["unpack_ipv4"] = self.unpack_ipv4
1744 if self.protocol != "both":
1745 kwargs["protocol"] = self.protocol
1746 if kwargs.get("max_length") == 39:
1747 del kwargs["max_length"]
1748 return name, path, args, kwargs
1749
1750 def get_internal_type(self) -> str:
1751 return "GenericIPAddressField"
1752
1753 def to_python(self, value: Any) -> str | None:
1754 if value is None:
1755 return None
1756 if not isinstance(value, str):
1757 value = str(value)
1758 value = value.strip()
1759 if ":" in value:
1760 return clean_ipv6_address(
1761 value, self.unpack_ipv4, self.error_messages["invalid"]
1762 )
1763 return value
1764
1765 def get_db_prep_value(
1766 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1767 ) -> Any:
1768 if not prepared:
1769 value = self.get_prep_value(value)
1770 return adapt_ipaddressfield_value(value)
1771
1772 def get_prep_value(self, value: Any) -> Any:
1773 value = super().get_prep_value(value)
1774 if value is None:
1775 return None
1776 if value and ":" in value:
1777 try:
1778 return clean_ipv6_address(value, self.unpack_ipv4)
1779 except exceptions.ValidationError:
1780 pass
1781 return str(value)
1782
1783
1784class _HasDbType(Protocol):
1785 """Protocol for objects that have a db_type method and integer_field_class."""
1786
1787 integer_field_class: type[IntegerField]
1788
1789 def db_type(self) -> str | None: ...
1790
1791
1792class PositiveIntegerRelDbTypeMixin(IntegerField):
1793 integer_field_class: type[IntegerField]
1794
1795 def __init_subclass__(cls, **kwargs: Any) -> None:
1796 super().__init_subclass__(**kwargs)
1797 if not hasattr(cls, "integer_field_class"):
1798 # Find the first IntegerField parent in the MRO
1799 integer_parent = next(
1800 (
1801 parent
1802 for parent in cls.__mro__[1:]
1803 if issubclass(parent, IntegerField)
1804 ),
1805 None,
1806 )
1807 if integer_parent is not None:
1808 cls.integer_field_class = integer_parent
1809
1810 def rel_db_type(self: _HasDbType) -> str | None:
1811 """
1812 Return the data type that a related field pointing to this field should
1813 use. PostgreSQL uses standard integer types for foreign keys.
1814 """
1815 return self.integer_field_class().db_type()
1816
1817
1818class PositiveBigIntegerField(PositiveIntegerRelDbTypeMixin, BigIntegerField):
1819 description = "Positive big integer"
1820
1821 def get_internal_type(self) -> str:
1822 return "PositiveBigIntegerField"
1823
1824
1825class PositiveIntegerField(PositiveIntegerRelDbTypeMixin, IntegerField):
1826 description = "Positive integer"
1827
1828 def get_internal_type(self) -> str:
1829 return "PositiveIntegerField"
1830
1831
1832class PositiveSmallIntegerField(PositiveIntegerRelDbTypeMixin, SmallIntegerField):
1833 description = "Positive small integer"
1834
1835 def get_internal_type(self) -> str:
1836 return "PositiveSmallIntegerField"
1837
1838
1839class TextField(Field[str]):
1840 description = "Text"
1841
1842 def get_internal_type(self) -> str:
1843 return "TextField"
1844
1845 def to_python(self, value: Any) -> str | None:
1846 if isinstance(value, str) or value is None:
1847 return value
1848 return str(value)
1849
1850 def get_prep_value(self, value: Any) -> Any:
1851 value = super().get_prep_value(value)
1852 return self.to_python(value)
1853
1854
1855class TimeField(DateTimeCheckMixin, Field[datetime.time]):
1856 empty_strings_allowed = False
1857 default_error_messages = {
1858 "invalid": '"%(value)s" value has an invalid format. It must be in HH:MM[:ss[.uuuuuu]] format.',
1859 "invalid_time": '"%(value)s" value has the correct format (HH:MM[:ss[.uuuuuu]]) but it is an invalid time.',
1860 }
1861 description = "Time"
1862
1863 def __init__(
1864 self, *, auto_now: bool = False, auto_now_add: bool = False, **kwargs: Any
1865 ):
1866 self.auto_now, self.auto_now_add = auto_now, auto_now_add
1867 if auto_now or auto_now_add:
1868 kwargs["required"] = False
1869 super().__init__(**kwargs)
1870
1871 def _check_fix_default_value(self) -> list[PreflightResult]:
1872 """
1873 Warn that using an actual date or datetime value is probably wrong;
1874 it's only evaluated on server startup.
1875 """
1876 if not self.has_default():
1877 return []
1878
1879 value = self.default
1880 if isinstance(value, datetime.datetime):
1881 now = None
1882 elif isinstance(value, datetime.time):
1883 now = _get_naive_now()
1884 # This will not use the right date in the race condition where now
1885 # is just before the date change and value is just past 0:00.
1886 value = datetime.datetime.combine(now.date(), value)
1887 else:
1888 # No explicit time / datetime value -- no checks necessary
1889 return []
1890 # At this point, value is a datetime object.
1891 return self._check_if_value_fixed(value, now=now)
1892
1893 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1894 name, path, args, kwargs = super().deconstruct()
1895 if self.auto_now is not False:
1896 kwargs["auto_now"] = self.auto_now
1897 if self.auto_now_add is not False:
1898 kwargs["auto_now_add"] = self.auto_now_add
1899 if self.auto_now or self.auto_now_add:
1900 del kwargs["required"]
1901 return name, path, args, kwargs
1902
1903 def get_internal_type(self) -> str:
1904 return "TimeField"
1905
1906 def to_python(self, value: Any) -> datetime.time | None:
1907 if value is None:
1908 return None
1909 if isinstance(value, datetime.time):
1910 return value
1911 if isinstance(value, datetime.datetime):
1912 # Not usually a good idea to pass in a datetime here (it loses
1913 # information), but we'll be accommodating.
1914 return value.time()
1915
1916 try:
1917 parsed = parse_time(value)
1918 if parsed is not None:
1919 return parsed
1920 except ValueError:
1921 raise exceptions.ValidationError(
1922 self.error_messages["invalid_time"],
1923 code="invalid_time",
1924 params={"value": value},
1925 )
1926
1927 raise exceptions.ValidationError(
1928 self.error_messages["invalid"],
1929 code="invalid",
1930 params={"value": value},
1931 )
1932
1933 def pre_save(self, model_instance: Model, add: bool) -> datetime.time | None:
1934 if self.auto_now or (self.auto_now_add and add):
1935 value = datetime.datetime.now().time()
1936 setattr(model_instance, self.attname, value)
1937 return value
1938 else:
1939 return super().pre_save(model_instance, add)
1940
1941 def get_prep_value(self, value: Any) -> Any:
1942 value = super().get_prep_value(value)
1943 return self.to_python(value)
1944
1945 def get_db_prep_value(
1946 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1947 ) -> Any:
1948 if not prepared:
1949 value = self.get_prep_value(value)
1950 return value
1951
1952 def value_to_string(self, obj: Model) -> str:
1953 val = self.value_from_object(obj)
1954 return "" if val is None else val.isoformat()
1955
1956
1957class URLField(CharField):
1958 default_validators = [validators.URLValidator()]
1959 description = "URL"
1960
1961 def __init__(self, **kwargs: Any):
1962 kwargs.setdefault("max_length", 200)
1963 super().__init__(**kwargs)
1964
1965 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1966 name, path, args, kwargs = super().deconstruct()
1967 if kwargs.get("max_length") == 200:
1968 del kwargs["max_length"]
1969 return name, path, args, kwargs
1970
1971
1972class BinaryField(Field[bytes | memoryview]):
1973 description = "Raw binary data"
1974 empty_values = [None, b""]
1975
1976 def __init__(self, **kwargs: Any):
1977 super().__init__(**kwargs)
1978 if self.max_length is not None:
1979 self.validators.append(validators.MaxLengthValidator(self.max_length))
1980
1981 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1982 return [*super().preflight(**kwargs), *self._check_str_default_value()]
1983
1984 def _check_str_default_value(self) -> list[PreflightResult]:
1985 if self.has_default() and isinstance(self.default, str):
1986 return [
1987 PreflightResult(
1988 fix="BinaryField's default cannot be a string. Use bytes "
1989 "content instead.",
1990 obj=self,
1991 id="fields.filefield_upload_to_not_callable",
1992 )
1993 ]
1994 return []
1995
1996 def get_internal_type(self) -> str:
1997 return "BinaryField"
1998
1999 def get_placeholder(
2000 self, value: Any, compiler: SQLCompiler, connection: DatabaseConnection
2001 ) -> Any:
2002 return "%s"
2003
2004 def get_default(self) -> bytes | memoryview | None:
2005 if self.has_default() and not callable(self.default):
2006 return self.default
2007 default = super().get_default()
2008 if default == "":
2009 return b""
2010 return default
2011
2012 def get_db_prep_value(
2013 self, value: Any, connection: DatabaseConnection, prepared: bool = False
2014 ) -> Any:
2015 value = super().get_db_prep_value(value, connection, prepared)
2016 if value is not None:
2017 return psycopg.Binary(value)
2018 return value
2019
2020 def value_to_string(self, obj: Model) -> str:
2021 """Binary data is serialized as base64"""
2022 val = self.value_from_object(obj)
2023 if val is None:
2024 return ""
2025 return b64encode(val).decode("ascii")
2026
2027 def to_python(self, value: Any) -> bytes | memoryview | None:
2028 # If it's a string, it should be base64-encoded data
2029 if isinstance(value, str):
2030 return memoryview(b64decode(value.encode("ascii")))
2031 return value
2032
2033
2034class UUIDField(Field[uuid.UUID]):
2035 default_error_messages = {
2036 "invalid": '"%(value)s" is not a valid UUID.',
2037 }
2038 description = "Universally unique identifier"
2039 empty_strings_allowed = False
2040
2041 def __init__(self, **kwargs: Any):
2042 kwargs["max_length"] = 32
2043 super().__init__(**kwargs)
2044
2045 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
2046 name, path, args, kwargs = super().deconstruct()
2047 del kwargs["max_length"]
2048 return name, path, args, kwargs
2049
2050 def get_internal_type(self) -> str:
2051 return "UUIDField"
2052
2053 def get_prep_value(self, value: Any) -> Any:
2054 value = super().get_prep_value(value)
2055 return self.to_python(value)
2056
2057 def get_db_prep_value(
2058 self, value: Any, connection: DatabaseConnection, prepared: bool = False
2059 ) -> uuid.UUID | None:
2060 # PostgreSQL has native UUID type
2061 if value is None:
2062 return None
2063 if not isinstance(value, uuid.UUID):
2064 value = self.to_python(value)
2065 return value
2066
2067 def to_python(self, value: Any) -> uuid.UUID | None:
2068 if value is not None and not isinstance(value, uuid.UUID):
2069 input_form = "int" if isinstance(value, int) else "hex"
2070 try:
2071 return uuid.UUID(**{input_form: value})
2072 except (AttributeError, ValueError):
2073 raise exceptions.ValidationError(
2074 self.error_messages["invalid"],
2075 code="invalid",
2076 params={"value": value},
2077 )
2078 return value
2079
2080
2081class PrimaryKeyField(BigIntegerField):
2082 db_returning = True
2083
2084 def __init__(self):
2085 super().__init__(required=False)
2086 self.primary_key = True
2087 self.auto_created = True
2088
2089 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
2090 errors = super().preflight(**kwargs)
2091 # Remove the reserved_field_name_id error for 'id' field name since PrimaryKeyField is allowed to use it
2092 errors = [e for e in errors if e.id != "fields.reserved_field_name_id"]
2093 return errors
2094
2095 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
2096 # PrimaryKeyField takes no parameters, so we return an empty kwargs dict
2097 return (
2098 self.name,
2099 "plain.models.PrimaryKeyField",
2100 cast(list[Any], []),
2101 cast(dict[str, Any], {}),
2102 )
2103
2104 def validate(self, value: Any, model_instance: Model) -> None:
2105 pass
2106
2107 def get_db_prep_value(
2108 self, value: Any, connection: DatabaseConnection, prepared: bool = False
2109 ) -> Any:
2110 if not prepared:
2111 value = self.get_prep_value(value)
2112 return value
2113
2114 def get_internal_type(self) -> str:
2115 return "PrimaryKeyField"
2116
2117 def rel_db_type(self) -> str | None:
2118 return BigIntegerField().db_type()