1import os
2import re
3from importlib import import_module
4
5from plain.models import migrations
6from plain.models.migrations.loader import MigrationLoader
7from plain.models.migrations.migration import SettingsTuple
8from plain.models.migrations.serializer import serializer_factory
9from plain.packages import packages_registry
10from plain.runtime import __version__
11from plain.utils.inspect import get_func_args
12from plain.utils.module_loading import module_dir
13from plain.utils.timezone import now
14
15
16class OperationWriter:
17 def __init__(self, operation, indentation=2):
18 self.operation = operation
19 self.buff = []
20 self.indentation = indentation
21
22 def serialize(self):
23 def _write(_arg_name, _arg_value):
24 if _arg_name in self.operation.serialization_expand_args and isinstance(
25 _arg_value, list | tuple | dict
26 ):
27 if isinstance(_arg_value, dict):
28 self.feed(f"{_arg_name}={{")
29 self.indent()
30 for key, value in _arg_value.items():
31 key_string, key_imports = MigrationWriter.serialize(key)
32 arg_string, arg_imports = MigrationWriter.serialize(value)
33 args = arg_string.splitlines()
34 if len(args) > 1:
35 self.feed(f"{key_string}: {args[0]}")
36 for arg in args[1:-1]:
37 self.feed(arg)
38 self.feed(f"{args[-1]},")
39 else:
40 self.feed(f"{key_string}: {arg_string},")
41 imports.update(key_imports)
42 imports.update(arg_imports)
43 self.unindent()
44 self.feed("},")
45 else:
46 self.feed(f"{_arg_name}=[")
47 self.indent()
48 for item in _arg_value:
49 arg_string, arg_imports = MigrationWriter.serialize(item)
50 args = arg_string.splitlines()
51 if len(args) > 1:
52 for arg in args[:-1]:
53 self.feed(arg)
54 self.feed(f"{args[-1]},")
55 else:
56 self.feed(f"{arg_string},")
57 imports.update(arg_imports)
58 self.unindent()
59 self.feed("],")
60 else:
61 arg_string, arg_imports = MigrationWriter.serialize(_arg_value)
62 args = arg_string.splitlines()
63 if len(args) > 1:
64 self.feed(f"{_arg_name}={args[0]}")
65 for arg in args[1:-1]:
66 self.feed(arg)
67 self.feed(f"{args[-1]},")
68 else:
69 self.feed(f"{_arg_name}={arg_string},")
70 imports.update(arg_imports)
71
72 imports = set()
73 name, args, kwargs = self.operation.deconstruct()
74 operation_args = get_func_args(self.operation.__init__)
75
76 # See if this operation is in plain.models.migrations. If it is,
77 # We can just use the fact we already have that imported,
78 # otherwise, we need to add an import for the operation class.
79 if getattr(migrations, name, None) == self.operation.__class__:
80 self.feed(f"migrations.{name}(")
81 else:
82 imports.add(f"import {self.operation.__class__.__module__}")
83 self.feed(f"{self.operation.__class__.__module__}.{name}(")
84
85 self.indent()
86
87 for i, arg in enumerate(args):
88 arg_value = arg
89 arg_name = operation_args[i]
90 _write(arg_name, arg_value)
91
92 i = len(args)
93 # Only iterate over remaining arguments
94 for arg_name in operation_args[i:]:
95 if arg_name in kwargs: # Don't sort to maintain signature order
96 arg_value = kwargs[arg_name]
97 _write(arg_name, arg_value)
98
99 self.unindent()
100 self.feed("),")
101 return self.render(), imports
102
103 def indent(self):
104 self.indentation += 1
105
106 def unindent(self):
107 self.indentation -= 1
108
109 def feed(self, line):
110 self.buff.append(" " * (self.indentation * 4) + line)
111
112 def render(self):
113 return "\n".join(self.buff)
114
115
116class MigrationWriter:
117 """
118 Take a Migration instance and is able to produce the contents
119 of the migration file from it.
120 """
121
122 def __init__(self, migration, include_header=True):
123 self.migration = migration
124 self.include_header = include_header
125 self.needs_manual_porting = False
126
127 def as_string(self):
128 """Return a string of the file contents."""
129 items = {
130 "replaces_str": "",
131 "initial_str": "",
132 }
133
134 imports = set()
135
136 # Deconstruct operations
137 operations = []
138 for operation in self.migration.operations:
139 operation_string, operation_imports = OperationWriter(operation).serialize()
140 imports.update(operation_imports)
141 operations.append(operation_string)
142 items["operations"] = "\n".join(operations) + "\n" if operations else ""
143
144 # Format dependencies and write out settings dependencies right
145 dependencies = []
146 for dependency in self.migration.dependencies:
147 if isinstance(dependency, SettingsTuple):
148 dependencies.append(
149 f" migrations.settings_dependency(settings.{dependency[1]}),"
150 )
151 imports.add("from plain.runtime import settings")
152 else:
153 dependencies.append(f" {self.serialize(dependency)[0]},")
154 items["dependencies"] = "\n".join(dependencies) + "\n" if dependencies else ""
155
156 # Format imports nicely, swapping imports of functions from migration files
157 # for comments
158 migration_imports = set()
159 for line in list(imports):
160 if re.match(r"^import (.*)\.\d+[^\s]*$", line):
161 migration_imports.add(line.split("import")[1].strip())
162 imports.remove(line)
163 self.needs_manual_porting = True
164
165 imports.add("from plain.models import migrations")
166
167 # Sort imports by the package / module to be imported (the part after
168 # "from" in "from ... import ..." or after "import" in "import ...").
169 # First group the "import" statements, then "from ... import ...".
170 sorted_imports = sorted(
171 imports, key=lambda i: (i.split()[0] == "from", i.split()[1])
172 )
173 items["imports"] = "\n".join(sorted_imports) + "\n" if imports else ""
174 if migration_imports:
175 items["imports"] += (
176 "\n\n# Functions from the following migrations need manual "
177 "copying.\n# Move them and any dependencies into this file, "
178 "then update the\n# RunPython operations to refer to the local "
179 "versions:\n# {}"
180 ).format("\n# ".join(sorted(migration_imports)))
181 # If there's a replaces, make a string for it
182 if self.migration.replaces:
183 items["replaces_str"] = (
184 f"\n replaces = {self.serialize(self.migration.replaces)[0]}\n"
185 )
186 # Hinting that goes into comment
187 if self.include_header:
188 items["migration_header"] = MIGRATION_HEADER_TEMPLATE % {
189 "version": __version__,
190 "timestamp": now().strftime("%Y-%m-%d %H:%M"),
191 }
192 else:
193 items["migration_header"] = ""
194
195 if self.migration.initial:
196 items["initial_str"] = "\n initial = True\n"
197
198 return MIGRATION_TEMPLATE % items
199
200 @property
201 def basedir(self):
202 migrations_package_name, _ = MigrationLoader.migrations_module(
203 self.migration.package_label
204 )
205
206 if migrations_package_name is None:
207 raise ValueError(
208 f"Plain can't create migrations for app '{self.migration.package_label}' because "
209 "migrations have been disabled via the MIGRATION_MODULES "
210 "setting."
211 )
212
213 # See if we can import the migrations module directly
214 try:
215 migrations_module = import_module(migrations_package_name)
216 except ImportError:
217 pass
218 else:
219 try:
220 return module_dir(migrations_module)
221 except ValueError:
222 pass
223
224 # Alright, see if it's a direct submodule of the app
225 package_config = packages_registry.get_package_config(
226 self.migration.package_label
227 )
228 (
229 maybe_package_name,
230 _,
231 migrations_package_basename,
232 ) = migrations_package_name.rpartition(".")
233 if package_config.name == maybe_package_name:
234 return os.path.join(package_config.path, migrations_package_basename)
235
236 # In case of using MIGRATION_MODULES setting and the custom package
237 # doesn't exist, create one, starting from an existing package
238 existing_dirs, missing_dirs = migrations_package_name.split("."), []
239 while existing_dirs:
240 missing_dirs.insert(0, existing_dirs.pop(-1))
241 try:
242 base_module = import_module(".".join(existing_dirs))
243 except (ImportError, ValueError):
244 continue
245 else:
246 try:
247 base_dir = module_dir(base_module)
248 except ValueError:
249 continue
250 else:
251 break
252 else:
253 raise ValueError(
254 "Could not locate an appropriate location to create "
255 f"migrations package {migrations_package_name}. Make sure the toplevel "
256 "package exists and can be imported."
257 )
258
259 final_dir = os.path.join(base_dir, *missing_dirs)
260 os.makedirs(final_dir, exist_ok=True)
261 for missing_dir in missing_dirs:
262 base_dir = os.path.join(base_dir, missing_dir)
263 with open(os.path.join(base_dir, "__init__.py"), "w"):
264 pass
265
266 return final_dir
267
268 @property
269 def filename(self):
270 return f"{self.migration.name}.py"
271
272 @property
273 def path(self):
274 return os.path.join(self.basedir, self.filename)
275
276 @classmethod
277 def serialize(cls, value):
278 return serializer_factory(value).serialize()
279
280
281MIGRATION_HEADER_TEMPLATE = """\
282# Generated by Plain %(version)s on %(timestamp)s
283
284"""
285
286
287MIGRATION_TEMPLATE = """\
288%(migration_header)s%(imports)s
289
290class Migration(migrations.Migration):
291%(replaces_str)s%(initial_str)s
292 dependencies = [
293%(dependencies)s\
294 ]
295
296 operations = [
297%(operations)s\
298 ]
299"""