1from plain.exceptions import ImproperlyConfigured
2from plain.models import DEFAULT_DB_ALIAS, connections
3
4
5def setup_databases(
6 verbosity,
7 *,
8 keepdb=False,
9 debug_sql=False,
10 aliases=None,
11 serialized_aliases=None,
12 **kwargs,
13):
14 """Create the test databases."""
15
16 test_databases, mirrored_aliases = get_unique_databases_and_mirrors(aliases)
17
18 old_names = []
19
20 for db_name, aliases in test_databases.values():
21 first_alias = None
22 for alias in aliases:
23 connection = connections[alias]
24 old_names.append((connection, db_name, first_alias is None))
25
26 # Actually create the database for the first connection
27 if first_alias is None:
28 first_alias = alias
29 serialize_alias = (
30 serialized_aliases is None or alias in serialized_aliases
31 )
32 connection.creation.create_test_db(
33 verbosity=verbosity,
34 autoclobber=True,
35 keepdb=keepdb,
36 serialize=serialize_alias,
37 )
38 # Configure all other connections as mirrors of the first one
39 else:
40 connections[alias].creation.set_as_test_mirror(
41 connections[first_alias].settings_dict
42 )
43
44 # Configure the test mirrors.
45 for alias, mirror_alias in mirrored_aliases.items():
46 connections[alias].creation.set_as_test_mirror(
47 connections[mirror_alias].settings_dict
48 )
49
50 if debug_sql:
51 for alias in connections:
52 connections[alias].force_debug_cursor = True
53
54 return old_names
55
56
57def get_unique_databases_and_mirrors(aliases=None):
58 """
59 Figure out which databases actually need to be created.
60
61 Deduplicate entries in DATABASES that correspond the same database or are
62 configured as test mirrors.
63
64 Return two values:
65 - test_databases: ordered mapping of signatures to (name, list of aliases)
66 where all aliases share the same underlying database.
67 - mirrored_aliases: mapping of mirror aliases to original aliases.
68 """
69 if aliases is None:
70 aliases = connections
71 mirrored_aliases = {}
72 test_databases = {}
73 dependencies = {}
74 default_sig = connections[DEFAULT_DB_ALIAS].creation.test_db_signature()
75
76 for alias in connections:
77 connection = connections[alias]
78 test_settings = connection.settings_dict["TEST"]
79
80 if test_settings["MIRROR"]:
81 # If the database is marked as a test mirror, save the alias.
82 mirrored_aliases[alias] = test_settings["MIRROR"]
83 elif alias in aliases:
84 # Store a tuple with DB parameters that uniquely identify it.
85 # If we have two aliases with the same values for that tuple,
86 # we only need to create the test database once.
87 item = test_databases.setdefault(
88 connection.creation.test_db_signature(),
89 (connection.settings_dict["NAME"], []),
90 )
91 # The default database must be the first because data migrations
92 # use the default alias by default.
93 if alias == DEFAULT_DB_ALIAS:
94 item[1].insert(0, alias)
95 else:
96 item[1].append(alias)
97
98 if "DEPENDENCIES" in test_settings:
99 dependencies[alias] = test_settings["DEPENDENCIES"]
100 else:
101 if (
102 alias != DEFAULT_DB_ALIAS
103 and connection.creation.test_db_signature() != default_sig
104 ):
105 dependencies[alias] = test_settings.get(
106 "DEPENDENCIES", [DEFAULT_DB_ALIAS]
107 )
108
109 test_databases = dict(dependency_ordered(test_databases.items(), dependencies))
110 return test_databases, mirrored_aliases
111
112
113def teardown_databases(old_config, verbosity, keepdb=False):
114 """Destroy all the non-mirror databases."""
115 for connection, old_name, destroy in old_config:
116 if destroy:
117 connection.creation.destroy_test_db(old_name, verbosity, keepdb)
118
119
120def dependency_ordered(test_databases, dependencies):
121 """
122 Reorder test_databases into an order that honors the dependencies
123 described in TEST[DEPENDENCIES].
124 """
125 ordered_test_databases = []
126 resolved_databases = set()
127
128 # Maps db signature to dependencies of all its aliases
129 dependencies_map = {}
130
131 # Check that no database depends on its own alias
132 for sig, (_, aliases) in test_databases:
133 all_deps = set()
134 for alias in aliases:
135 all_deps.update(dependencies.get(alias, []))
136 if not all_deps.isdisjoint(aliases):
137 raise ImproperlyConfigured(
138 "Circular dependency: databases %r depend on each other, "
139 "but are aliases." % aliases
140 )
141 dependencies_map[sig] = all_deps
142
143 while test_databases:
144 changed = False
145 deferred = []
146
147 # Try to find a DB that has all its dependencies met
148 for signature, (db_name, aliases) in test_databases:
149 if dependencies_map[signature].issubset(resolved_databases):
150 resolved_databases.update(aliases)
151 ordered_test_databases.append((signature, (db_name, aliases)))
152 changed = True
153 else:
154 deferred.append((signature, (db_name, aliases)))
155
156 if not changed:
157 raise ImproperlyConfigured("Circular dependency in TEST[DEPENDENCIES]")
158 test_databases = deferred
159 return ordered_test_databases