1from __future__ import annotations
  2
  3from collections.abc import Callable
  4from typing import TYPE_CHECKING, Any, TypeVar, cast
  5
  6import psycopg
  7
  8if TYPE_CHECKING:
  9    from plain.postgres.connection import DatabaseConnection
 10
 11F = TypeVar("F", bound=Callable[..., Any])
 12
 13# MARK: Database Query Exceptions
 14
 15
 16class EmptyResultSet(Exception):
 17    """A database query predicate is impossible."""
 18
 19    pass
 20
 21
 22class FullResultSet(Exception):
 23    """A database query predicate is matches everything."""
 24
 25    pass
 26
 27
 28# MARK: Model and Field Errors
 29
 30
 31class FieldDoesNotExist(Exception):
 32    """The requested model field does not exist"""
 33
 34    pass
 35
 36
 37class FieldError(Exception):
 38    """Some kind of problem with a model field."""
 39
 40    pass
 41
 42
 43class ObjectDoesNotExist(Exception):
 44    """The requested object does not exist"""
 45
 46    pass
 47
 48
 49class MultipleObjectsReturned(Exception):
 50    """The query returned multiple objects when only one was expected."""
 51
 52    pass
 53
 54
 55# MARK: Model Exception Descriptors
 56
 57
 58class DoesNotExistDescriptor:
 59    """Descriptor that creates a unique DoesNotExist exception class per model."""
 60
 61    def __init__(self) -> None:
 62        self._exceptions_by_class: dict[type, type[ObjectDoesNotExist]] = {}
 63
 64    def __get__(self, instance: Any, owner: type | None) -> type[ObjectDoesNotExist]:
 65        if owner is None:
 66            return ObjectDoesNotExist  # Return base class as fallback
 67
 68        # Create a unique exception class for this model if we haven't already
 69        if owner not in self._exceptions_by_class:
 70            # type() returns a subclass of ObjectDoesNotExist
 71            exc_class: type[ObjectDoesNotExist] = cast(
 72                type[ObjectDoesNotExist],
 73                type(
 74                    "DoesNotExist",
 75                    (ObjectDoesNotExist,),
 76                    {
 77                        "__module__": owner.__module__,
 78                        "__qualname__": f"{owner.__qualname__}.DoesNotExist",
 79                    },
 80                ),
 81            )
 82            self._exceptions_by_class[owner] = exc_class
 83
 84        return self._exceptions_by_class[owner]
 85
 86    def __set__(self, instance: Any, value: Any) -> None:
 87        raise AttributeError("Cannot set DoesNotExist")
 88
 89
 90class MultipleObjectsReturnedDescriptor:
 91    """Descriptor that creates a unique MultipleObjectsReturned exception class per model."""
 92
 93    def __init__(self) -> None:
 94        self._exceptions_by_class: dict[type, type[MultipleObjectsReturned]] = {}
 95
 96    def __get__(
 97        self, instance: Any, owner: type | None
 98    ) -> type[MultipleObjectsReturned]:
 99        if owner is None:
100            return MultipleObjectsReturned  # Return base class as fallback
101
102        # Create a unique exception class for this model if we haven't already
103        if owner not in self._exceptions_by_class:
104            # type() returns a subclass of MultipleObjectsReturned
105            exc_class = cast(
106                type[MultipleObjectsReturned],
107                type(
108                    "MultipleObjectsReturned",
109                    (MultipleObjectsReturned,),
110                    {
111                        "__module__": owner.__module__,
112                        "__qualname__": f"{owner.__qualname__}.MultipleObjectsReturned",
113                    },
114                ),
115            )
116            self._exceptions_by_class[owner] = exc_class
117
118        return self._exceptions_by_class[owner]
119
120    def __set__(self, instance: Any, value: Any) -> None:
121        raise AttributeError("Cannot set MultipleObjectsReturned")
122
123
124# MARK: Database Exceptions (PEP-249)
125
126
127class Error(Exception):
128    pass
129
130
131class InterfaceError(Error):
132    pass
133
134
135class DatabaseError(Error):
136    pass
137
138
139class DataError(DatabaseError):
140    pass
141
142
143class OperationalError(DatabaseError):
144    pass
145
146
147class IntegrityError(DatabaseError):
148    pass
149
150
151class InternalError(DatabaseError):
152    pass
153
154
155class ProgrammingError(DatabaseError):
156    pass
157
158
159class NotSupportedError(DatabaseError):
160    pass
161
162
163class DatabaseErrorWrapper:
164    """
165    Context manager and decorator that reraises backend-specific database
166    exceptions using Plain's common wrappers.
167    """
168
169    def __init__(self, wrapper: DatabaseConnection) -> None:
170        """
171        wrapper is a database wrapper.
172
173        It must have a Database attribute defining PEP-249 exceptions.
174        """
175        self.wrapper = wrapper
176
177    def __enter__(self) -> None:
178        pass
179
180    def __exit__(
181        self,
182        exc_type: type[BaseException] | None,
183        exc_value: BaseException | None,
184        traceback: Any,
185    ) -> None:
186        if exc_type is None:
187            return
188        for plain_exc_type in (
189            DataError,
190            OperationalError,
191            IntegrityError,
192            InternalError,
193            ProgrammingError,
194            NotSupportedError,
195            DatabaseError,
196            InterfaceError,
197            Error,
198        ):
199            db_exc_type = getattr(psycopg, plain_exc_type.__name__)
200            if issubclass(exc_type, db_exc_type):
201                plain_exc_value = (
202                    plain_exc_type(*exc_value.args) if exc_value else plain_exc_type()
203                )
204                # Only set the 'errors_occurred' flag for errors that may make
205                # the connection unusable.
206                if plain_exc_type not in (DataError, IntegrityError):
207                    self.wrapper.errors_occurred = True
208                raise plain_exc_value.with_traceback(traceback) from exc_value
209
210    def __call__(self, func: F) -> F:
211        # Note that we are intentionally not using @wraps here for performance
212        # reasons. Refs #21109.
213        def inner(*args: Any, **kwargs: Any) -> Any:
214            with self:
215                return func(*args, **kwargs)
216
217        return cast(F, inner)