1from __future__ import annotations
2
3from collections.abc import Callable
4from typing import TYPE_CHECKING, Any, TypeVar, cast
5
6import psycopg
7
8if TYPE_CHECKING:
9 from plain.postgres.connection import DatabaseConnection
10
11F = TypeVar("F", bound=Callable[..., Any])
12
13# MARK: Database Query Exceptions
14
15
16class EmptyResultSet(Exception):
17 """A database query predicate is impossible."""
18
19 pass
20
21
22class FullResultSet(Exception):
23 """A database query predicate is matches everything."""
24
25 pass
26
27
28# MARK: Model and Field Errors
29
30
31class FieldDoesNotExist(Exception):
32 """The requested model field does not exist"""
33
34 pass
35
36
37class FieldError(Exception):
38 """Some kind of problem with a model field."""
39
40 pass
41
42
43class ObjectDoesNotExist(Exception):
44 """The requested object does not exist"""
45
46 pass
47
48
49class MultipleObjectsReturned(Exception):
50 """The query returned multiple objects when only one was expected."""
51
52 pass
53
54
55# MARK: Model Exception Descriptors
56
57
58class DoesNotExistDescriptor:
59 """Descriptor that creates a unique DoesNotExist exception class per model."""
60
61 def __init__(self) -> None:
62 self._exceptions_by_class: dict[type, type[ObjectDoesNotExist]] = {}
63
64 def __get__(self, instance: Any, owner: type | None) -> type[ObjectDoesNotExist]:
65 if owner is None:
66 return ObjectDoesNotExist # Return base class as fallback
67
68 # Create a unique exception class for this model if we haven't already
69 if owner not in self._exceptions_by_class:
70 # type() returns a subclass of ObjectDoesNotExist
71 exc_class: type[ObjectDoesNotExist] = cast(
72 type[ObjectDoesNotExist],
73 type(
74 "DoesNotExist",
75 (ObjectDoesNotExist,),
76 {
77 "__module__": owner.__module__,
78 "__qualname__": f"{owner.__qualname__}.DoesNotExist",
79 },
80 ),
81 )
82 self._exceptions_by_class[owner] = exc_class
83
84 return self._exceptions_by_class[owner]
85
86 def __set__(self, instance: Any, value: Any) -> None:
87 raise AttributeError("Cannot set DoesNotExist")
88
89
90class MultipleObjectsReturnedDescriptor:
91 """Descriptor that creates a unique MultipleObjectsReturned exception class per model."""
92
93 def __init__(self) -> None:
94 self._exceptions_by_class: dict[type, type[MultipleObjectsReturned]] = {}
95
96 def __get__(
97 self, instance: Any, owner: type | None
98 ) -> type[MultipleObjectsReturned]:
99 if owner is None:
100 return MultipleObjectsReturned # Return base class as fallback
101
102 # Create a unique exception class for this model if we haven't already
103 if owner not in self._exceptions_by_class:
104 # type() returns a subclass of MultipleObjectsReturned
105 exc_class = cast(
106 type[MultipleObjectsReturned],
107 type(
108 "MultipleObjectsReturned",
109 (MultipleObjectsReturned,),
110 {
111 "__module__": owner.__module__,
112 "__qualname__": f"{owner.__qualname__}.MultipleObjectsReturned",
113 },
114 ),
115 )
116 self._exceptions_by_class[owner] = exc_class
117
118 return self._exceptions_by_class[owner]
119
120 def __set__(self, instance: Any, value: Any) -> None:
121 raise AttributeError("Cannot set MultipleObjectsReturned")
122
123
124# MARK: Database Exceptions (PEP-249)
125
126
127class Error(Exception):
128 pass
129
130
131class InterfaceError(Error):
132 pass
133
134
135class DatabaseError(Error):
136 pass
137
138
139class DataError(DatabaseError):
140 pass
141
142
143class OperationalError(DatabaseError):
144 pass
145
146
147class IntegrityError(DatabaseError):
148 pass
149
150
151class InternalError(DatabaseError):
152 pass
153
154
155class ProgrammingError(DatabaseError):
156 pass
157
158
159class NotSupportedError(DatabaseError):
160 pass
161
162
163class DatabaseErrorWrapper:
164 """
165 Context manager and decorator that reraises backend-specific database
166 exceptions using Plain's common wrappers.
167 """
168
169 def __init__(self, wrapper: DatabaseConnection) -> None:
170 """
171 wrapper is a database wrapper.
172
173 It must have a Database attribute defining PEP-249 exceptions.
174 """
175 self.wrapper = wrapper
176
177 def __enter__(self) -> None:
178 pass
179
180 def __exit__(
181 self,
182 exc_type: type[BaseException] | None,
183 exc_value: BaseException | None,
184 traceback: Any,
185 ) -> None:
186 if exc_type is None:
187 return
188 for plain_exc_type in (
189 DataError,
190 OperationalError,
191 IntegrityError,
192 InternalError,
193 ProgrammingError,
194 NotSupportedError,
195 DatabaseError,
196 InterfaceError,
197 Error,
198 ):
199 db_exc_type = getattr(psycopg, plain_exc_type.__name__)
200 if issubclass(exc_type, db_exc_type):
201 plain_exc_value = (
202 plain_exc_type(*exc_value.args) if exc_value else plain_exc_type()
203 )
204 # Only set the 'errors_occurred' flag for errors that may make
205 # the connection unusable.
206 if plain_exc_type not in (DataError, IntegrityError):
207 self.wrapper.errors_occurred = True
208 raise plain_exc_value.with_traceback(traceback) from exc_value
209
210 def __call__(self, func: F) -> F:
211 # Note that we are intentionally not using @wraps here for performance
212 # reasons. Refs #21109.
213 def inner(*args: Any, **kwargs: Any) -> Any:
214 with self:
215 return func(*args, **kwargs)
216
217 return cast(F, inner)