1import datetime
2import importlib
3import os
4import sys
5
6from plain.models.fields import NOT_PROVIDED
7from plain.packages import packages
8from plain.utils import timezone
9
10from .loader import MigrationLoader
11
12
13class MigrationQuestioner:
14 """
15 Give the autodetector responses to questions it might have.
16 This base class has a built-in noninteractive mode, but the
17 interactive subclass is what the command-line arguments will use.
18 """
19
20 def __init__(self, defaults=None, specified_packages=None, dry_run=None):
21 self.defaults = defaults or {}
22 self.specified_packages = specified_packages or set()
23 self.dry_run = dry_run
24
25 def ask_initial(self, package_label):
26 """Should we create an initial migration for the app?"""
27 # If it was specified on the command line, definitely true
28 if package_label in self.specified_packages:
29 return True
30 # Otherwise, we look to see if it has a migrations module
31 # without any Python files in it, apart from __init__.py.
32 # Packages from the new app template will have these; the Python
33 # file check will ensure we skip South ones.
34 try:
35 package_config = packages.get_package_config(package_label)
36 except LookupError: # It's a fake app.
37 return self.defaults.get("ask_initial", False)
38 migrations_import_path, _ = MigrationLoader.migrations_module(
39 package_config.label
40 )
41 if migrations_import_path is None:
42 # It's an application with migrations disabled.
43 return self.defaults.get("ask_initial", False)
44 try:
45 migrations_module = importlib.import_module(migrations_import_path)
46 except ImportError:
47 return self.defaults.get("ask_initial", False)
48 else:
49 if getattr(migrations_module, "__file__", None):
50 filenames = os.listdir(os.path.dirname(migrations_module.__file__))
51 elif hasattr(migrations_module, "__path__"):
52 if len(migrations_module.__path__) > 1:
53 return False
54 filenames = os.listdir(list(migrations_module.__path__)[0])
55 return not any(x.endswith(".py") for x in filenames if x != "__init__.py")
56
57 def ask_not_null_addition(self, field_name, model_name):
58 """Adding a NOT NULL field to a model."""
59 # None means quit
60 return None
61
62 def ask_not_null_alteration(self, field_name, model_name):
63 """Changing a NULL field to NOT NULL."""
64 # None means quit
65 return None
66
67 def ask_rename(self, model_name, old_name, new_name, field_instance):
68 """Was this field really renamed?"""
69 return self.defaults.get("ask_rename", False)
70
71 def ask_rename_model(self, old_model_state, new_model_state):
72 """Was this model really renamed?"""
73 return self.defaults.get("ask_rename_model", False)
74
75 def ask_merge(self, package_label):
76 """Should these migrations really be merged?"""
77 return self.defaults.get("ask_merge", False)
78
79 def ask_auto_now_add_addition(self, field_name, model_name):
80 """Adding an auto_now_add field to a model."""
81 # None means quit
82 return None
83
84 def ask_unique_callable_default_addition(self, field_name, model_name):
85 """Adding a unique field with a callable default."""
86 # None means continue.
87 return None
88
89
90class InteractiveMigrationQuestioner(MigrationQuestioner):
91 def __init__(
92 self, defaults=None, specified_packages=None, dry_run=None, prompt_output=None
93 ):
94 super().__init__(
95 defaults=defaults, specified_packages=specified_packages, dry_run=dry_run
96 )
97 self.prompt_output = prompt_output or sys.stdout
98
99 def _boolean_input(self, question, default=None):
100 self.prompt_output.write(f"{question} ", ending="")
101 result = input()
102 if not result and default is not None:
103 return default
104 while not result or result[0].lower() not in "yn":
105 self.prompt_output.write("Please answer yes or no: ", ending="")
106 result = input()
107 return result[0].lower() == "y"
108
109 def _choice_input(self, question, choices):
110 self.prompt_output.write(f"{question}")
111 for i, choice in enumerate(choices):
112 self.prompt_output.write(f" {i + 1}) {choice}")
113 self.prompt_output.write("Select an option: ", ending="")
114 result = input()
115 while True:
116 try:
117 value = int(result)
118 except ValueError:
119 pass
120 else:
121 if 0 < value <= len(choices):
122 return value
123 self.prompt_output.write("Please select a valid option: ", ending="")
124 result = input()
125
126 def _ask_default(self, default=""):
127 """
128 Prompt for a default value.
129
130 The ``default`` argument allows providing a custom default value (as a
131 string) which will be shown to the user and used as the return value
132 if the user doesn't provide any other input.
133 """
134 self.prompt_output.write("Please enter the default value as valid Python.")
135 if default:
136 self.prompt_output.write(
137 f"Accept the default '{default}' by pressing 'Enter' or "
138 f"provide another value."
139 )
140 self.prompt_output.write(
141 "The datetime and plain.utils.timezone modules are available, so "
142 "it is possible to provide e.g. timezone.now as a value."
143 )
144 self.prompt_output.write("Type 'exit' to exit this prompt")
145 while True:
146 if default:
147 prompt = f"[default: {default}] >>> "
148 else:
149 prompt = ">>> "
150 self.prompt_output.write(prompt, ending="")
151 code = input()
152 if not code and default:
153 code = default
154 if not code:
155 self.prompt_output.write(
156 "Please enter some code, or 'exit' (without quotes) to exit."
157 )
158 elif code == "exit":
159 sys.exit(1)
160 else:
161 try:
162 return eval(code, {}, {"datetime": datetime, "timezone": timezone})
163 except (SyntaxError, NameError) as e:
164 self.prompt_output.write("Invalid input: %s" % e)
165
166 def ask_not_null_addition(self, field_name, model_name):
167 """Adding a NOT NULL field to a model."""
168 if not self.dry_run:
169 choice = self._choice_input(
170 f"It is impossible to add a non-nullable field '{field_name}' "
171 f"to {model_name} without specifying a default. This is "
172 f"because the database needs something to populate existing "
173 f"rows.\n"
174 f"Please select a fix:",
175 [
176 (
177 "Provide a one-off default now (will be set on all existing "
178 "rows with a null value for this column)"
179 ),
180 "Quit and manually define a default value in models.py.",
181 ],
182 )
183 if choice == 2:
184 sys.exit(3)
185 else:
186 return self._ask_default()
187 return None
188
189 def ask_not_null_alteration(self, field_name, model_name):
190 """Changing a NULL field to NOT NULL."""
191 if not self.dry_run:
192 choice = self._choice_input(
193 f"It is impossible to change a nullable field '{field_name}' "
194 f"on {model_name} to non-nullable without providing a "
195 f"default. This is because the database needs something to "
196 f"populate existing rows.\n"
197 f"Please select a fix:",
198 [
199 (
200 "Provide a one-off default now (will be set on all existing "
201 "rows with a null value for this column)"
202 ),
203 "Ignore for now. Existing rows that contain NULL values "
204 "will have to be handled manually, for example with a "
205 "RunPython or RunSQL operation.",
206 "Quit and manually define a default value in models.py.",
207 ],
208 )
209 if choice == 2:
210 return NOT_PROVIDED
211 elif choice == 3:
212 sys.exit(3)
213 else:
214 return self._ask_default()
215 return None
216
217 def ask_rename(self, model_name, old_name, new_name, field_instance):
218 """Was this field really renamed?"""
219 msg = "Was %s.%s renamed to %s.%s (a %s)? [y/N]"
220 return self._boolean_input(
221 msg
222 % (
223 model_name,
224 old_name,
225 model_name,
226 new_name,
227 field_instance.__class__.__name__,
228 ),
229 False,
230 )
231
232 def ask_rename_model(self, old_model_state, new_model_state):
233 """Was this model really renamed?"""
234 msg = "Was the model %s.%s renamed to %s? [y/N]"
235 return self._boolean_input(
236 msg
237 % (
238 old_model_state.package_label,
239 old_model_state.name,
240 new_model_state.name,
241 ),
242 False,
243 )
244
245 def ask_merge(self, package_label):
246 return self._boolean_input(
247 (
248 "\nMerging will only work if the operations printed above do not conflict\n"
249 "with each other (working on different fields or models)\n"
250 "Should these migration branches be merged? [y/N]"
251 ),
252 False,
253 )
254
255 def ask_auto_now_add_addition(self, field_name, model_name):
256 """Adding an auto_now_add field to a model."""
257 if not self.dry_run:
258 choice = self._choice_input(
259 f"It is impossible to add the field '{field_name}' with "
260 f"'auto_now_add=True' to {model_name} without providing a "
261 f"default. This is because the database needs something to "
262 f"populate existing rows.\n",
263 [
264 "Provide a one-off default now which will be set on all "
265 "existing rows",
266 "Quit and manually define a default value in models.py.",
267 ],
268 )
269 if choice == 2:
270 sys.exit(3)
271 else:
272 return self._ask_default(default="timezone.now")
273 return None
274
275 def ask_unique_callable_default_addition(self, field_name, model_name):
276 """Adding a unique field with a callable default."""
277 if not self.dry_run:
278 choice = self._choice_input(
279 f"Callable default on unique field {model_name}.{field_name} "
280 f"will not generate unique values upon migrating.\n"
281 f"Please choose how to proceed:\n",
282 [
283 "Continue making this migration as the first step in "
284 "writing a manual migration to generate unique values.",
285 "Quit and edit field options in models.py.",
286 ],
287 )
288 if choice == 2:
289 sys.exit(3)
290 return None
291
292
293class NonInteractiveMigrationQuestioner(MigrationQuestioner):
294 def __init__(
295 self,
296 defaults=None,
297 specified_packages=None,
298 dry_run=None,
299 verbosity=1,
300 log=None,
301 ):
302 self.verbosity = verbosity
303 self.log = log
304 super().__init__(
305 defaults=defaults,
306 specified_packages=specified_packages,
307 dry_run=dry_run,
308 )
309
310 def log_lack_of_migration(self, field_name, model_name, reason):
311 if self.verbosity > 0:
312 self.log(
313 f"Field '{field_name}' on model '{model_name}' not migrated: "
314 f"{reason}."
315 )
316
317 def ask_not_null_addition(self, field_name, model_name):
318 # We can't ask the user, so act like the user aborted.
319 self.log_lack_of_migration(
320 field_name,
321 model_name,
322 "it is impossible to add a non-nullable field without specifying "
323 "a default",
324 )
325 sys.exit(3)
326
327 def ask_not_null_alteration(self, field_name, model_name):
328 # We can't ask the user, so set as not provided.
329 self.log(
330 f"Field '{field_name}' on model '{model_name}' given a default of "
331 f"NOT PROVIDED and must be corrected."
332 )
333 return NOT_PROVIDED
334
335 def ask_auto_now_add_addition(self, field_name, model_name):
336 # We can't ask the user, so act like the user aborted.
337 self.log_lack_of_migration(
338 field_name,
339 model_name,
340 "it is impossible to add a field with 'auto_now_add=True' without "
341 "specifying a default",
342 )
343 sys.exit(3)