Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import datetime
  4import importlib
  5import os
  6import sys
  7from collections.abc import Callable
  8from typing import TYPE_CHECKING, Any
  9
 10import click
 11
 12from plain.models.fields import NOT_PROVIDED
 13from plain.packages import packages_registry
 14from plain.utils import timezone
 15
 16from .loader import MigrationLoader
 17
 18if TYPE_CHECKING:
 19    from plain.models.fields import Field
 20
 21
 22class MigrationQuestioner:
 23    """
 24    Give the autodetector responses to questions it might have.
 25    This base class has a built-in noninteractive mode, but the
 26    interactive subclass is what the command-line arguments will use.
 27    """
 28
 29    def __init__(
 30        self,
 31        defaults: dict[str, Any] | None = None,
 32        specified_packages: set[str] | None = None,
 33        dry_run: bool | None = None,
 34    ) -> None:
 35        self.defaults = defaults or {}
 36        self.specified_packages = specified_packages or set()
 37        self.dry_run = dry_run
 38
 39    def ask_initial(self, package_label: str) -> bool:
 40        """Should we create an initial migration for the app?"""
 41        # If it was specified on the command line, definitely true
 42        if package_label in self.specified_packages:
 43            return True
 44        # Otherwise, we look to see if it has a migrations module
 45        # without any Python files in it, apart from __init__.py.
 46        # Packages from the new app template will have these; the Python
 47        # file check will ensure we skip South ones.
 48        try:
 49            package_config = packages_registry.get_package_config(package_label)
 50        except LookupError:  # It's a fake app.
 51            return self.defaults.get("ask_initial", False)
 52        migrations_import_path, _ = MigrationLoader.migrations_module(
 53            package_config.package_label
 54        )
 55        if migrations_import_path is None:
 56            # It's an application with migrations disabled.
 57            return self.defaults.get("ask_initial", False)
 58        try:
 59            migrations_module = importlib.import_module(migrations_import_path)
 60        except ImportError:
 61            return self.defaults.get("ask_initial", False)
 62        else:
 63            if getattr(migrations_module, "__file__", None):
 64                filenames = os.listdir(os.path.dirname(migrations_module.__file__))  # type: ignore[arg-type]
 65            elif hasattr(migrations_module, "__path__"):
 66                if len(migrations_module.__path__) > 1:
 67                    return False
 68                filenames = os.listdir(list(migrations_module.__path__)[0])
 69            return not any(x.endswith(".py") for x in filenames if x != "__init__.py")
 70
 71    def ask_not_null_addition(self, field_name: str, model_name: str) -> Any:
 72        """Adding a NOT NULL field to a model."""
 73        # None means quit
 74        return None
 75
 76    def ask_not_null_alteration(self, field_name: str, model_name: str) -> Any:
 77        """Changing a NULL field to NOT NULL."""
 78        # None means quit
 79        return None
 80
 81    def ask_rename(
 82        self, model_name: str, old_name: str, new_name: str, field_instance: Field
 83    ) -> bool:
 84        """Was this field really renamed?"""
 85        return self.defaults.get("ask_rename", False)
 86
 87    def ask_rename_model(self, old_model_state: Any, new_model_state: Any) -> bool:
 88        """Was this model really renamed?"""
 89        return self.defaults.get("ask_rename_model", False)
 90
 91    def ask_auto_now_add_addition(self, field_name: str, model_name: str) -> Any:
 92        """Adding an auto_now_add field to a model."""
 93        # None means quit
 94        return None
 95
 96    def ask_unique_callable_default_addition(
 97        self, field_name: str, model_name: str
 98    ) -> Any:
 99        """Adding a unique field with a callable default."""
100        # None means continue.
101        return None
102
103
104class InteractiveMigrationQuestioner(MigrationQuestioner):
105    def __init__(
106        self,
107        defaults: dict[str, Any] | None = None,
108        specified_packages: set[str] | None = None,
109        dry_run: bool | None = None,
110    ) -> None:
111        super().__init__(
112            defaults=defaults, specified_packages=specified_packages, dry_run=dry_run
113        )
114
115    def _boolean_input(self, question: str, default: bool | None = None) -> bool:
116        return click.confirm(question, default=default)
117
118    def _choice_input(self, question: str, choices: list[str]) -> int:
119        choice_map = {str(i + 1): choice for i, choice in enumerate(choices)}
120        choice_map_str = "\n".join(
121            [f"{i}) {choice}" for i, choice in choice_map.items()]
122        )
123        choice = click.prompt(
124            f"{question}\n{choice_map_str}\nSelect an option",
125            type=click.Choice(choice_map.keys()),
126        )
127        return int(choice)
128
129    def _ask_default(self, default: str = "") -> Any:
130        """
131        Prompt for a default value.
132
133        The ``default`` argument allows providing a custom default value (as a
134        string) which will be shown to the user and used as the return value
135        if the user doesn't provide any other input.
136        """
137        click.echo("Please enter the default value as valid Python.")
138        if default:
139            click.echo(
140                f"Accept the default '{default}' by pressing 'Enter' or "
141                f"provide another value."
142            )
143        click.echo(
144            "The datetime and plain.utils.timezone modules are available, so "
145            "it is possible to provide e.g. timezone.now as a value."
146        )
147        click.echo("Type 'exit' to exit this prompt")
148        while True:
149            if default:
150                prompt = f"[default: {default}] >>> "
151            else:
152                prompt = ">>> "
153            code = click.prompt(prompt, default=default, show_default=False)
154            if not code and default:
155                code = default
156            if not code:
157                click.echo(
158                    "Please enter some code, or 'exit' (without quotes) to exit."
159                )
160            elif code == "exit":
161                sys.exit(1)
162            else:
163                try:
164                    return eval(code, {}, {"datetime": datetime, "timezone": timezone})
165                except (SyntaxError, NameError) as e:
166                    click.echo(f"Invalid input: {e}")
167
168    def ask_not_null_addition(self, field_name: str, model_name: str) -> Any:
169        """Adding a NOT NULL field to a model."""
170        if not self.dry_run:
171            choice = self._choice_input(
172                f"It is impossible to add a non-nullable field '{field_name}' "
173                f"to {model_name} without specifying a default. This is "
174                f"because the database needs something to populate existing "
175                f"rows.\n"
176                f"Please select a fix:",
177                [
178                    (
179                        "Provide a one-off default now (will be set on all existing "
180                        "rows with a null value for this column)"
181                    ),
182                    "Quit and manually define a default value in models.py.",
183                ],
184            )
185            if choice == 2:
186                sys.exit(3)
187            else:
188                return self._ask_default()
189        return None
190
191    def ask_not_null_alteration(self, field_name: str, model_name: str) -> Any:
192        """Changing a NULL field to NOT NULL."""
193        if not self.dry_run:
194            choice = self._choice_input(
195                f"It is impossible to change a nullable field '{field_name}' "
196                f"on {model_name} to non-nullable without providing a "
197                f"default. This is because the database needs something to "
198                f"populate existing rows.\n"
199                f"Please select a fix:",
200                [
201                    (
202                        "Provide a one-off default now (will be set on all existing "
203                        "rows with a null value for this column)"
204                    ),
205                    "Ignore for now. Existing rows that contain NULL values "
206                    "will have to be handled manually, for example with a "
207                    "RunPython or RunSQL operation.",
208                    "Quit and manually define a default value in models.py.",
209                ],
210            )
211            if choice == 2:
212                return NOT_PROVIDED
213            elif choice == 3:
214                sys.exit(3)
215            else:
216                return self._ask_default()
217        return None
218
219    def ask_rename(
220        self, model_name: str, old_name: str, new_name: str, field_instance: Field
221    ) -> bool:
222        """Was this field really renamed?"""
223        msg = "Was %s.%s renamed to %s.%s (a %s)?"
224        return self._boolean_input(
225            msg
226            % (
227                model_name,
228                old_name,
229                model_name,
230                new_name,
231                field_instance.__class__.__name__,
232            ),
233            default=False,
234        )
235
236    def ask_rename_model(self, old_model_state: Any, new_model_state: Any) -> bool:
237        """Was this model really renamed?"""
238        msg = "Was the model %s.%s renamed to %s?"
239        return self._boolean_input(
240            msg
241            % (
242                old_model_state.package_label,
243                old_model_state.name,
244                new_model_state.name,
245            ),
246            default=False,
247        )
248
249    def ask_auto_now_add_addition(self, field_name: str, model_name: str) -> Any:
250        """Adding an auto_now_add field to a model."""
251        if not self.dry_run:
252            choice = self._choice_input(
253                f"It is impossible to add the field '{field_name}' with "
254                f"'auto_now_add=True' to {model_name} without providing a "
255                f"default. This is because the database needs something to "
256                f"populate existing rows.\n",
257                [
258                    "Provide a one-off default now which will be set on all "
259                    "existing rows",
260                    "Quit and manually define a default value in models.py.",
261                ],
262            )
263            if choice == 2:
264                sys.exit(3)
265            else:
266                return self._ask_default(default="timezone.now")
267        return None
268
269    def ask_unique_callable_default_addition(
270        self, field_name: str, model_name: str
271    ) -> Any:
272        """Adding a unique field with a callable default."""
273        if not self.dry_run:
274            choice = self._choice_input(
275                f"Callable default on unique field {model_name}.{field_name} "
276                f"will not generate unique values upon migrating.\n"
277                f"Please choose how to proceed:\n",
278                [
279                    "Continue making this migration as the first step in "
280                    "writing a manual migration to generate unique values.",
281                    "Quit and edit field options in models.py.",
282                ],
283            )
284            if choice == 2:
285                sys.exit(3)
286        return None
287
288
289class NonInteractiveMigrationQuestioner(MigrationQuestioner):
290    def __init__(
291        self,
292        defaults: dict[str, Any] | None = None,
293        specified_packages: set[str] | None = None,
294        dry_run: bool | None = None,
295        verbosity: int = 1,
296        log: Callable[[str], Any] | None = None,
297    ) -> None:
298        self.verbosity = verbosity
299        self.log = log
300        super().__init__(
301            defaults=defaults,
302            specified_packages=specified_packages,
303            dry_run=dry_run,
304        )
305
306    def log_lack_of_migration(
307        self, field_name: str, model_name: str, reason: str
308    ) -> None:
309        if self.verbosity > 0:
310            self.log(  # type: ignore[misc]
311                f"Field '{field_name}' on model '{model_name}' not migrated: {reason}."
312            )
313
314    def ask_not_null_addition(self, field_name: str, model_name: str) -> Any:
315        # We can't ask the user, so act like the user aborted.
316        self.log_lack_of_migration(
317            field_name,
318            model_name,
319            "it is impossible to add a non-nullable field without specifying a default",
320        )
321        sys.exit(3)
322
323    def ask_not_null_alteration(self, field_name: str, model_name: str) -> Any:
324        # We can't ask the user, so set as not provided.
325        self.log(  # type: ignore[misc]
326            f"Field '{field_name}' on model '{model_name}' given a default of "
327            f"NOT PROVIDED and must be corrected."
328        )
329        return NOT_PROVIDED
330
331    def ask_auto_now_add_addition(self, field_name: str, model_name: str) -> Any:
332        # We can't ask the user, so act like the user aborted.
333        self.log_lack_of_migration(
334            field_name,
335            model_name,
336            "it is impossible to add a field with 'auto_now_add=True' without "
337            "specifying a default",
338        )
339        sys.exit(3)