Plain is headed towards 1.0! Subscribe for development updates →

  1import datetime
  2import importlib
  3import os
  4import sys
  5
  6from plain.models.fields import NOT_PROVIDED
  7from plain.packages import packages
  8from plain.utils import timezone
  9
 10from .loader import MigrationLoader
 11
 12
 13class MigrationQuestioner:
 14    """
 15    Give the autodetector responses to questions it might have.
 16    This base class has a built-in noninteractive mode, but the
 17    interactive subclass is what the command-line arguments will use.
 18    """
 19
 20    def __init__(self, defaults=None, specified_packages=None, dry_run=None):
 21        self.defaults = defaults or {}
 22        self.specified_packages = specified_packages or set()
 23        self.dry_run = dry_run
 24
 25    def ask_initial(self, package_label):
 26        """Should we create an initial migration for the app?"""
 27        # If it was specified on the command line, definitely true
 28        if package_label in self.specified_packages:
 29            return True
 30        # Otherwise, we look to see if it has a migrations module
 31        # without any Python files in it, apart from __init__.py.
 32        # Packages from the new app template will have these; the Python
 33        # file check will ensure we skip South ones.
 34        try:
 35            package_config = packages.get_package_config(package_label)
 36        except LookupError:  # It's a fake app.
 37            return self.defaults.get("ask_initial", False)
 38        migrations_import_path, _ = MigrationLoader.migrations_module(
 39            package_config.label
 40        )
 41        if migrations_import_path is None:
 42            # It's an application with migrations disabled.
 43            return self.defaults.get("ask_initial", False)
 44        try:
 45            migrations_module = importlib.import_module(migrations_import_path)
 46        except ImportError:
 47            return self.defaults.get("ask_initial", False)
 48        else:
 49            if getattr(migrations_module, "__file__", None):
 50                filenames = os.listdir(os.path.dirname(migrations_module.__file__))
 51            elif hasattr(migrations_module, "__path__"):
 52                if len(migrations_module.__path__) > 1:
 53                    return False
 54                filenames = os.listdir(list(migrations_module.__path__)[0])
 55            return not any(x.endswith(".py") for x in filenames if x != "__init__.py")
 56
 57    def ask_not_null_addition(self, field_name, model_name):
 58        """Adding a NOT NULL field to a model."""
 59        # None means quit
 60        return None
 61
 62    def ask_not_null_alteration(self, field_name, model_name):
 63        """Changing a NULL field to NOT NULL."""
 64        # None means quit
 65        return None
 66
 67    def ask_rename(self, model_name, old_name, new_name, field_instance):
 68        """Was this field really renamed?"""
 69        return self.defaults.get("ask_rename", False)
 70
 71    def ask_rename_model(self, old_model_state, new_model_state):
 72        """Was this model really renamed?"""
 73        return self.defaults.get("ask_rename_model", False)
 74
 75    def ask_merge(self, package_label):
 76        """Should these migrations really be merged?"""
 77        return self.defaults.get("ask_merge", False)
 78
 79    def ask_auto_now_add_addition(self, field_name, model_name):
 80        """Adding an auto_now_add field to a model."""
 81        # None means quit
 82        return None
 83
 84    def ask_unique_callable_default_addition(self, field_name, model_name):
 85        """Adding a unique field with a callable default."""
 86        # None means continue.
 87        return None
 88
 89
 90class InteractiveMigrationQuestioner(MigrationQuestioner):
 91    def __init__(
 92        self, defaults=None, specified_packages=None, dry_run=None, prompt_output=None
 93    ):
 94        super().__init__(
 95            defaults=defaults, specified_packages=specified_packages, dry_run=dry_run
 96        )
 97        self.prompt_output = prompt_output or sys.stdout
 98
 99    def _boolean_input(self, question, default=None):
100        self.prompt_output.write(f"{question} ", ending="")
101        result = input()
102        if not result and default is not None:
103            return default
104        while not result or result[0].lower() not in "yn":
105            self.prompt_output.write("Please answer yes or no: ", ending="")
106            result = input()
107        return result[0].lower() == "y"
108
109    def _choice_input(self, question, choices):
110        self.prompt_output.write(f"{question}")
111        for i, choice in enumerate(choices):
112            self.prompt_output.write(f" {i + 1}) {choice}")
113        self.prompt_output.write("Select an option: ", ending="")
114        result = input()
115        while True:
116            try:
117                value = int(result)
118            except ValueError:
119                pass
120            else:
121                if 0 < value <= len(choices):
122                    return value
123            self.prompt_output.write("Please select a valid option: ", ending="")
124            result = input()
125
126    def _ask_default(self, default=""):
127        """
128        Prompt for a default value.
129
130        The ``default`` argument allows providing a custom default value (as a
131        string) which will be shown to the user and used as the return value
132        if the user doesn't provide any other input.
133        """
134        self.prompt_output.write("Please enter the default value as valid Python.")
135        if default:
136            self.prompt_output.write(
137                f"Accept the default '{default}' by pressing 'Enter' or "
138                f"provide another value."
139            )
140        self.prompt_output.write(
141            "The datetime and plain.utils.timezone modules are available, so "
142            "it is possible to provide e.g. timezone.now as a value."
143        )
144        self.prompt_output.write("Type 'exit' to exit this prompt")
145        while True:
146            if default:
147                prompt = f"[default: {default}] >>> "
148            else:
149                prompt = ">>> "
150            self.prompt_output.write(prompt, ending="")
151            code = input()
152            if not code and default:
153                code = default
154            if not code:
155                self.prompt_output.write(
156                    "Please enter some code, or 'exit' (without quotes) to exit."
157                )
158            elif code == "exit":
159                sys.exit(1)
160            else:
161                try:
162                    return eval(code, {}, {"datetime": datetime, "timezone": timezone})
163                except (SyntaxError, NameError) as e:
164                    self.prompt_output.write("Invalid input: %s" % e)
165
166    def ask_not_null_addition(self, field_name, model_name):
167        """Adding a NOT NULL field to a model."""
168        if not self.dry_run:
169            choice = self._choice_input(
170                f"It is impossible to add a non-nullable field '{field_name}' "
171                f"to {model_name} without specifying a default. This is "
172                f"because the database needs something to populate existing "
173                f"rows.\n"
174                f"Please select a fix:",
175                [
176                    (
177                        "Provide a one-off default now (will be set on all existing "
178                        "rows with a null value for this column)"
179                    ),
180                    "Quit and manually define a default value in models.py.",
181                ],
182            )
183            if choice == 2:
184                sys.exit(3)
185            else:
186                return self._ask_default()
187        return None
188
189    def ask_not_null_alteration(self, field_name, model_name):
190        """Changing a NULL field to NOT NULL."""
191        if not self.dry_run:
192            choice = self._choice_input(
193                f"It is impossible to change a nullable field '{field_name}' "
194                f"on {model_name} to non-nullable without providing a "
195                f"default. This is because the database needs something to "
196                f"populate existing rows.\n"
197                f"Please select a fix:",
198                [
199                    (
200                        "Provide a one-off default now (will be set on all existing "
201                        "rows with a null value for this column)"
202                    ),
203                    "Ignore for now. Existing rows that contain NULL values "
204                    "will have to be handled manually, for example with a "
205                    "RunPython or RunSQL operation.",
206                    "Quit and manually define a default value in models.py.",
207                ],
208            )
209            if choice == 2:
210                return NOT_PROVIDED
211            elif choice == 3:
212                sys.exit(3)
213            else:
214                return self._ask_default()
215        return None
216
217    def ask_rename(self, model_name, old_name, new_name, field_instance):
218        """Was this field really renamed?"""
219        msg = "Was %s.%s renamed to %s.%s (a %s)? [y/N]"
220        return self._boolean_input(
221            msg
222            % (
223                model_name,
224                old_name,
225                model_name,
226                new_name,
227                field_instance.__class__.__name__,
228            ),
229            False,
230        )
231
232    def ask_rename_model(self, old_model_state, new_model_state):
233        """Was this model really renamed?"""
234        msg = "Was the model %s.%s renamed to %s? [y/N]"
235        return self._boolean_input(
236            msg
237            % (
238                old_model_state.package_label,
239                old_model_state.name,
240                new_model_state.name,
241            ),
242            False,
243        )
244
245    def ask_merge(self, package_label):
246        return self._boolean_input(
247            (
248                "\nMerging will only work if the operations printed above do not conflict\n"
249                "with each other (working on different fields or models)\n"
250                "Should these migration branches be merged? [y/N]"
251            ),
252            False,
253        )
254
255    def ask_auto_now_add_addition(self, field_name, model_name):
256        """Adding an auto_now_add field to a model."""
257        if not self.dry_run:
258            choice = self._choice_input(
259                f"It is impossible to add the field '{field_name}' with "
260                f"'auto_now_add=True' to {model_name} without providing a "
261                f"default. This is because the database needs something to "
262                f"populate existing rows.\n",
263                [
264                    "Provide a one-off default now which will be set on all "
265                    "existing rows",
266                    "Quit and manually define a default value in models.py.",
267                ],
268            )
269            if choice == 2:
270                sys.exit(3)
271            else:
272                return self._ask_default(default="timezone.now")
273        return None
274
275    def ask_unique_callable_default_addition(self, field_name, model_name):
276        """Adding a unique field with a callable default."""
277        if not self.dry_run:
278            choice = self._choice_input(
279                f"Callable default on unique field {model_name}.{field_name} "
280                f"will not generate unique values upon migrating.\n"
281                f"Please choose how to proceed:\n",
282                [
283                    "Continue making this migration as the first step in "
284                    "writing a manual migration to generate unique values.",
285                    "Quit and edit field options in models.py.",
286                ],
287            )
288            if choice == 2:
289                sys.exit(3)
290        return None
291
292
293class NonInteractiveMigrationQuestioner(MigrationQuestioner):
294    def __init__(
295        self,
296        defaults=None,
297        specified_packages=None,
298        dry_run=None,
299        verbosity=1,
300        log=None,
301    ):
302        self.verbosity = verbosity
303        self.log = log
304        super().__init__(
305            defaults=defaults,
306            specified_packages=specified_packages,
307            dry_run=dry_run,
308        )
309
310    def log_lack_of_migration(self, field_name, model_name, reason):
311        if self.verbosity > 0:
312            self.log(
313                f"Field '{field_name}' on model '{model_name}' not migrated: "
314                f"{reason}."
315            )
316
317    def ask_not_null_addition(self, field_name, model_name):
318        # We can't ask the user, so act like the user aborted.
319        self.log_lack_of_migration(
320            field_name,
321            model_name,
322            "it is impossible to add a non-nullable field without specifying "
323            "a default",
324        )
325        sys.exit(3)
326
327    def ask_not_null_alteration(self, field_name, model_name):
328        # We can't ask the user, so set as not provided.
329        self.log(
330            f"Field '{field_name}' on model '{model_name}' given a default of "
331            f"NOT PROVIDED and must be corrected."
332        )
333        return NOT_PROVIDED
334
335    def ask_auto_now_add_addition(self, field_name, model_name):
336        # We can't ask the user, so act like the user aborted.
337        self.log_lack_of_migration(
338            field_name,
339            model_name,
340            "it is impossible to add a field with 'auto_now_add=True' without "
341            "specifying a default",
342        )
343        sys.exit(3)