1import datetime
  2import multiprocessing
  3import queue
  4import signal
  5from types import FrameType
  6
  7from .color import get_colors
  8from .compat import ProcessManager
  9from .printer import Message, Printer
 10from .process import Process
 11
 12KILL_WAIT = 5
 13SIGNALS = {
 14    signal.SIGINT: {
 15        "name": "SIGINT",
 16        "rc": 130,
 17    },
 18    signal.SIGTERM: {
 19        "name": "SIGTERM",
 20        "rc": 143,
 21    },
 22}
 23SYSTEM_PRINTER_NAME = "system"
 24
 25
 26class Manager:
 27    """
 28    Manager is responsible for running multiple external processes in parallel
 29    managing the events that result (starting, stopping, printing). By default
 30    it relays printed lines to a printer that prints to STDOUT.
 31
 32    Example::
 33
 34        import sys
 35        from poncho.manager import Manager
 36
 37        m = Manager()
 38        m.add_process('server', 'ruby server.rb')
 39        m.add_process('worker', 'python worker.py')
 40        m.loop()
 41
 42        sys.exit(m.returncode)
 43    """
 44
 45    #: After :func:`~poncho.manager.Manager.loop` finishes,
 46    #: this will contain a return code that can be used with `sys.exit`.
 47    returncode = None
 48
 49    def __init__(self, printer: Printer | None = None) -> None:
 50        self.events = multiprocessing.Queue()
 51        self.returncode = None
 52
 53        self._colors = get_colors()
 54        self._clock = datetime.datetime
 55        self._procmgr = ProcessManager()
 56
 57        self._printer = printer if printer is not None else Printer(lambda s: print(s))
 58        self._printer.width = len(SYSTEM_PRINTER_NAME)
 59
 60        self._process_ctor = Process
 61        self._processes = {}
 62
 63        self._terminating = False
 64        self._terminate_start = datetime.datetime.min
 65        self._killed = False
 66
 67    def add_process(
 68        self,
 69        name: str,
 70        cmd: str,
 71        quiet: bool = False,
 72        env: dict[str, str] | None = None,
 73        cwd: str | None = None,
 74    ) -> Process:
 75        """
 76        Add a process to this manager instance. The process will not be started
 77        until :func:`~poncho.manager.Manager.loop` is called.
 78        """
 79        assert name not in self._processes, "process names must be unique"
 80        proc = self._process_ctor(
 81            cmd, name=name, quiet=quiet, color=next(self._colors), env=env, cwd=cwd
 82        )
 83        self._processes[name] = {}
 84        self._processes[name]["obj"] = proc
 85
 86        # Update printer width to accommodate this process name
 87        self._printer.width = max(self._printer.width, len(name))
 88
 89        # If the loop is already running, we need to start the process now.
 90        if self._is_running():
 91            self._start_process(name)
 92
 93        return proc
 94
 95    def num_processes(self) -> int:
 96        """
 97        Return the number of processes managed by this instance.
 98        """
 99        return len(self._processes)
100
101    def loop(self) -> None:
102        """
103        Start all the added processes and multiplex their output onto the bound
104        printer (which by default will print to STDOUT).
105
106        If one process terminates, all the others will be terminated by
107        Poncho, and :func:`~poncho.manager.Manager.loop` will return.
108
109        This method will block until all the processes have terminated.
110        """
111
112        def _terminate(signum: int, frame: FrameType | None) -> None:
113            sig = signal.Signals(signum)
114            self._system_print("{} received\n".format(SIGNALS[sig]["name"]))
115            self.returncode = SIGNALS[sig]["rc"]
116            if self._terminating:
117                self._system_print("forcing immediate shutdown\n")
118                self.kill()
119            else:
120                self.terminate()
121
122        signal.signal(signal.SIGTERM, _terminate)
123        signal.signal(signal.SIGINT, _terminate)
124
125        for name in self._processes.keys():
126            self._start_process(name)
127
128        exit = False
129
130        while 1:
131            try:
132                msg = self.events.get(timeout=0.1)
133            except queue.Empty:
134                if exit:
135                    break
136            else:
137                if msg.type == "line":
138                    self._printer.write(msg)
139                elif msg.type == "start":
140                    self._processes[msg.name]["pid"] = msg.data["pid"]
141                    self._system_print(
142                        "{} started (pid={})\n".format(msg.name, msg.data["pid"])
143                    )
144                elif msg.type == "stop":
145                    self._processes[msg.name]["returncode"] = msg.data["returncode"]
146                    self._system_print(
147                        "{} stopped (rc={})\n".format(msg.name, msg.data["returncode"])
148                    )
149                    if self.returncode is None:
150                        self.returncode = msg.data["returncode"]
151
152            if self._all_started() and self._all_stopped():
153                exit = True
154
155            if not self._terminating and self._all_started() and self._any_stopped():
156                self.terminate()
157
158            if self._terminating and not self._all_stopped():
159                waiting = self._clock.now() - self._terminate_start
160                if waiting > datetime.timedelta(seconds=KILL_WAIT):
161                    self.kill()
162
163    def terminate(self) -> None:
164        """
165        Terminate all processes managed by this ProcessManager.
166        """
167        if self._terminating:
168            return
169        self._terminating = True
170        self._terminate_start = self._clock.now()
171        self._killall()
172
173    def kill(self) -> None:
174        """
175        Kill all processes managed by this ProcessManager.
176        """
177        if self._killed:
178            return
179        self._killed = True
180        self._killall(force=True)
181
182    def _killall(self, force: bool = False) -> None:
183        """Kill all remaining processes, forcefully if requested."""
184        for_termination = []
185
186        for n, p in self._processes.items():
187            if "returncode" not in p:
188                for_termination.append(n)
189
190        for n in for_termination:
191            p = self._processes[n]
192            signame = "SIGKILL" if force else "SIGTERM"
193            self._system_print(
194                "sending {} to {} (pid {})\n".format(signame, n, p["pid"])
195            )
196            if force:
197                self._procmgr.kill(p["pid"])
198            else:
199                self._procmgr.terminate(p["pid"])
200
201    def _start_process(self, name: str) -> None:
202        p = self._processes[name]
203        p["process"] = multiprocessing.Process(
204            name=name, target=p["obj"].run, args=(self.events, True)
205        )
206        p["process"].start()
207
208    def _is_running(self) -> bool:
209        return any(p.get("pid") is not None for _, p in self._processes.items())
210
211    def _all_started(self) -> bool:
212        return all(p.get("pid") is not None for _, p in self._processes.items())
213
214    def _all_stopped(self) -> bool:
215        return all(p.get("returncode") is not None for _, p in self._processes.items())
216
217    def _any_stopped(self) -> bool:
218        return any(p.get("returncode") is not None for _, p in self._processes.items())
219
220    def _system_print(self, data: str) -> None:
221        self._printer.write(
222            Message(
223                type="line",
224                data=data,
225                time=self._clock.now(),
226                name=SYSTEM_PRINTER_NAME,
227                color="2",  # Dim prefix
228                stream="stdout",
229            )
230        )