Plain is headed towards 1.0! Subscribe for development updates →

  1import datetime
  2import importlib
  3import os
  4import sys
  5
  6import click
  7
  8from plain.models.fields import NOT_PROVIDED
  9from plain.packages import packages_registry
 10from plain.utils import timezone
 11
 12from .loader import MigrationLoader
 13
 14
 15class MigrationQuestioner:
 16    """
 17    Give the autodetector responses to questions it might have.
 18    This base class has a built-in noninteractive mode, but the
 19    interactive subclass is what the command-line arguments will use.
 20    """
 21
 22    def __init__(self, defaults=None, specified_packages=None, dry_run=None):
 23        self.defaults = defaults or {}
 24        self.specified_packages = specified_packages or set()
 25        self.dry_run = dry_run
 26
 27    def ask_initial(self, package_label):
 28        """Should we create an initial migration for the app?"""
 29        # If it was specified on the command line, definitely true
 30        if package_label in self.specified_packages:
 31            return True
 32        # Otherwise, we look to see if it has a migrations module
 33        # without any Python files in it, apart from __init__.py.
 34        # Packages from the new app template will have these; the Python
 35        # file check will ensure we skip South ones.
 36        try:
 37            package_config = packages_registry.get_package_config(package_label)
 38        except LookupError:  # It's a fake app.
 39            return self.defaults.get("ask_initial", False)
 40        migrations_import_path, _ = MigrationLoader.migrations_module(
 41            package_config.package_label
 42        )
 43        if migrations_import_path is None:
 44            # It's an application with migrations disabled.
 45            return self.defaults.get("ask_initial", False)
 46        try:
 47            migrations_module = importlib.import_module(migrations_import_path)
 48        except ImportError:
 49            return self.defaults.get("ask_initial", False)
 50        else:
 51            if getattr(migrations_module, "__file__", None):
 52                filenames = os.listdir(os.path.dirname(migrations_module.__file__))
 53            elif hasattr(migrations_module, "__path__"):
 54                if len(migrations_module.__path__) > 1:
 55                    return False
 56                filenames = os.listdir(list(migrations_module.__path__)[0])
 57            return not any(x.endswith(".py") for x in filenames if x != "__init__.py")
 58
 59    def ask_not_null_addition(self, field_name, model_name):
 60        """Adding a NOT NULL field to a model."""
 61        # None means quit
 62        return None
 63
 64    def ask_not_null_alteration(self, field_name, model_name):
 65        """Changing a NULL field to NOT NULL."""
 66        # None means quit
 67        return None
 68
 69    def ask_rename(self, model_name, old_name, new_name, field_instance):
 70        """Was this field really renamed?"""
 71        return self.defaults.get("ask_rename", False)
 72
 73    def ask_rename_model(self, old_model_state, new_model_state):
 74        """Was this model really renamed?"""
 75        return self.defaults.get("ask_rename_model", False)
 76
 77    def ask_merge(self, package_label):
 78        """Should these migrations really be merged?"""
 79        return self.defaults.get("ask_merge", False)
 80
 81    def ask_auto_now_add_addition(self, field_name, model_name):
 82        """Adding an auto_now_add field to a model."""
 83        # None means quit
 84        return None
 85
 86    def ask_unique_callable_default_addition(self, field_name, model_name):
 87        """Adding a unique field with a callable default."""
 88        # None means continue.
 89        return None
 90
 91
 92class InteractiveMigrationQuestioner(MigrationQuestioner):
 93    def __init__(self, defaults=None, specified_packages=None, dry_run=None):
 94        super().__init__(
 95            defaults=defaults, specified_packages=specified_packages, dry_run=dry_run
 96        )
 97
 98    def _boolean_input(self, question, default=None):
 99        return click.confirm(question, default=default)
100
101    def _choice_input(self, question, choices):
102        choice_map = {str(i + 1): choice for i, choice in enumerate(choices)}
103        choice_map_str = "\n".join(
104            [f"{i}) {choice}" for i, choice in choice_map.items()]
105        )
106        choice = click.prompt(
107            f"{question}\n{choice_map_str}\nSelect an option",
108            type=click.Choice(choice_map.keys()),
109        )
110        return int(choice)
111
112    def _ask_default(self, default=""):
113        """
114        Prompt for a default value.
115
116        The ``default`` argument allows providing a custom default value (as a
117        string) which will be shown to the user and used as the return value
118        if the user doesn't provide any other input.
119        """
120        click.echo("Please enter the default value as valid Python.")
121        if default:
122            click.echo(
123                f"Accept the default '{default}' by pressing 'Enter' or "
124                f"provide another value."
125            )
126        click.echo(
127            "The datetime and plain.utils.timezone modules are available, so "
128            "it is possible to provide e.g. timezone.now as a value."
129        )
130        click.echo("Type 'exit' to exit this prompt")
131        while True:
132            if default:
133                prompt = f"[default: {default}] >>> "
134            else:
135                prompt = ">>> "
136            code = click.prompt(prompt, default=default, show_default=False)
137            if not code and default:
138                code = default
139            if not code:
140                click.echo(
141                    "Please enter some code, or 'exit' (without quotes) to exit."
142                )
143            elif code == "exit":
144                sys.exit(1)
145            else:
146                try:
147                    return eval(code, {}, {"datetime": datetime, "timezone": timezone})
148                except (SyntaxError, NameError) as e:
149                    click.echo(f"Invalid input: {e}")
150
151    def ask_not_null_addition(self, field_name, model_name):
152        """Adding a NOT NULL field to a model."""
153        if not self.dry_run:
154            choice = self._choice_input(
155                f"It is impossible to add a non-nullable field '{field_name}' "
156                f"to {model_name} without specifying a default. This is "
157                f"because the database needs something to populate existing "
158                f"rows.\n"
159                f"Please select a fix:",
160                [
161                    (
162                        "Provide a one-off default now (will be set on all existing "
163                        "rows with a null value for this column)"
164                    ),
165                    "Quit and manually define a default value in models.py.",
166                ],
167            )
168            if choice == 2:
169                sys.exit(3)
170            else:
171                return self._ask_default()
172        return None
173
174    def ask_not_null_alteration(self, field_name, model_name):
175        """Changing a NULL field to NOT NULL."""
176        if not self.dry_run:
177            choice = self._choice_input(
178                f"It is impossible to change a nullable field '{field_name}' "
179                f"on {model_name} to non-nullable without providing a "
180                f"default. This is because the database needs something to "
181                f"populate existing rows.\n"
182                f"Please select a fix:",
183                [
184                    (
185                        "Provide a one-off default now (will be set on all existing "
186                        "rows with a null value for this column)"
187                    ),
188                    "Ignore for now. Existing rows that contain NULL values "
189                    "will have to be handled manually, for example with a "
190                    "RunPython or RunSQL operation.",
191                    "Quit and manually define a default value in models.py.",
192                ],
193            )
194            if choice == 2:
195                return NOT_PROVIDED
196            elif choice == 3:
197                sys.exit(3)
198            else:
199                return self._ask_default()
200        return None
201
202    def ask_rename(self, model_name, old_name, new_name, field_instance):
203        """Was this field really renamed?"""
204        msg = "Was %s.%s renamed to %s.%s (a %s)?"
205        return self._boolean_input(
206            msg
207            % (
208                model_name,
209                old_name,
210                model_name,
211                new_name,
212                field_instance.__class__.__name__,
213            ),
214            default=False,
215        )
216
217    def ask_rename_model(self, old_model_state, new_model_state):
218        """Was this model really renamed?"""
219        msg = "Was the model %s.%s renamed to %s?"
220        return self._boolean_input(
221            msg
222            % (
223                old_model_state.package_label,
224                old_model_state.name,
225                new_model_state.name,
226            ),
227            default=False,
228        )
229
230    def ask_merge(self, package_label):
231        return self._boolean_input(
232            (
233                "\nMerging will only work if the operations printed above do not conflict\n"
234                "with each other (working on different fields or models)\n"
235                "Should these migration branches be merged?"
236            ),
237            default=False,
238        )
239
240    def ask_auto_now_add_addition(self, field_name, model_name):
241        """Adding an auto_now_add field to a model."""
242        if not self.dry_run:
243            choice = self._choice_input(
244                f"It is impossible to add the field '{field_name}' with "
245                f"'auto_now_add=True' to {model_name} without providing a "
246                f"default. This is because the database needs something to "
247                f"populate existing rows.\n",
248                [
249                    "Provide a one-off default now which will be set on all "
250                    "existing rows",
251                    "Quit and manually define a default value in models.py.",
252                ],
253            )
254            if choice == 2:
255                sys.exit(3)
256            else:
257                return self._ask_default(default="timezone.now")
258        return None
259
260    def ask_unique_callable_default_addition(self, field_name, model_name):
261        """Adding a unique field with a callable default."""
262        if not self.dry_run:
263            choice = self._choice_input(
264                f"Callable default on unique field {model_name}.{field_name} "
265                f"will not generate unique values upon migrating.\n"
266                f"Please choose how to proceed:\n",
267                [
268                    "Continue making this migration as the first step in "
269                    "writing a manual migration to generate unique values.",
270                    "Quit and edit field options in models.py.",
271                ],
272            )
273            if choice == 2:
274                sys.exit(3)
275        return None
276
277
278class NonInteractiveMigrationQuestioner(MigrationQuestioner):
279    def __init__(
280        self,
281        defaults=None,
282        specified_packages=None,
283        dry_run=None,
284        verbosity=1,
285        log=None,
286    ):
287        self.verbosity = verbosity
288        self.log = log
289        super().__init__(
290            defaults=defaults,
291            specified_packages=specified_packages,
292            dry_run=dry_run,
293        )
294
295    def log_lack_of_migration(self, field_name, model_name, reason):
296        if self.verbosity > 0:
297            self.log(
298                f"Field '{field_name}' on model '{model_name}' not migrated: {reason}."
299            )
300
301    def ask_not_null_addition(self, field_name, model_name):
302        # We can't ask the user, so act like the user aborted.
303        self.log_lack_of_migration(
304            field_name,
305            model_name,
306            "it is impossible to add a non-nullable field without specifying a default",
307        )
308        sys.exit(3)
309
310    def ask_not_null_alteration(self, field_name, model_name):
311        # We can't ask the user, so set as not provided.
312        self.log(
313            f"Field '{field_name}' on model '{model_name}' given a default of "
314            f"NOT PROVIDED and must be corrected."
315        )
316        return NOT_PROVIDED
317
318    def ask_auto_now_add_addition(self, field_name, model_name):
319        # We can't ask the user, so act like the user aborted.
320        self.log_lack_of_migration(
321            field_name,
322            model_name,
323            "it is impossible to add a field with 'auto_now_add=True' without "
324            "specifying a default",
325        )
326        sys.exit(3)