1from __future__ import annotations
  2
  3import _thread
  4import copy
  5import datetime
  6import logging
  7import threading
  8import time
  9import warnings
 10import zoneinfo
 11from abc import ABC, abstractmethod
 12from collections import deque
 13from collections.abc import Generator
 14from contextlib import contextmanager
 15from functools import cached_property
 16from typing import TYPE_CHECKING, Any
 17
 18from plain.models.backends import utils
 19from plain.models.backends.base.validation import BaseDatabaseValidation
 20from plain.models.backends.utils import debug_transaction
 21from plain.models.db import (
 22    DatabaseError,
 23    DatabaseErrorWrapper,
 24    NotSupportedError,
 25)
 26from plain.models.transaction import TransactionManagementError
 27from plain.runtime import settings
 28
 29if TYPE_CHECKING:
 30    from plain.models.backends.base.client import BaseDatabaseClient
 31    from plain.models.backends.base.creation import BaseDatabaseCreation
 32    from plain.models.backends.base.features import BaseDatabaseFeatures
 33    from plain.models.backends.base.introspection import BaseDatabaseIntrospection
 34    from plain.models.backends.base.operations import BaseDatabaseOperations
 35    from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
 36    from plain.models.connections import DatabaseConfig
 37
 38RAN_DB_VERSION_CHECK = False
 39
 40logger = logging.getLogger("plain.models.backends.base")
 41
 42
 43class BaseDatabaseWrapper(ABC):
 44    """Represent a database connection."""
 45
 46    # Mapping of Field objects to their column types.
 47    data_types: dict[str, str] = {}
 48    # Mapping of Field objects to their SQL suffix such as AUTOINCREMENT.
 49    data_types_suffix: dict[str, str] = {}
 50    # Mapping of Field objects to their SQL for CHECK constraints.
 51    data_type_check_constraints: dict[str, str] = {}
 52    # Mapping of lookup operators to SQL templates (defined on backend subclasses)
 53    operators: dict[str, str]
 54    # Mapping of pattern lookup operators to SQL templates using str.format syntax (defined on backend subclasses)
 55    pattern_ops: dict[str, str]
 56    # SQL template for escaping patterns in LIKE queries using str.format syntax (defined on backend subclasses)
 57    pattern_esc: str
 58    # Instance attributes - always assigned in __init__
 59    ops: BaseDatabaseOperations
 60    client: BaseDatabaseClient
 61    creation: BaseDatabaseCreation
 62    features: BaseDatabaseFeatures
 63    introspection: BaseDatabaseIntrospection
 64    validation: BaseDatabaseValidation
 65    vendor: str = "unknown"
 66    display_name: str = "unknown"
 67    SchemaEditorClass: type[BaseDatabaseSchemaEditor] | None = None
 68    # Classes instantiated in __init__() - subclasses must set these.
 69    client_class: type[BaseDatabaseClient]
 70    creation_class: type[BaseDatabaseCreation]
 71    features_class: type[BaseDatabaseFeatures]
 72    introspection_class: type[BaseDatabaseIntrospection]
 73    ops_class: type[BaseDatabaseOperations]
 74    validation_class: type[BaseDatabaseValidation] = BaseDatabaseValidation
 75    Database: Any
 76
 77    queries_limit: int = 9000
 78
 79    def __init__(self, settings_dict: DatabaseConfig):
 80        # Connection related attributes.
 81        # The underlying database connection (from the database library, not a wrapper).
 82        self.connection: Any = None
 83        # `settings_dict` should be a dictionary containing keys such as
 84        # NAME, USER, etc. It's called `settings_dict` instead of `settings`
 85        # to disambiguate it from Plain settings modules.
 86        self.settings_dict: DatabaseConfig = settings_dict
 87        # Query logging in debug mode or when explicitly enabled.
 88        self.queries_log: deque[dict[str, Any]] = deque(maxlen=self.queries_limit)
 89        self.force_debug_cursor: bool = False
 90
 91        # Transaction related attributes.
 92        # Tracks if the connection is in autocommit mode. Per PEP 249, by
 93        # default, it isn't.
 94        self.autocommit: bool = False
 95        # Tracks if the connection is in a transaction managed by 'atomic'.
 96        self.in_atomic_block: bool = False
 97        # Increment to generate unique savepoint ids.
 98        self.savepoint_state: int = 0
 99        # List of savepoints created by 'atomic'.
100        self.savepoint_ids: list[str] = []
101        # Stack of active 'atomic' blocks.
102        self.atomic_blocks: list[Any] = []
103        # Tracks if the outermost 'atomic' block should commit on exit,
104        # ie. if autocommit was active on entry.
105        self.commit_on_exit: bool = True
106        # Tracks if the transaction should be rolled back to the next
107        # available savepoint because of an exception in an inner block.
108        self.needs_rollback: bool = False
109        self.rollback_exc: Exception | None = None
110
111        # Connection termination related attributes.
112        self.close_at: float | None = None
113        self.closed_in_transaction: bool = False
114        self.errors_occurred: bool = False
115        self.health_check_enabled: bool = False
116        self.health_check_done: bool = False
117
118        # Thread-safety related attributes.
119        self._thread_sharing_lock: threading.Lock = threading.Lock()
120        self._thread_sharing_count: int = 0
121        self._thread_ident: int = _thread.get_ident()
122
123        # A list of no-argument functions to run when the transaction commits.
124        # Each entry is an (sids, func, robust) tuple, where sids is a set of
125        # the active savepoint IDs when this function was registered and robust
126        # specifies whether it's allowed for the function to fail.
127        self.run_on_commit: list[tuple[set[str], Any, bool]] = []
128
129        # Should we run the on-commit hooks the next time set_autocommit(True)
130        # is called?
131        self.run_commit_hooks_on_set_autocommit_on: bool = False
132
133        # A stack of wrappers to be invoked around execute()/executemany()
134        # calls. Each entry is a function taking five arguments: execute, sql,
135        # params, many, and context. It's the function's responsibility to
136        # call execute(sql, params, many, context).
137        self.execute_wrappers: list[Any] = []
138
139        self.client: BaseDatabaseClient = self.client_class(self)
140        self.creation: BaseDatabaseCreation = self.creation_class(self)
141        self.features: BaseDatabaseFeatures = self.features_class(self)
142        self.introspection: BaseDatabaseIntrospection = self.introspection_class(self)
143        self.ops: BaseDatabaseOperations = self.ops_class(self)
144        self.validation: BaseDatabaseValidation = self.validation_class(self)
145
146    def __repr__(self) -> str:
147        return f"<{self.__class__.__qualname__} vendor={self.vendor!r}>"
148
149    def ensure_timezone(self) -> bool:
150        """
151        Ensure the connection's timezone is set to `self.timezone_name` and
152        return whether it changed or not.
153        """
154        return False
155
156    @cached_property
157    def timezone(self) -> datetime.tzinfo:
158        """
159        Return a tzinfo of the database connection time zone.
160
161        This is only used when time zone support is enabled. When a datetime is
162        read from the database, it is always returned in this time zone.
163
164        When the database backend supports time zones, it doesn't matter which
165        time zone Plain uses, as long as aware datetimes are used everywhere.
166        Other users connecting to the database can choose their own time zone.
167
168        When the database backend doesn't support time zones, the time zone
169        Plain uses may be constrained by the requirements of other users of
170        the database.
171        """
172        if self.settings_dict["TIME_ZONE"] is None:
173            return datetime.UTC
174        else:
175            return zoneinfo.ZoneInfo(self.settings_dict["TIME_ZONE"])
176
177    @cached_property
178    def timezone_name(self) -> str:
179        """
180        Name of the time zone of the database connection.
181        """
182        if self.settings_dict["TIME_ZONE"] is None:
183            return "UTC"
184        else:
185            return self.settings_dict["TIME_ZONE"]
186
187    @property
188    def queries_logged(self) -> bool:
189        return self.force_debug_cursor or settings.DEBUG
190
191    @property
192    def queries(self) -> list[dict[str, Any]]:
193        if len(self.queries_log) == self.queries_log.maxlen:
194            warnings.warn(
195                f"Limit for query logging exceeded, only the last {self.queries_log.maxlen} queries "
196                "will be returned."
197            )
198        return list(self.queries_log)
199
200    @abstractmethod
201    def get_database_version(self) -> tuple[int, ...]:
202        """Return a tuple of the database's version."""
203        ...
204
205    def check_database_version_supported(self) -> None:
206        """
207        Raise an error if the database version isn't supported by this
208        version of Plain.
209        """
210        if (
211            self.features.minimum_database_version is not None
212            and self.get_database_version() < self.features.minimum_database_version
213        ):
214            db_version = ".".join(str(v) for v in self.get_database_version())
215            min_db_version = ".".join(
216                str(v) for v in self.features.minimum_database_version
217            )
218            raise NotSupportedError(
219                f"{self.display_name} {min_db_version} or later is required "
220                f"(found {db_version})."
221            )
222
223    # ##### Backend-specific methods for creating connections and cursors #####
224
225    @abstractmethod
226    def get_connection_params(self) -> dict[str, Any]:
227        """Return a dict of parameters suitable for get_new_connection."""
228        ...
229
230    @abstractmethod
231    def get_new_connection(self, conn_params: dict[str, Any]) -> Any:
232        """Open a connection to the database."""
233        ...
234
235    def init_connection_state(self) -> None:
236        """Initialize the database connection settings."""
237        global RAN_DB_VERSION_CHECK
238        if not RAN_DB_VERSION_CHECK:
239            self.check_database_version_supported()
240            RAN_DB_VERSION_CHECK = True
241
242    @abstractmethod
243    def create_cursor(self, name: str | None = None) -> Any:
244        """Create a cursor. Assume that a connection is established."""
245        ...
246
247    # ##### Backend-specific methods for creating connections #####
248
249    def connect(self) -> None:
250        """Connect to the database. Assume that the connection is closed."""
251        # In case the previous connection was closed while in an atomic block
252        self.in_atomic_block = False
253        self.savepoint_ids = []
254        self.atomic_blocks = []
255        self.needs_rollback = False
256        # Reset parameters defining when to close/health-check the connection.
257        self.health_check_enabled = self.settings_dict["CONN_HEALTH_CHECKS"]
258        max_age = self.settings_dict["CONN_MAX_AGE"]
259        self.close_at = None if max_age is None else time.monotonic() + max_age
260        self.closed_in_transaction = False
261        self.errors_occurred = False
262        # New connections are healthy.
263        self.health_check_done = True
264        # Establish the connection
265        conn_params = self.get_connection_params()
266        self.connection = self.get_new_connection(conn_params)
267        self.set_autocommit(self.settings_dict["AUTOCOMMIT"])
268        self.init_connection_state()
269
270        self.run_on_commit = []
271
272    def ensure_connection(self) -> None:
273        """Guarantee that a connection to the database is established."""
274        if self.connection is None:
275            with self.wrap_database_errors:
276                self.connect()
277
278    # ##### Backend-specific wrappers for PEP-249 connection methods #####
279
280    def _prepare_cursor(self, cursor: utils.DBAPICursor) -> utils.CursorWrapper:
281        """
282        Validate the connection is usable and perform database cursor wrapping.
283        """
284        self.validate_thread_sharing()
285        if self.queries_logged:
286            wrapped_cursor = self.make_debug_cursor(cursor)
287        else:
288            wrapped_cursor = self.make_cursor(cursor)
289        return wrapped_cursor
290
291    def _cursor(self, name: str | None = None) -> utils.CursorWrapper:
292        self.close_if_health_check_failed()
293        self.ensure_connection()
294        with self.wrap_database_errors:
295            return self._prepare_cursor(self.create_cursor(name))
296
297    def _commit(self) -> None:
298        if self.connection is not None:
299            with debug_transaction(self, "COMMIT"), self.wrap_database_errors:
300                return self.connection.commit()
301
302    def _rollback(self) -> None:
303        if self.connection is not None:
304            with debug_transaction(self, "ROLLBACK"), self.wrap_database_errors:
305                return self.connection.rollback()
306
307    def _close(self) -> None:
308        if self.connection is not None:
309            with self.wrap_database_errors:
310                return self.connection.close()
311
312    # ##### Generic wrappers for PEP-249 connection methods #####
313
314    def cursor(self) -> utils.CursorWrapper:
315        """Create a cursor, opening a connection if necessary."""
316        return self._cursor()
317
318    def commit(self) -> None:
319        """Commit a transaction and reset the dirty flag."""
320        self.validate_thread_sharing()
321        self.validate_no_atomic_block()
322        self._commit()
323        # A successful commit means that the database connection works.
324        self.errors_occurred = False
325        self.run_commit_hooks_on_set_autocommit_on = True
326
327    def rollback(self) -> None:
328        """Roll back a transaction and reset the dirty flag."""
329        self.validate_thread_sharing()
330        self.validate_no_atomic_block()
331        self._rollback()
332        # A successful rollback means that the database connection works.
333        self.errors_occurred = False
334        self.needs_rollback = False
335        self.run_on_commit = []
336
337    def close(self) -> None:
338        """Close the connection to the database."""
339        self.validate_thread_sharing()
340        self.run_on_commit = []
341
342        # Don't call validate_no_atomic_block() to avoid making it difficult
343        # to get rid of a connection in an invalid state. The next connect()
344        # will reset the transaction state anyway.
345        if self.closed_in_transaction or self.connection is None:
346            return
347        try:
348            self._close()
349        finally:
350            if self.in_atomic_block:
351                self.closed_in_transaction = True
352                self.needs_rollback = True
353            else:
354                self.connection = None
355
356    # ##### Backend-specific savepoint management methods #####
357
358    def _savepoint(self, sid: str) -> None:
359        with self.cursor() as cursor:
360            cursor.execute(self.ops.savepoint_create_sql(sid))
361
362    def _savepoint_rollback(self, sid: str) -> None:
363        with self.cursor() as cursor:
364            cursor.execute(self.ops.savepoint_rollback_sql(sid))
365
366    def _savepoint_commit(self, sid: str) -> None:
367        with self.cursor() as cursor:
368            cursor.execute(self.ops.savepoint_commit_sql(sid))
369
370    def _savepoint_allowed(self) -> bool:
371        # Savepoints cannot be created outside a transaction
372        return self.features.uses_savepoints and not self.get_autocommit()
373
374    # ##### Generic savepoint management methods #####
375
376    def savepoint(self) -> str | None:
377        """
378        Create a savepoint inside the current transaction. Return an
379        identifier for the savepoint that will be used for the subsequent
380        rollback or commit. Do nothing if savepoints are not supported.
381        """
382        if not self._savepoint_allowed():
383            return None
384
385        thread_ident = _thread.get_ident()
386        tid = str(thread_ident).replace("-", "")
387
388        self.savepoint_state += 1
389        sid = "s%s_x%d" % (tid, self.savepoint_state)  # noqa: UP031
390
391        self.validate_thread_sharing()
392        self._savepoint(sid)
393
394        return sid
395
396    def savepoint_rollback(self, sid: str) -> None:
397        """
398        Roll back to a savepoint. Do nothing if savepoints are not supported.
399        """
400        if not self._savepoint_allowed():
401            return
402
403        self.validate_thread_sharing()
404        self._savepoint_rollback(sid)
405
406        # Remove any callbacks registered while this savepoint was active.
407        self.run_on_commit = [
408            (sids, func, robust)
409            for (sids, func, robust) in self.run_on_commit
410            if sid not in sids
411        ]
412
413    def savepoint_commit(self, sid: str) -> None:
414        """
415        Release a savepoint. Do nothing if savepoints are not supported.
416        """
417        if not self._savepoint_allowed():
418            return
419
420        self.validate_thread_sharing()
421        self._savepoint_commit(sid)
422
423    def clean_savepoints(self) -> None:
424        """
425        Reset the counter used to generate unique savepoint ids in this thread.
426        """
427        self.savepoint_state = 0
428
429    # ##### Backend-specific transaction management methods #####
430
431    @abstractmethod
432    def _set_autocommit(self, autocommit: bool) -> None:
433        """
434        Backend-specific implementation to enable or disable autocommit.
435        """
436        ...
437
438    # ##### Generic transaction management methods #####
439
440    def get_autocommit(self) -> bool:
441        """Get the autocommit state."""
442        self.ensure_connection()
443        return self.autocommit
444
445    def set_autocommit(
446        self,
447        autocommit: bool,
448        force_begin_transaction_with_broken_autocommit: bool = False,
449    ) -> None:
450        """
451        Enable or disable autocommit.
452
453        The usual way to start a transaction is to turn autocommit off.
454        SQLite does not properly start a transaction when disabling
455        autocommit. To avoid this buggy behavior and to actually enter a new
456        transaction, an explicit BEGIN is required. Using
457        force_begin_transaction_with_broken_autocommit=True will issue an
458        explicit BEGIN with SQLite. This option will be ignored for other
459        backends.
460        """
461        self.validate_no_atomic_block()
462        self.close_if_health_check_failed()
463        self.ensure_connection()
464
465        start_transaction_under_autocommit = (
466            force_begin_transaction_with_broken_autocommit
467            and not autocommit
468            and hasattr(self, "_start_transaction_under_autocommit")
469        )
470
471        if start_transaction_under_autocommit:
472            self._start_transaction_under_autocommit()  # type: ignore[attr-defined]
473        elif autocommit:
474            self._set_autocommit(autocommit)
475        else:
476            with debug_transaction(self, "BEGIN"):
477                self._set_autocommit(autocommit)
478        self.autocommit = autocommit
479
480        if autocommit and self.run_commit_hooks_on_set_autocommit_on:
481            self.run_and_clear_commit_hooks()
482            self.run_commit_hooks_on_set_autocommit_on = False
483
484    def get_rollback(self) -> bool:
485        """Get the "needs rollback" flag -- for *advanced use* only."""
486        if not self.in_atomic_block:
487            raise TransactionManagementError(
488                "The rollback flag doesn't work outside of an 'atomic' block."
489            )
490        return self.needs_rollback
491
492    def set_rollback(self, rollback: bool) -> None:
493        """
494        Set or unset the "needs rollback" flag -- for *advanced use* only.
495        """
496        if not self.in_atomic_block:
497            raise TransactionManagementError(
498                "The rollback flag doesn't work outside of an 'atomic' block."
499            )
500        self.needs_rollback = rollback
501
502    def validate_no_atomic_block(self) -> None:
503        """Raise an error if an atomic block is active."""
504        if self.in_atomic_block:
505            raise TransactionManagementError(
506                "This is forbidden when an 'atomic' block is active."
507            )
508
509    def validate_no_broken_transaction(self) -> None:
510        if self.needs_rollback:
511            raise TransactionManagementError(
512                "An error occurred in the current transaction. You can't "
513                "execute queries until the end of the 'atomic' block."
514            ) from self.rollback_exc
515
516    # ##### Foreign key constraints checks handling #####
517
518    def disable_constraint_checking(self) -> bool:
519        """
520        Backends can implement as needed to temporarily disable foreign key
521        constraint checking. Should return True if the constraints were
522        disabled and will need to be reenabled.
523        """
524        return False
525
526    def enable_constraint_checking(self) -> None:
527        """
528        Backends can implement as needed to re-enable foreign key constraint
529        checking.
530        """
531        pass
532
533    def check_constraints(self, table_names: list[str] | None = None) -> None:
534        """
535        Backends can override this method if they can apply constraint
536        checking (e.g. via "SET CONSTRAINTS ALL IMMEDIATE"). Should raise an
537        IntegrityError if any invalid foreign key references are encountered.
538        """
539        pass
540
541    # ##### Connection termination handling #####
542
543    @abstractmethod
544    def is_usable(self) -> bool:
545        """
546        Test if the database connection is usable.
547
548        This method may assume that self.connection is not None.
549
550        Actual implementations should take care not to raise exceptions
551        as that may prevent Plain from recycling unusable connections.
552        """
553        ...
554
555    def close_if_health_check_failed(self) -> None:
556        """Close existing connection if it fails a health check."""
557        if (
558            self.connection is None
559            or not self.health_check_enabled
560            or self.health_check_done
561        ):
562            return
563
564        if not self.is_usable():
565            self.close()
566        self.health_check_done = True
567
568    def close_if_unusable_or_obsolete(self) -> None:
569        """
570        Close the current connection if unrecoverable errors have occurred
571        or if it outlived its maximum age.
572        """
573        if self.connection is not None:
574            self.health_check_done = False
575            # If the application didn't restore the original autocommit setting,
576            # don't take chances, drop the connection.
577            if self.get_autocommit() != self.settings_dict["AUTOCOMMIT"]:
578                self.close()
579                return
580
581            # If an exception other than DataError or IntegrityError occurred
582            # since the last commit / rollback, check if the connection works.
583            if self.errors_occurred:
584                if self.is_usable():
585                    self.errors_occurred = False
586                    self.health_check_done = True
587                else:
588                    self.close()
589                    return
590
591            if self.close_at is not None and time.monotonic() >= self.close_at:
592                self.close()
593                return
594
595    # ##### Thread safety handling #####
596
597    @property
598    def allow_thread_sharing(self) -> bool:
599        with self._thread_sharing_lock:
600            return self._thread_sharing_count > 0
601
602    def validate_thread_sharing(self) -> None:
603        """
604        Validate that the connection isn't accessed by another thread than the
605        one which originally created it, unless the connection was explicitly
606        authorized to be shared between threads (via the `inc_thread_sharing()`
607        method). Raise an exception if the validation fails.
608        """
609        if not (self.allow_thread_sharing or self._thread_ident == _thread.get_ident()):
610            raise DatabaseError(
611                "DatabaseWrapper objects created in a "
612                "thread can only be used in that same thread. The connection "
613                f"was created in thread id {self._thread_ident} and this is "
614                f"thread id {_thread.get_ident()}."
615            )
616
617    # ##### Miscellaneous #####
618
619    def prepare_database(self) -> None:
620        """
621        Hook to do any database check or preparation, generally called before
622        migrating a project or an app.
623        """
624        pass
625
626    @cached_property
627    def wrap_database_errors(self) -> DatabaseErrorWrapper:
628        """
629        Context manager and decorator that re-throws backend-specific database
630        exceptions using Plain's common wrappers.
631        """
632        return DatabaseErrorWrapper(self)
633
634    def chunked_cursor(self) -> utils.CursorWrapper:
635        """
636        Return a cursor that tries to avoid caching in the database (if
637        supported by the database), otherwise return a regular cursor.
638        """
639        return self.cursor()
640
641    def make_debug_cursor(self, cursor: utils.DBAPICursor) -> utils.CursorDebugWrapper:
642        """Create a cursor that logs all queries in self.queries_log."""
643        return utils.CursorDebugWrapper(cursor, self)
644
645    def make_cursor(self, cursor: utils.DBAPICursor) -> utils.CursorWrapper:
646        """Create a cursor without debug logging."""
647        return utils.CursorWrapper(cursor, self)
648
649    @contextmanager
650    def temporary_connection(self) -> Generator[utils.CursorWrapper, None, None]:
651        """
652        Context manager that ensures that a connection is established, and
653        if it opened one, closes it to avoid leaving a dangling connection.
654        This is useful for operations outside of the request-response cycle.
655
656        Provide a cursor: with self.temporary_connection() as cursor: ...
657        """
658        must_close = self.connection is None
659        try:
660            with self.cursor() as cursor:
661                yield cursor
662        finally:
663            if must_close:
664                self.close()
665
666    @contextmanager
667    def _nodb_cursor(self) -> Generator[utils.CursorWrapper, None, None]:
668        """
669        Return a cursor from an alternative connection to be used when there is
670        no need to access the main database, specifically for test db
671        creation/deletion. This also prevents the production database from
672        being exposed to potential child threads while (or after) the test
673        database is destroyed. Refs #10868, #17786, #16969.
674        """
675        conn = self.__class__({**self.settings_dict, "NAME": None})
676        try:
677            with conn.cursor() as cursor:
678                yield cursor
679        finally:
680            conn.close()
681
682    def schema_editor(self, *args: Any, **kwargs: Any) -> BaseDatabaseSchemaEditor:
683        """
684        Return a new instance of this backend's SchemaEditor.
685        """
686        if self.SchemaEditorClass is None:
687            raise NotImplementedError(
688                "The SchemaEditorClass attribute of this database wrapper is still None"
689            )
690        return self.SchemaEditorClass(self, *args, **kwargs)
691
692    def on_commit(self, func: Any, robust: bool = False) -> None:
693        if not callable(func):
694            raise TypeError("on_commit()'s callback must be a callable.")
695        if self.in_atomic_block:
696            # Transaction in progress; save for execution on commit.
697            self.run_on_commit.append((set(self.savepoint_ids), func, robust))
698        elif not self.get_autocommit():
699            raise TransactionManagementError(
700                "on_commit() cannot be used in manual transaction management"
701            )
702        else:
703            # No transaction in progress and in autocommit mode; execute
704            # immediately.
705            if robust:
706                try:
707                    func()
708                except Exception as e:
709                    logger.error(
710                        f"Error calling {func.__qualname__} in on_commit() (%s).",
711                        e,
712                        exc_info=True,
713                    )
714            else:
715                func()
716
717    def run_and_clear_commit_hooks(self) -> None:
718        self.validate_no_atomic_block()
719        current_run_on_commit = self.run_on_commit
720        self.run_on_commit = []
721        while current_run_on_commit:
722            _, func, robust = current_run_on_commit.pop(0)
723            if robust:
724                try:
725                    func()
726                except Exception as e:
727                    logger.error(
728                        f"Error calling {func.__qualname__} in on_commit() during "
729                        f"transaction (%s).",
730                        e,
731                        exc_info=True,
732                    )
733            else:
734                func()
735
736    @contextmanager
737    def execute_wrapper(self, wrapper: Any) -> Generator[None, None, None]:
738        """
739        Return a context manager under which the wrapper is applied to suitable
740        database query executions.
741        """
742        self.execute_wrappers.append(wrapper)
743        try:
744            yield
745        finally:
746            self.execute_wrappers.pop()
747
748    def copy(self) -> BaseDatabaseWrapper:
749        """
750        Return a copy of this connection.
751
752        For tests that require two connections to the same database.
753        """
754        settings_dict = copy.deepcopy(self.settings_dict)
755        return type(self)(settings_dict)