Plain is headed towards 1.0! Subscribe for development updates →

  1import os
  2import re
  3from importlib import import_module
  4
  5from plain.models import migrations
  6from plain.models.migrations.loader import MigrationLoader
  7from plain.models.migrations.migration import SettingsTuple
  8from plain.models.migrations.serializer import serializer_factory
  9from plain.packages import packages_registry
 10from plain.runtime import __version__
 11from plain.utils.inspect import get_func_args
 12from plain.utils.module_loading import module_dir
 13from plain.utils.timezone import now
 14
 15
 16class OperationWriter:
 17    def __init__(self, operation, indentation=2):
 18        self.operation = operation
 19        self.buff = []
 20        self.indentation = indentation
 21
 22    def serialize(self):
 23        def _write(_arg_name, _arg_value):
 24            if _arg_name in self.operation.serialization_expand_args and isinstance(
 25                _arg_value, list | tuple | dict
 26            ):
 27                if isinstance(_arg_value, dict):
 28                    self.feed(f"{_arg_name}={{")
 29                    self.indent()
 30                    for key, value in _arg_value.items():
 31                        key_string, key_imports = MigrationWriter.serialize(key)
 32                        arg_string, arg_imports = MigrationWriter.serialize(value)
 33                        args = arg_string.splitlines()
 34                        if len(args) > 1:
 35                            self.feed(f"{key_string}: {args[0]}")
 36                            for arg in args[1:-1]:
 37                                self.feed(arg)
 38                            self.feed(f"{args[-1]},")
 39                        else:
 40                            self.feed(f"{key_string}: {arg_string},")
 41                        imports.update(key_imports)
 42                        imports.update(arg_imports)
 43                    self.unindent()
 44                    self.feed("},")
 45                else:
 46                    self.feed(f"{_arg_name}=[")
 47                    self.indent()
 48                    for item in _arg_value:
 49                        arg_string, arg_imports = MigrationWriter.serialize(item)
 50                        args = arg_string.splitlines()
 51                        if len(args) > 1:
 52                            for arg in args[:-1]:
 53                                self.feed(arg)
 54                            self.feed(f"{args[-1]},")
 55                        else:
 56                            self.feed(f"{arg_string},")
 57                        imports.update(arg_imports)
 58                    self.unindent()
 59                    self.feed("],")
 60            else:
 61                arg_string, arg_imports = MigrationWriter.serialize(_arg_value)
 62                args = arg_string.splitlines()
 63                if len(args) > 1:
 64                    self.feed(f"{_arg_name}={args[0]}")
 65                    for arg in args[1:-1]:
 66                        self.feed(arg)
 67                    self.feed(f"{args[-1]},")
 68                else:
 69                    self.feed(f"{_arg_name}={arg_string},")
 70                imports.update(arg_imports)
 71
 72        imports = set()
 73        name, args, kwargs = self.operation.deconstruct()
 74        operation_args = get_func_args(self.operation.__init__)
 75
 76        # See if this operation is in plain.models.migrations. If it is,
 77        # We can just use the fact we already have that imported,
 78        # otherwise, we need to add an import for the operation class.
 79        if getattr(migrations, name, None) == self.operation.__class__:
 80            self.feed(f"migrations.{name}(")
 81        else:
 82            imports.add(f"import {self.operation.__class__.__module__}")
 83            self.feed(f"{self.operation.__class__.__module__}.{name}(")
 84
 85        self.indent()
 86
 87        for i, arg in enumerate(args):
 88            arg_value = arg
 89            arg_name = operation_args[i]
 90            _write(arg_name, arg_value)
 91
 92        i = len(args)
 93        # Only iterate over remaining arguments
 94        for arg_name in operation_args[i:]:
 95            if arg_name in kwargs:  # Don't sort to maintain signature order
 96                arg_value = kwargs[arg_name]
 97                _write(arg_name, arg_value)
 98
 99        self.unindent()
100        self.feed("),")
101        return self.render(), imports
102
103    def indent(self):
104        self.indentation += 1
105
106    def unindent(self):
107        self.indentation -= 1
108
109    def feed(self, line):
110        self.buff.append(" " * (self.indentation * 4) + line)
111
112    def render(self):
113        return "\n".join(self.buff)
114
115
116class MigrationWriter:
117    """
118    Take a Migration instance and is able to produce the contents
119    of the migration file from it.
120    """
121
122    def __init__(self, migration, include_header=True):
123        self.migration = migration
124        self.include_header = include_header
125        self.needs_manual_porting = False
126
127    def as_string(self):
128        """Return a string of the file contents."""
129        items = {
130            "replaces_str": "",
131            "initial_str": "",
132        }
133
134        imports = set()
135
136        # Deconstruct operations
137        operations = []
138        for operation in self.migration.operations:
139            operation_string, operation_imports = OperationWriter(operation).serialize()
140            imports.update(operation_imports)
141            operations.append(operation_string)
142        items["operations"] = "\n".join(operations) + "\n" if operations else ""
143
144        # Format dependencies and write out settings dependencies right
145        dependencies = []
146        for dependency in self.migration.dependencies:
147            if isinstance(dependency, SettingsTuple):
148                dependencies.append(
149                    f"        migrations.settings_dependency(settings.{dependency[1]}),"
150                )
151                imports.add("from plain.runtime import settings")
152            else:
153                dependencies.append(f"        {self.serialize(dependency)[0]},")
154        items["dependencies"] = "\n".join(dependencies) + "\n" if dependencies else ""
155
156        # Format imports nicely, swapping imports of functions from migration files
157        # for comments
158        migration_imports = set()
159        for line in list(imports):
160            if re.match(r"^import (.*)\.\d+[^\s]*$", line):
161                migration_imports.add(line.split("import")[1].strip())
162                imports.remove(line)
163                self.needs_manual_porting = True
164
165        imports.add("from plain.models import migrations")
166
167        # Sort imports by the package / module to be imported (the part after
168        # "from" in "from ... import ..." or after "import" in "import ...").
169        # First group the "import" statements, then "from ... import ...".
170        sorted_imports = sorted(
171            imports, key=lambda i: (i.split()[0] == "from", i.split()[1])
172        )
173        items["imports"] = "\n".join(sorted_imports) + "\n" if imports else ""
174        if migration_imports:
175            items["imports"] += (
176                "\n\n# Functions from the following migrations need manual "
177                "copying.\n# Move them and any dependencies into this file, "
178                "then update the\n# RunPython operations to refer to the local "
179                "versions:\n# {}"
180            ).format("\n# ".join(sorted(migration_imports)))
181        # If there's a replaces, make a string for it
182        if self.migration.replaces:
183            items["replaces_str"] = (
184                f"\n    replaces = {self.serialize(self.migration.replaces)[0]}\n"
185            )
186        # Hinting that goes into comment
187        if self.include_header:
188            items["migration_header"] = MIGRATION_HEADER_TEMPLATE % {
189                "version": __version__,
190                "timestamp": now().strftime("%Y-%m-%d %H:%M"),
191            }
192        else:
193            items["migration_header"] = ""
194
195        if self.migration.initial:
196            items["initial_str"] = "\n    initial = True\n"
197
198        return MIGRATION_TEMPLATE % items
199
200    @property
201    def basedir(self):
202        migrations_package_name, _ = MigrationLoader.migrations_module(
203            self.migration.package_label
204        )
205
206        if migrations_package_name is None:
207            raise ValueError(
208                f"Plain can't create migrations for app '{self.migration.package_label}' because "
209                "migrations have been disabled via the MIGRATION_MODULES "
210                "setting."
211            )
212
213        # See if we can import the migrations module directly
214        try:
215            migrations_module = import_module(migrations_package_name)
216        except ImportError:
217            pass
218        else:
219            try:
220                return module_dir(migrations_module)
221            except ValueError:
222                pass
223
224        # Alright, see if it's a direct submodule of the app
225        package_config = packages_registry.get_package_config(
226            self.migration.package_label
227        )
228        (
229            maybe_package_name,
230            _,
231            migrations_package_basename,
232        ) = migrations_package_name.rpartition(".")
233        if package_config.name == maybe_package_name:
234            return os.path.join(package_config.path, migrations_package_basename)
235
236        # In case of using MIGRATION_MODULES setting and the custom package
237        # doesn't exist, create one, starting from an existing package
238        existing_dirs, missing_dirs = migrations_package_name.split("."), []
239        while existing_dirs:
240            missing_dirs.insert(0, existing_dirs.pop(-1))
241            try:
242                base_module = import_module(".".join(existing_dirs))
243            except (ImportError, ValueError):
244                continue
245            else:
246                try:
247                    base_dir = module_dir(base_module)
248                except ValueError:
249                    continue
250                else:
251                    break
252        else:
253            raise ValueError(
254                "Could not locate an appropriate location to create "
255                f"migrations package {migrations_package_name}. Make sure the toplevel "
256                "package exists and can be imported."
257            )
258
259        final_dir = os.path.join(base_dir, *missing_dirs)
260        os.makedirs(final_dir, exist_ok=True)
261        for missing_dir in missing_dirs:
262            base_dir = os.path.join(base_dir, missing_dir)
263            with open(os.path.join(base_dir, "__init__.py"), "w"):
264                pass
265
266        return final_dir
267
268    @property
269    def filename(self):
270        return f"{self.migration.name}.py"
271
272    @property
273    def path(self):
274        return os.path.join(self.basedir, self.filename)
275
276    @classmethod
277    def serialize(cls, value):
278        return serializer_factory(value).serialize()
279
280
281MIGRATION_HEADER_TEMPLATE = """\
282# Generated by Plain %(version)s on %(timestamp)s
283
284"""
285
286
287MIGRATION_TEMPLATE = """\
288%(migration_header)s%(imports)s
289
290class Migration(migrations.Migration):
291%(replaces_str)s%(initial_str)s
292    dependencies = [
293%(dependencies)s\
294    ]
295
296    operations = [
297%(operations)s\
298    ]
299"""