1import multiprocessing
2import os
3import shutil
4import sqlite3
5import sys
6from pathlib import Path
7
8from plain.models.backends.base.creation import BaseDatabaseCreation
9from plain.models.db import NotSupportedError
10
11
12class DatabaseCreation(BaseDatabaseCreation):
13 @staticmethod
14 def is_in_memory_db(database_name):
15 return not isinstance(database_name, Path) and (
16 database_name == ":memory:" or "mode=memory" in database_name
17 )
18
19 def _get_test_db_name(self):
20 test_database_name = self.connection.settings_dict["TEST"]["NAME"] or ":memory:"
21 if test_database_name == ":memory:":
22 return "file:memorydb_%s?mode=memory&cache=shared" % self.connection.alias
23 return test_database_name
24
25 def _create_test_db(self, verbosity, autoclobber, keepdb=False):
26 test_database_name = self._get_test_db_name()
27
28 if keepdb:
29 return test_database_name
30 if not self.is_in_memory_db(test_database_name):
31 # Erase the old test database
32 if verbosity >= 1:
33 self.log(
34 "Destroying old test database for alias {}...".format(
35 self._get_database_display_str(verbosity, test_database_name)
36 )
37 )
38 if os.access(test_database_name, os.F_OK):
39 if not autoclobber:
40 confirm = input(
41 "Type 'yes' if you would like to try deleting the test "
42 "database '%s', or 'no' to cancel: " % test_database_name
43 )
44 if autoclobber or confirm == "yes":
45 try:
46 os.remove(test_database_name)
47 except Exception as e:
48 self.log("Got an error deleting the old test database: %s" % e)
49 sys.exit(2)
50 else:
51 self.log("Tests cancelled.")
52 sys.exit(1)
53 return test_database_name
54
55 def get_test_db_clone_settings(self, suffix):
56 orig_settings_dict = self.connection.settings_dict
57 source_database_name = orig_settings_dict["NAME"]
58
59 if not self.is_in_memory_db(source_database_name):
60 root, ext = os.path.splitext(source_database_name)
61 return {**orig_settings_dict, "NAME": f"{root}_{suffix}{ext}"}
62
63 start_method = multiprocessing.get_start_method()
64 if start_method == "fork":
65 return orig_settings_dict
66 if start_method == "spawn":
67 return {
68 **orig_settings_dict,
69 "NAME": f"{self.connection.alias}_{suffix}.sqlite3",
70 }
71 raise NotSupportedError(
72 f"Cloning with start method {start_method!r} is not supported."
73 )
74
75 def _clone_test_db(self, suffix, verbosity, keepdb=False):
76 source_database_name = self.connection.settings_dict["NAME"]
77 target_database_name = self.get_test_db_clone_settings(suffix)["NAME"]
78 if not self.is_in_memory_db(source_database_name):
79 # Erase the old test database
80 if os.access(target_database_name, os.F_OK):
81 if keepdb:
82 return
83 if verbosity >= 1:
84 self.log(
85 "Destroying old test database for alias {}...".format(
86 self._get_database_display_str(
87 verbosity, target_database_name
88 ),
89 )
90 )
91 try:
92 os.remove(target_database_name)
93 except Exception as e:
94 self.log("Got an error deleting the old test database: %s" % e)
95 sys.exit(2)
96 try:
97 shutil.copy(source_database_name, target_database_name)
98 except Exception as e:
99 self.log("Got an error cloning the test database: %s" % e)
100 sys.exit(2)
101 # Forking automatically makes a copy of an in-memory database.
102 # Spawn requires migrating to disk which will be re-opened in
103 # setup_worker_connection.
104 elif multiprocessing.get_start_method() == "spawn":
105 ondisk_db = sqlite3.connect(target_database_name, uri=True)
106 self.connection.connection.backup(ondisk_db)
107 ondisk_db.close()
108
109 def _destroy_test_db(self, test_database_name, verbosity):
110 if test_database_name and not self.is_in_memory_db(test_database_name):
111 # Remove the SQLite database file
112 os.remove(test_database_name)
113
114 def test_db_signature(self):
115 """
116 Return a tuple that uniquely identifies a test database.
117
118 This takes into account the special cases of ":memory:" and "" for
119 SQLite since the databases will be distinct despite having the same
120 TEST NAME. See https://www.sqlite.org/inmemorydb.html
121 """
122 test_database_name = self._get_test_db_name()
123 sig = [self.connection.settings_dict["NAME"]]
124 if self.is_in_memory_db(test_database_name):
125 sig.append(self.connection.alias)
126 else:
127 sig.append(test_database_name)
128 return tuple(sig)
129
130 def setup_worker_connection(self, _worker_id):
131 settings_dict = self.get_test_db_clone_settings(_worker_id)
132 # connection.settings_dict must be updated in place for changes to be
133 # reflected in plain.models.connections. Otherwise new threads would
134 # connect to the default database instead of the appropriate clone.
135 start_method = multiprocessing.get_start_method()
136 if start_method == "fork":
137 # Update settings_dict in place.
138 self.connection.settings_dict.update(settings_dict)
139 self.connection.close()
140 elif start_method == "spawn":
141 alias = self.connection.alias
142 connection_str = (
143 f"file:memorydb_{alias}_{_worker_id}?mode=memory&cache=shared"
144 )
145 source_db = self.connection.Database.connect(
146 f"file:{alias}_{_worker_id}.sqlite3", uri=True
147 )
148 target_db = sqlite3.connect(connection_str, uri=True)
149 source_db.backup(target_db)
150 source_db.close()
151 # Update settings_dict in place.
152 self.connection.settings_dict.update(settings_dict)
153 self.connection.settings_dict["NAME"] = connection_str
154 # Re-open connection to in-memory database before closing copy
155 # connection.
156 self.connection.connect()
157 target_db.close()