1from __future__ import annotations
 2
 3from typing import Any
 4
 5
 6class MigrationOptimizer:
 7    """
 8    Power the optimization process, where you provide a list of Operations
 9    and you are returned a list of equal or shorter length - operations
10    are merged into one if possible.
11
12    For example, a CreateModel and an AddField can be optimized into a
13    new CreateModel, and CreateModel and DeleteModel can be optimized into
14    nothing.
15    """
16
17    def optimize(self, operations: list[Any], package_label: str) -> list[Any]:
18        """
19        Main optimization entry point. Pass in a list of Operation instances,
20        get out a new list of Operation instances.
21
22        Unfortunately, due to the scope of the optimization (two combinable
23        operations might be separated by several hundred others), this can't be
24        done as a peephole optimization with checks/output implemented on
25        the Operations themselves; instead, the optimizer looks at each
26        individual operation and scans forwards in the list to see if there
27        are any matches, stopping at boundaries - operations which can't
28        be optimized over (RunSQL, operations on the same field/model, etc.)
29
30        The inner loop is run until the starting list is the same as the result
31        list, and then the result is returned. This means that operation
32        optimization must be stable and always return an equal or shorter list.
33        """
34        # Internal tracking variable for test assertions about # of loops
35        if package_label is None:
36            raise TypeError("package_label must be a str.")
37        while True:
38            result = self.optimize_inner(operations, package_label)
39            if result == operations:
40                return result
41            operations = result
42
43    def optimize_inner(self, operations: list[Any], package_label: str) -> list[Any]:
44        """Inner optimization loop."""
45        new_operations = []
46        for i, operation in enumerate(operations):
47            right = True  # Should we reduce on the right or on the left.
48            # Compare it to each operation after it
49            for j, other in enumerate(operations[i + 1 :]):
50                result = operation.reduce(other, package_label)
51                if isinstance(result, list):
52                    in_between = operations[i + 1 : i + j + 1]
53                    if right:
54                        new_operations.extend(in_between)
55                        new_operations.extend(result)
56                    elif all(
57                        op.reduce(other, package_label) is True for op in in_between
58                    ):
59                        # Perform a left reduction if all of the in-between
60                        # operations can optimize through other.
61                        new_operations.extend(result)
62                        new_operations.extend(in_between)
63                    else:
64                        # Otherwise keep trying.
65                        new_operations.append(operation)
66                        break
67                    new_operations.extend(operations[i + j + 2 :])
68                    return new_operations
69                elif not result:
70                    # Can't perform a right reduction.
71                    right = False
72            else:
73                new_operations.append(operation)
74        return new_operations