Plain is headed towards 1.0! Subscribe for development updates →

  1import datetime
  2import importlib
  3import os
  4import sys
  5
  6import click
  7
  8from plain.models.fields import NOT_PROVIDED
  9from plain.packages import packages_registry
 10from plain.utils import timezone
 11
 12from .loader import MigrationLoader
 13
 14
 15class MigrationQuestioner:
 16    """
 17    Give the autodetector responses to questions it might have.
 18    This base class has a built-in noninteractive mode, but the
 19    interactive subclass is what the command-line arguments will use.
 20    """
 21
 22    def __init__(self, defaults=None, specified_packages=None, dry_run=None):
 23        self.defaults = defaults or {}
 24        self.specified_packages = specified_packages or set()
 25        self.dry_run = dry_run
 26
 27    def ask_initial(self, package_label):
 28        """Should we create an initial migration for the app?"""
 29        # If it was specified on the command line, definitely true
 30        if package_label in self.specified_packages:
 31            return True
 32        # Otherwise, we look to see if it has a migrations module
 33        # without any Python files in it, apart from __init__.py.
 34        # Packages from the new app template will have these; the Python
 35        # file check will ensure we skip South ones.
 36        try:
 37            package_config = packages_registry.get_package_config(package_label)
 38        except LookupError:  # It's a fake app.
 39            return self.defaults.get("ask_initial", False)
 40        migrations_import_path, _ = MigrationLoader.migrations_module(
 41            package_config.package_label
 42        )
 43        if migrations_import_path is None:
 44            # It's an application with migrations disabled.
 45            return self.defaults.get("ask_initial", False)
 46        try:
 47            migrations_module = importlib.import_module(migrations_import_path)
 48        except ImportError:
 49            return self.defaults.get("ask_initial", False)
 50        else:
 51            if getattr(migrations_module, "__file__", None):
 52                filenames = os.listdir(os.path.dirname(migrations_module.__file__))
 53            elif hasattr(migrations_module, "__path__"):
 54                if len(migrations_module.__path__) > 1:
 55                    return False
 56                filenames = os.listdir(list(migrations_module.__path__)[0])
 57            return not any(x.endswith(".py") for x in filenames if x != "__init__.py")
 58
 59    def ask_not_null_addition(self, field_name, model_name):
 60        """Adding a NOT NULL field to a model."""
 61        # None means quit
 62        return None
 63
 64    def ask_not_null_alteration(self, field_name, model_name):
 65        """Changing a NULL field to NOT NULL."""
 66        # None means quit
 67        return None
 68
 69    def ask_rename(self, model_name, old_name, new_name, field_instance):
 70        """Was this field really renamed?"""
 71        return self.defaults.get("ask_rename", False)
 72
 73    def ask_rename_model(self, old_model_state, new_model_state):
 74        """Was this model really renamed?"""
 75        return self.defaults.get("ask_rename_model", False)
 76
 77    def ask_auto_now_add_addition(self, field_name, model_name):
 78        """Adding an auto_now_add field to a model."""
 79        # None means quit
 80        return None
 81
 82    def ask_unique_callable_default_addition(self, field_name, model_name):
 83        """Adding a unique field with a callable default."""
 84        # None means continue.
 85        return None
 86
 87
 88class InteractiveMigrationQuestioner(MigrationQuestioner):
 89    def __init__(self, defaults=None, specified_packages=None, dry_run=None):
 90        super().__init__(
 91            defaults=defaults, specified_packages=specified_packages, dry_run=dry_run
 92        )
 93
 94    def _boolean_input(self, question, default=None):
 95        return click.confirm(question, default=default)
 96
 97    def _choice_input(self, question, choices):
 98        choice_map = {str(i + 1): choice for i, choice in enumerate(choices)}
 99        choice_map_str = "\n".join(
100            [f"{i}) {choice}" for i, choice in choice_map.items()]
101        )
102        choice = click.prompt(
103            f"{question}\n{choice_map_str}\nSelect an option",
104            type=click.Choice(choice_map.keys()),
105        )
106        return int(choice)
107
108    def _ask_default(self, default=""):
109        """
110        Prompt for a default value.
111
112        The ``default`` argument allows providing a custom default value (as a
113        string) which will be shown to the user and used as the return value
114        if the user doesn't provide any other input.
115        """
116        click.echo("Please enter the default value as valid Python.")
117        if default:
118            click.echo(
119                f"Accept the default '{default}' by pressing 'Enter' or "
120                f"provide another value."
121            )
122        click.echo(
123            "The datetime and plain.utils.timezone modules are available, so "
124            "it is possible to provide e.g. timezone.now as a value."
125        )
126        click.echo("Type 'exit' to exit this prompt")
127        while True:
128            if default:
129                prompt = f"[default: {default}] >>> "
130            else:
131                prompt = ">>> "
132            code = click.prompt(prompt, default=default, show_default=False)
133            if not code and default:
134                code = default
135            if not code:
136                click.echo(
137                    "Please enter some code, or 'exit' (without quotes) to exit."
138                )
139            elif code == "exit":
140                sys.exit(1)
141            else:
142                try:
143                    return eval(code, {}, {"datetime": datetime, "timezone": timezone})
144                except (SyntaxError, NameError) as e:
145                    click.echo(f"Invalid input: {e}")
146
147    def ask_not_null_addition(self, field_name, model_name):
148        """Adding a NOT NULL field to a model."""
149        if not self.dry_run:
150            choice = self._choice_input(
151                f"It is impossible to add a non-nullable field '{field_name}' "
152                f"to {model_name} without specifying a default. This is "
153                f"because the database needs something to populate existing "
154                f"rows.\n"
155                f"Please select a fix:",
156                [
157                    (
158                        "Provide a one-off default now (will be set on all existing "
159                        "rows with a null value for this column)"
160                    ),
161                    "Quit and manually define a default value in models.py.",
162                ],
163            )
164            if choice == 2:
165                sys.exit(3)
166            else:
167                return self._ask_default()
168        return None
169
170    def ask_not_null_alteration(self, field_name, model_name):
171        """Changing a NULL field to NOT NULL."""
172        if not self.dry_run:
173            choice = self._choice_input(
174                f"It is impossible to change a nullable field '{field_name}' "
175                f"on {model_name} to non-nullable without providing a "
176                f"default. This is because the database needs something to "
177                f"populate existing rows.\n"
178                f"Please select a fix:",
179                [
180                    (
181                        "Provide a one-off default now (will be set on all existing "
182                        "rows with a null value for this column)"
183                    ),
184                    "Ignore for now. Existing rows that contain NULL values "
185                    "will have to be handled manually, for example with a "
186                    "RunPython or RunSQL operation.",
187                    "Quit and manually define a default value in models.py.",
188                ],
189            )
190            if choice == 2:
191                return NOT_PROVIDED
192            elif choice == 3:
193                sys.exit(3)
194            else:
195                return self._ask_default()
196        return None
197
198    def ask_rename(self, model_name, old_name, new_name, field_instance):
199        """Was this field really renamed?"""
200        msg = "Was %s.%s renamed to %s.%s (a %s)?"
201        return self._boolean_input(
202            msg
203            % (
204                model_name,
205                old_name,
206                model_name,
207                new_name,
208                field_instance.__class__.__name__,
209            ),
210            default=False,
211        )
212
213    def ask_rename_model(self, old_model_state, new_model_state):
214        """Was this model really renamed?"""
215        msg = "Was the model %s.%s renamed to %s?"
216        return self._boolean_input(
217            msg
218            % (
219                old_model_state.package_label,
220                old_model_state.name,
221                new_model_state.name,
222            ),
223            default=False,
224        )
225
226    def ask_auto_now_add_addition(self, field_name, model_name):
227        """Adding an auto_now_add field to a model."""
228        if not self.dry_run:
229            choice = self._choice_input(
230                f"It is impossible to add the field '{field_name}' with "
231                f"'auto_now_add=True' to {model_name} without providing a "
232                f"default. This is because the database needs something to "
233                f"populate existing rows.\n",
234                [
235                    "Provide a one-off default now which will be set on all "
236                    "existing rows",
237                    "Quit and manually define a default value in models.py.",
238                ],
239            )
240            if choice == 2:
241                sys.exit(3)
242            else:
243                return self._ask_default(default="timezone.now")
244        return None
245
246    def ask_unique_callable_default_addition(self, field_name, model_name):
247        """Adding a unique field with a callable default."""
248        if not self.dry_run:
249            choice = self._choice_input(
250                f"Callable default on unique field {model_name}.{field_name} "
251                f"will not generate unique values upon migrating.\n"
252                f"Please choose how to proceed:\n",
253                [
254                    "Continue making this migration as the first step in "
255                    "writing a manual migration to generate unique values.",
256                    "Quit and edit field options in models.py.",
257                ],
258            )
259            if choice == 2:
260                sys.exit(3)
261        return None
262
263
264class NonInteractiveMigrationQuestioner(MigrationQuestioner):
265    def __init__(
266        self,
267        defaults=None,
268        specified_packages=None,
269        dry_run=None,
270        verbosity=1,
271        log=None,
272    ):
273        self.verbosity = verbosity
274        self.log = log
275        super().__init__(
276            defaults=defaults,
277            specified_packages=specified_packages,
278            dry_run=dry_run,
279        )
280
281    def log_lack_of_migration(self, field_name, model_name, reason):
282        if self.verbosity > 0:
283            self.log(
284                f"Field '{field_name}' on model '{model_name}' not migrated: {reason}."
285            )
286
287    def ask_not_null_addition(self, field_name, model_name):
288        # We can't ask the user, so act like the user aborted.
289        self.log_lack_of_migration(
290            field_name,
291            model_name,
292            "it is impossible to add a non-nullable field without specifying a default",
293        )
294        sys.exit(3)
295
296    def ask_not_null_alteration(self, field_name, model_name):
297        # We can't ask the user, so set as not provided.
298        self.log(
299            f"Field '{field_name}' on model '{model_name}' given a default of "
300            f"NOT PROVIDED and must be corrected."
301        )
302        return NOT_PROVIDED
303
304    def ask_auto_now_add_addition(self, field_name, model_name):
305        # We can't ask the user, so act like the user aborted.
306        self.log_lack_of_migration(
307            field_name,
308            model_name,
309            "it is impossible to add a field with 'auto_now_add=True' without "
310            "specifying a default",
311        )
312        sys.exit(3)