Plain is headed towards 1.0! Subscribe for development updates →

  1import datetime
  2import multiprocessing
  3import queue
  4import signal
  5
  6from .color import get_colors
  7from .compat import ProcessManager
  8from .printer import Message, Printer
  9from .process import Process
 10
 11KILL_WAIT = 5
 12SIGNALS = {
 13    signal.SIGINT: {
 14        "name": "SIGINT",
 15        "rc": 130,
 16    },
 17    signal.SIGTERM: {
 18        "name": "SIGTERM",
 19        "rc": 143,
 20    },
 21}
 22SYSTEM_PRINTER_NAME = "system"
 23
 24
 25class Manager:
 26    """
 27    Manager is responsible for running multiple external processes in parallel
 28    managing the events that result (starting, stopping, printing). By default
 29    it relays printed lines to a printer that prints to STDOUT.
 30
 31    Example::
 32
 33        import sys
 34        from poncho.manager import Manager
 35
 36        m = Manager()
 37        m.add_process('server', 'ruby server.rb')
 38        m.add_process('worker', 'python worker.py')
 39        m.loop()
 40
 41        sys.exit(m.returncode)
 42    """
 43
 44    #: After :func:`~poncho.manager.Manager.loop` finishes,
 45    #: this will contain a return code that can be used with `sys.exit`.
 46    returncode = None
 47
 48    def __init__(self, printer=None):
 49        self.events = multiprocessing.Queue()
 50        self.returncode = None
 51
 52        self._colors = get_colors()
 53        self._clock = datetime.datetime
 54        self._procmgr = ProcessManager()
 55
 56        self._printer = printer if printer is not None else Printer(lambda s: print(s))
 57        self._printer.width = len(SYSTEM_PRINTER_NAME)
 58
 59        self._process_ctor = Process
 60        self._processes = {}
 61
 62        self._terminating = False
 63
 64    def add_process(self, name, cmd, quiet=False, env=None, cwd=None):
 65        """
 66        Add a process to this manager instance. The process will not be started
 67        until :func:`~poncho.manager.Manager.loop` is called.
 68        """
 69        assert name not in self._processes, "process names must be unique"
 70        proc = self._process_ctor(
 71            cmd, name=name, quiet=quiet, color=next(self._colors), env=env, cwd=cwd
 72        )
 73        self._processes[name] = {}
 74        self._processes[name]["obj"] = proc
 75
 76        # Update printer width to accommodate this process name
 77        self._printer.width = max(self._printer.width, len(name))
 78
 79        # If the loop is already running, we need to start the process now.
 80        if self._is_running():
 81            self._start_process(name)
 82
 83        return proc
 84
 85    def num_processes(self):
 86        """
 87        Return the number of processes managed by this instance.
 88        """
 89        return len(self._processes)
 90
 91    def loop(self):
 92        """
 93        Start all the added processes and multiplex their output onto the bound
 94        printer (which by default will print to STDOUT).
 95
 96        If one process terminates, all the others will be terminated by
 97        Poncho, and :func:`~poncho.manager.Manager.loop` will return.
 98
 99        This method will block until all the processes have terminated.
100        """
101
102        def _terminate(signum, frame):
103            self._system_print("{} received\n".format(SIGNALS[signum]["name"]))
104            self.returncode = SIGNALS[signum]["rc"]
105            self.terminate()
106
107        signal.signal(signal.SIGTERM, _terminate)
108        signal.signal(signal.SIGINT, _terminate)
109
110        for name in self._processes.keys():
111            self._start_process(name)
112
113        exit = False
114        exit_start = None
115
116        while 1:
117            try:
118                msg = self.events.get(timeout=0.1)
119            except queue.Empty:
120                if exit:
121                    break
122            else:
123                if msg.type == "line":
124                    self._printer.write(msg)
125                elif msg.type == "start":
126                    self._processes[msg.name]["pid"] = msg.data["pid"]
127                    self._system_print(
128                        "{} started (pid={})\n".format(msg.name, msg.data["pid"])
129                    )
130                elif msg.type == "stop":
131                    self._processes[msg.name]["returncode"] = msg.data["returncode"]
132                    self._system_print(
133                        "{} stopped (rc={})\n".format(msg.name, msg.data["returncode"])
134                    )
135                    if self.returncode is None:
136                        self.returncode = msg.data["returncode"]
137
138            if self._all_started() and self._all_stopped():
139                exit = True
140
141            if exit_start is None and self._all_started() and self._any_stopped():
142                exit_start = self._clock.now()
143                self.terminate()
144
145            if exit_start is not None:
146                # If we've been in this loop for more than KILL_WAIT seconds,
147                # it's time to kill all remaining children.
148                waiting = self._clock.now() - exit_start
149                if waiting > datetime.timedelta(seconds=KILL_WAIT):
150                    self.kill()
151
152    def terminate(self):
153        """
154        Terminate all processes managed by this ProcessManager.
155        """
156        if self._terminating:
157            return
158        self._terminating = True
159        self._killall()
160
161    def kill(self):
162        """
163        Kill all processes managed by this ProcessManager.
164        """
165        self._killall(force=True)
166
167    def _killall(self, force=False):
168        """Kill all remaining processes, forcefully if requested."""
169        for_termination = []
170
171        for n, p in self._processes.items():
172            if "returncode" not in p:
173                for_termination.append(n)
174
175        for n in for_termination:
176            p = self._processes[n]
177            signame = "SIGKILL" if force else "SIGTERM"
178            self._system_print(
179                "sending {} to {} (pid {})\n".format(signame, n, p["pid"])
180            )
181            if force:
182                self._procmgr.kill(p["pid"])
183            else:
184                self._procmgr.terminate(p["pid"])
185
186    def _start_process(self, name):
187        p = self._processes[name]
188        p["process"] = multiprocessing.Process(
189            name=name, target=p["obj"].run, args=(self.events, True)
190        )
191        p["process"].start()
192
193    def _is_running(self):
194        return any(p.get("pid") is not None for _, p in self._processes.items())
195
196    def _all_started(self):
197        return all(p.get("pid") is not None for _, p in self._processes.items())
198
199    def _all_stopped(self):
200        return all(p.get("returncode") is not None for _, p in self._processes.items())
201
202    def _any_stopped(self):
203        return any(p.get("returncode") is not None for _, p in self._processes.items())
204
205    def _system_print(self, data):
206        self._printer.write(
207            Message(
208                type="line",
209                data=data,
210                time=self._clock.now(),
211                name=SYSTEM_PRINTER_NAME,
212                color=None,
213            )
214        )