1import os
2import sys
3
4from plain.runtime import settings
5
6# The prefix to put on the default database name when creating
7# the test database.
8TEST_DATABASE_PREFIX = "test_"
9
10
11class BaseDatabaseCreation:
12 """
13 Encapsulate backend-specific differences pertaining to creation and
14 destruction of the test database.
15 """
16
17 def __init__(self, connection):
18 self.connection = connection
19
20 def _nodb_cursor(self):
21 return self.connection._nodb_cursor()
22
23 def log(self, msg):
24 sys.stderr.write(msg + os.linesep)
25
26 def create_test_db(self, verbosity=1, prefix=""):
27 """
28 Create a test database, prompting the user for confirmation if the
29 database already exists. Return the name of the test database created.
30
31 If prefix is provided, it will be prepended to the database name
32 to isolate it from other test databases.
33 """
34 from plain.models.cli import migrate
35
36 test_database_name = self._get_test_db_name(prefix)
37
38 if verbosity >= 1:
39 self.log(f"Creating test database '{test_database_name}'...")
40
41 self._create_test_db(
42 test_database_name=test_database_name, verbosity=verbosity, autoclobber=True
43 )
44
45 self.connection.close()
46 settings.DATABASE["NAME"] = test_database_name
47 self.connection.settings_dict["NAME"] = test_database_name
48
49 # We report migrate messages at one level lower than that
50 # requested. This ensures we don't get flooded with messages during
51 # testing (unless you really ask to be flooded).
52 migrate.callback(
53 package_label=None,
54 migration_name=None,
55 fake=False,
56 fake_initial=False,
57 plan=False,
58 check_unapplied=False,
59 backup=False,
60 prune=False,
61 verbosity=max(verbosity - 1, 0),
62 )
63
64 # Ensure a connection for the side effect of initializing the test database.
65 self.connection.ensure_connection()
66
67 return test_database_name
68
69 def set_as_test_mirror(self, primary_settings_dict):
70 """
71 Set this database up to be used in testing as a mirror of a primary
72 database whose settings are given.
73 """
74 self.connection.settings_dict["NAME"] = primary_settings_dict["NAME"]
75
76 # def serialize_db_to_string(self):
77 # """
78 # Serialize all data in the database into a JSON string.
79 # Designed only for test runner usage; will not handle large
80 # amounts of data.
81 # """
82
83 # # Iteratively return every object for all models to serialize.
84 # def get_objects():
85 # from plain.models.migrations.loader import MigrationLoader
86
87 # loader = MigrationLoader(self.connection)
88 # for package_config in packages.get_package_configs():
89 # if (
90 # package_config.models_module is not None
91 # and package_config.package_label in loader.migrated_packages
92 # ):
93 # for model in package_config.get_models():
94 # if model._meta.can_migrate(
95 # self.connection
96 # ) and router.allow_migrate_model(self.connection.alias, model):
97 # queryset = model._base_manager.using(
98 # self.connection.alias,
99 # ).order_by(model._meta.pk.name)
100 # yield from queryset.iterator()
101
102 # # Serialize to a string
103 # out = StringIO()
104 # serializers.serialize("json", get_objects(), indent=None, stream=out)
105 # return out.getvalue()
106
107 # def deserialize_db_from_string(self, data):
108 # """
109 # Reload the database with data from a string generated by
110 # the serialize_db_to_string() method.
111 # """
112 # data = StringIO(data)
113 # table_names = set()
114 # # Load data in a transaction to handle forward references and cycles.
115 # with atomic(using=self.connection.alias):
116 # # Disable constraint checks, because some databases (MySQL) doesn't
117 # # support deferred checks.
118 # with self.connection.constraint_checks_disabled():
119 # for obj in serializers.deserialize(
120 # "json", data, using=self.connection.alias
121 # ):
122 # obj.save()
123 # table_names.add(obj.object.__class__._meta.db_table)
124 # # Manually check for any invalid keys that might have been added,
125 # # because constraint checks were disabled.
126 # self.connection.check_constraints(table_names=table_names)
127
128 def _get_test_db_name(self, prefix=""):
129 """
130 Internal implementation - return the name of the test DB that will be
131 created. Only useful when called from create_test_db() and
132 _create_test_db() and when no external munging is done with the 'NAME'
133 settings.
134
135 If prefix is provided, it will be prepended to the database name.
136 """
137 # Determine the base name: explicit TEST.NAME overrides base NAME.
138 base_name = (
139 self.connection.settings_dict["TEST"]["NAME"]
140 or self.connection.settings_dict["NAME"]
141 )
142 if prefix:
143 return f"{prefix}_{base_name}"
144 if self.connection.settings_dict["TEST"]["NAME"]:
145 return self.connection.settings_dict["TEST"]["NAME"]
146 return TEST_DATABASE_PREFIX + self.connection.settings_dict["NAME"]
147
148 def _execute_create_test_db(self, cursor, parameters):
149 cursor.execute("CREATE DATABASE {dbname} {suffix}".format(**parameters))
150
151 def _create_test_db(self, *, test_database_name, verbosity, autoclobber):
152 """
153 Internal implementation - create the test db tables.
154 """
155 test_db_params = {
156 "dbname": self.connection.ops.quote_name(test_database_name),
157 "suffix": self.sql_table_creation_suffix(),
158 }
159 # Create the test database and connect to it.
160 with self._nodb_cursor() as cursor:
161 try:
162 self._execute_create_test_db(cursor, test_db_params)
163 except Exception as e:
164 self.log(f"Got an error creating the test database: {e}")
165 if not autoclobber:
166 confirm = input(
167 "Type 'yes' if you would like to try deleting the test "
168 f"database '{test_database_name}', or 'no' to cancel: "
169 )
170 if autoclobber or confirm == "yes":
171 try:
172 if verbosity >= 1:
173 self.log(
174 f"Destroying old test database '{test_database_name}'..."
175 )
176 cursor.execute(
177 "DROP DATABASE {dbname}".format(**test_db_params)
178 )
179 self._execute_create_test_db(cursor, test_db_params)
180 except Exception as e:
181 self.log(f"Got an error recreating the test database: {e}")
182 sys.exit(2)
183 else:
184 self.log("Tests cancelled.")
185 sys.exit(1)
186
187 return test_database_name
188
189 def destroy_test_db(self, old_database_name=None, verbosity=1):
190 """
191 Destroy a test database, prompting the user for confirmation if the
192 database already exists.
193 """
194 self.connection.close()
195
196 test_database_name = self.connection.settings_dict["NAME"]
197
198 if verbosity >= 1:
199 self.log(f"Destroying test database '{test_database_name}'...")
200 self._destroy_test_db(test_database_name, verbosity)
201
202 # Restore the original database name
203 if old_database_name is not None:
204 settings.DATABASE["NAME"] = old_database_name
205 self.connection.settings_dict["NAME"] = old_database_name
206
207 def _destroy_test_db(self, test_database_name, verbosity):
208 """
209 Internal implementation - remove the test db tables.
210 """
211 # Remove the test database to clean up after
212 # ourselves. Connect to the previous database (not the test database)
213 # to do so, because it's not allowed to delete a database while being
214 # connected to it.
215 with self._nodb_cursor() as cursor:
216 cursor.execute(
217 f"DROP DATABASE {self.connection.ops.quote_name(test_database_name)}"
218 )
219
220 def sql_table_creation_suffix(self):
221 """
222 SQL to append to the end of the test table creation statements.
223 """
224 return ""
225
226 def test_db_signature(self, prefix=""):
227 """
228 Return a tuple with elements of self.connection.settings_dict (a
229 DATABASE setting value) that uniquely identify a database
230 accordingly to the RDBMS particularities.
231 """
232 settings_dict = self.connection.settings_dict
233 return (
234 settings_dict["HOST"],
235 settings_dict["PORT"],
236 settings_dict["ENGINE"],
237 self._get_test_db_name(prefix),
238 )