Plain is headed towards 1.0! Subscribe for development updates →

 1import os
 2import subprocess
 3import sys
 4
 5from plain.models.backends.base.creation import BaseDatabaseCreation
 6
 7from .client import DatabaseClient
 8
 9
10class DatabaseCreation(BaseDatabaseCreation):
11    def sql_table_creation_suffix(self):
12        suffix = []
13        test_settings = self.connection.settings_dict["TEST"]
14        if test_settings["CHARSET"]:
15            suffix.append("CHARACTER SET %s" % test_settings["CHARSET"])
16        if test_settings["COLLATION"]:
17            suffix.append("COLLATE %s" % test_settings["COLLATION"])
18        return " ".join(suffix)
19
20    def _execute_create_test_db(self, cursor, parameters, keepdb=False):
21        try:
22            super()._execute_create_test_db(cursor, parameters, keepdb)
23        except Exception as e:
24            if len(e.args) < 1 or e.args[0] != 1007:
25                # All errors except "database exists" (1007) cancel tests.
26                self.log("Got an error creating the test database: %s" % e)
27                sys.exit(2)
28            else:
29                raise
30
31    def _clone_test_db(self, suffix, verbosity, keepdb=False):
32        source_database_name = self.connection.settings_dict["NAME"]
33        target_database_name = self.get_test_db_clone_settings(suffix)["NAME"]
34        test_db_params = {
35            "dbname": self.connection.ops.quote_name(target_database_name),
36            "suffix": self.sql_table_creation_suffix(),
37        }
38        with self._nodb_cursor() as cursor:
39            try:
40                self._execute_create_test_db(cursor, test_db_params, keepdb)
41            except Exception:
42                if keepdb:
43                    # If the database should be kept, skip everything else.
44                    return
45                try:
46                    if verbosity >= 1:
47                        self.log(
48                            "Destroying old test database for alias {}...".format(
49                                self._get_database_display_str(
50                                    verbosity, target_database_name
51                                ),
52                            )
53                        )
54                    cursor.execute("DROP DATABASE {dbname}".format(**test_db_params))
55                    self._execute_create_test_db(cursor, test_db_params, keepdb)
56                except Exception as e:
57                    self.log("Got an error recreating the test database: %s" % e)
58                    sys.exit(2)
59        self._clone_db(source_database_name, target_database_name)
60
61    def _clone_db(self, source_database_name, target_database_name):
62        cmd_args, cmd_env = DatabaseClient.settings_to_cmd_args_env(
63            self.connection.settings_dict, []
64        )
65        dump_cmd = [
66            "mysqldump",
67            *cmd_args[1:-1],
68            "--routines",
69            "--events",
70            source_database_name,
71        ]
72        dump_env = load_env = {**os.environ, **cmd_env} if cmd_env else None
73        load_cmd = cmd_args
74        load_cmd[-1] = target_database_name
75
76        with subprocess.Popen(
77            dump_cmd, stdout=subprocess.PIPE, env=dump_env
78        ) as dump_proc:
79            with subprocess.Popen(
80                load_cmd,
81                stdin=dump_proc.stdout,
82                stdout=subprocess.DEVNULL,
83                env=load_env,
84            ):
85                # Allow dump_proc to receive a SIGPIPE if the load process exits.
86                dump_proc.stdout.close()