1from __future__ import annotations
2
3import collections.abc
4import copy
5import datetime
6import decimal
7import enum
8import operator
9import uuid
10import warnings
11from base64 import b64decode, b64encode
12from collections.abc import Callable, Sequence
13from functools import cached_property
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Self,
18 cast,
19 overload,
20)
21
22import psycopg
23from psycopg.types import numeric
24
25from plain import exceptions, validators
26from plain.postgres.constants import LOOKUP_SEP
27from plain.postgres.dialect import (
28 adapt_ipaddressfield_value,
29 quote_name,
30)
31from plain.postgres.enums import ChoicesMeta
32from plain.postgres.query_utils import RegisterLookupMixin
33from plain.preflight import PreflightResult
34from plain.utils import timezone
35from plain.utils.datastructures import DictWrapper
36from plain.utils.dateparse import (
37 parse_date,
38 parse_datetime,
39 parse_duration,
40 parse_time,
41)
42from plain.utils.duration import duration_string
43from plain.utils.functional import Promise
44from plain.utils.ipv6 import clean_ipv6_address
45from plain.utils.itercompat import is_iterable
46
47from ..registry import models_registry
48
49if TYPE_CHECKING:
50 from plain.postgres.base import Model
51 from plain.postgres.connection import DatabaseConnection
52 from plain.postgres.expressions import Col
53 from plain.postgres.fields.reverse_related import ForeignObjectRel
54 from plain.postgres.sql.compiler import SQLCompiler
55
56
57__all__ = [
58 "BLANK_CHOICE_DASH",
59 "PrimaryKeyField",
60 "BigIntegerField",
61 "BinaryField",
62 "BooleanField",
63 "DateField",
64 "DateTimeField",
65 "DecimalField",
66 "DurationField",
67 "EmailField",
68 "Empty",
69 "Field",
70 "FloatField",
71 "GenericIPAddressField",
72 "IntegerField",
73 "NOT_PROVIDED",
74 "SmallIntegerField",
75 "TextField",
76 "TimeField",
77 "URLField",
78 "UUIDField",
79]
80
81
82class Empty:
83 pass
84
85
86class NOT_PROVIDED:
87 pass
88
89
90# The values to use for "blank" in SelectFields. Will be appended to the start
91# of most "choices" lists.
92BLANK_CHOICE_DASH = [("", "---------")]
93
94
95def _load_field(
96 package_label: str, model_name: str, field_name: str
97) -> Field[Any] | ForeignObjectRel:
98 return models_registry.get_model(package_label, model_name)._model_meta.get_field(
99 field_name
100 )
101
102
103# A guide to Field parameters:
104#
105# * name: The name of the field specified in the model.
106# * attname: The attribute to use on the model object. This is the same as
107# "name", except in the case of ForeignKeys, where "_id" is
108# appended.
109# * column: The database column for this field. This is the same as
110# "attname".
111#
112# Code that introspects values, or does other dynamic things, should use
113# attname.
114
115
116def _empty(of_cls: type) -> Empty:
117 new = Empty()
118 new.__class__ = of_cls
119 return new
120
121
122def return_None() -> None:
123 return None
124
125
126class Field[T](RegisterLookupMixin):
127 """Base class for all field types"""
128
129 # SQL type for this field (e.g. "text", "integer", "boolean").
130 # String templates are interpolated against db_type_parameters().
131 # Subclasses that need dynamic logic override db_type() instead.
132 db_type_sql: str | None = None
133 # Column definition suffix (e.g. "GENERATED BY DEFAULT AS IDENTITY").
134 db_type_suffix_sql: str | None = None
135 # SQL type for Cast() expressions. Falls back to db_type() if None.
136 cast_db_type_sql: str | None = None
137
138 # Instance attributes set during field lifecycle
139 # Set by __init__
140 name: str | None
141 max_length: int | None
142 # Set by set_attributes_from_name (called by contribute_to_class)
143 attname: str
144 column: str
145 concrete: bool
146 # Set by contribute_to_class
147 model: type[Model]
148
149 # Designates whether empty strings fundamentally are allowed at the
150 # database level.
151 empty_strings_allowed = True
152 empty_values = list(validators.EMPTY_VALUES)
153
154 default_validators = [] # Default set of validators
155 default_error_messages = {
156 "invalid_choice": "Value %(value)r is not a valid choice.",
157 "allow_null": "This field cannot be null.",
158 "required": "This field is be required.",
159 "unique": "A %(model_name)s with this %(field_label)s already exists.",
160 }
161
162 # Attributes that don't affect a column definition.
163 # These attributes are ignored when altering the field.
164 non_db_attrs = (
165 "required",
166 "choices",
167 "error_messages",
168 "limit_choices_to",
169 # Database-level options are not supported, see #21961.
170 "on_delete",
171 "related_query_name",
172 "validators",
173 )
174
175 # Generic field type description, usually overridden by subclasses
176 def _description(self) -> str:
177 return f"Field of type: {self.__class__.__name__}"
178
179 description = property(_description)
180
181 def __init__(
182 self,
183 *,
184 max_length: int | None = None,
185 required: bool = True,
186 allow_null: bool = False,
187 default: Any = NOT_PROVIDED,
188 choices: Any = None,
189 validators: Sequence[Callable[..., Any]] = (),
190 error_messages: dict[str, str] | None = None,
191 ):
192 self.name = None # Set by set_attributes_from_name
193 self.max_length = max_length
194 self.required, self.allow_null = required, allow_null
195 self.default = default
196 if isinstance(choices, ChoicesMeta):
197 choices = choices.choices
198 elif isinstance(choices, enum.EnumMeta):
199 choices = [(member.value, member.name) for member in choices]
200 if isinstance(choices, collections.abc.Iterator):
201 choices = list(choices)
202 self.choices = choices
203
204 self.primary_key = False
205 self.auto_created = False
206
207 self._validators = list(validators) # Store for deconstruction later
208
209 self._error_messages = error_messages # Store for deconstruction later
210
211 def __str__(self) -> str:
212 """
213 Return "package_label.model_label.field_name" for fields attached to
214 models.
215 """
216 if not hasattr(self, "model"):
217 return super().__str__()
218 model = self.model
219 return f"{model.model_options.label}.{self.name}"
220
221 def __repr__(self) -> str:
222 """Display the module, class, and name of the field."""
223 path = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
224 name = getattr(self, "name", None)
225 if name is not None:
226 return f"<{path}: {name}>"
227 return f"<{path}>"
228
229 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
230 return [
231 *self._check_field_name(),
232 *self._check_choices(),
233 *self._check_null_allowed_for_primary_keys(),
234 *self._check_validators(),
235 ]
236
237 def _check_field_name(self) -> list[PreflightResult]:
238 """
239 Check if field name is valid, i.e. 1) does not end with an
240 underscore, 2) does not contain "__" and 3) is not "id".
241 """
242 assert self.name is not None, "Field name must be set before checking"
243 if self.name.endswith("_"):
244 return [
245 PreflightResult(
246 fix="Field names must not end with an underscore.",
247 obj=self,
248 id="fields.name_ends_with_underscore",
249 )
250 ]
251 elif LOOKUP_SEP in self.name:
252 return [
253 PreflightResult(
254 fix=f'Field names must not contain "{LOOKUP_SEP}".',
255 obj=self,
256 id="fields.name_contains_lookup_separator",
257 )
258 ]
259 elif self.name == "id":
260 return [
261 PreflightResult(
262 fix="'id' is a reserved word that cannot be used as a field name.",
263 obj=self,
264 id="fields.reserved_field_name_id",
265 )
266 ]
267 else:
268 return []
269
270 @classmethod
271 def _choices_is_value(cls, value: Any) -> bool:
272 return isinstance(value, str | Promise) or not is_iterable(value)
273
274 def _check_choices(self) -> list[PreflightResult]:
275 if not self.choices:
276 return []
277
278 if not is_iterable(self.choices) or isinstance(self.choices, str):
279 return [
280 PreflightResult(
281 fix="'choices' must be an iterable (e.g., a list or tuple).",
282 obj=self,
283 id="fields.choices_not_iterable",
284 )
285 ]
286
287 choice_max_length = 0
288 # Expect [group_name, [value, display]]
289 for choices_group in self.choices:
290 try:
291 group_name, group_choices = choices_group
292 except (TypeError, ValueError):
293 # Containing non-pairs
294 break
295 try:
296 if not all(
297 self._choices_is_value(value) and self._choices_is_value(human_name)
298 for value, human_name in group_choices
299 ):
300 break
301 if self.max_length is not None and group_choices:
302 choice_max_length = max(
303 [
304 choice_max_length,
305 *(
306 len(value)
307 for value, _ in group_choices
308 if isinstance(value, str)
309 ),
310 ]
311 )
312 except (TypeError, ValueError):
313 # No groups, choices in the form [value, display]
314 value, human_name = group_name, group_choices
315 if not self._choices_is_value(value) or not self._choices_is_value(
316 human_name
317 ):
318 break
319 if self.max_length is not None and isinstance(value, str):
320 choice_max_length = max(choice_max_length, len(value))
321
322 # Special case: choices=['ab']
323 if isinstance(choices_group, str):
324 break
325 else:
326 if self.max_length is not None and choice_max_length > self.max_length:
327 return [
328 PreflightResult(
329 fix="'max_length' is too small to fit the longest value " # noqa: UP031
330 "in 'choices' (%d characters)." % choice_max_length,
331 obj=self,
332 id="fields.max_length_too_small_for_choices",
333 ),
334 ]
335 return []
336
337 return [
338 PreflightResult(
339 fix="'choices' must be an iterable containing "
340 "(actual value, human readable name) tuples.",
341 obj=self,
342 id="fields.choices_invalid_format",
343 )
344 ]
345
346 def _check_null_allowed_for_primary_keys(self) -> list[PreflightResult]:
347 if self.primary_key and self.allow_null:
348 return [
349 PreflightResult(
350 fix="Primary keys must not have allow_null=True. "
351 "Set allow_null=False on the field, or "
352 "remove primary_key=True argument.",
353 obj=self,
354 id="fields.primary_key_allows_null",
355 )
356 ]
357 else:
358 return []
359
360 def _check_validators(self) -> list[PreflightResult]:
361 errors = []
362 for i, validator in enumerate(self.validators):
363 if not callable(validator):
364 errors.append(
365 PreflightResult(
366 fix=(
367 "All 'validators' must be callable. "
368 f"validators[{i}] ({repr(validator)}) isn't a function or "
369 "instance of a validator class."
370 ),
371 obj=self,
372 id="fields.invalid_validator",
373 )
374 )
375 return errors
376
377 def get_col(self, alias: str | None, output_field: Field | None = None) -> Col:
378 if alias == self.model.model_options.db_table and (
379 output_field is None or output_field == self
380 ):
381 return self.cached_col
382 from plain.postgres.expressions import Col
383
384 return Col(alias, self, output_field)
385
386 @cached_property
387 def cached_col(self) -> Col:
388 from plain.postgres.expressions import Col
389
390 return Col(self.model.model_options.db_table, self)
391
392 def select_format(
393 self, compiler: SQLCompiler, sql: str, params: Any
394 ) -> tuple[str, Any]:
395 """
396 Custom format for select clauses.
397 """
398 return sql, params
399
400 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
401 """
402 Return enough information to recreate the field as a 4-tuple:
403
404 * The name of the field on the model, if contribute_to_class() has
405 been run.
406 * The import path of the field, including the class, e.g.
407 plain.postgres.IntegerField. This should be the most portable
408 version, so less specific may be better.
409 * A list of positional arguments.
410 * A dict of keyword arguments.
411
412 Note that the positional or keyword arguments must contain values of
413 the following types (including inner values of collection types):
414
415 * None, bool, str, int, float, complex, set, frozenset, list, tuple,
416 dict
417 * UUID
418 * datetime.datetime (naive), datetime.date
419 * top-level classes, top-level functions - will be referenced by their
420 full import path
421 * Storage instances - these have their own deconstruct() method
422
423 This is because the values here must be serialized into a text format
424 (possibly new Python code, possibly JSON) and these are the only types
425 with encoding handlers defined.
426
427 There's no need to return the exact way the field was instantiated this
428 time, just ensure that the resulting field is the same - prefer keyword
429 arguments over positional ones, and omit parameters with their default
430 values.
431 """
432 # Short-form way of fetching all the default parameters
433 keywords = {}
434 possibles = {
435 "max_length": None,
436 "required": True,
437 "allow_null": False,
438 "default": NOT_PROVIDED,
439 "choices": None,
440 "validators": [],
441 "error_messages": None,
442 }
443 attr_overrides = {
444 "error_messages": "_error_messages",
445 "validators": "_validators",
446 }
447 equals_comparison = {"choices", "validators"}
448 for name, default in possibles.items():
449 value = getattr(self, attr_overrides.get(name, name))
450 # Unroll anything iterable for choices into a concrete list
451 if name == "choices" and isinstance(value, collections.abc.Iterable):
452 value = list(value)
453 # Do correct kind of comparison
454 if name in equals_comparison:
455 if value != default:
456 keywords[name] = value
457 else:
458 if value is not default:
459 keywords[name] = value
460 # Work out path - we shorten it for known Plain core fields
461 path = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
462 if path.startswith("plain.postgres.fields.related"):
463 path = path.replace("plain.postgres.fields.related", "plain.postgres")
464 elif path.startswith("plain.postgres.fields.json"):
465 path = path.replace("plain.postgres.fields.json", "plain.postgres")
466 elif path.startswith("plain.postgres.fields.proxy"):
467 path = path.replace("plain.postgres.fields.proxy", "plain.postgres")
468 elif path.startswith("plain.postgres.fields.timezones"):
469 path = path.replace("plain.postgres.fields.timezones", "plain.postgres")
470 elif path.startswith("plain.postgres.fields"):
471 path = path.replace("plain.postgres.fields", "plain.postgres")
472 # Return basic info - other fields should override this.
473 # Note: self.name can be None during migration state rendering when fields are cloned
474 return (self.name, path, [], keywords)
475
476 def clone(self) -> Self:
477 """
478 Uses deconstruct() to clone a new copy of this Field.
479 Will not preserve any class attachments/attribute names.
480 """
481 name, path, args, kwargs = self.deconstruct()
482 return self.__class__(*args, **kwargs)
483
484 def __deepcopy__(self, memodict: dict[int, Any]) -> Self:
485 # We don't have to deepcopy very much here, since most things are not
486 # intended to be altered after initial creation.
487 obj = copy.copy(self)
488 memodict[id(self)] = obj
489 return obj
490
491 def __copy__(self) -> Self:
492 # We need to avoid hitting __reduce__, so define this
493 # slightly weird copy construct.
494 obj = Empty()
495 obj.__class__ = self.__class__
496 obj.__dict__ = self.__dict__.copy()
497 return cast(Self, obj)
498
499 def __reduce__(
500 self,
501 ) -> (
502 tuple[Callable[..., Any], tuple[Any, ...], dict[str, Any]]
503 | tuple[Callable[..., Field[Any] | ForeignObjectRel], tuple[str, str, str]]
504 ):
505 """
506 Pickling should return the model._model_meta.fields instance of the field,
507 not a new copy of that field. So, use the app registry to load the
508 model and then the field back.
509 """
510 model = getattr(self, "model", None)
511 if model is None:
512 # Fields are sometimes used without attaching them to models (for
513 # example in aggregation). In this case give back a plain field
514 # instance. The code below will create a new empty instance of
515 # class self.__class__, then update its dict with self.__dict__
516 # values - so, this is very close to normal pickle.
517 state = self.__dict__.copy()
518 # The _get_default cached_property can't be pickled due to lambda
519 # usage.
520 state.pop("_get_default", None)
521 return _empty, (self.__class__,), state
522 assert self.name is not None
523 options = model.model_options
524 return _load_field, (
525 options.package_label,
526 options.object_name,
527 self.name,
528 )
529
530 def get_id_value_on_save(self, instance: Model) -> T | None:
531 """
532 Hook to generate new primary key values on save. This method is called when
533 saving instances with no primary key value set. If this method returns
534 something else than None, then the returned value is used when saving
535 the new instance.
536 """
537 if self.default:
538 return self.get_default()
539 return None
540
541 def to_python(self, value: Any) -> T | None:
542 """
543 Convert the input value into the expected Python data type, raising
544 plain.exceptions.ValidationError if the data can't be converted.
545 Return the converted value. Subclasses should override this.
546 """
547 return value
548
549 @cached_property
550 def error_messages(self) -> dict[str, str]:
551 messages = {}
552 for c in reversed(self.__class__.__mro__):
553 messages.update(getattr(c, "default_error_messages", {}))
554 messages.update(self._error_messages or {})
555 return messages
556
557 @cached_property
558 def validators(self) -> list[Callable[..., Any]]:
559 """
560 Some validators can't be created at field initialization time.
561 This method provides a way to delay their creation until required.
562 """
563 return [*self.default_validators, *self._validators]
564
565 def run_validators(self, value: Any) -> None:
566 if value in self.empty_values:
567 return
568
569 errors = []
570 for v in self.validators:
571 try:
572 v(value)
573 except exceptions.ValidationError as e:
574 if hasattr(e, "code") and e.code in self.error_messages:
575 e.message = self.error_messages[e.code]
576 errors.extend(e.error_list)
577
578 if errors:
579 raise exceptions.ValidationError(errors)
580
581 def validate(self, value: Any, model_instance: Model) -> None:
582 """
583 Validate value and raise ValidationError if necessary. Subclasses
584 should override this to provide validation logic.
585 """
586
587 if self.choices is not None and value not in self.empty_values:
588 for option_key, option_value in self.choices:
589 if isinstance(option_value, list | tuple):
590 # This is an optgroup, so look inside the group for
591 # options.
592 for optgroup_key, optgroup_value in option_value:
593 if value == optgroup_key:
594 return
595 elif value == option_key:
596 return
597 raise exceptions.ValidationError(
598 self.error_messages["invalid_choice"],
599 code="invalid_choice",
600 params={"value": value},
601 )
602
603 if value is None and not self.allow_null:
604 raise exceptions.ValidationError(
605 self.error_messages["allow_null"], code="allow_null"
606 )
607
608 if self.required and value in self.empty_values:
609 raise exceptions.ValidationError(
610 self.error_messages["required"], code="required"
611 )
612
613 def clean(self, value: Any, model_instance: Model) -> T | None:
614 """
615 Convert the value's type and run validation. Validation errors
616 from to_python() and validate() are propagated. Return the correct
617 value if no error is raised.
618 """
619 value = self.to_python(value)
620 self.validate(value, model_instance)
621 self.run_validators(value)
622 return value
623
624 def db_type_parameters(self) -> DictWrapper:
625 return DictWrapper(self.__dict__, quote_name, "qn_")
626
627 def db_type(self) -> str | None:
628 """Return the database column data type for this field."""
629 if self.db_type_sql is None:
630 return None
631 return self.db_type_sql % self.db_type_parameters()
632
633 def rel_db_type(self) -> str | None:
634 """
635 Return the data type that a related field pointing to this field should
636 use. For example, this method is called by ForeignKeyField to determine its data type.
637 """
638 return self.db_type()
639
640 def cast_db_type(self) -> str | None:
641 """Return the data type to use in the Cast() function."""
642 if self.cast_db_type_sql is not None:
643 return self.cast_db_type_sql % self.db_type_parameters()
644 return self.db_type()
645
646 def db_type_suffix(self) -> str | None:
647 return self.db_type_suffix_sql
648
649 def get_db_converters(
650 self, connection: DatabaseConnection
651 ) -> list[Callable[..., Any]]:
652 if from_db_value := getattr(self, "from_db_value", None):
653 return [from_db_value]
654 return []
655
656 @property
657 def db_returning(self) -> bool:
658 """
659 Private API intended only to be used by Plain itself. Currently only
660 the PostgreSQL backend supports returning multiple fields on a model.
661 """
662 return False
663
664 def set_attributes_from_name(self, name: str) -> None:
665 self.name = self.name or name
666 self.attname = self.get_attname()
667 self.column = self.attname
668 self.concrete = self.column is not None
669
670 def contribute_to_class(self, cls: type[Model], name: str) -> None:
671 """
672 Register the field with the model class it belongs to.
673
674 Field now acts as its own descriptor - it stays on the class and handles
675 __get__/__set__/__delete__ directly.
676 """
677 self.set_attributes_from_name(name)
678 self.model = cls
679 cls._model_meta.add_field(self)
680
681 # Field is now a descriptor itself - ensure it's set on the class
682 # This is important for inherited fields that get deepcopied in Meta.__get__
683 if self.column:
684 setattr(cls, self.attname, self)
685
686 # Descriptor protocol implementation
687 @overload
688 def __get__(self, instance: None, owner: type[Model]) -> Self: ...
689
690 @overload
691 def __get__(self, instance: Model, owner: type[Model]) -> T: ...
692
693 def __get__(self, instance: Model | None, owner: type[Model]) -> Self | T:
694 """
695 Descriptor __get__ for attribute access.
696
697 Class access (User.email) returns the Field descriptor itself.
698 Instance access (user.email) returns the field value from instance.__dict__,
699 with lazy loading support if the value is not yet loaded.
700 """
701 # Class access - return the Field descriptor
702 if instance is None:
703 return self
704
705 # If field hasn't been contributed to a class yet (e.g., used standalone
706 # as an output_field in aggregates), just return self
707 if not hasattr(self, "attname"):
708 return self
709
710 # Instance access - get value from instance dict
711 data = instance.__dict__
712 field_name = self.attname
713
714 # If value not in dict, lazy load from database
715 if field_name not in data:
716 # Deferred field - load it from the database
717 instance.refresh_from_db(fields=[field_name])
718
719 return cast(T, data.get(field_name))
720
721 def __set__(self, instance: Model, value: Any) -> None:
722 """
723 Descriptor __set__ for attribute assignment.
724
725 Validates and converts the value using to_python(), then stores it
726 in instance.__dict__[attname].
727 """
728 # Safety check: ensure field has been properly initialized
729 if not hasattr(self, "attname"):
730 raise AttributeError(
731 f"Field {self.__class__.__name__} has not been initialized properly. "
732 f"The field's contribute_to_class() has not been called yet. "
733 f"This usually means the field is being used before it was added to a model class."
734 )
735
736 # Convert/validate the value
737 if value is not None:
738 value = self.to_python(value)
739
740 # Store in instance dict
741 instance.__dict__[self.attname] = value
742
743 def __delete__(self, instance: Model) -> None:
744 """
745 Descriptor __delete__ for attribute deletion.
746
747 Removes the value from instance.__dict__.
748 """
749 try:
750 del instance.__dict__[self.attname]
751 except KeyError:
752 raise AttributeError(
753 f"{instance.__class__.__name__!r} object has no attribute {self.attname!r}"
754 )
755
756 def get_attname(self) -> str:
757 assert self.name is not None # Field name must be set
758 return self.name
759
760 def pre_save(self, model_instance: Model, add: bool) -> T | None:
761 """Return field's value just before saving."""
762 return getattr(model_instance, self.attname)
763
764 def get_prep_value(self, value: Any) -> Any:
765 """Perform preliminary non-db specific value checks and conversions."""
766 if isinstance(value, Promise):
767 value = value._proxy____cast()
768 return value
769
770 def get_db_prep_value(
771 self, value: Any, connection: DatabaseConnection, prepared: bool = False
772 ) -> Any:
773 """
774 Return field's value prepared for interacting with the database backend.
775
776 Used by the default implementations of get_db_prep_save().
777 """
778 if not prepared:
779 value = self.get_prep_value(value)
780 return value
781
782 def get_db_prep_save(self, value: Any, connection: DatabaseConnection) -> Any:
783 """Return field's value prepared for saving into a database."""
784 if hasattr(value, "as_sql"):
785 return value
786 return self.get_db_prep_value(value, connection=connection, prepared=False)
787
788 def has_default(self) -> bool:
789 """Return a boolean of whether this field has a default value."""
790 return self.default is not NOT_PROVIDED
791
792 def get_default(self) -> T | None:
793 """Return the default value for this field."""
794 return self._get_default()
795
796 @cached_property
797 def _get_default(self) -> Callable[[], Any]:
798 if self.has_default():
799 if callable(self.default):
800 return self.default
801 return lambda: self.default
802
803 if not self.empty_strings_allowed or self.allow_null:
804 return return_None
805 return str # return empty string
806
807 def get_limit_choices_to(self) -> Any:
808 """
809 Return ``limit_choices_to`` for this model field.
810 Overridden by related fields (ForeignKey, etc.).
811 """
812 raise NotImplementedError(
813 "get_limit_choices_to() should only be called on related fields"
814 )
815
816 def get_choices(
817 self,
818 include_blank: bool = True,
819 blank_choice: list[tuple[str, str]] = BLANK_CHOICE_DASH,
820 limit_choices_to: Any = None,
821 ordering: tuple[str, ...] = (),
822 ) -> list[tuple[Any, str]]:
823 """
824 Return choices with a default blank choices included, for use
825 as <select> choices for this field.
826 """
827 if self.choices is not None:
828 choices = list(self.choices)
829 if include_blank:
830 blank_defined = any(
831 choice in ("", None) for choice, _ in self.flatchoices
832 )
833 if not blank_defined:
834 choices = blank_choice + choices
835 return choices
836 remote_field = getattr(self, "remote_field", None)
837 if remote_field is None or getattr(remote_field, "model", None) is None:
838 return blank_choice if include_blank else []
839 rel_model = remote_field.model
840 limit_choices_to = limit_choices_to or self.get_limit_choices_to()
841 related_field_name = (
842 remote_field.get_related_field().attname
843 if hasattr(remote_field, "get_related_field")
844 else "id"
845 )
846 choice_func = operator.attrgetter(related_field_name)
847 qs = rel_model.query.complex_filter(limit_choices_to)
848 if ordering:
849 qs = qs.order_by(*ordering)
850 return (blank_choice if include_blank else []) + [
851 (choice_func(x), str(x)) for x in qs
852 ]
853
854 def value_to_string(self, obj: Model) -> str:
855 """
856 Return a string value of this field from the passed obj.
857 This is used by the serialization framework.
858 """
859 return str(self.value_from_object(obj))
860
861 def _get_flatchoices(self) -> list[tuple[Any, Any]]:
862 """Flattened version of choices tuple."""
863 if self.choices is None:
864 return []
865 flat = []
866 for choice, value in self.choices:
867 if isinstance(value, list | tuple):
868 flat.extend(value)
869 else:
870 flat.append((choice, value))
871 return flat
872
873 flatchoices = property(_get_flatchoices)
874
875 def save_form_data(self, instance: Model, data: Any) -> None:
876 assert self.name is not None
877 setattr(instance, self.name, data)
878
879 def value_from_object(self, obj: Model) -> T | None:
880 """Return the value of this field in the given model instance."""
881 return getattr(obj, self.attname)
882
883
884class BooleanField(Field[bool]):
885 db_type_sql = "boolean"
886 empty_strings_allowed = False
887 default_error_messages = {
888 "invalid": '"%(value)s" value must be either True or False.',
889 "invalid_nullable": '"%(value)s" value must be either True, False, or None.',
890 }
891 description = "Boolean (Either True or False)"
892
893 def to_python(self, value: Any) -> bool | None:
894 if self.allow_null and value in self.empty_values:
895 return None
896 if value in (True, False):
897 # 1/0 are equal to True/False. bool() converts former to latter.
898 return bool(value)
899 if value in ("t", "True", "1"):
900 return True
901 if value in ("f", "False", "0"):
902 return False
903 raise exceptions.ValidationError(
904 self.error_messages["invalid_nullable" if self.allow_null else "invalid"],
905 code="invalid",
906 params={"value": value},
907 )
908
909 def get_prep_value(self, value: Any) -> Any:
910 value = super().get_prep_value(value)
911 if value is None:
912 return None
913 return self.to_python(value)
914
915
916class TextField(Field[str]):
917 db_type_sql = "text"
918
919 def __init__(self, **kwargs: Any):
920 super().__init__(**kwargs)
921 if self.max_length is not None:
922 self.validators.append(validators.MaxLengthValidator(self.max_length))
923
924 @property
925 def description(self) -> str:
926 if self.max_length is not None:
927 return "String (up to %(max_length)s)"
928 else:
929 return "String (unlimited)"
930
931 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
932 return [
933 *super().preflight(**kwargs),
934 *self._check_max_length_attribute(),
935 ]
936
937 def _check_max_length_attribute(self, **kwargs: Any) -> list[PreflightResult]:
938 if self.max_length is None:
939 return []
940 elif (
941 not isinstance(self.max_length, int)
942 or isinstance(self.max_length, bool)
943 or self.max_length <= 0
944 ):
945 return [
946 PreflightResult(
947 fix="'max_length' must be a positive integer.",
948 obj=self,
949 id="fields.textfield_invalid_max_length",
950 )
951 ]
952 else:
953 return []
954
955 def to_python(self, value: Any) -> str | None:
956 if isinstance(value, str) or value is None:
957 return value
958 return str(value)
959
960 def get_prep_value(self, value: Any) -> Any:
961 value = super().get_prep_value(value)
962 return self.to_python(value)
963
964
965def _to_naive(value: datetime.datetime) -> datetime.datetime:
966 if timezone.is_aware(value):
967 value = timezone.make_naive(value, datetime.UTC)
968 return value
969
970
971def _get_naive_now() -> datetime.datetime:
972 return _to_naive(timezone.now())
973
974
975class DateTimeCheckMixin(Field):
976 auto_now: bool
977 auto_now_add: bool
978
979 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
980 return [
981 *super().preflight(**kwargs),
982 *self._check_mutually_exclusive_options(),
983 *self._check_fix_default_value(),
984 ]
985
986 def _check_mutually_exclusive_options(self) -> list[PreflightResult]:
987 # auto_now, auto_now_add, and default are mutually exclusive
988 # options. The use of more than one of these options together
989 # will trigger an Error
990 mutually_exclusive_options = [
991 self.auto_now_add,
992 self.auto_now,
993 self.has_default(),
994 ]
995 enabled_options = [
996 option not in (None, False) for option in mutually_exclusive_options
997 ].count(True)
998 if enabled_options > 1:
999 return [
1000 PreflightResult(
1001 fix="The options auto_now, auto_now_add, and default "
1002 "are mutually exclusive. Only one of these options "
1003 "may be present.",
1004 obj=self,
1005 id="fields.datetime_auto_options_mutually_exclusive",
1006 )
1007 ]
1008 else:
1009 return []
1010
1011 def _check_fix_default_value(self) -> list[PreflightResult]:
1012 return []
1013
1014 # Concrete subclasses use this in their implementations of
1015 # _check_fix_default_value().
1016 def _check_if_value_fixed(
1017 self,
1018 value: datetime.date | datetime.datetime,
1019 now: datetime.datetime | None = None,
1020 ) -> list[PreflightResult]:
1021 """
1022 Check if the given value appears to have been provided as a "fixed"
1023 time value, and include a warning in the returned list if it does. The
1024 value argument must be a date object or aware/naive datetime object. If
1025 now is provided, it must be a naive datetime object.
1026 """
1027 if now is None:
1028 now = _get_naive_now()
1029 offset = datetime.timedelta(seconds=10)
1030 lower = now - offset
1031 upper = now + offset
1032 if isinstance(value, datetime.datetime):
1033 value = _to_naive(value)
1034 else:
1035 assert isinstance(value, datetime.date)
1036 lower = lower.date()
1037 upper = upper.date()
1038 if lower <= value <= upper:
1039 return [
1040 PreflightResult(
1041 fix="Fixed default value provided. "
1042 "It seems you set a fixed date / time / datetime "
1043 "value as default for this field. This may not be "
1044 "what you want. If you want to have the current date "
1045 "as default, use `plain.utils.timezone.now`",
1046 obj=self,
1047 id="fields.datetime_naive_default_value",
1048 warning=True,
1049 )
1050 ]
1051 return []
1052
1053
1054class DateField(DateTimeCheckMixin, Field[datetime.date]):
1055 db_type_sql = "date"
1056 empty_strings_allowed = False
1057 default_error_messages = {
1058 "invalid": '"%(value)s" value has an invalid date format. It must be in YYYY-MM-DD format.',
1059 "invalid_date": '"%(value)s" value has the correct format (YYYY-MM-DD) but it is an invalid date.',
1060 }
1061 description = "Date (without time)"
1062
1063 def __init__(
1064 self, *, auto_now: bool = False, auto_now_add: bool = False, **kwargs: Any
1065 ):
1066 self.auto_now, self.auto_now_add = auto_now, auto_now_add
1067 if auto_now or auto_now_add:
1068 kwargs["required"] = False
1069 super().__init__(**kwargs)
1070
1071 def _check_fix_default_value(self) -> list[PreflightResult]:
1072 """
1073 Warn that using an actual date or datetime value is probably wrong;
1074 it's only evaluated on server startup.
1075 """
1076 if not self.has_default():
1077 return []
1078
1079 value = self.default
1080 if isinstance(value, datetime.datetime):
1081 value = _to_naive(value).date()
1082 elif isinstance(value, datetime.date):
1083 pass
1084 else:
1085 # No explicit date / datetime value -- no checks necessary
1086 return []
1087 # At this point, value is a date object.
1088 return self._check_if_value_fixed(value)
1089
1090 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1091 name, path, args, kwargs = super().deconstruct()
1092 if self.auto_now:
1093 kwargs["auto_now"] = True
1094 if self.auto_now_add:
1095 kwargs["auto_now_add"] = True
1096 if self.auto_now or self.auto_now_add:
1097 del kwargs["required"]
1098 return name, path, args, kwargs
1099
1100 def to_python(self, value: Any) -> datetime.date | None:
1101 if value is None:
1102 return value
1103 if isinstance(value, datetime.datetime):
1104 if timezone.is_aware(value):
1105 # Convert aware datetimes to the default time zone
1106 # before casting them to dates (#17742).
1107 default_timezone = timezone.get_default_timezone()
1108 value = timezone.make_naive(value, default_timezone)
1109 return value.date()
1110 if isinstance(value, datetime.date):
1111 return value
1112
1113 try:
1114 parsed = parse_date(value)
1115 if parsed is not None:
1116 return parsed
1117 except ValueError:
1118 raise exceptions.ValidationError(
1119 self.error_messages["invalid_date"],
1120 code="invalid_date",
1121 params={"value": value},
1122 )
1123
1124 raise exceptions.ValidationError(
1125 self.error_messages["invalid"],
1126 code="invalid",
1127 params={"value": value},
1128 )
1129
1130 def pre_save(self, model_instance: Model, add: bool) -> datetime.date | None:
1131 if self.auto_now or (self.auto_now_add and add):
1132 value = datetime.date.today()
1133 setattr(model_instance, self.attname, value)
1134 return value
1135 else:
1136 return super().pre_save(model_instance, add)
1137
1138 def get_prep_value(self, value: Any) -> Any:
1139 value = super().get_prep_value(value)
1140 return self.to_python(value)
1141
1142 def get_db_prep_value(
1143 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1144 ) -> Any:
1145 if not prepared:
1146 value = self.get_prep_value(value)
1147 return value
1148
1149 def value_to_string(self, obj: Model) -> str:
1150 val = self.value_from_object(obj)
1151 return "" if val is None else val.isoformat()
1152
1153
1154class DateTimeField(DateField):
1155 db_type_sql = "timestamp with time zone"
1156 empty_strings_allowed = False
1157 default_error_messages = {
1158 "invalid": '"%(value)s" value has an invalid format. It must be in YYYY-MM-DD HH:MM[:ss[.uuuuuu]][TZ] format.',
1159 "invalid_date": '"%(value)s" value has the correct format (YYYY-MM-DD) but it is an invalid date.',
1160 "invalid_datetime": '"%(value)s" value has the correct format (YYYY-MM-DD HH:MM[:ss[.uuuuuu]][TZ]) but it is an invalid date/time.',
1161 }
1162 description = "Date (with time)"
1163
1164 # __init__ is inherited from DateField
1165
1166 def _check_fix_default_value(self) -> list[PreflightResult]:
1167 """
1168 Warn that using an actual date or datetime value is probably wrong;
1169 it's only evaluated on server startup.
1170 """
1171 if not self.has_default():
1172 return []
1173
1174 value = self.default
1175 if isinstance(value, datetime.datetime | datetime.date):
1176 return self._check_if_value_fixed(value)
1177 # No explicit date / datetime value -- no checks necessary.
1178 return []
1179
1180 def to_python(self, value: Any) -> datetime.datetime | None:
1181 if value is None:
1182 return value
1183 if isinstance(value, datetime.datetime):
1184 return value
1185 if isinstance(value, datetime.date):
1186 value = datetime.datetime(value.year, value.month, value.day)
1187
1188 # For backwards compatibility, interpret naive datetimes in
1189 # local time. This won't work during DST change, but we can't
1190 # do much about it, so we let the exceptions percolate up the
1191 # call stack.
1192 warnings.warn(
1193 f"DateTimeField {self.model.__name__}.{self.name} received a naive datetime "
1194 f"({value}) while time zone support is active.",
1195 RuntimeWarning,
1196 )
1197 default_timezone = timezone.get_default_timezone()
1198 value = timezone.make_aware(value, default_timezone)
1199
1200 return value
1201
1202 try:
1203 parsed = parse_datetime(value)
1204 if parsed is not None:
1205 return parsed
1206 except ValueError:
1207 raise exceptions.ValidationError(
1208 self.error_messages["invalid_datetime"],
1209 code="invalid_datetime",
1210 params={"value": value},
1211 )
1212
1213 try:
1214 parsed = parse_date(value)
1215 if parsed is not None:
1216 return datetime.datetime(parsed.year, parsed.month, parsed.day)
1217 except ValueError:
1218 raise exceptions.ValidationError(
1219 self.error_messages["invalid_date"],
1220 code="invalid_date",
1221 params={"value": value},
1222 )
1223
1224 raise exceptions.ValidationError(
1225 self.error_messages["invalid"],
1226 code="invalid",
1227 params={"value": value},
1228 )
1229
1230 def pre_save(self, model_instance: Model, add: bool) -> datetime.datetime | None:
1231 if self.auto_now or (self.auto_now_add and add):
1232 value = timezone.now()
1233 setattr(model_instance, self.attname, value)
1234 return value
1235 else:
1236 return getattr(model_instance, self.attname)
1237
1238 def get_prep_value(self, value: Any) -> Any:
1239 value = super().get_prep_value(value)
1240 value = self.to_python(value)
1241 if value is not None and timezone.is_naive(value):
1242 # For backwards compatibility, interpret naive datetimes in local
1243 # time. This won't work during DST change, but we can't do much
1244 # about it, so we let the exceptions percolate up the call stack.
1245 try:
1246 name = f"{self.model.__name__}.{self.name}"
1247 except AttributeError:
1248 name = "(unbound)"
1249 warnings.warn(
1250 f"DateTimeField {name} received a naive datetime ({value})"
1251 " while time zone support is active.",
1252 RuntimeWarning,
1253 )
1254 default_timezone = timezone.get_default_timezone()
1255 value = timezone.make_aware(value, default_timezone)
1256 return value
1257
1258 def get_db_prep_value(
1259 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1260 ) -> Any:
1261 if not prepared:
1262 value = self.get_prep_value(value)
1263 return value
1264
1265 def value_to_string(self, obj: Model) -> str:
1266 val = self.value_from_object(obj)
1267 return "" if val is None else val.isoformat()
1268
1269
1270class DecimalField(Field[decimal.Decimal]):
1271 db_type_sql = "numeric(%(max_digits)s,%(decimal_places)s)"
1272 empty_strings_allowed = False
1273 default_error_messages = {
1274 "invalid": '"%(value)s" value must be a decimal number.',
1275 }
1276 description = "Decimal number"
1277
1278 def __init__(
1279 self,
1280 *,
1281 max_digits: int | None = None,
1282 decimal_places: int | None = None,
1283 **kwargs: Any,
1284 ):
1285 self.max_digits, self.decimal_places = max_digits, decimal_places
1286 super().__init__(**kwargs)
1287
1288 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1289 errors = super().preflight(**kwargs)
1290
1291 digits_errors = [
1292 *self._check_decimal_places(),
1293 *self._check_max_digits(),
1294 ]
1295 if not digits_errors:
1296 errors.extend(self._check_decimal_places_and_max_digits())
1297 else:
1298 errors.extend(digits_errors)
1299 return errors
1300
1301 def _check_decimal_places(self) -> list[PreflightResult]:
1302 if self.decimal_places is None:
1303 return [
1304 PreflightResult(
1305 fix="DecimalFields must define a 'decimal_places' attribute.",
1306 obj=self,
1307 id="fields.decimalfield_missing_decimal_places",
1308 )
1309 ]
1310 try:
1311 decimal_places = int(self.decimal_places)
1312 if decimal_places < 0:
1313 raise ValueError()
1314 except ValueError:
1315 return [
1316 PreflightResult(
1317 fix="'decimal_places' must be a non-negative integer.",
1318 obj=self,
1319 id="fields.decimalfield_invalid_decimal_places",
1320 )
1321 ]
1322 else:
1323 return []
1324
1325 def _check_max_digits(self) -> list[PreflightResult]:
1326 if self.max_digits is None:
1327 return [
1328 PreflightResult(
1329 fix="DecimalFields must define a 'max_digits' attribute.",
1330 obj=self,
1331 id="fields.decimalfield_missing_max_digits",
1332 )
1333 ]
1334 try:
1335 max_digits = int(self.max_digits)
1336 if max_digits <= 0:
1337 raise ValueError()
1338 except ValueError:
1339 return [
1340 PreflightResult(
1341 fix="'max_digits' must be a positive integer.",
1342 obj=self,
1343 id="fields.decimalfield_invalid_max_digits",
1344 )
1345 ]
1346 else:
1347 return []
1348
1349 def _check_decimal_places_and_max_digits(self) -> list[PreflightResult]:
1350 if self.decimal_places is None or self.max_digits is None:
1351 return []
1352 if self.decimal_places > self.max_digits:
1353 return [
1354 PreflightResult(
1355 fix="'max_digits' must be greater or equal to 'decimal_places'.",
1356 obj=self,
1357 id="fields.decimalfield_decimal_places_exceeds_max_digits",
1358 )
1359 ]
1360 return []
1361
1362 @cached_property
1363 def validators(self) -> list[Callable[..., Any]]:
1364 return super().validators + [
1365 validators.DecimalValidator(self.max_digits, self.decimal_places)
1366 ]
1367
1368 @cached_property
1369 def context(self) -> decimal.Context:
1370 return decimal.Context(prec=self.max_digits)
1371
1372 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1373 name, path, args, kwargs = super().deconstruct()
1374 if self.max_digits is not None:
1375 kwargs["max_digits"] = self.max_digits
1376 if self.decimal_places is not None:
1377 kwargs["decimal_places"] = self.decimal_places
1378 return name, path, args, kwargs
1379
1380 def to_python(self, value: Any) -> decimal.Decimal | None:
1381 if value is None:
1382 return value
1383 try:
1384 if isinstance(value, float):
1385 decimal_value = self.context.create_decimal_from_float(value)
1386 else:
1387 decimal_value = decimal.Decimal(value)
1388 except (decimal.InvalidOperation, TypeError, ValueError):
1389 raise exceptions.ValidationError(
1390 self.error_messages["invalid"],
1391 code="invalid",
1392 params={"value": value},
1393 )
1394 if not decimal_value.is_finite():
1395 raise exceptions.ValidationError(
1396 self.error_messages["invalid"],
1397 code="invalid",
1398 params={"value": value},
1399 )
1400 return decimal_value
1401
1402 def get_db_prep_value(
1403 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1404 ) -> Any:
1405 if not prepared:
1406 value = self.get_prep_value(value)
1407 return value
1408
1409 def get_prep_value(self, value: Any) -> Any:
1410 value = super().get_prep_value(value)
1411 return self.to_python(value)
1412
1413
1414class DurationField(Field[datetime.timedelta]):
1415 """Store timedelta objects using PostgreSQL's interval type."""
1416
1417 db_type_sql = "interval"
1418 empty_strings_allowed = False
1419 default_error_messages = {
1420 "invalid": '"%(value)s" value has an invalid format. It must be in [DD] [[HH:]MM:]ss[.uuuuuu] format.',
1421 }
1422 description = "Duration"
1423
1424 def to_python(self, value: Any) -> datetime.timedelta | None:
1425 if value is None:
1426 return value
1427 if isinstance(value, datetime.timedelta):
1428 return value
1429 try:
1430 parsed = parse_duration(value)
1431 except ValueError:
1432 pass
1433 else:
1434 if parsed is not None:
1435 return parsed
1436
1437 raise exceptions.ValidationError(
1438 self.error_messages["invalid"],
1439 code="invalid",
1440 params={"value": value},
1441 )
1442
1443 def get_db_prep_value(
1444 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1445 ) -> Any:
1446 # PostgreSQL has native interval (duration) type
1447 return value
1448
1449 def get_db_converters(
1450 self, connection: DatabaseConnection
1451 ) -> list[Callable[..., Any]]:
1452 # PostgreSQL has native duration field, no converters needed
1453 return super().get_db_converters(connection)
1454
1455 def value_to_string(self, obj: Model) -> str:
1456 val = self.value_from_object(obj)
1457 return "" if val is None else duration_string(val)
1458
1459
1460class EmailField(TextField):
1461 default_validators = [validators.validate_email]
1462 description = "Email address"
1463
1464
1465class FloatField(Field[float]):
1466 db_type_sql = "double precision"
1467 empty_strings_allowed = False
1468 default_error_messages = {
1469 "invalid": '"%(value)s" value must be a float.',
1470 }
1471 description = "Floating point number"
1472
1473 def get_prep_value(self, value: Any) -> Any:
1474 value = super().get_prep_value(value)
1475 if value is None:
1476 return None
1477 try:
1478 return float(value)
1479 except (TypeError, ValueError) as e:
1480 raise e.__class__(
1481 f"Field '{self.name}' expected a number but got {value!r}.",
1482 ) from e
1483
1484 def to_python(self, value: Any) -> float | None:
1485 if value is None:
1486 return value
1487 try:
1488 return float(value)
1489 except (TypeError, ValueError):
1490 raise exceptions.ValidationError(
1491 self.error_messages["invalid"],
1492 code="invalid",
1493 params={"value": value},
1494 )
1495
1496
1497class IntegerField(Field[int]):
1498 db_type_sql = "integer"
1499 integer_range: tuple[int, int] = (-2147483648, 2147483647)
1500 psycopg_type: type = numeric.Int4
1501 empty_strings_allowed = False
1502 default_error_messages = {
1503 "invalid": '"%(value)s" value must be an integer.',
1504 }
1505 description = "Integer"
1506
1507 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1508 return [
1509 *super().preflight(**kwargs),
1510 *self._check_max_length_warning(),
1511 ]
1512
1513 def _check_max_length_warning(self) -> list[PreflightResult]:
1514 if self.max_length is not None:
1515 return [
1516 PreflightResult(
1517 fix=f"'max_length' is ignored when used with {self.__class__.__name__}. Remove 'max_length' from field.",
1518 obj=self,
1519 id="fields.max_length_ignored",
1520 warning=True,
1521 )
1522 ]
1523 return []
1524
1525 @cached_property
1526 def validators(self) -> list[Callable[..., Any]]:
1527 # These validators can't be added at field initialization time since
1528 # they're based on values retrieved from the database connection.
1529 validators_ = super().validators
1530 min_value, max_value = self.integer_range
1531 if min_value is not None and not any(
1532 (
1533 isinstance(validator, validators.MinValueValidator)
1534 and (
1535 validator.limit_value()
1536 if callable(validator.limit_value)
1537 else validator.limit_value
1538 )
1539 >= min_value
1540 )
1541 for validator in validators_
1542 ):
1543 validators_.append(validators.MinValueValidator(min_value))
1544 if max_value is not None and not any(
1545 (
1546 isinstance(validator, validators.MaxValueValidator)
1547 and (
1548 validator.limit_value()
1549 if callable(validator.limit_value)
1550 else validator.limit_value
1551 )
1552 <= max_value
1553 )
1554 for validator in validators_
1555 ):
1556 validators_.append(validators.MaxValueValidator(max_value))
1557 return validators_
1558
1559 def get_prep_value(self, value: Any) -> Any:
1560 value = super().get_prep_value(value)
1561 if value is None:
1562 return None
1563 try:
1564 return int(value)
1565 except (TypeError, ValueError) as e:
1566 raise e.__class__(
1567 f"Field '{self.name}' expected a number but got {value!r}.",
1568 ) from e
1569
1570 def get_db_prep_value(
1571 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1572 ) -> Any:
1573 from plain.postgres.expressions import ResolvableExpression
1574
1575 value = super().get_db_prep_value(value, connection, prepared)
1576 if value is None or isinstance(value, ResolvableExpression):
1577 return value
1578 return self.psycopg_type(value)
1579
1580 def to_python(self, value: Any) -> int | None:
1581 if value is None:
1582 return value
1583 try:
1584 return int(value)
1585 except (TypeError, ValueError):
1586 raise exceptions.ValidationError(
1587 self.error_messages["invalid"],
1588 code="invalid",
1589 params={"value": value},
1590 )
1591
1592
1593class BigIntegerField(IntegerField):
1594 db_type_sql = "bigint"
1595 integer_range = (-9223372036854775808, 9223372036854775807)
1596 psycopg_type = numeric.Int8
1597 description = "Big (8 byte) integer"
1598
1599
1600class SmallIntegerField(IntegerField):
1601 db_type_sql = "smallint"
1602 integer_range = (-32768, 32767)
1603 psycopg_type = numeric.Int2
1604 description = "Small integer"
1605
1606
1607class GenericIPAddressField(Field[str]):
1608 db_type_sql = "inet"
1609 empty_strings_allowed = False
1610 description = "IP address"
1611 default_error_messages = {}
1612
1613 def __init__(
1614 self,
1615 *,
1616 protocol: str = "both",
1617 unpack_ipv4: bool = False,
1618 **kwargs: Any,
1619 ):
1620 self.unpack_ipv4 = unpack_ipv4
1621 self.protocol = protocol
1622 (
1623 self.default_validators,
1624 invalid_error_message,
1625 ) = validators.ip_address_validators(protocol, unpack_ipv4)
1626 self.default_error_messages["invalid"] = invalid_error_message
1627 kwargs["max_length"] = 39
1628 super().__init__(**kwargs)
1629
1630 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1631 return [
1632 *super().preflight(**kwargs),
1633 *self._check_required_and_null_values(),
1634 ]
1635
1636 def _check_required_and_null_values(self) -> list[PreflightResult]:
1637 if not getattr(self, "allow_null", False) and not getattr(
1638 self, "required", True
1639 ):
1640 return [
1641 PreflightResult(
1642 fix="GenericIPAddressFields cannot have required=False if allow_null=False, "
1643 "as blank values are stored as nulls.",
1644 obj=self,
1645 id="fields.generic_ip_field_null_blank_config",
1646 )
1647 ]
1648 return []
1649
1650 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1651 name, path, args, kwargs = super().deconstruct()
1652 if self.unpack_ipv4 is not False:
1653 kwargs["unpack_ipv4"] = self.unpack_ipv4
1654 if self.protocol != "both":
1655 kwargs["protocol"] = self.protocol
1656 if kwargs.get("max_length") == 39:
1657 del kwargs["max_length"]
1658 return name, path, args, kwargs
1659
1660 def to_python(self, value: Any) -> str | None:
1661 if value is None:
1662 return None
1663 if not isinstance(value, str):
1664 value = str(value)
1665 value = value.strip()
1666 if ":" in value:
1667 return clean_ipv6_address(
1668 value, self.unpack_ipv4, self.error_messages["invalid"]
1669 )
1670 return value
1671
1672 def get_db_prep_value(
1673 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1674 ) -> Any:
1675 if not prepared:
1676 value = self.get_prep_value(value)
1677 return adapt_ipaddressfield_value(value)
1678
1679 def get_prep_value(self, value: Any) -> Any:
1680 value = super().get_prep_value(value)
1681 if value is None:
1682 return None
1683 if value and ":" in value:
1684 try:
1685 return clean_ipv6_address(value, self.unpack_ipv4)
1686 except exceptions.ValidationError:
1687 pass
1688 return str(value)
1689
1690
1691class TimeField(DateTimeCheckMixin, Field[datetime.time]):
1692 db_type_sql = "time without time zone"
1693 empty_strings_allowed = False
1694 default_error_messages = {
1695 "invalid": '"%(value)s" value has an invalid format. It must be in HH:MM[:ss[.uuuuuu]] format.',
1696 "invalid_time": '"%(value)s" value has the correct format (HH:MM[:ss[.uuuuuu]]) but it is an invalid time.',
1697 }
1698 description = "Time"
1699
1700 def __init__(
1701 self, *, auto_now: bool = False, auto_now_add: bool = False, **kwargs: Any
1702 ):
1703 self.auto_now, self.auto_now_add = auto_now, auto_now_add
1704 if auto_now or auto_now_add:
1705 kwargs["required"] = False
1706 super().__init__(**kwargs)
1707
1708 def _check_fix_default_value(self) -> list[PreflightResult]:
1709 """
1710 Warn that using an actual date or datetime value is probably wrong;
1711 it's only evaluated on server startup.
1712 """
1713 if not self.has_default():
1714 return []
1715
1716 value = self.default
1717 if isinstance(value, datetime.datetime):
1718 now = None
1719 elif isinstance(value, datetime.time):
1720 now = _get_naive_now()
1721 # This will not use the right date in the race condition where now
1722 # is just before the date change and value is just past 0:00.
1723 value = datetime.datetime.combine(now.date(), value)
1724 else:
1725 # No explicit time / datetime value -- no checks necessary
1726 return []
1727 # At this point, value is a datetime object.
1728 return self._check_if_value_fixed(value, now=now)
1729
1730 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1731 name, path, args, kwargs = super().deconstruct()
1732 if self.auto_now is not False:
1733 kwargs["auto_now"] = self.auto_now
1734 if self.auto_now_add is not False:
1735 kwargs["auto_now_add"] = self.auto_now_add
1736 if self.auto_now or self.auto_now_add:
1737 del kwargs["required"]
1738 return name, path, args, kwargs
1739
1740 def to_python(self, value: Any) -> datetime.time | None:
1741 if value is None:
1742 return None
1743 if isinstance(value, datetime.time):
1744 return value
1745 if isinstance(value, datetime.datetime):
1746 # Not usually a good idea to pass in a datetime here (it loses
1747 # information), but we'll be accommodating.
1748 return value.time()
1749
1750 try:
1751 parsed = parse_time(value)
1752 if parsed is not None:
1753 return parsed
1754 except ValueError:
1755 raise exceptions.ValidationError(
1756 self.error_messages["invalid_time"],
1757 code="invalid_time",
1758 params={"value": value},
1759 )
1760
1761 raise exceptions.ValidationError(
1762 self.error_messages["invalid"],
1763 code="invalid",
1764 params={"value": value},
1765 )
1766
1767 def pre_save(self, model_instance: Model, add: bool) -> datetime.time | None:
1768 if self.auto_now or (self.auto_now_add and add):
1769 value = datetime.datetime.now().time()
1770 setattr(model_instance, self.attname, value)
1771 return value
1772 else:
1773 return super().pre_save(model_instance, add)
1774
1775 def get_prep_value(self, value: Any) -> Any:
1776 value = super().get_prep_value(value)
1777 return self.to_python(value)
1778
1779 def get_db_prep_value(
1780 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1781 ) -> Any:
1782 if not prepared:
1783 value = self.get_prep_value(value)
1784 return value
1785
1786 def value_to_string(self, obj: Model) -> str:
1787 val = self.value_from_object(obj)
1788 return "" if val is None else val.isoformat()
1789
1790
1791class URLField(TextField):
1792 default_validators = [validators.URLValidator()]
1793 description = "URL"
1794
1795
1796class BinaryField(Field[bytes | memoryview]):
1797 db_type_sql = "bytea"
1798 description = "Raw binary data"
1799 empty_values = [None, b""]
1800
1801 def __init__(self, **kwargs: Any):
1802 super().__init__(**kwargs)
1803 if self.max_length is not None:
1804 self.validators.append(validators.MaxLengthValidator(self.max_length))
1805
1806 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1807 return [*super().preflight(**kwargs), *self._check_str_default_value()]
1808
1809 def _check_str_default_value(self) -> list[PreflightResult]:
1810 if self.has_default() and isinstance(self.default, str):
1811 return [
1812 PreflightResult(
1813 fix="BinaryField's default cannot be a string. Use bytes "
1814 "content instead.",
1815 obj=self,
1816 id="fields.filefield_upload_to_not_callable",
1817 )
1818 ]
1819 return []
1820
1821 def get_placeholder(
1822 self, value: Any, compiler: SQLCompiler, connection: DatabaseConnection
1823 ) -> Any:
1824 return "%s"
1825
1826 def get_default(self) -> bytes | memoryview | None:
1827 if self.has_default() and not callable(self.default):
1828 return self.default
1829 default = super().get_default()
1830 if default == "":
1831 return b""
1832 return default
1833
1834 def get_db_prep_value(
1835 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1836 ) -> Any:
1837 value = super().get_db_prep_value(value, connection, prepared)
1838 if value is not None:
1839 return psycopg.Binary(value)
1840 return value
1841
1842 def value_to_string(self, obj: Model) -> str:
1843 """Binary data is serialized as base64"""
1844 val = self.value_from_object(obj)
1845 if val is None:
1846 return ""
1847 return b64encode(val).decode("ascii")
1848
1849 def to_python(self, value: Any) -> bytes | memoryview | None:
1850 # If it's a string, it should be base64-encoded data
1851 if isinstance(value, str):
1852 return memoryview(b64decode(value.encode("ascii")))
1853 return value
1854
1855
1856class UUIDField(Field[uuid.UUID]):
1857 db_type_sql = "uuid"
1858 default_error_messages = {
1859 "invalid": '"%(value)s" is not a valid UUID.',
1860 }
1861 description = "Universally unique identifier"
1862 empty_strings_allowed = False
1863
1864 def __init__(self, **kwargs: Any):
1865 kwargs["max_length"] = 32
1866 super().__init__(**kwargs)
1867
1868 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1869 name, path, args, kwargs = super().deconstruct()
1870 del kwargs["max_length"]
1871 return name, path, args, kwargs
1872
1873 def get_prep_value(self, value: Any) -> Any:
1874 value = super().get_prep_value(value)
1875 return self.to_python(value)
1876
1877 def get_db_prep_value(
1878 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1879 ) -> uuid.UUID | None:
1880 # PostgreSQL has native UUID type
1881 if value is None:
1882 return None
1883 if not isinstance(value, uuid.UUID):
1884 value = self.to_python(value)
1885 return value
1886
1887 def to_python(self, value: Any) -> uuid.UUID | None:
1888 if value is not None and not isinstance(value, uuid.UUID):
1889 input_form = "int" if isinstance(value, int) else "hex"
1890 try:
1891 return uuid.UUID(**{input_form: value})
1892 except (AttributeError, ValueError):
1893 raise exceptions.ValidationError(
1894 self.error_messages["invalid"],
1895 code="invalid",
1896 params={"value": value},
1897 )
1898 return value
1899
1900
1901class PrimaryKeyField(BigIntegerField):
1902 db_type_suffix_sql = "GENERATED BY DEFAULT AS IDENTITY"
1903 cast_db_type_sql = "bigint"
1904 db_returning = True
1905
1906 def __init__(self):
1907 super().__init__(required=False)
1908 self.primary_key = True
1909 self.auto_created = True
1910
1911 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
1912 errors = super().preflight(**kwargs)
1913 # Remove the reserved_field_name_id error for 'id' field name since PrimaryKeyField is allowed to use it
1914 errors = [e for e in errors if e.id != "fields.reserved_field_name_id"]
1915 return errors
1916
1917 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1918 # PrimaryKeyField takes no parameters, so we return an empty kwargs dict
1919 return (
1920 self.name,
1921 "plain.postgres.PrimaryKeyField",
1922 cast(list[Any], []),
1923 cast(dict[str, Any], {}),
1924 )
1925
1926 def validate(self, value: Any, model_instance: Model) -> None:
1927 pass
1928
1929 def get_db_prep_value(
1930 self, value: Any, connection: DatabaseConnection, prepared: bool = False
1931 ) -> Any:
1932 if not prepared:
1933 value = self.get_prep_value(value)
1934 return value
1935
1936 def rel_db_type(self) -> str | None:
1937 return BigIntegerField().db_type()