1import datetime
2import importlib
3import os
4import sys
5
6import click
7
8from plain.models.fields import NOT_PROVIDED
9from plain.packages import packages_registry
10from plain.utils import timezone
11
12from .loader import MigrationLoader
13
14
15class MigrationQuestioner:
16 """
17 Give the autodetector responses to questions it might have.
18 This base class has a built-in noninteractive mode, but the
19 interactive subclass is what the command-line arguments will use.
20 """
21
22 def __init__(self, defaults=None, specified_packages=None, dry_run=None):
23 self.defaults = defaults or {}
24 self.specified_packages = specified_packages or set()
25 self.dry_run = dry_run
26
27 def ask_initial(self, package_label):
28 """Should we create an initial migration for the app?"""
29 # If it was specified on the command line, definitely true
30 if package_label in self.specified_packages:
31 return True
32 # Otherwise, we look to see if it has a migrations module
33 # without any Python files in it, apart from __init__.py.
34 # Packages from the new app template will have these; the Python
35 # file check will ensure we skip South ones.
36 try:
37 package_config = packages_registry.get_package_config(package_label)
38 except LookupError: # It's a fake app.
39 return self.defaults.get("ask_initial", False)
40 migrations_import_path, _ = MigrationLoader.migrations_module(
41 package_config.package_label
42 )
43 if migrations_import_path is None:
44 # It's an application with migrations disabled.
45 return self.defaults.get("ask_initial", False)
46 try:
47 migrations_module = importlib.import_module(migrations_import_path)
48 except ImportError:
49 return self.defaults.get("ask_initial", False)
50 else:
51 if getattr(migrations_module, "__file__", None):
52 filenames = os.listdir(os.path.dirname(migrations_module.__file__))
53 elif hasattr(migrations_module, "__path__"):
54 if len(migrations_module.__path__) > 1:
55 return False
56 filenames = os.listdir(list(migrations_module.__path__)[0])
57 return not any(x.endswith(".py") for x in filenames if x != "__init__.py")
58
59 def ask_not_null_addition(self, field_name, model_name):
60 """Adding a NOT NULL field to a model."""
61 # None means quit
62 return None
63
64 def ask_not_null_alteration(self, field_name, model_name):
65 """Changing a NULL field to NOT NULL."""
66 # None means quit
67 return None
68
69 def ask_rename(self, model_name, old_name, new_name, field_instance):
70 """Was this field really renamed?"""
71 return self.defaults.get("ask_rename", False)
72
73 def ask_rename_model(self, old_model_state, new_model_state):
74 """Was this model really renamed?"""
75 return self.defaults.get("ask_rename_model", False)
76
77 def ask_merge(self, package_label):
78 """Should these migrations really be merged?"""
79 return self.defaults.get("ask_merge", False)
80
81 def ask_auto_now_add_addition(self, field_name, model_name):
82 """Adding an auto_now_add field to a model."""
83 # None means quit
84 return None
85
86 def ask_unique_callable_default_addition(self, field_name, model_name):
87 """Adding a unique field with a callable default."""
88 # None means continue.
89 return None
90
91
92class InteractiveMigrationQuestioner(MigrationQuestioner):
93 def __init__(self, defaults=None, specified_packages=None, dry_run=None):
94 super().__init__(
95 defaults=defaults, specified_packages=specified_packages, dry_run=dry_run
96 )
97
98 def _boolean_input(self, question, default=None):
99 return click.confirm(question, default=default)
100
101 def _choice_input(self, question, choices):
102 choice_map = {str(i + 1): choice for i, choice in enumerate(choices)}
103 choice_map_str = "\n".join(
104 [f"{i}) {choice}" for i, choice in choice_map.items()]
105 )
106 choice = click.prompt(
107 f"{question}\n{choice_map_str}\nSelect an option",
108 type=click.Choice(choice_map.keys()),
109 )
110 return int(choice)
111
112 def _ask_default(self, default=""):
113 """
114 Prompt for a default value.
115
116 The ``default`` argument allows providing a custom default value (as a
117 string) which will be shown to the user and used as the return value
118 if the user doesn't provide any other input.
119 """
120 click.echo("Please enter the default value as valid Python.")
121 if default:
122 click.echo(
123 f"Accept the default '{default}' by pressing 'Enter' or "
124 f"provide another value."
125 )
126 click.echo(
127 "The datetime and plain.utils.timezone modules are available, so "
128 "it is possible to provide e.g. timezone.now as a value."
129 )
130 click.echo("Type 'exit' to exit this prompt")
131 while True:
132 if default:
133 prompt = f"[default: {default}] >>> "
134 else:
135 prompt = ">>> "
136 code = click.prompt(prompt, default=default, show_default=False)
137 if not code and default:
138 code = default
139 if not code:
140 click.echo(
141 "Please enter some code, or 'exit' (without quotes) to exit."
142 )
143 elif code == "exit":
144 sys.exit(1)
145 else:
146 try:
147 return eval(code, {}, {"datetime": datetime, "timezone": timezone})
148 except (SyntaxError, NameError) as e:
149 click.echo(f"Invalid input: {e}")
150
151 def ask_not_null_addition(self, field_name, model_name):
152 """Adding a NOT NULL field to a model."""
153 if not self.dry_run:
154 choice = self._choice_input(
155 f"It is impossible to add a non-nullable field '{field_name}' "
156 f"to {model_name} without specifying a default. This is "
157 f"because the database needs something to populate existing "
158 f"rows.\n"
159 f"Please select a fix:",
160 [
161 (
162 "Provide a one-off default now (will be set on all existing "
163 "rows with a null value for this column)"
164 ),
165 "Quit and manually define a default value in models.py.",
166 ],
167 )
168 if choice == 2:
169 sys.exit(3)
170 else:
171 return self._ask_default()
172 return None
173
174 def ask_not_null_alteration(self, field_name, model_name):
175 """Changing a NULL field to NOT NULL."""
176 if not self.dry_run:
177 choice = self._choice_input(
178 f"It is impossible to change a nullable field '{field_name}' "
179 f"on {model_name} to non-nullable without providing a "
180 f"default. This is because the database needs something to "
181 f"populate existing rows.\n"
182 f"Please select a fix:",
183 [
184 (
185 "Provide a one-off default now (will be set on all existing "
186 "rows with a null value for this column)"
187 ),
188 "Ignore for now. Existing rows that contain NULL values "
189 "will have to be handled manually, for example with a "
190 "RunPython or RunSQL operation.",
191 "Quit and manually define a default value in models.py.",
192 ],
193 )
194 if choice == 2:
195 return NOT_PROVIDED
196 elif choice == 3:
197 sys.exit(3)
198 else:
199 return self._ask_default()
200 return None
201
202 def ask_rename(self, model_name, old_name, new_name, field_instance):
203 """Was this field really renamed?"""
204 msg = "Was %s.%s renamed to %s.%s (a %s)?"
205 return self._boolean_input(
206 msg
207 % (
208 model_name,
209 old_name,
210 model_name,
211 new_name,
212 field_instance.__class__.__name__,
213 ),
214 default=False,
215 )
216
217 def ask_rename_model(self, old_model_state, new_model_state):
218 """Was this model really renamed?"""
219 msg = "Was the model %s.%s renamed to %s?"
220 return self._boolean_input(
221 msg
222 % (
223 old_model_state.package_label,
224 old_model_state.name,
225 new_model_state.name,
226 ),
227 default=False,
228 )
229
230 def ask_merge(self, package_label):
231 return self._boolean_input(
232 (
233 "\nMerging will only work if the operations printed above do not conflict\n"
234 "with each other (working on different fields or models)\n"
235 "Should these migration branches be merged?"
236 ),
237 default=False,
238 )
239
240 def ask_auto_now_add_addition(self, field_name, model_name):
241 """Adding an auto_now_add field to a model."""
242 if not self.dry_run:
243 choice = self._choice_input(
244 f"It is impossible to add the field '{field_name}' with "
245 f"'auto_now_add=True' to {model_name} without providing a "
246 f"default. This is because the database needs something to "
247 f"populate existing rows.\n",
248 [
249 "Provide a one-off default now which will be set on all "
250 "existing rows",
251 "Quit and manually define a default value in models.py.",
252 ],
253 )
254 if choice == 2:
255 sys.exit(3)
256 else:
257 return self._ask_default(default="timezone.now")
258 return None
259
260 def ask_unique_callable_default_addition(self, field_name, model_name):
261 """Adding a unique field with a callable default."""
262 if not self.dry_run:
263 choice = self._choice_input(
264 f"Callable default on unique field {model_name}.{field_name} "
265 f"will not generate unique values upon migrating.\n"
266 f"Please choose how to proceed:\n",
267 [
268 "Continue making this migration as the first step in "
269 "writing a manual migration to generate unique values.",
270 "Quit and edit field options in models.py.",
271 ],
272 )
273 if choice == 2:
274 sys.exit(3)
275 return None
276
277
278class NonInteractiveMigrationQuestioner(MigrationQuestioner):
279 def __init__(
280 self,
281 defaults=None,
282 specified_packages=None,
283 dry_run=None,
284 verbosity=1,
285 log=None,
286 ):
287 self.verbosity = verbosity
288 self.log = log
289 super().__init__(
290 defaults=defaults,
291 specified_packages=specified_packages,
292 dry_run=dry_run,
293 )
294
295 def log_lack_of_migration(self, field_name, model_name, reason):
296 if self.verbosity > 0:
297 self.log(
298 f"Field '{field_name}' on model '{model_name}' not migrated: {reason}."
299 )
300
301 def ask_not_null_addition(self, field_name, model_name):
302 # We can't ask the user, so act like the user aborted.
303 self.log_lack_of_migration(
304 field_name,
305 model_name,
306 "it is impossible to add a non-nullable field without specifying a default",
307 )
308 sys.exit(3)
309
310 def ask_not_null_alteration(self, field_name, model_name):
311 # We can't ask the user, so set as not provided.
312 self.log(
313 f"Field '{field_name}' on model '{model_name}' given a default of "
314 f"NOT PROVIDED and must be corrected."
315 )
316 return NOT_PROVIDED
317
318 def ask_auto_now_add_addition(self, field_name, model_name):
319 # We can't ask the user, so act like the user aborted.
320 self.log_lack_of_migration(
321 field_name,
322 model_name,
323 "it is impossible to add a field with 'auto_now_add=True' without "
324 "specifying a default",
325 )
326 sys.exit(3)