1import datetime
2import multiprocessing
3import queue
4import signal
5from types import FrameType
6
7from .color import get_colors
8from .compat import ProcessManager
9from .printer import Message, Printer
10from .process import Process
11
12KILL_WAIT = 5
13SIGNALS = {
14 signal.SIGINT: {
15 "name": "SIGINT",
16 "rc": 130,
17 },
18 signal.SIGTERM: {
19 "name": "SIGTERM",
20 "rc": 143,
21 },
22}
23SYSTEM_PRINTER_NAME = "system"
24
25
26class Manager:
27 """
28 Manager is responsible for running multiple external processes in parallel
29 managing the events that result (starting, stopping, printing). By default
30 it relays printed lines to a printer that prints to STDOUT.
31
32 Example::
33
34 import sys
35 from poncho.manager import Manager
36
37 m = Manager()
38 m.add_process('server', 'ruby server.rb')
39 m.add_process('worker', 'python worker.py')
40 m.loop()
41
42 sys.exit(m.returncode)
43 """
44
45 #: After :func:`~poncho.manager.Manager.loop` finishes,
46 #: this will contain a return code that can be used with `sys.exit`.
47 returncode = None
48
49 def __init__(self, printer: Printer | None = None) -> None:
50 self.events = multiprocessing.Queue()
51 self.returncode = None
52
53 self._colors = get_colors()
54 self._clock = datetime.datetime
55 self._procmgr = ProcessManager()
56
57 self._printer = printer if printer is not None else Printer(lambda s: print(s))
58 self._printer.width = len(SYSTEM_PRINTER_NAME)
59
60 self._process_ctor = Process
61 self._processes = {}
62
63 self._terminating = False
64 self._terminate_start = datetime.datetime.min
65 self._killed = False
66
67 def add_process(
68 self,
69 name: str,
70 cmd: str,
71 quiet: bool = False,
72 env: dict[str, str] | None = None,
73 cwd: str | None = None,
74 ) -> Process:
75 """
76 Add a process to this manager instance. The process will not be started
77 until :func:`~poncho.manager.Manager.loop` is called.
78 """
79 assert name not in self._processes, "process names must be unique"
80 proc = self._process_ctor(
81 cmd, name=name, quiet=quiet, color=next(self._colors), env=env, cwd=cwd
82 )
83 self._processes[name] = {}
84 self._processes[name]["obj"] = proc
85
86 # Update printer width to accommodate this process name
87 self._printer.width = max(self._printer.width, len(name))
88
89 # If the loop is already running, we need to start the process now.
90 if self._is_running():
91 self._start_process(name)
92
93 return proc
94
95 def num_processes(self) -> int:
96 """
97 Return the number of processes managed by this instance.
98 """
99 return len(self._processes)
100
101 def loop(self) -> None:
102 """
103 Start all the added processes and multiplex their output onto the bound
104 printer (which by default will print to STDOUT).
105
106 If one process terminates, all the others will be terminated by
107 Poncho, and :func:`~poncho.manager.Manager.loop` will return.
108
109 This method will block until all the processes have terminated.
110 """
111
112 def _terminate(signum: int, frame: FrameType | None) -> None:
113 sig = signal.Signals(signum)
114 self._system_print("{} received\n".format(SIGNALS[sig]["name"]))
115 self.returncode = SIGNALS[sig]["rc"]
116 if self._terminating:
117 self._system_print("forcing immediate shutdown\n")
118 self.kill()
119 else:
120 self.terminate()
121
122 signal.signal(signal.SIGTERM, _terminate)
123 signal.signal(signal.SIGINT, _terminate)
124
125 for name in self._processes.keys():
126 self._start_process(name)
127
128 exit = False
129
130 while 1:
131 try:
132 msg = self.events.get(timeout=0.1)
133 except queue.Empty:
134 if exit:
135 break
136 else:
137 if msg.type == "line":
138 self._printer.write(msg)
139 elif msg.type == "start":
140 self._processes[msg.name]["pid"] = msg.data["pid"]
141 self._system_print(
142 "{} started (pid={})\n".format(msg.name, msg.data["pid"])
143 )
144 elif msg.type == "stop":
145 self._processes[msg.name]["returncode"] = msg.data["returncode"]
146 self._system_print(
147 "{} stopped (rc={})\n".format(msg.name, msg.data["returncode"])
148 )
149 if self.returncode is None:
150 self.returncode = msg.data["returncode"]
151
152 if self._all_started() and self._all_stopped():
153 exit = True
154
155 if not self._terminating and self._all_started() and self._any_stopped():
156 self.terminate()
157
158 if self._terminating and not self._all_stopped():
159 waiting = self._clock.now() - self._terminate_start
160 if waiting > datetime.timedelta(seconds=KILL_WAIT):
161 self.kill()
162
163 def terminate(self) -> None:
164 """
165 Terminate all processes managed by this ProcessManager.
166 """
167 if self._terminating:
168 return
169 self._terminating = True
170 self._terminate_start = self._clock.now()
171 self._killall()
172
173 def kill(self) -> None:
174 """
175 Kill all processes managed by this ProcessManager.
176 """
177 if self._killed:
178 return
179 self._killed = True
180 self._killall(force=True)
181
182 def _killall(self, force: bool = False) -> None:
183 """Kill all remaining processes, forcefully if requested."""
184 for_termination = []
185
186 for n, p in self._processes.items():
187 if "returncode" not in p:
188 for_termination.append(n)
189
190 for n in for_termination:
191 p = self._processes[n]
192 signame = "SIGKILL" if force else "SIGTERM"
193 self._system_print(
194 "sending {} to {} (pid {})\n".format(signame, n, p["pid"])
195 )
196 if force:
197 self._procmgr.kill(p["pid"])
198 else:
199 self._procmgr.terminate(p["pid"])
200
201 def _start_process(self, name: str) -> None:
202 p = self._processes[name]
203 p["process"] = multiprocessing.Process(
204 name=name, target=p["obj"].run, args=(self.events, True)
205 )
206 p["process"].start()
207
208 def _is_running(self) -> bool:
209 return any(p.get("pid") is not None for _, p in self._processes.items())
210
211 def _all_started(self) -> bool:
212 return all(p.get("pid") is not None for _, p in self._processes.items())
213
214 def _all_stopped(self) -> bool:
215 return all(p.get("returncode") is not None for _, p in self._processes.items())
216
217 def _any_stopped(self) -> bool:
218 return any(p.get("returncode") is not None for _, p in self._processes.items())
219
220 def _system_print(self, data: str) -> None:
221 self._printer.write(
222 Message(
223 type="line",
224 data=data,
225 time=self._clock.now(),
226 name=SYSTEM_PRINTER_NAME,
227 color="2", # Dim prefix
228 stream="stdout",
229 )
230 )