1from __future__ import annotations
2
3from typing import Any
4
5
6class MigrationOptimizer:
7 """
8 Power the optimization process, where you provide a list of Operations
9 and you are returned a list of equal or shorter length - operations
10 are merged into one if possible.
11
12 For example, a CreateModel and an AddField can be optimized into a
13 new CreateModel, and CreateModel and DeleteModel can be optimized into
14 nothing.
15 """
16
17 def optimize(self, operations: list[Any], package_label: str) -> list[Any]:
18 """
19 Main optimization entry point. Pass in a list of Operation instances,
20 get out a new list of Operation instances.
21
22 Unfortunately, due to the scope of the optimization (two combinable
23 operations might be separated by several hundred others), this can't be
24 done as a peephole optimization with checks/output implemented on
25 the Operations themselves; instead, the optimizer looks at each
26 individual operation and scans forwards in the list to see if there
27 are any matches, stopping at boundaries - operations which can't
28 be optimized over (RunSQL, operations on the same field/model, etc.)
29
30 The inner loop is run until the starting list is the same as the result
31 list, and then the result is returned. This means that operation
32 optimization must be stable and always return an equal or shorter list.
33 """
34 # Internal tracking variable for test assertions about # of loops
35 if package_label is None:
36 raise TypeError("package_label must be a str.")
37 while True:
38 result = self.optimize_inner(operations, package_label)
39 if result == operations:
40 return result
41 operations = result
42
43 def optimize_inner(self, operations: list[Any], package_label: str) -> list[Any]:
44 """Inner optimization loop."""
45 new_operations = []
46 for i, operation in enumerate(operations):
47 right = True # Should we reduce on the right or on the left.
48 # Compare it to each operation after it
49 for j, other in enumerate(operations[i + 1 :]):
50 result = operation.reduce(other, package_label)
51 if isinstance(result, list):
52 in_between = operations[i + 1 : i + j + 1]
53 if right:
54 new_operations.extend(in_between)
55 new_operations.extend(result)
56 elif all(
57 op.reduce(other, package_label) is True for op in in_between
58 ):
59 # Perform a left reduction if all of the in-between
60 # operations can optimize through other.
61 new_operations.extend(result)
62 new_operations.extend(in_between)
63 else:
64 # Otherwise keep trying.
65 new_operations.append(operation)
66 break
67 new_operations.extend(operations[i + j + 2 :])
68 return new_operations
69 elif not result:
70 # Can't perform a right reduction.
71 right = False
72 else:
73 new_operations.append(operation)
74 return new_operations