1import datetime
2import os
3import signal
4import subprocess
5import threading
6from queue import Queue
7from typing import Any
8
9from .compat import ON_WINDOWS
10from .printer import Message
11
12
13class Process:
14 """
15 A simple utility wrapper around a subprocess.Popen that stores
16 a number of attributes needed by Poncho and supports forwarding process
17 lifecycle events and output to a queue.
18 """
19
20 def __init__(
21 self,
22 cmd: str,
23 name: str | None = None,
24 color: str | None = None,
25 quiet: bool = False,
26 env: dict[str, str] | None = None,
27 cwd: str | None = None,
28 ) -> None:
29 self.cmd = cmd
30 self.color = color
31 self.quiet = quiet
32 self.name = name
33 self.env = os.environ.copy() if env is None else env
34 self.cwd = cwd
35
36 self._clock = datetime.datetime
37 self._child = None
38 self._child_ctor = Popen
39
40 def run(
41 self, events: Queue[Message] | None = None, ignore_signals: bool = False
42 ) -> None:
43 self._events = events
44 self._child = self._child_ctor(self.cmd, env=self.env, cwd=self.cwd)
45 self._send_message({"pid": self._child.pid}, type="start")
46
47 # Don't pay attention to SIGINT/SIGTERM. The process itself is
48 # considered unkillable, and will only exit when its child (the shell
49 # running the Procfile process) exits.
50 if ignore_signals:
51 signal.signal(signal.SIGINT, signal.SIG_IGN)
52 signal.signal(signal.SIGTERM, signal.SIG_IGN)
53
54 # Read stdout and stderr concurrently using threads
55 stdout_thread = threading.Thread(
56 target=self._read_stream, args=(self._child.stdout, "stdout")
57 )
58 stderr_thread = threading.Thread(
59 target=self._read_stream, args=(self._child.stderr, "stderr")
60 )
61
62 stdout_thread.start()
63 stderr_thread.start()
64
65 # Wait for both threads to complete
66 stdout_thread.join()
67 stderr_thread.join()
68
69 self._child.wait()
70
71 self._send_message({"returncode": self._child.returncode}, type="stop")
72
73 def _read_stream(self, stream: Any, stream_name: str) -> None:
74 """Read lines from a stream and send them as messages."""
75 for line in iter(stream.readline, b""):
76 if not self.quiet:
77 self._send_message(line, stream=stream_name)
78 stream.close()
79
80 def _send_message(
81 self, data: bytes | dict[str, Any], type: str = "line", stream: str = "stdout"
82 ) -> None:
83 if self._events is not None:
84 self._events.put(
85 Message(
86 type=type,
87 data=data,
88 time=self._clock.now(),
89 name=self.name,
90 color=self.color,
91 stream=stream,
92 )
93 )
94
95
96class Popen(subprocess.Popen):
97 def __init__(self, cmd: str, **kwargs: Any) -> None:
98 start_new_session = kwargs.pop("start_new_session", True)
99 options = {
100 "stdout": subprocess.PIPE,
101 "stderr": subprocess.PIPE,
102 "shell": True,
103 "close_fds": not ON_WINDOWS,
104 }
105 options.update(**kwargs)
106
107 if ON_WINDOWS:
108 # MSDN reference:
109 # http://msdn.microsoft.com/en-us/library/windows/desktop/ms684863%28v=vs.85%29.aspx
110 create_no_window = 0x08000000
111 options.update(creationflags=create_no_window)
112 elif start_new_session:
113 options.update(start_new_session=True)
114
115 super().__init__(cmd, **options) # type: ignore[no-matching-overload]