1from __future__ import annotations
2
3import _thread
4import copy
5import datetime
6import logging
7import threading
8import time
9import warnings
10import zoneinfo
11from abc import ABC, abstractmethod
12from collections import deque
13from collections.abc import Generator
14from contextlib import contextmanager
15from functools import cached_property
16from typing import TYPE_CHECKING, Any
17
18from plain.models.backends import utils
19from plain.models.backends.base.validation import BaseDatabaseValidation
20from plain.models.backends.utils import debug_transaction
21from plain.models.db import (
22 DatabaseError,
23 DatabaseErrorWrapper,
24 NotSupportedError,
25)
26from plain.models.transaction import TransactionManagementError
27from plain.runtime import settings
28
29if TYPE_CHECKING:
30 from plain.models.backends.base.client import BaseDatabaseClient
31 from plain.models.backends.base.creation import BaseDatabaseCreation
32 from plain.models.backends.base.features import BaseDatabaseFeatures
33 from plain.models.backends.base.introspection import BaseDatabaseIntrospection
34 from plain.models.backends.base.operations import BaseDatabaseOperations
35 from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
36 from plain.models.connections import DatabaseConfig
37
38RAN_DB_VERSION_CHECK = False
39
40logger = logging.getLogger("plain.models.backends.base")
41
42
43class BaseDatabaseWrapper(ABC):
44 """Represent a database connection."""
45
46 # Mapping of Field objects to their column types.
47 data_types: dict[str, str] = {}
48 # Mapping of Field objects to their SQL suffix such as AUTOINCREMENT.
49 data_types_suffix: dict[str, str] = {}
50 # Mapping of Field objects to their SQL for CHECK constraints.
51 data_type_check_constraints: dict[str, str] = {}
52 # Mapping of lookup operators to SQL templates (defined on backend subclasses)
53 operators: dict[str, str]
54 # Mapping of pattern lookup operators to SQL templates using str.format syntax (defined on backend subclasses)
55 pattern_ops: dict[str, str]
56 # SQL template for escaping patterns in LIKE queries using str.format syntax (defined on backend subclasses)
57 pattern_esc: str
58 # Instance attributes - always assigned in __init__
59 ops: BaseDatabaseOperations
60 client: BaseDatabaseClient
61 creation: BaseDatabaseCreation
62 features: BaseDatabaseFeatures
63 introspection: BaseDatabaseIntrospection
64 validation: BaseDatabaseValidation
65 vendor: str = "unknown"
66 display_name: str = "unknown"
67 SchemaEditorClass: type[BaseDatabaseSchemaEditor] | None = None
68 # Classes instantiated in __init__() - subclasses must set these.
69 client_class: type[BaseDatabaseClient]
70 creation_class: type[BaseDatabaseCreation]
71 features_class: type[BaseDatabaseFeatures]
72 introspection_class: type[BaseDatabaseIntrospection]
73 ops_class: type[BaseDatabaseOperations]
74 validation_class: type[BaseDatabaseValidation] = BaseDatabaseValidation
75 Database: Any
76
77 queries_limit: int = 9000
78
79 def __init__(self, settings_dict: DatabaseConfig):
80 # Connection related attributes.
81 # The underlying database connection (from the database library, not a wrapper).
82 self.connection: Any = None
83 # `settings_dict` should be a dictionary containing keys such as
84 # NAME, USER, etc. It's called `settings_dict` instead of `settings`
85 # to disambiguate it from Plain settings modules.
86 self.settings_dict: DatabaseConfig = settings_dict
87 # Query logging in debug mode or when explicitly enabled.
88 self.queries_log: deque[dict[str, Any]] = deque(maxlen=self.queries_limit)
89 self.force_debug_cursor: bool = False
90
91 # Transaction related attributes.
92 # Tracks if the connection is in autocommit mode. Per PEP 249, by
93 # default, it isn't.
94 self.autocommit: bool = False
95 # Tracks if the connection is in a transaction managed by 'atomic'.
96 self.in_atomic_block: bool = False
97 # Increment to generate unique savepoint ids.
98 self.savepoint_state: int = 0
99 # List of savepoints created by 'atomic'.
100 self.savepoint_ids: list[str] = []
101 # Stack of active 'atomic' blocks.
102 self.atomic_blocks: list[Any] = []
103 # Tracks if the outermost 'atomic' block should commit on exit,
104 # ie. if autocommit was active on entry.
105 self.commit_on_exit: bool = True
106 # Tracks if the transaction should be rolled back to the next
107 # available savepoint because of an exception in an inner block.
108 self.needs_rollback: bool = False
109 self.rollback_exc: Exception | None = None
110
111 # Connection termination related attributes.
112 self.close_at: float | None = None
113 self.closed_in_transaction: bool = False
114 self.errors_occurred: bool = False
115 self.health_check_enabled: bool = False
116 self.health_check_done: bool = False
117
118 # Thread-safety related attributes.
119 self._thread_sharing_lock: threading.Lock = threading.Lock()
120 self._thread_sharing_count: int = 0
121 self._thread_ident: int = _thread.get_ident()
122
123 # A list of no-argument functions to run when the transaction commits.
124 # Each entry is an (sids, func, robust) tuple, where sids is a set of
125 # the active savepoint IDs when this function was registered and robust
126 # specifies whether it's allowed for the function to fail.
127 self.run_on_commit: list[tuple[set[str], Any, bool]] = []
128
129 # Should we run the on-commit hooks the next time set_autocommit(True)
130 # is called?
131 self.run_commit_hooks_on_set_autocommit_on: bool = False
132
133 # A stack of wrappers to be invoked around execute()/executemany()
134 # calls. Each entry is a function taking five arguments: execute, sql,
135 # params, many, and context. It's the function's responsibility to
136 # call execute(sql, params, many, context).
137 self.execute_wrappers: list[Any] = []
138
139 self.client: BaseDatabaseClient = self.client_class(self)
140 self.creation: BaseDatabaseCreation = self.creation_class(self)
141 self.features: BaseDatabaseFeatures = self.features_class(self)
142 self.introspection: BaseDatabaseIntrospection = self.introspection_class(self)
143 self.ops: BaseDatabaseOperations = self.ops_class(self)
144 self.validation: BaseDatabaseValidation = self.validation_class(self)
145
146 def __repr__(self) -> str:
147 return f"<{self.__class__.__qualname__} vendor={self.vendor!r}>"
148
149 def ensure_timezone(self) -> bool:
150 """
151 Ensure the connection's timezone is set to `self.timezone_name` and
152 return whether it changed or not.
153 """
154 return False
155
156 @cached_property
157 def timezone(self) -> datetime.tzinfo:
158 """
159 Return a tzinfo of the database connection time zone.
160
161 This is only used when time zone support is enabled. When a datetime is
162 read from the database, it is always returned in this time zone.
163
164 When the database backend supports time zones, it doesn't matter which
165 time zone Plain uses, as long as aware datetimes are used everywhere.
166 Other users connecting to the database can choose their own time zone.
167
168 When the database backend doesn't support time zones, the time zone
169 Plain uses may be constrained by the requirements of other users of
170 the database.
171 """
172 if self.settings_dict["TIME_ZONE"] is None:
173 return datetime.UTC
174 else:
175 return zoneinfo.ZoneInfo(self.settings_dict["TIME_ZONE"])
176
177 @cached_property
178 def timezone_name(self) -> str:
179 """
180 Name of the time zone of the database connection.
181 """
182 if self.settings_dict["TIME_ZONE"] is None:
183 return "UTC"
184 else:
185 return self.settings_dict["TIME_ZONE"]
186
187 @property
188 def queries_logged(self) -> bool:
189 return self.force_debug_cursor or settings.DEBUG
190
191 @property
192 def queries(self) -> list[dict[str, Any]]:
193 if len(self.queries_log) == self.queries_log.maxlen:
194 warnings.warn(
195 f"Limit for query logging exceeded, only the last {self.queries_log.maxlen} queries "
196 "will be returned."
197 )
198 return list(self.queries_log)
199
200 @abstractmethod
201 def get_database_version(self) -> tuple[int, ...]:
202 """Return a tuple of the database's version."""
203 ...
204
205 def check_database_version_supported(self) -> None:
206 """
207 Raise an error if the database version isn't supported by this
208 version of Plain.
209 """
210 if (
211 self.features.minimum_database_version is not None
212 and self.get_database_version() < self.features.minimum_database_version
213 ):
214 db_version = ".".join(str(v) for v in self.get_database_version())
215 min_db_version = ".".join(
216 str(v) for v in self.features.minimum_database_version
217 )
218 raise NotSupportedError(
219 f"{self.display_name} {min_db_version} or later is required "
220 f"(found {db_version})."
221 )
222
223 # ##### Backend-specific methods for creating connections and cursors #####
224
225 @abstractmethod
226 def get_connection_params(self) -> dict[str, Any]:
227 """Return a dict of parameters suitable for get_new_connection."""
228 ...
229
230 @abstractmethod
231 def get_new_connection(self, conn_params: dict[str, Any]) -> Any:
232 """Open a connection to the database."""
233 ...
234
235 def init_connection_state(self) -> None:
236 """Initialize the database connection settings."""
237 global RAN_DB_VERSION_CHECK
238 if not RAN_DB_VERSION_CHECK:
239 self.check_database_version_supported()
240 RAN_DB_VERSION_CHECK = True
241
242 @abstractmethod
243 def create_cursor(self, name: str | None = None) -> Any:
244 """Create a cursor. Assume that a connection is established."""
245 ...
246
247 # ##### Backend-specific methods for creating connections #####
248
249 def connect(self) -> None:
250 """Connect to the database. Assume that the connection is closed."""
251 # In case the previous connection was closed while in an atomic block
252 self.in_atomic_block = False
253 self.savepoint_ids = []
254 self.atomic_blocks = []
255 self.needs_rollback = False
256 # Reset parameters defining when to close/health-check the connection.
257 self.health_check_enabled = self.settings_dict["CONN_HEALTH_CHECKS"]
258 max_age = self.settings_dict["CONN_MAX_AGE"]
259 self.close_at = None if max_age is None else time.monotonic() + max_age
260 self.closed_in_transaction = False
261 self.errors_occurred = False
262 # New connections are healthy.
263 self.health_check_done = True
264 # Establish the connection
265 conn_params = self.get_connection_params()
266 self.connection = self.get_new_connection(conn_params)
267 self.set_autocommit(self.settings_dict["AUTOCOMMIT"])
268 self.init_connection_state()
269
270 self.run_on_commit = []
271
272 def ensure_connection(self) -> None:
273 """Guarantee that a connection to the database is established."""
274 if self.connection is None:
275 with self.wrap_database_errors:
276 self.connect()
277
278 # ##### Backend-specific wrappers for PEP-249 connection methods #####
279
280 def _prepare_cursor(self, cursor: utils.DBAPICursor) -> utils.CursorWrapper:
281 """
282 Validate the connection is usable and perform database cursor wrapping.
283 """
284 self.validate_thread_sharing()
285 if self.queries_logged:
286 wrapped_cursor = self.make_debug_cursor(cursor)
287 else:
288 wrapped_cursor = self.make_cursor(cursor)
289 return wrapped_cursor
290
291 def _cursor(self, name: str | None = None) -> utils.CursorWrapper:
292 self.close_if_health_check_failed()
293 self.ensure_connection()
294 with self.wrap_database_errors:
295 return self._prepare_cursor(self.create_cursor(name))
296
297 def _commit(self) -> None:
298 if self.connection is not None:
299 with debug_transaction(self, "COMMIT"), self.wrap_database_errors:
300 return self.connection.commit()
301
302 def _rollback(self) -> None:
303 if self.connection is not None:
304 with debug_transaction(self, "ROLLBACK"), self.wrap_database_errors:
305 return self.connection.rollback()
306
307 def _close(self) -> None:
308 if self.connection is not None:
309 with self.wrap_database_errors:
310 return self.connection.close()
311
312 # ##### Generic wrappers for PEP-249 connection methods #####
313
314 def cursor(self) -> utils.CursorWrapper:
315 """Create a cursor, opening a connection if necessary."""
316 return self._cursor()
317
318 def commit(self) -> None:
319 """Commit a transaction and reset the dirty flag."""
320 self.validate_thread_sharing()
321 self.validate_no_atomic_block()
322 self._commit()
323 # A successful commit means that the database connection works.
324 self.errors_occurred = False
325 self.run_commit_hooks_on_set_autocommit_on = True
326
327 def rollback(self) -> None:
328 """Roll back a transaction and reset the dirty flag."""
329 self.validate_thread_sharing()
330 self.validate_no_atomic_block()
331 self._rollback()
332 # A successful rollback means that the database connection works.
333 self.errors_occurred = False
334 self.needs_rollback = False
335 self.run_on_commit = []
336
337 def close(self) -> None:
338 """Close the connection to the database."""
339 self.validate_thread_sharing()
340 self.run_on_commit = []
341
342 # Don't call validate_no_atomic_block() to avoid making it difficult
343 # to get rid of a connection in an invalid state. The next connect()
344 # will reset the transaction state anyway.
345 if self.closed_in_transaction or self.connection is None:
346 return
347 try:
348 self._close()
349 finally:
350 if self.in_atomic_block:
351 self.closed_in_transaction = True
352 self.needs_rollback = True
353 else:
354 self.connection = None
355
356 # ##### Backend-specific savepoint management methods #####
357
358 def _savepoint(self, sid: str) -> None:
359 with self.cursor() as cursor:
360 cursor.execute(self.ops.savepoint_create_sql(sid))
361
362 def _savepoint_rollback(self, sid: str) -> None:
363 with self.cursor() as cursor:
364 cursor.execute(self.ops.savepoint_rollback_sql(sid))
365
366 def _savepoint_commit(self, sid: str) -> None:
367 with self.cursor() as cursor:
368 cursor.execute(self.ops.savepoint_commit_sql(sid))
369
370 def _savepoint_allowed(self) -> bool:
371 # Savepoints cannot be created outside a transaction
372 return self.features.uses_savepoints and not self.get_autocommit()
373
374 # ##### Generic savepoint management methods #####
375
376 def savepoint(self) -> str | None:
377 """
378 Create a savepoint inside the current transaction. Return an
379 identifier for the savepoint that will be used for the subsequent
380 rollback or commit. Do nothing if savepoints are not supported.
381 """
382 if not self._savepoint_allowed():
383 return None
384
385 thread_ident = _thread.get_ident()
386 tid = str(thread_ident).replace("-", "")
387
388 self.savepoint_state += 1
389 sid = "s%s_x%d" % (tid, self.savepoint_state) # noqa: UP031
390
391 self.validate_thread_sharing()
392 self._savepoint(sid)
393
394 return sid
395
396 def savepoint_rollback(self, sid: str) -> None:
397 """
398 Roll back to a savepoint. Do nothing if savepoints are not supported.
399 """
400 if not self._savepoint_allowed():
401 return
402
403 self.validate_thread_sharing()
404 self._savepoint_rollback(sid)
405
406 # Remove any callbacks registered while this savepoint was active.
407 self.run_on_commit = [
408 (sids, func, robust)
409 for (sids, func, robust) in self.run_on_commit
410 if sid not in sids
411 ]
412
413 def savepoint_commit(self, sid: str) -> None:
414 """
415 Release a savepoint. Do nothing if savepoints are not supported.
416 """
417 if not self._savepoint_allowed():
418 return
419
420 self.validate_thread_sharing()
421 self._savepoint_commit(sid)
422
423 def clean_savepoints(self) -> None:
424 """
425 Reset the counter used to generate unique savepoint ids in this thread.
426 """
427 self.savepoint_state = 0
428
429 # ##### Backend-specific transaction management methods #####
430
431 @abstractmethod
432 def _set_autocommit(self, autocommit: bool) -> None:
433 """
434 Backend-specific implementation to enable or disable autocommit.
435 """
436 ...
437
438 # ##### Generic transaction management methods #####
439
440 def get_autocommit(self) -> bool:
441 """Get the autocommit state."""
442 self.ensure_connection()
443 return self.autocommit
444
445 def set_autocommit(
446 self,
447 autocommit: bool,
448 force_begin_transaction_with_broken_autocommit: bool = False,
449 ) -> None:
450 """
451 Enable or disable autocommit.
452
453 The usual way to start a transaction is to turn autocommit off.
454 SQLite does not properly start a transaction when disabling
455 autocommit. To avoid this buggy behavior and to actually enter a new
456 transaction, an explicit BEGIN is required. Using
457 force_begin_transaction_with_broken_autocommit=True will issue an
458 explicit BEGIN with SQLite. This option will be ignored for other
459 backends.
460 """
461 self.validate_no_atomic_block()
462 self.close_if_health_check_failed()
463 self.ensure_connection()
464
465 start_transaction_under_autocommit = (
466 force_begin_transaction_with_broken_autocommit
467 and not autocommit
468 and hasattr(self, "_start_transaction_under_autocommit")
469 )
470
471 if start_transaction_under_autocommit:
472 self._start_transaction_under_autocommit() # type: ignore[attr-defined]
473 elif autocommit:
474 self._set_autocommit(autocommit)
475 else:
476 with debug_transaction(self, "BEGIN"):
477 self._set_autocommit(autocommit)
478 self.autocommit = autocommit
479
480 if autocommit and self.run_commit_hooks_on_set_autocommit_on:
481 self.run_and_clear_commit_hooks()
482 self.run_commit_hooks_on_set_autocommit_on = False
483
484 def get_rollback(self) -> bool:
485 """Get the "needs rollback" flag -- for *advanced use* only."""
486 if not self.in_atomic_block:
487 raise TransactionManagementError(
488 "The rollback flag doesn't work outside of an 'atomic' block."
489 )
490 return self.needs_rollback
491
492 def set_rollback(self, rollback: bool) -> None:
493 """
494 Set or unset the "needs rollback" flag -- for *advanced use* only.
495 """
496 if not self.in_atomic_block:
497 raise TransactionManagementError(
498 "The rollback flag doesn't work outside of an 'atomic' block."
499 )
500 self.needs_rollback = rollback
501
502 def validate_no_atomic_block(self) -> None:
503 """Raise an error if an atomic block is active."""
504 if self.in_atomic_block:
505 raise TransactionManagementError(
506 "This is forbidden when an 'atomic' block is active."
507 )
508
509 def validate_no_broken_transaction(self) -> None:
510 if self.needs_rollback:
511 raise TransactionManagementError(
512 "An error occurred in the current transaction. You can't "
513 "execute queries until the end of the 'atomic' block."
514 ) from self.rollback_exc
515
516 # ##### Foreign key constraints checks handling #####
517
518 def disable_constraint_checking(self) -> bool:
519 """
520 Backends can implement as needed to temporarily disable foreign key
521 constraint checking. Should return True if the constraints were
522 disabled and will need to be reenabled.
523 """
524 return False
525
526 def enable_constraint_checking(self) -> None:
527 """
528 Backends can implement as needed to re-enable foreign key constraint
529 checking.
530 """
531 pass
532
533 def check_constraints(self, table_names: list[str] | None = None) -> None:
534 """
535 Backends can override this method if they can apply constraint
536 checking (e.g. via "SET CONSTRAINTS ALL IMMEDIATE"). Should raise an
537 IntegrityError if any invalid foreign key references are encountered.
538 """
539 pass
540
541 # ##### Connection termination handling #####
542
543 @abstractmethod
544 def is_usable(self) -> bool:
545 """
546 Test if the database connection is usable.
547
548 This method may assume that self.connection is not None.
549
550 Actual implementations should take care not to raise exceptions
551 as that may prevent Plain from recycling unusable connections.
552 """
553 ...
554
555 def close_if_health_check_failed(self) -> None:
556 """Close existing connection if it fails a health check."""
557 if (
558 self.connection is None
559 or not self.health_check_enabled
560 or self.health_check_done
561 ):
562 return
563
564 if not self.is_usable():
565 self.close()
566 self.health_check_done = True
567
568 def close_if_unusable_or_obsolete(self) -> None:
569 """
570 Close the current connection if unrecoverable errors have occurred
571 or if it outlived its maximum age.
572 """
573 if self.connection is not None:
574 self.health_check_done = False
575 # If the application didn't restore the original autocommit setting,
576 # don't take chances, drop the connection.
577 if self.get_autocommit() != self.settings_dict["AUTOCOMMIT"]:
578 self.close()
579 return
580
581 # If an exception other than DataError or IntegrityError occurred
582 # since the last commit / rollback, check if the connection works.
583 if self.errors_occurred:
584 if self.is_usable():
585 self.errors_occurred = False
586 self.health_check_done = True
587 else:
588 self.close()
589 return
590
591 if self.close_at is not None and time.monotonic() >= self.close_at:
592 self.close()
593 return
594
595 # ##### Thread safety handling #####
596
597 @property
598 def allow_thread_sharing(self) -> bool:
599 with self._thread_sharing_lock:
600 return self._thread_sharing_count > 0
601
602 def validate_thread_sharing(self) -> None:
603 """
604 Validate that the connection isn't accessed by another thread than the
605 one which originally created it, unless the connection was explicitly
606 authorized to be shared between threads (via the `inc_thread_sharing()`
607 method). Raise an exception if the validation fails.
608 """
609 if not (self.allow_thread_sharing or self._thread_ident == _thread.get_ident()):
610 raise DatabaseError(
611 "DatabaseWrapper objects created in a "
612 "thread can only be used in that same thread. The connection "
613 f"was created in thread id {self._thread_ident} and this is "
614 f"thread id {_thread.get_ident()}."
615 )
616
617 # ##### Miscellaneous #####
618
619 def prepare_database(self) -> None:
620 """
621 Hook to do any database check or preparation, generally called before
622 migrating a project or an app.
623 """
624 pass
625
626 @cached_property
627 def wrap_database_errors(self) -> DatabaseErrorWrapper:
628 """
629 Context manager and decorator that re-throws backend-specific database
630 exceptions using Plain's common wrappers.
631 """
632 return DatabaseErrorWrapper(self)
633
634 def chunked_cursor(self) -> utils.CursorWrapper:
635 """
636 Return a cursor that tries to avoid caching in the database (if
637 supported by the database), otherwise return a regular cursor.
638 """
639 return self.cursor()
640
641 def make_debug_cursor(self, cursor: utils.DBAPICursor) -> utils.CursorDebugWrapper:
642 """Create a cursor that logs all queries in self.queries_log."""
643 return utils.CursorDebugWrapper(cursor, self)
644
645 def make_cursor(self, cursor: utils.DBAPICursor) -> utils.CursorWrapper:
646 """Create a cursor without debug logging."""
647 return utils.CursorWrapper(cursor, self)
648
649 @contextmanager
650 def temporary_connection(self) -> Generator[utils.CursorWrapper, None, None]:
651 """
652 Context manager that ensures that a connection is established, and
653 if it opened one, closes it to avoid leaving a dangling connection.
654 This is useful for operations outside of the request-response cycle.
655
656 Provide a cursor: with self.temporary_connection() as cursor: ...
657 """
658 must_close = self.connection is None
659 try:
660 with self.cursor() as cursor:
661 yield cursor
662 finally:
663 if must_close:
664 self.close()
665
666 @contextmanager
667 def _nodb_cursor(self) -> Generator[utils.CursorWrapper, None, None]:
668 """
669 Return a cursor from an alternative connection to be used when there is
670 no need to access the main database, specifically for test db
671 creation/deletion. This also prevents the production database from
672 being exposed to potential child threads while (or after) the test
673 database is destroyed. Refs #10868, #17786, #16969.
674 """
675 conn = self.__class__({**self.settings_dict, "NAME": None})
676 try:
677 with conn.cursor() as cursor:
678 yield cursor
679 finally:
680 conn.close()
681
682 def schema_editor(self, *args: Any, **kwargs: Any) -> BaseDatabaseSchemaEditor:
683 """
684 Return a new instance of this backend's SchemaEditor.
685 """
686 if self.SchemaEditorClass is None:
687 raise NotImplementedError(
688 "The SchemaEditorClass attribute of this database wrapper is still None"
689 )
690 return self.SchemaEditorClass(self, *args, **kwargs)
691
692 def on_commit(self, func: Any, robust: bool = False) -> None:
693 if not callable(func):
694 raise TypeError("on_commit()'s callback must be a callable.")
695 if self.in_atomic_block:
696 # Transaction in progress; save for execution on commit.
697 self.run_on_commit.append((set(self.savepoint_ids), func, robust))
698 elif not self.get_autocommit():
699 raise TransactionManagementError(
700 "on_commit() cannot be used in manual transaction management"
701 )
702 else:
703 # No transaction in progress and in autocommit mode; execute
704 # immediately.
705 if robust:
706 try:
707 func()
708 except Exception as e:
709 logger.error(
710 f"Error calling {func.__qualname__} in on_commit() (%s).",
711 e,
712 exc_info=True,
713 )
714 else:
715 func()
716
717 def run_and_clear_commit_hooks(self) -> None:
718 self.validate_no_atomic_block()
719 current_run_on_commit = self.run_on_commit
720 self.run_on_commit = []
721 while current_run_on_commit:
722 _, func, robust = current_run_on_commit.pop(0)
723 if robust:
724 try:
725 func()
726 except Exception as e:
727 logger.error(
728 f"Error calling {func.__qualname__} in on_commit() during "
729 f"transaction (%s).",
730 e,
731 exc_info=True,
732 )
733 else:
734 func()
735
736 @contextmanager
737 def execute_wrapper(self, wrapper: Any) -> Generator[None, None, None]:
738 """
739 Return a context manager under which the wrapper is applied to suitable
740 database query executions.
741 """
742 self.execute_wrappers.append(wrapper)
743 try:
744 yield
745 finally:
746 self.execute_wrappers.pop()
747
748 def copy(self) -> BaseDatabaseWrapper:
749 """
750 Return a copy of this connection.
751
752 For tests that require two connections to the same database.
753 """
754 settings_dict = copy.deepcopy(self.settings_dict)
755 return type(self)(settings_dict)