1from __future__ import annotations
2
3import os
4import sys
5from pathlib import Path
6
7from plain.models.backends.base.creation import BaseDatabaseCreation
8
9
10class DatabaseCreation(BaseDatabaseCreation):
11 @staticmethod
12 def is_in_memory_db(database_name: str | Path) -> bool:
13 return not isinstance(database_name, Path) and (
14 database_name == ":memory:" or "mode=memory" in database_name
15 )
16
17 def _get_test_db_name(self, prefix: str = "") -> str:
18 raw_name = self.connection.settings_dict["TEST"]["NAME"] or ":memory:"
19 # Special in-memory case
20 if raw_name == ":memory:":
21 return "file:memorydb?mode=memory&cache=shared"
22
23 test_database_name = raw_name
24
25 if prefix:
26 test_database_name = f"{prefix}_{test_database_name}"
27
28 return test_database_name
29
30 def _create_test_db(
31 self, *, test_database_name: str, verbosity: int, autoclobber: bool
32 ) -> str:
33 """
34 Internal implementation - delete existing SQLite test DB file if needed.
35 """
36 if not self.is_in_memory_db(test_database_name):
37 # Erase the old test database file.
38 if verbosity >= 1:
39 self.log(f"Destroying old test database '{test_database_name}'...")
40 if os.access(test_database_name, os.F_OK):
41 if not autoclobber:
42 confirm = input(
43 "Type 'yes' if you would like to try deleting the test "
44 f"database '{test_database_name}', or 'no' to cancel: "
45 )
46 if autoclobber or confirm == "yes":
47 try:
48 os.remove(test_database_name)
49 except Exception as e:
50 self.log(f"Got an error deleting the old test database: {e}")
51 sys.exit(2)
52 else:
53 self.log("Tests cancelled.")
54 sys.exit(1)
55 return test_database_name
56
57 def _destroy_test_db(self, test_database_name: str, verbosity: int) -> None:
58 if test_database_name and not self.is_in_memory_db(test_database_name):
59 # Remove the SQLite database file
60 os.remove(test_database_name)
61
62 def test_db_signature(self, prefix: str = "") -> tuple[str, str]:
63 """
64 Return a tuple that uniquely identifies a test database.
65
66 This takes into account the special cases of ":memory:" and "" for
67 SQLite since the databases will be distinct despite having the same
68 TEST NAME. See https://www.sqlite.org/inmemorydb.html
69 """
70 test_database_name = self._get_test_db_name(prefix)
71 sig = [self.connection.settings_dict["NAME"]]
72 if self.is_in_memory_db(test_database_name):
73 sig.append(":memory:")
74 else:
75 sig.append(test_database_name)
76 return tuple(sig)