1from __future__ import annotations
  2
  3from collections.abc import Callable
  4from typing import TYPE_CHECKING, Any, TypeVar, cast
  5
  6if TYPE_CHECKING:
  7    from plain.models.backends.base.base import BaseDatabaseWrapper
  8
  9F = TypeVar("F", bound=Callable[..., Any])
 10
 11# MARK: Database Query Exceptions
 12
 13
 14class EmptyResultSet(Exception):
 15    """A database query predicate is impossible."""
 16
 17    pass
 18
 19
 20class FullResultSet(Exception):
 21    """A database query predicate is matches everything."""
 22
 23    pass
 24
 25
 26# MARK: Model and Field Errors
 27
 28
 29class FieldDoesNotExist(Exception):
 30    """The requested model field does not exist"""
 31
 32    pass
 33
 34
 35class FieldError(Exception):
 36    """Some kind of problem with a model field."""
 37
 38    pass
 39
 40
 41class ObjectDoesNotExist(Exception):
 42    """The requested object does not exist"""
 43
 44    pass
 45
 46
 47class MultipleObjectsReturned(Exception):
 48    """The query returned multiple objects when only one was expected."""
 49
 50    pass
 51
 52
 53# MARK: Model Exception Descriptors
 54
 55
 56class DoesNotExistDescriptor:
 57    """Descriptor that creates a unique DoesNotExist exception class per model."""
 58
 59    def __init__(self) -> None:
 60        self._exceptions_by_class: dict[type, type[ObjectDoesNotExist]] = {}
 61
 62    def __get__(self, instance: Any, owner: type | None) -> type[ObjectDoesNotExist]:
 63        if owner is None:
 64            return ObjectDoesNotExist  # Return base class as fallback
 65
 66        # Create a unique exception class for this model if we haven't already
 67        if owner not in self._exceptions_by_class:
 68            # type() returns a subclass of ObjectDoesNotExist
 69            exc_class: type[ObjectDoesNotExist] = cast(
 70                type[ObjectDoesNotExist],
 71                type(
 72                    "DoesNotExist",
 73                    (ObjectDoesNotExist,),
 74                    {
 75                        "__module__": owner.__module__,
 76                        "__qualname__": f"{owner.__qualname__}.DoesNotExist",
 77                    },
 78                ),
 79            )
 80            self._exceptions_by_class[owner] = exc_class
 81
 82        return self._exceptions_by_class[owner]
 83
 84    def __set__(self, instance: Any, value: Any) -> None:
 85        raise AttributeError("Cannot set DoesNotExist")
 86
 87
 88class MultipleObjectsReturnedDescriptor:
 89    """Descriptor that creates a unique MultipleObjectsReturned exception class per model."""
 90
 91    def __init__(self) -> None:
 92        self._exceptions_by_class: dict[type, type[MultipleObjectsReturned]] = {}
 93
 94    def __get__(
 95        self, instance: Any, owner: type | None
 96    ) -> type[MultipleObjectsReturned]:
 97        if owner is None:
 98            return MultipleObjectsReturned  # Return base class as fallback
 99
100        # Create a unique exception class for this model if we haven't already
101        if owner not in self._exceptions_by_class:
102            # type() returns a subclass of MultipleObjectsReturned
103            exc_class = cast(
104                type[MultipleObjectsReturned],
105                type(
106                    "MultipleObjectsReturned",
107                    (MultipleObjectsReturned,),
108                    {
109                        "__module__": owner.__module__,
110                        "__qualname__": f"{owner.__qualname__}.MultipleObjectsReturned",
111                    },
112                ),
113            )
114            self._exceptions_by_class[owner] = exc_class
115
116        return self._exceptions_by_class[owner]
117
118    def __set__(self, instance: Any, value: Any) -> None:
119        raise AttributeError("Cannot set MultipleObjectsReturned")
120
121
122# MARK: Database Exceptions (PEP-249)
123
124
125class Error(Exception):
126    pass
127
128
129class InterfaceError(Error):
130    pass
131
132
133class DatabaseError(Error):
134    pass
135
136
137class DataError(DatabaseError):
138    pass
139
140
141class OperationalError(DatabaseError):
142    pass
143
144
145class IntegrityError(DatabaseError):
146    pass
147
148
149class InternalError(DatabaseError):
150    pass
151
152
153class ProgrammingError(DatabaseError):
154    pass
155
156
157class NotSupportedError(DatabaseError):
158    pass
159
160
161class ConnectionDoesNotExist(Exception):
162    pass
163
164
165class DatabaseErrorWrapper:
166    """
167    Context manager and decorator that reraises backend-specific database
168    exceptions using Plain's common wrappers.
169    """
170
171    def __init__(self, wrapper: BaseDatabaseWrapper) -> None:
172        """
173        wrapper is a database wrapper.
174
175        It must have a Database attribute defining PEP-249 exceptions.
176        """
177        self.wrapper = wrapper
178
179    def __enter__(self) -> None:
180        pass
181
182    def __exit__(
183        self,
184        exc_type: type[BaseException] | None,
185        exc_value: BaseException | None,
186        traceback: Any,
187    ) -> None:
188        if exc_type is None:
189            return
190        for plain_exc_type in (
191            DataError,
192            OperationalError,
193            IntegrityError,
194            InternalError,
195            ProgrammingError,
196            NotSupportedError,
197            DatabaseError,
198            InterfaceError,
199            Error,
200        ):
201            db_exc_type = getattr(self.wrapper.Database, plain_exc_type.__name__)
202            if issubclass(exc_type, db_exc_type):
203                plain_exc_value = (
204                    plain_exc_type(*exc_value.args) if exc_value else plain_exc_type()
205                )
206                # Only set the 'errors_occurred' flag for errors that may make
207                # the connection unusable.
208                if plain_exc_type not in (DataError, IntegrityError):
209                    self.wrapper.errors_occurred = True
210                raise plain_exc_value.with_traceback(traceback) from exc_value
211
212    def __call__(self, func: F) -> F:
213        # Note that we are intentionally not using @wraps here for performance
214        # reasons. Refs #21109.
215        def inner(*args: Any, **kwargs: Any) -> Any:
216            with self:
217                return func(*args, **kwargs)
218
219        return cast(F, inner)