1import datetime
2import multiprocessing
3import queue
4import signal
5
6from .color import get_colors
7from .compat import ProcessManager
8from .printer import Message, Printer
9from .process import Process
10
11KILL_WAIT = 5
12SIGNALS = {
13 signal.SIGINT: {
14 "name": "SIGINT",
15 "rc": 130,
16 },
17 signal.SIGTERM: {
18 "name": "SIGTERM",
19 "rc": 143,
20 },
21}
22SYSTEM_PRINTER_NAME = "system"
23
24
25class Manager:
26 """
27 Manager is responsible for running multiple external processes in parallel
28 managing the events that result (starting, stopping, printing). By default
29 it relays printed lines to a printer that prints to STDOUT.
30
31 Example::
32
33 import sys
34 from poncho.manager import Manager
35
36 m = Manager()
37 m.add_process('server', 'ruby server.rb')
38 m.add_process('worker', 'python worker.py')
39 m.loop()
40
41 sys.exit(m.returncode)
42 """
43
44 #: After :func:`~poncho.manager.Manager.loop` finishes,
45 #: this will contain a return code that can be used with `sys.exit`.
46 returncode = None
47
48 def __init__(self, printer=None):
49 self.events = multiprocessing.Queue()
50 self.returncode = None
51
52 self._colors = get_colors()
53 self._clock = datetime.datetime
54 self._procmgr = ProcessManager()
55
56 self._printer = printer if printer is not None else Printer(lambda s: print(s))
57 self._printer.width = len(SYSTEM_PRINTER_NAME)
58
59 self._process_ctor = Process
60 self._processes = {}
61
62 self._terminating = False
63
64 def add_process(self, name, cmd, quiet=False, env=None, cwd=None):
65 """
66 Add a process to this manager instance. The process will not be started
67 until :func:`~poncho.manager.Manager.loop` is called.
68 """
69 assert name not in self._processes, "process names must be unique"
70 proc = self._process_ctor(
71 cmd, name=name, quiet=quiet, color=next(self._colors), env=env, cwd=cwd
72 )
73 self._processes[name] = {}
74 self._processes[name]["obj"] = proc
75
76 # Update printer width to accommodate this process name
77 self._printer.width = max(self._printer.width, len(name))
78
79 # If the loop is already running, we need to start the process now.
80 if self._is_running():
81 self._start_process(name)
82
83 return proc
84
85 def num_processes(self):
86 """
87 Return the number of processes managed by this instance.
88 """
89 return len(self._processes)
90
91 def loop(self):
92 """
93 Start all the added processes and multiplex their output onto the bound
94 printer (which by default will print to STDOUT).
95
96 If one process terminates, all the others will be terminated by
97 Poncho, and :func:`~poncho.manager.Manager.loop` will return.
98
99 This method will block until all the processes have terminated.
100 """
101
102 def _terminate(signum, frame):
103 self._system_print("{} received\n".format(SIGNALS[signum]["name"]))
104 self.returncode = SIGNALS[signum]["rc"]
105 self.terminate()
106
107 signal.signal(signal.SIGTERM, _terminate)
108 signal.signal(signal.SIGINT, _terminate)
109
110 for name in self._processes.keys():
111 self._start_process(name)
112
113 exit = False
114 exit_start = None
115
116 while 1:
117 try:
118 msg = self.events.get(timeout=0.1)
119 except queue.Empty:
120 if exit:
121 break
122 else:
123 if msg.type == "line":
124 self._printer.write(msg)
125 elif msg.type == "start":
126 self._processes[msg.name]["pid"] = msg.data["pid"]
127 self._system_print(
128 "{} started (pid={})\n".format(msg.name, msg.data["pid"])
129 )
130 elif msg.type == "stop":
131 self._processes[msg.name]["returncode"] = msg.data["returncode"]
132 self._system_print(
133 "{} stopped (rc={})\n".format(msg.name, msg.data["returncode"])
134 )
135 if self.returncode is None:
136 self.returncode = msg.data["returncode"]
137
138 if self._all_started() and self._all_stopped():
139 exit = True
140
141 if exit_start is None and self._all_started() and self._any_stopped():
142 exit_start = self._clock.now()
143 self.terminate()
144
145 if exit_start is not None:
146 # If we've been in this loop for more than KILL_WAIT seconds,
147 # it's time to kill all remaining children.
148 waiting = self._clock.now() - exit_start
149 if waiting > datetime.timedelta(seconds=KILL_WAIT):
150 self.kill()
151
152 def terminate(self):
153 """
154 Terminate all processes managed by this ProcessManager.
155 """
156 if self._terminating:
157 return
158 self._terminating = True
159 self._killall()
160
161 def kill(self):
162 """
163 Kill all processes managed by this ProcessManager.
164 """
165 self._killall(force=True)
166
167 def _killall(self, force=False):
168 """Kill all remaining processes, forcefully if requested."""
169 for_termination = []
170
171 for n, p in self._processes.items():
172 if "returncode" not in p:
173 for_termination.append(n)
174
175 for n in for_termination:
176 p = self._processes[n]
177 signame = "SIGKILL" if force else "SIGTERM"
178 self._system_print(
179 "sending {} to {} (pid {})\n".format(signame, n, p["pid"])
180 )
181 if force:
182 self._procmgr.kill(p["pid"])
183 else:
184 self._procmgr.terminate(p["pid"])
185
186 def _start_process(self, name):
187 p = self._processes[name]
188 p["process"] = multiprocessing.Process(
189 name=name, target=p["obj"].run, args=(self.events, True)
190 )
191 p["process"].start()
192
193 def _is_running(self):
194 return any(p.get("pid") is not None for _, p in self._processes.items())
195
196 def _all_started(self):
197 return all(p.get("pid") is not None for _, p in self._processes.items())
198
199 def _all_stopped(self):
200 return all(p.get("returncode") is not None for _, p in self._processes.items())
201
202 def _any_stopped(self):
203 return any(p.get("returncode") is not None for _, p in self._processes.items())
204
205 def _system_print(self, data):
206 self._printer.write(
207 Message(
208 type="line",
209 data=data,
210 time=self._clock.now(),
211 name=SYSTEM_PRINTER_NAME,
212 color=None,
213 )
214 )