Plain is headed towards 1.0! Subscribe for development updates →

  1import datetime
  2import os
  3import signal
  4import subprocess
  5import threading
  6from queue import Queue
  7from typing import Any
  8
  9from .compat import ON_WINDOWS
 10from .printer import Message
 11
 12
 13class Process:
 14    """
 15    A simple utility wrapper around a subprocess.Popen that stores
 16    a number of attributes needed by Poncho and supports forwarding process
 17    lifecycle events and output to a queue.
 18    """
 19
 20    def __init__(
 21        self,
 22        cmd: str,
 23        name: str | None = None,
 24        color: str | None = None,
 25        quiet: bool = False,
 26        env: dict[str, str] | None = None,
 27        cwd: str | None = None,
 28    ) -> None:
 29        self.cmd = cmd
 30        self.color = color
 31        self.quiet = quiet
 32        self.name = name
 33        self.env = os.environ.copy() if env is None else env
 34        self.cwd = cwd
 35
 36        self._clock = datetime.datetime
 37        self._child = None
 38        self._child_ctor = Popen
 39
 40    def run(
 41        self, events: Queue[Message] | None = None, ignore_signals: bool = False
 42    ) -> None:
 43        self._events = events
 44        self._child = self._child_ctor(self.cmd, env=self.env, cwd=self.cwd)
 45        self._send_message({"pid": self._child.pid}, type="start")
 46
 47        # Don't pay attention to SIGINT/SIGTERM. The process itself is
 48        # considered unkillable, and will only exit when its child (the shell
 49        # running the Procfile process) exits.
 50        if ignore_signals:
 51            signal.signal(signal.SIGINT, signal.SIG_IGN)
 52            signal.signal(signal.SIGTERM, signal.SIG_IGN)
 53
 54        # Read stdout and stderr concurrently using threads
 55        stdout_thread = threading.Thread(
 56            target=self._read_stream, args=(self._child.stdout, "stdout")
 57        )
 58        stderr_thread = threading.Thread(
 59            target=self._read_stream, args=(self._child.stderr, "stderr")
 60        )
 61
 62        stdout_thread.start()
 63        stderr_thread.start()
 64
 65        # Wait for both threads to complete
 66        stdout_thread.join()
 67        stderr_thread.join()
 68
 69        self._child.wait()
 70
 71        self._send_message({"returncode": self._child.returncode}, type="stop")
 72
 73    def _read_stream(self, stream: Any, stream_name: str) -> None:
 74        """Read lines from a stream and send them as messages."""
 75        for line in iter(stream.readline, b""):
 76            if not self.quiet:
 77                self._send_message(line, stream=stream_name)
 78        stream.close()
 79
 80    def _send_message(
 81        self, data: bytes | dict[str, Any], type: str = "line", stream: str = "stdout"
 82    ) -> None:
 83        if self._events is not None:
 84            self._events.put(
 85                Message(
 86                    type=type,
 87                    data=data,
 88                    time=self._clock.now(),
 89                    name=self.name,
 90                    color=self.color,
 91                    stream=stream,
 92                )
 93            )
 94
 95
 96class Popen(subprocess.Popen):
 97    def __init__(self, cmd: str, **kwargs: Any) -> None:
 98        start_new_session = kwargs.pop("start_new_session", True)
 99        options = {
100            "stdout": subprocess.PIPE,
101            "stderr": subprocess.PIPE,
102            "shell": True,
103            "close_fds": not ON_WINDOWS,
104        }
105        options.update(**kwargs)
106
107        if ON_WINDOWS:
108            # MSDN reference:
109            #   http://msdn.microsoft.com/en-us/library/windows/desktop/ms684863%28v=vs.85%29.aspx
110            create_no_window = 0x08000000
111            options.update(creationflags=create_no_window)
112        elif start_new_session:
113            options.update(start_new_session=True)
114
115        super().__init__(cmd, **options)  # type: ignore[no-matching-overload]