Plain is headed towards 1.0! Subscribe for development updates →

  1from plain.exceptions import ImproperlyConfigured
  2from plain.models import DEFAULT_DB_ALIAS, connections
  3
  4
  5def setup_databases(
  6    verbosity,
  7    *,
  8    keepdb=False,
  9    debug_sql=False,
 10    aliases=None,
 11    serialized_aliases=None,
 12    **kwargs,
 13):
 14    """Create the test databases."""
 15
 16    test_databases, mirrored_aliases = get_unique_databases_and_mirrors(aliases)
 17
 18    old_names = []
 19
 20    for db_name, aliases in test_databases.values():
 21        first_alias = None
 22        for alias in aliases:
 23            connection = connections[alias]
 24            old_names.append((connection, db_name, first_alias is None))
 25
 26            # Actually create the database for the first connection
 27            if first_alias is None:
 28                first_alias = alias
 29                serialize_alias = (
 30                    serialized_aliases is None or alias in serialized_aliases
 31                )
 32                connection.creation.create_test_db(
 33                    verbosity=verbosity,
 34                    autoclobber=True,
 35                    keepdb=keepdb,
 36                    serialize=serialize_alias,
 37                )
 38            # Configure all other connections as mirrors of the first one
 39            else:
 40                connections[alias].creation.set_as_test_mirror(
 41                    connections[first_alias].settings_dict
 42                )
 43
 44    # Configure the test mirrors.
 45    for alias, mirror_alias in mirrored_aliases.items():
 46        connections[alias].creation.set_as_test_mirror(
 47            connections[mirror_alias].settings_dict
 48        )
 49
 50    if debug_sql:
 51        for alias in connections:
 52            connections[alias].force_debug_cursor = True
 53
 54    return old_names
 55
 56
 57def get_unique_databases_and_mirrors(aliases=None):
 58    """
 59    Figure out which databases actually need to be created.
 60
 61    Deduplicate entries in DATABASES that correspond the same database or are
 62    configured as test mirrors.
 63
 64    Return two values:
 65    - test_databases: ordered mapping of signatures to (name, list of aliases)
 66                      where all aliases share the same underlying database.
 67    - mirrored_aliases: mapping of mirror aliases to original aliases.
 68    """
 69    if aliases is None:
 70        aliases = connections
 71    mirrored_aliases = {}
 72    test_databases = {}
 73    dependencies = {}
 74    default_sig = connections[DEFAULT_DB_ALIAS].creation.test_db_signature()
 75
 76    for alias in connections:
 77        connection = connections[alias]
 78        test_settings = connection.settings_dict["TEST"]
 79
 80        if test_settings["MIRROR"]:
 81            # If the database is marked as a test mirror, save the alias.
 82            mirrored_aliases[alias] = test_settings["MIRROR"]
 83        elif alias in aliases:
 84            # Store a tuple with DB parameters that uniquely identify it.
 85            # If we have two aliases with the same values for that tuple,
 86            # we only need to create the test database once.
 87            item = test_databases.setdefault(
 88                connection.creation.test_db_signature(),
 89                (connection.settings_dict["NAME"], []),
 90            )
 91            # The default database must be the first because data migrations
 92            # use the default alias by default.
 93            if alias == DEFAULT_DB_ALIAS:
 94                item[1].insert(0, alias)
 95            else:
 96                item[1].append(alias)
 97
 98            if "DEPENDENCIES" in test_settings:
 99                dependencies[alias] = test_settings["DEPENDENCIES"]
100            else:
101                if (
102                    alias != DEFAULT_DB_ALIAS
103                    and connection.creation.test_db_signature() != default_sig
104                ):
105                    dependencies[alias] = test_settings.get(
106                        "DEPENDENCIES", [DEFAULT_DB_ALIAS]
107                    )
108
109    test_databases = dict(dependency_ordered(test_databases.items(), dependencies))
110    return test_databases, mirrored_aliases
111
112
113def teardown_databases(old_config, verbosity, keepdb=False):
114    """Destroy all the non-mirror databases."""
115    for connection, old_name, destroy in old_config:
116        if destroy:
117            connection.creation.destroy_test_db(old_name, verbosity, keepdb)
118
119
120def dependency_ordered(test_databases, dependencies):
121    """
122    Reorder test_databases into an order that honors the dependencies
123    described in TEST[DEPENDENCIES].
124    """
125    ordered_test_databases = []
126    resolved_databases = set()
127
128    # Maps db signature to dependencies of all its aliases
129    dependencies_map = {}
130
131    # Check that no database depends on its own alias
132    for sig, (_, aliases) in test_databases:
133        all_deps = set()
134        for alias in aliases:
135            all_deps.update(dependencies.get(alias, []))
136        if not all_deps.isdisjoint(aliases):
137            raise ImproperlyConfigured(
138                "Circular dependency: databases %r depend on each other, "
139                "but are aliases." % aliases
140            )
141        dependencies_map[sig] = all_deps
142
143    while test_databases:
144        changed = False
145        deferred = []
146
147        # Try to find a DB that has all its dependencies met
148        for signature, (db_name, aliases) in test_databases:
149            if dependencies_map[signature].issubset(resolved_databases):
150                resolved_databases.update(aliases)
151                ordered_test_databases.append((signature, (db_name, aliases)))
152                changed = True
153            else:
154                deferred.append((signature, (db_name, aliases)))
155
156        if not changed:
157            raise ImproperlyConfigured("Circular dependency in TEST[DEPENDENCIES]")
158        test_databases = deferred
159    return ordered_test_databases