Plain is headed towards 1.0! Subscribe for development updates →

  1import os
  2import sys
  3
  4from plain.runtime import settings
  5
  6# The prefix to put on the default database name when creating
  7# the test database.
  8TEST_DATABASE_PREFIX = "test_"
  9
 10
 11class BaseDatabaseCreation:
 12    """
 13    Encapsulate backend-specific differences pertaining to creation and
 14    destruction of the test database.
 15    """
 16
 17    def __init__(self, connection):
 18        self.connection = connection
 19
 20    def _nodb_cursor(self):
 21        return self.connection._nodb_cursor()
 22
 23    def log(self, msg):
 24        sys.stderr.write(msg + os.linesep)
 25
 26    def create_test_db(self, verbosity=1, prefix=""):
 27        """
 28        Create a test database, prompting the user for confirmation if the
 29        database already exists. Return the name of the test database created.
 30
 31        If prefix is provided, it will be prepended to the database name
 32        to isolate it from other test databases.
 33        """
 34        from plain.models.cli import migrate
 35
 36        test_database_name = self._get_test_db_name(prefix)
 37
 38        if verbosity >= 1:
 39            self.log(f"Creating test database '{test_database_name}'...")
 40
 41        self._create_test_db(
 42            test_database_name=test_database_name, verbosity=verbosity, autoclobber=True
 43        )
 44
 45        self.connection.close()
 46        settings.DATABASE["NAME"] = test_database_name
 47        self.connection.settings_dict["NAME"] = test_database_name
 48
 49        # We report migrate messages at one level lower than that
 50        # requested. This ensures we don't get flooded with messages during
 51        # testing (unless you really ask to be flooded).
 52        migrate.callback(
 53            package_label=None,
 54            migration_name=None,
 55            fake=False,
 56            fake_initial=False,
 57            plan=False,
 58            check_unapplied=False,
 59            backup=False,
 60            prune=False,
 61            verbosity=max(verbosity - 1, 0),
 62        )
 63
 64        # Ensure a connection for the side effect of initializing the test database.
 65        self.connection.ensure_connection()
 66
 67        return test_database_name
 68
 69    def set_as_test_mirror(self, primary_settings_dict):
 70        """
 71        Set this database up to be used in testing as a mirror of a primary
 72        database whose settings are given.
 73        """
 74        self.connection.settings_dict["NAME"] = primary_settings_dict["NAME"]
 75
 76    # def serialize_db_to_string(self):
 77    #     """
 78    #     Serialize all data in the database into a JSON string.
 79    #     Designed only for test runner usage; will not handle large
 80    #     amounts of data.
 81    #     """
 82
 83    #     # Iteratively return every object for all models to serialize.
 84    #     def get_objects():
 85    #         from plain.models.migrations.loader import MigrationLoader
 86
 87    #         loader = MigrationLoader(self.connection)
 88    #         for package_config in packages.get_package_configs():
 89    #             if (
 90    #                 package_config.models_module is not None
 91    #                 and package_config.package_label in loader.migrated_packages
 92    #             ):
 93    #                 for model in package_config.get_models():
 94    #                     if model._meta.can_migrate(
 95    #                         self.connection
 96    #                     ) and router.allow_migrate_model(self.connection.alias, model):
 97    #                         queryset = model._base_manager.using(
 98    #                             self.connection.alias,
 99    #                         ).order_by(model._meta.pk.name)
100    #                         yield from queryset.iterator()
101
102    #     # Serialize to a string
103    #     out = StringIO()
104    #     serializers.serialize("json", get_objects(), indent=None, stream=out)
105    #     return out.getvalue()
106
107    # def deserialize_db_from_string(self, data):
108    #     """
109    #     Reload the database with data from a string generated by
110    #     the serialize_db_to_string() method.
111    #     """
112    #     data = StringIO(data)
113    #     table_names = set()
114    #     # Load data in a transaction to handle forward references and cycles.
115    #     with atomic(using=self.connection.alias):
116    #         # Disable constraint checks, because some databases (MySQL) doesn't
117    #         # support deferred checks.
118    #         with self.connection.constraint_checks_disabled():
119    #             for obj in serializers.deserialize(
120    #                 "json", data, using=self.connection.alias
121    #             ):
122    #                 obj.save()
123    #                 table_names.add(obj.object.__class__._meta.db_table)
124    #         # Manually check for any invalid keys that might have been added,
125    #         # because constraint checks were disabled.
126    #         self.connection.check_constraints(table_names=table_names)
127
128    def _get_test_db_name(self, prefix=""):
129        """
130        Internal implementation - return the name of the test DB that will be
131        created. Only useful when called from create_test_db() and
132        _create_test_db() and when no external munging is done with the 'NAME'
133        settings.
134
135        If prefix is provided, it will be prepended to the database name.
136        """
137        # Determine the base name: explicit TEST.NAME overrides base NAME.
138        base_name = (
139            self.connection.settings_dict["TEST"]["NAME"]
140            or self.connection.settings_dict["NAME"]
141        )
142        if prefix:
143            return f"{prefix}_{base_name}"
144        if self.connection.settings_dict["TEST"]["NAME"]:
145            return self.connection.settings_dict["TEST"]["NAME"]
146        return TEST_DATABASE_PREFIX + self.connection.settings_dict["NAME"]
147
148    def _execute_create_test_db(self, cursor, parameters):
149        cursor.execute("CREATE DATABASE {dbname} {suffix}".format(**parameters))
150
151    def _create_test_db(self, *, test_database_name, verbosity, autoclobber):
152        """
153        Internal implementation - create the test db tables.
154        """
155        test_db_params = {
156            "dbname": self.connection.ops.quote_name(test_database_name),
157            "suffix": self.sql_table_creation_suffix(),
158        }
159        # Create the test database and connect to it.
160        with self._nodb_cursor() as cursor:
161            try:
162                self._execute_create_test_db(cursor, test_db_params)
163            except Exception as e:
164                self.log(f"Got an error creating the test database: {e}")
165                if not autoclobber:
166                    confirm = input(
167                        "Type 'yes' if you would like to try deleting the test "
168                        f"database '{test_database_name}', or 'no' to cancel: "
169                    )
170                if autoclobber or confirm == "yes":
171                    try:
172                        if verbosity >= 1:
173                            self.log(
174                                f"Destroying old test database '{test_database_name}'..."
175                            )
176                        cursor.execute(
177                            "DROP DATABASE {dbname}".format(**test_db_params)
178                        )
179                        self._execute_create_test_db(cursor, test_db_params)
180                    except Exception as e:
181                        self.log(f"Got an error recreating the test database: {e}")
182                        sys.exit(2)
183                else:
184                    self.log("Tests cancelled.")
185                    sys.exit(1)
186
187        return test_database_name
188
189    def destroy_test_db(self, old_database_name=None, verbosity=1):
190        """
191        Destroy a test database, prompting the user for confirmation if the
192        database already exists.
193        """
194        self.connection.close()
195
196        test_database_name = self.connection.settings_dict["NAME"]
197
198        if verbosity >= 1:
199            self.log(f"Destroying test database '{test_database_name}'...")
200        self._destroy_test_db(test_database_name, verbosity)
201
202        # Restore the original database name
203        if old_database_name is not None:
204            settings.DATABASE["NAME"] = old_database_name
205            self.connection.settings_dict["NAME"] = old_database_name
206
207    def _destroy_test_db(self, test_database_name, verbosity):
208        """
209        Internal implementation - remove the test db tables.
210        """
211        # Remove the test database to clean up after
212        # ourselves. Connect to the previous database (not the test database)
213        # to do so, because it's not allowed to delete a database while being
214        # connected to it.
215        with self._nodb_cursor() as cursor:
216            cursor.execute(
217                f"DROP DATABASE {self.connection.ops.quote_name(test_database_name)}"
218            )
219
220    def sql_table_creation_suffix(self):
221        """
222        SQL to append to the end of the test table creation statements.
223        """
224        return ""
225
226    def test_db_signature(self, prefix=""):
227        """
228        Return a tuple with elements of self.connection.settings_dict (a
229        DATABASE setting value) that uniquely identify a database
230        accordingly to the RDBMS particularities.
231        """
232        settings_dict = self.connection.settings_dict
233        return (
234            settings_dict["HOST"],
235            settings_dict["PORT"],
236            settings_dict["ENGINE"],
237            self._get_test_db_name(prefix),
238        )