1import fcntl
2import os
3import signal
4import subprocess
5import sys
6import time
7from pathlib import Path
8from typing import Any
9
10from .poncho.manager import Manager as PonchoManager
11from .poncho.printer import Printer
12
13
14def _pid_is_alive(pid: int) -> bool:
15 """Return True if a process with *pid* currently exists."""
16 try:
17 os.kill(pid, 0) # Signal 0 checks for existence – it does not kill.
18 except OSError:
19 return False
20 return True
21
22
23class Supervisor:
24 """A single-instance, long-running dev process group.
25
26 Only one supervisor may run per project. That's enforced by holding an
27 exclusive advisory lock (``flock``) on the pidfile for the *entire* life of
28 the process: a second supervisor simply fails to take the lock and bows out,
29 and the kernel releases the lock when the holding process exits — so a crash
30 can't leave a stale lock behind. The pid is written into the file too, but
31 only so other commands can identify and signal the running supervisor — it
32 is not what guards against duplicates.
33 """
34
35 pidfile: Path
36 log_dir: Path
37 # Foreground command that re-runs this supervisor, e.g. ["dev", "services"].
38 background_command: list[str]
39 # Human label for "already running" warnings, e.g. "Services".
40 display_name: str
41
42 def __init__(self):
43 self.pid = os.getpid()
44 self._lock_fd: int | None = None
45 self.log_path: Path | None = None
46 self.printer: Printer | None = None
47 self.poncho: PonchoManager | None = None
48
49 # ------------------------------------------------------------------
50 # Reads (pure, lock-free – cheap enough for the per-command hot path)
51 # ------------------------------------------------------------------
52 @classmethod
53 def read_pidfile(cls) -> int | None:
54 """Return the PID recorded in *cls.pidfile* (or ``None``)."""
55 try:
56 return int(cls.pidfile.read_text())
57 except (ValueError, OSError):
58 # Missing, empty (released), or partial – treat as absent.
59 return None
60
61 @classmethod
62 def running_pid(cls) -> int | None:
63 """Return a *running* supervisor PID, or ``None`` if none is alive."""
64 pid = cls.read_pidfile()
65 if pid is None or not _pid_is_alive(pid):
66 return None
67 return pid
68
69 @classmethod
70 def already_running_message(cls, pid: int | None) -> str:
71 """The single source of truth for the 'slot is taken' warning."""
72 return f"{cls.display_name} already running (pid={pid})"
73
74 # ------------------------------------------------------------------
75 # Single-instance ownership (the lock is held for our whole lifetime)
76 # ------------------------------------------------------------------
77 def acquire(self) -> bool:
78 """Claim sole ownership for this process's lifetime.
79
80 Returns ``True`` if we now hold it, ``False`` if another live supervisor
81 already does (in which case the caller must not start – a second one
82 would collide on shared services like the database). A pidfile left by a
83 dead supervisor is reclaimed automatically: its lock is already gone.
84 """
85 self.pidfile.parent.mkdir(parents=True, exist_ok=True)
86 fd = os.open(self.pidfile, os.O_CREAT | os.O_RDWR, 0o644)
87 try:
88 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
89 except OSError:
90 os.close(fd)
91 return False
92
93 os.ftruncate(fd, 0)
94 os.write(fd, str(self.pid).encode())
95 self._lock_fd = fd # Held (open) until release() / process exit.
96 return True
97
98 def release(self) -> None:
99 """Release ownership: clear the recorded pid and drop the lock."""
100 if self._lock_fd is None:
101 return
102 os.ftruncate(self._lock_fd, 0)
103 os.close(self._lock_fd) # Closing the fd releases the flock.
104 self._lock_fd = None
105
106 @classmethod
107 def spawn_background(cls, *extra_args: str) -> int:
108 """Start this supervisor detached in the background; return its pid."""
109 proc = subprocess.Popen(
110 [sys.executable, "-m", "plain", *cls.background_command, *extra_args],
111 start_new_session=True,
112 stdout=subprocess.DEVNULL,
113 stderr=subprocess.DEVNULL,
114 )
115 return proc.pid
116
117 @classmethod
118 def _has_live_owner(cls) -> bool:
119 """True if a live supervisor currently holds the lock.
120
121 A read-only probe: it does not write to or truncate the pidfile, so it
122 can't be misread by a concurrent command (unlike claiming the lock).
123 """
124 try:
125 fd = os.open(cls.pidfile, os.O_RDONLY)
126 except OSError:
127 return False
128 try:
129 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
130 except OSError:
131 return True # Someone live holds it.
132 else:
133 fcntl.flock(fd, fcntl.LOCK_UN)
134 return False
135 finally:
136 os.close(fd)
137
138 def stop_process(self) -> None:
139 """Terminate the running supervisor, if one actually holds the lock."""
140 pid = self.read_pidfile()
141 if pid is None:
142 return
143
144 # If nobody holds the lock, no supervisor is alive — the recorded pid is
145 # stale (and could even be a reused, unrelated pid), so don't signal it.
146 if not self._has_live_owner():
147 return
148
149 try:
150 os.kill(pid, signal.SIGTERM)
151 except OSError:
152 return # Already gone.
153
154 deadline = time.time() + 10
155 while time.time() < deadline:
156 if not _pid_is_alive(pid):
157 return # Exited gracefully; it cleared its own pidfile.
158 time.sleep(0.1)
159
160 try:
161 os.kill(pid, signal.SIGKILL)
162 except OSError:
163 pass
164
165 # ------------------------------------------------------------------
166 # Logging / Poncho helpers
167 # ------------------------------------------------------------------
168 def prepare_log(self) -> Path:
169 """Create the log directory and return a path for *this* run."""
170 self.log_dir.mkdir(parents=True, exist_ok=True)
171
172 # Keep the 5 most recent log files.
173 logs = sorted(
174 self.log_dir.glob("*.log"),
175 key=lambda p: p.stat().st_mtime,
176 reverse=True,
177 )
178 for old in logs[5:]:
179 old.unlink(missing_ok=True)
180
181 self.log_path = self.log_dir / f"{self.pid}.log"
182 return self.log_path
183
184 def init_poncho(self, print_func: Any) -> PonchoManager: # noqa: D401
185 """Return a :class:`~plain.dev.poncho.manager.Manager` instance."""
186 if self.log_path is None:
187 self.prepare_log()
188
189 self.printer = Printer(print_func, log_file=self.log_path)
190 self.poncho = PonchoManager(printer=self.printer)
191 return self.poncho
192
193 def close(self) -> None:
194 if self.printer:
195 self.printer.close()