1from __future__ import annotations
2
3import datetime
4import importlib
5import os
6import sys
7from collections.abc import Callable
8from typing import TYPE_CHECKING, Any
9
10import click
11
12from plain.models.fields import NOT_PROVIDED
13from plain.packages import packages_registry
14from plain.utils import timezone
15
16from .loader import MigrationLoader
17
18if TYPE_CHECKING:
19 from plain.models.fields import Field
20
21
22class MigrationQuestioner:
23 """
24 Give the autodetector responses to questions it might have.
25 This base class has a built-in noninteractive mode, but the
26 interactive subclass is what the command-line arguments will use.
27 """
28
29 def __init__(
30 self,
31 defaults: dict[str, Any] | None = None,
32 specified_packages: set[str] | None = None,
33 dry_run: bool | None = None,
34 ) -> None:
35 self.defaults = defaults or {}
36 self.specified_packages = specified_packages or set()
37 self.dry_run = dry_run
38
39 def ask_initial(self, package_label: str) -> bool:
40 """Should we create an initial migration for the app?"""
41 # If it was specified on the command line, definitely true
42 if package_label in self.specified_packages:
43 return True
44 # Otherwise, we look to see if it has a migrations module
45 # without any Python files in it, apart from __init__.py.
46 # Packages from the new app template will have these; the Python
47 # file check will ensure we skip South ones.
48 try:
49 package_config = packages_registry.get_package_config(package_label)
50 except LookupError: # It's a fake app.
51 return self.defaults.get("ask_initial", False)
52 migrations_import_path, _ = MigrationLoader.migrations_module(
53 package_config.package_label
54 )
55 if migrations_import_path is None:
56 # It's an application with migrations disabled.
57 return self.defaults.get("ask_initial", False)
58 try:
59 migrations_module = importlib.import_module(migrations_import_path)
60 except ImportError:
61 return self.defaults.get("ask_initial", False)
62 else:
63 if getattr(migrations_module, "__file__", None):
64 filenames = os.listdir(os.path.dirname(migrations_module.__file__)) # type: ignore[arg-type]
65 elif hasattr(migrations_module, "__path__"):
66 if len(migrations_module.__path__) > 1:
67 return False
68 filenames = os.listdir(list(migrations_module.__path__)[0])
69 return not any(x.endswith(".py") for x in filenames if x != "__init__.py")
70
71 def ask_not_null_addition(self, field_name: str, model_name: str) -> Any:
72 """Adding a NOT NULL field to a model."""
73 # None means quit
74 return None
75
76 def ask_not_null_alteration(self, field_name: str, model_name: str) -> Any:
77 """Changing a NULL field to NOT NULL."""
78 # None means quit
79 return None
80
81 def ask_rename(
82 self, model_name: str, old_name: str, new_name: str, field_instance: Field
83 ) -> bool:
84 """Was this field really renamed?"""
85 return self.defaults.get("ask_rename", False)
86
87 def ask_rename_model(self, old_model_state: Any, new_model_state: Any) -> bool:
88 """Was this model really renamed?"""
89 return self.defaults.get("ask_rename_model", False)
90
91 def ask_auto_now_add_addition(self, field_name: str, model_name: str) -> Any:
92 """Adding an auto_now_add field to a model."""
93 # None means quit
94 return None
95
96 def ask_unique_callable_default_addition(
97 self, field_name: str, model_name: str
98 ) -> Any:
99 """Adding a unique field with a callable default."""
100 # None means continue.
101 return None
102
103
104class InteractiveMigrationQuestioner(MigrationQuestioner):
105 def __init__(
106 self,
107 defaults: dict[str, Any] | None = None,
108 specified_packages: set[str] | None = None,
109 dry_run: bool | None = None,
110 ) -> None:
111 super().__init__(
112 defaults=defaults, specified_packages=specified_packages, dry_run=dry_run
113 )
114
115 def _boolean_input(self, question: str, default: bool | None = None) -> bool:
116 return click.confirm(question, default=default)
117
118 def _choice_input(self, question: str, choices: list[str]) -> int:
119 choice_map = {str(i + 1): choice for i, choice in enumerate(choices)}
120 choice_map_str = "\n".join(
121 [f"{i}) {choice}" for i, choice in choice_map.items()]
122 )
123 choice = click.prompt(
124 f"{question}\n{choice_map_str}\nSelect an option",
125 type=click.Choice(choice_map.keys()),
126 )
127 return int(choice)
128
129 def _ask_default(self, default: str = "") -> Any:
130 """
131 Prompt for a default value.
132
133 The ``default`` argument allows providing a custom default value (as a
134 string) which will be shown to the user and used as the return value
135 if the user doesn't provide any other input.
136 """
137 click.echo("Please enter the default value as valid Python.")
138 if default:
139 click.echo(
140 f"Accept the default '{default}' by pressing 'Enter' or "
141 f"provide another value."
142 )
143 click.echo(
144 "The datetime and plain.utils.timezone modules are available, so "
145 "it is possible to provide e.g. timezone.now as a value."
146 )
147 click.echo("Type 'exit' to exit this prompt")
148 while True:
149 if default:
150 prompt = f"[default: {default}] >>> "
151 else:
152 prompt = ">>> "
153 code = click.prompt(prompt, default=default, show_default=False)
154 if not code and default:
155 code = default
156 if not code:
157 click.echo(
158 "Please enter some code, or 'exit' (without quotes) to exit."
159 )
160 elif code == "exit":
161 sys.exit(1)
162 else:
163 try:
164 return eval(code, {}, {"datetime": datetime, "timezone": timezone})
165 except (SyntaxError, NameError) as e:
166 click.echo(f"Invalid input: {e}")
167
168 def ask_not_null_addition(self, field_name: str, model_name: str) -> Any:
169 """Adding a NOT NULL field to a model."""
170 if not self.dry_run:
171 choice = self._choice_input(
172 f"It is impossible to add a non-nullable field '{field_name}' "
173 f"to {model_name} without specifying a default. This is "
174 f"because the database needs something to populate existing "
175 f"rows.\n"
176 f"Please select a fix:",
177 [
178 (
179 "Provide a one-off default now (will be set on all existing "
180 "rows with a null value for this column)"
181 ),
182 "Quit and manually define a default value in models.py.",
183 ],
184 )
185 if choice == 2:
186 sys.exit(3)
187 else:
188 return self._ask_default()
189 return None
190
191 def ask_not_null_alteration(self, field_name: str, model_name: str) -> Any:
192 """Changing a NULL field to NOT NULL."""
193 if not self.dry_run:
194 choice = self._choice_input(
195 f"It is impossible to change a nullable field '{field_name}' "
196 f"on {model_name} to non-nullable without providing a "
197 f"default. This is because the database needs something to "
198 f"populate existing rows.\n"
199 f"Please select a fix:",
200 [
201 (
202 "Provide a one-off default now (will be set on all existing "
203 "rows with a null value for this column)"
204 ),
205 "Ignore for now. Existing rows that contain NULL values "
206 "will have to be handled manually, for example with a "
207 "RunPython or RunSQL operation.",
208 "Quit and manually define a default value in models.py.",
209 ],
210 )
211 if choice == 2:
212 return NOT_PROVIDED
213 elif choice == 3:
214 sys.exit(3)
215 else:
216 return self._ask_default()
217 return None
218
219 def ask_rename(
220 self, model_name: str, old_name: str, new_name: str, field_instance: Field
221 ) -> bool:
222 """Was this field really renamed?"""
223 msg = "Was %s.%s renamed to %s.%s (a %s)?"
224 return self._boolean_input(
225 msg
226 % (
227 model_name,
228 old_name,
229 model_name,
230 new_name,
231 field_instance.__class__.__name__,
232 ),
233 default=False,
234 )
235
236 def ask_rename_model(self, old_model_state: Any, new_model_state: Any) -> bool:
237 """Was this model really renamed?"""
238 msg = "Was the model %s.%s renamed to %s?"
239 return self._boolean_input(
240 msg
241 % (
242 old_model_state.package_label,
243 old_model_state.name,
244 new_model_state.name,
245 ),
246 default=False,
247 )
248
249 def ask_auto_now_add_addition(self, field_name: str, model_name: str) -> Any:
250 """Adding an auto_now_add field to a model."""
251 if not self.dry_run:
252 choice = self._choice_input(
253 f"It is impossible to add the field '{field_name}' with "
254 f"'auto_now_add=True' to {model_name} without providing a "
255 f"default. This is because the database needs something to "
256 f"populate existing rows.\n",
257 [
258 "Provide a one-off default now which will be set on all "
259 "existing rows",
260 "Quit and manually define a default value in models.py.",
261 ],
262 )
263 if choice == 2:
264 sys.exit(3)
265 else:
266 return self._ask_default(default="timezone.now")
267 return None
268
269 def ask_unique_callable_default_addition(
270 self, field_name: str, model_name: str
271 ) -> Any:
272 """Adding a unique field with a callable default."""
273 if not self.dry_run:
274 choice = self._choice_input(
275 f"Callable default on unique field {model_name}.{field_name} "
276 f"will not generate unique values upon migrating.\n"
277 f"Please choose how to proceed:\n",
278 [
279 "Continue making this migration as the first step in "
280 "writing a manual migration to generate unique values.",
281 "Quit and edit field options in models.py.",
282 ],
283 )
284 if choice == 2:
285 sys.exit(3)
286 return None
287
288
289class NonInteractiveMigrationQuestioner(MigrationQuestioner):
290 def __init__(
291 self,
292 defaults: dict[str, Any] | None = None,
293 specified_packages: set[str] | None = None,
294 dry_run: bool | None = None,
295 verbosity: int = 1,
296 log: Callable[[str], Any] | None = None,
297 ) -> None:
298 self.verbosity = verbosity
299 self.log = log
300 super().__init__(
301 defaults=defaults,
302 specified_packages=specified_packages,
303 dry_run=dry_run,
304 )
305
306 def log_lack_of_migration(
307 self, field_name: str, model_name: str, reason: str
308 ) -> None:
309 if self.verbosity > 0:
310 self.log( # type: ignore[misc]
311 f"Field '{field_name}' on model '{model_name}' not migrated: {reason}."
312 )
313
314 def ask_not_null_addition(self, field_name: str, model_name: str) -> Any:
315 # We can't ask the user, so act like the user aborted.
316 self.log_lack_of_migration(
317 field_name,
318 model_name,
319 "it is impossible to add a non-nullable field without specifying a default",
320 )
321 sys.exit(3)
322
323 def ask_not_null_alteration(self, field_name: str, model_name: str) -> Any:
324 # We can't ask the user, so set as not provided.
325 self.log( # type: ignore[misc]
326 f"Field '{field_name}' on model '{model_name}' given a default of "
327 f"NOT PROVIDED and must be corrected."
328 )
329 return NOT_PROVIDED
330
331 def ask_auto_now_add_addition(self, field_name: str, model_name: str) -> Any:
332 # We can't ask the user, so act like the user aborted.
333 self.log_lack_of_migration(
334 field_name,
335 model_name,
336 "it is impossible to add a field with 'auto_now_add=True' without "
337 "specifying a default",
338 )
339 sys.exit(3)