1import os
2import subprocess
3import sys
4
5from plain.models.backends.base.creation import BaseDatabaseCreation
6
7from .client import DatabaseClient
8
9
10class DatabaseCreation(BaseDatabaseCreation):
11 def sql_table_creation_suffix(self):
12 suffix = []
13 test_settings = self.connection.settings_dict["TEST"]
14 if test_settings["CHARSET"]:
15 suffix.append("CHARACTER SET %s" % test_settings["CHARSET"])
16 if test_settings["COLLATION"]:
17 suffix.append("COLLATE %s" % test_settings["COLLATION"])
18 return " ".join(suffix)
19
20 def _execute_create_test_db(self, cursor, parameters, keepdb=False):
21 try:
22 super()._execute_create_test_db(cursor, parameters, keepdb)
23 except Exception as e:
24 if len(e.args) < 1 or e.args[0] != 1007:
25 # All errors except "database exists" (1007) cancel tests.
26 self.log("Got an error creating the test database: %s" % e)
27 sys.exit(2)
28 else:
29 raise
30
31 def _clone_test_db(self, suffix, verbosity, keepdb=False):
32 source_database_name = self.connection.settings_dict["NAME"]
33 target_database_name = self.get_test_db_clone_settings(suffix)["NAME"]
34 test_db_params = {
35 "dbname": self.connection.ops.quote_name(target_database_name),
36 "suffix": self.sql_table_creation_suffix(),
37 }
38 with self._nodb_cursor() as cursor:
39 try:
40 self._execute_create_test_db(cursor, test_db_params, keepdb)
41 except Exception:
42 if keepdb:
43 # If the database should be kept, skip everything else.
44 return
45 try:
46 if verbosity >= 1:
47 self.log(
48 "Destroying old test database for alias {}...".format(
49 self._get_database_display_str(
50 verbosity, target_database_name
51 ),
52 )
53 )
54 cursor.execute("DROP DATABASE {dbname}".format(**test_db_params))
55 self._execute_create_test_db(cursor, test_db_params, keepdb)
56 except Exception as e:
57 self.log("Got an error recreating the test database: %s" % e)
58 sys.exit(2)
59 self._clone_db(source_database_name, target_database_name)
60
61 def _clone_db(self, source_database_name, target_database_name):
62 cmd_args, cmd_env = DatabaseClient.settings_to_cmd_args_env(
63 self.connection.settings_dict, []
64 )
65 dump_cmd = [
66 "mysqldump",
67 *cmd_args[1:-1],
68 "--routines",
69 "--events",
70 source_database_name,
71 ]
72 dump_env = load_env = {**os.environ, **cmd_env} if cmd_env else None
73 load_cmd = cmd_args
74 load_cmd[-1] = target_database_name
75
76 with subprocess.Popen(
77 dump_cmd, stdout=subprocess.PIPE, env=dump_env
78 ) as dump_proc:
79 with subprocess.Popen(
80 load_cmd,
81 stdin=dump_proc.stdout,
82 stdout=subprocess.DEVNULL,
83 env=load_env,
84 ):
85 # Allow dump_proc to receive a SIGPIPE if the load process exits.
86 dump_proc.stdout.close()