1"""Bash-compatible `.env` file parsing and Plain dev/test dotenv loading.
2
3`plain.dev` owns all dotenv code so that production deployments (which don't
4install plain.dev) never load `.env` files. plain.pytest opportunistically
5imports `load_dotenv_files` — if plain.dev is installed, `.env.test*` loads
6under pytest; if not, the plugin skips dotenv loading entirely.
7
8Parser supports:
9- KEY=value (basic unquoted)
10- KEY="double quoted value" (with escape handling and multiline)
11- KEY='single quoted value' (literal, including multiline)
12- export KEY=value (strips export prefix)
13- Comments (# comment and inline KEY=value # comment)
14- Variable expansion: $VAR and ${VAR} (in unquoted and double-quoted values)
15- Command substitution: $(command)
16"""
17
18from __future__ import annotations
19
20import os
21import re
22import subprocess
23from collections.abc import Callable
24from pathlib import Path
25
26import click
27
28__all__ = ["load_dotenv", "load_dotenv_files", "parse_dotenv"]
29
30# Match ${VAR} or $VAR (VAR must start with letter/underscore, then alphanumeric/underscore)
31_VAR_BRACE_RE = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)\}")
32_VAR_BARE_RE = re.compile(r"\$([A-Za-z_][A-Za-z0-9_]*)")
33# Placeholder for escaped $ (to prevent expansion)
34_ESCAPED_DOLLAR = "\x00DOLLAR\x00"
35
36_PLAIN_ENV_RE = re.compile(r"^[a-zA-Z][a-zA-Z0-9_-]*$")
37_files_loaded = False
38
39
40def load_dotenv_files() -> None:
41 """Load `.env` files using Next.js / Vite-style precedence.
42
43 Files loaded (highest precedence first — `load_dotenv()` doesn't override
44 existing keys, so the first file to bind a key wins):
45
46 1. `.env.{PLAIN_ENV}.local` — gitignored, env-specific personal
47 2. `.env.local` — gitignored, personal; SKIPPED in test
48 3. `.env.{PLAIN_ENV}` — gitignored or committed, env-specific
49 4. `.env` — committed baseline
50
51 `PLAIN_ENV` is set by the CLI dispatcher (`plain.cli.core`) based on the
52 active command — `plain dev` → `dev`, `plain test` → `test`. Export
53 `PLAIN_ENV` yourself to override.
54
55 Idempotent within a process — repeat calls are a no-op.
56 """
57 global _files_loaded
58 if _files_loaded:
59 return
60 _files_loaded = True
61
62 plain_env = os.environ.get("PLAIN_ENV", "")
63 if plain_env and not _PLAIN_ENV_RE.fullmatch(plain_env):
64 raise ValueError(
65 f"PLAIN_ENV must match {_PLAIN_ENV_RE.pattern}, got {plain_env!r}"
66 )
67
68 def _load(path: str) -> None:
69 if load_dotenv(path):
70 click.secho(f"Loading {path}...", dim=True, italic=True, err=True)
71
72 if plain_env:
73 _load(f".env.{plain_env}.local")
74 if plain_env != "test":
75 # Skipped under test (Next.js / Rails dotenv convention) so CI runs
76 # stay deterministic and personal creds don't leak into the suite.
77 _load(".env.local")
78 if plain_env:
79 _load(f".env.{plain_env}")
80 _load(".env")
81
82
83def load_dotenv(
84 filepath: str | Path,
85 *,
86 override: bool = False,
87) -> bool:
88 """
89 Load environment variables from a .env file into os.environ.
90
91 Args:
92 filepath: Path to the .env file
93 override: If True, overwrite existing environment variables
94
95 Returns:
96 True if the file was loaded, False if it doesn't exist
97 """
98 path = Path(filepath)
99 if not path.exists():
100 return False
101
102 content = path.read_text(encoding="utf-8")
103
104 # Skip command execution for keys that already exist (unless override)
105 skip_commands_for = None if override else set(os.environ.keys())
106
107 def on_bind(key: str, value: str) -> None:
108 if override or key not in os.environ:
109 os.environ[key] = value
110
111 _parse_content(content, skip_commands_for=skip_commands_for, on_bind=on_bind)
112 return True
113
114
115def parse_dotenv(filepath: str | Path) -> dict[str, str]:
116 """
117 Parse a .env file and return a dictionary of key-value pairs.
118
119 Does not modify os.environ. Supports multiline values in quoted strings.
120 """
121 content = Path(filepath).read_text(encoding="utf-8")
122 return _parse_content(content)
123
124
125def _parse_content(
126 content: str,
127 skip_commands_for: set[str] | None = None,
128 on_bind: Callable[[str, str], None] | None = None,
129) -> dict[str, str]:
130 """Parse .env file content and return key-value pairs."""
131 result: dict[str, str] = {}
132 pos = 0
133 length = len(content)
134
135 while pos < length:
136 # Skip whitespace and empty lines
137 while pos < length and content[pos] in " \t\r\n":
138 pos += 1
139
140 if pos >= length:
141 break
142
143 # Skip comment lines
144 if content[pos] == "#":
145 pos = _skip_to_eol(content, pos)
146 continue
147
148 # Try to parse a binding
149 parsed = _parse_binding(content, pos, result, skip_commands_for)
150 if parsed:
151 key, value, new_pos = parsed
152 result[key] = value
153 if on_bind:
154 on_bind(key, value)
155 pos = new_pos
156 else:
157 # Skip to next line on parse failure
158 pos = _skip_to_eol(content, pos)
159
160 return result
161
162
163def _skip_to_eol(content: str, pos: int) -> int:
164 """Skip to end of line, return position after newline."""
165 while pos < len(content) and content[pos] not in "\r\n":
166 pos += 1
167 if pos < len(content) and content[pos] == "\r":
168 pos += 1
169 if pos < len(content) and content[pos] == "\n":
170 pos += 1
171 return pos
172
173
174def _parse_binding(
175 content: str,
176 pos: int,
177 context: dict[str, str],
178 skip_commands_for: set[str] | None = None,
179) -> tuple[str, str, int] | None:
180 """Parse a KEY=value binding, return (key, value, new_pos) or None."""
181 length = len(content)
182
183 # Skip optional 'export ' prefix
184 if content[pos : pos + 7] == "export ":
185 pos += 7
186 while pos < length and content[pos] in " \t":
187 pos += 1
188
189 # Parse key
190 key_start = pos
191 while pos < length and (content[pos].isalnum() or content[pos] == "_"):
192 pos += 1
193
194 if pos == key_start:
195 return None
196
197 key = content[key_start:pos]
198
199 # Must start with letter or underscore
200 if not (key[0].isalpha() or key[0] == "_"):
201 return None
202
203 # Skip whitespace before =
204 while pos < length and content[pos] in " \t":
205 pos += 1
206
207 # Expect =
208 if pos >= length or content[pos] != "=":
209 return None
210 pos += 1
211
212 # Skip whitespace after =
213 while pos < length and content[pos] in " \t":
214 pos += 1
215
216 # If key already exists in env and we should skip commands, use existing value
217 if skip_commands_for and key in skip_commands_for:
218 # Skip to end of line without executing commands
219 new_pos = _skip_to_eol(content, pos)
220 return key, os.environ[key], new_pos
221
222 # Parse value (with command expansion)
223 value, pos = _parse_value(content, pos, context)
224
225 return key, value, pos
226
227
228def _parse_value(content: str, pos: int, context: dict[str, str]) -> tuple[str, int]:
229 """Parse a value starting at pos, return (value, new_pos)."""
230 if pos >= len(content) or content[pos] in "\r\n":
231 return "", pos
232
233 char = content[pos]
234
235 # Single-quoted: literal value (no escape, no expansion), supports multiline
236 if char == "'":
237 return _parse_single_quoted(content, pos)
238
239 # Double-quoted: process escapes, variable expansion, and commands, supports multiline
240 if char == '"':
241 value, pos = _parse_double_quoted(content, pos)
242 value = _expand_variables(value, context)
243 value = _expand_commands(value)
244 value = value.replace(_ESCAPED_DOLLAR, "$") # Restore escaped $
245 return value, pos
246
247 # Unquoted value: variable expansion and command substitution
248 return _parse_unquoted(content, pos, context)
249
250
251def _parse_single_quoted(content: str, pos: int) -> tuple[str, int]:
252 """Parse single-quoted value (literal, multiline supported)."""
253 pos += 1 # Skip opening quote
254 start = pos
255 length = len(content)
256
257 while pos < length:
258 if content[pos] == "'":
259 value = content[start:pos]
260 return value, pos + 1
261 pos += 1
262
263 # No closing quote found, return what we have
264 return content[start:], pos
265
266
267def _parse_double_quoted(content: str, pos: int) -> tuple[str, int]:
268 """Parse double-quoted value (with escapes, multiline supported)."""
269 pos += 1 # Skip opening quote
270 result = []
271 length = len(content)
272
273 while pos < length:
274 char = content[pos]
275
276 if char == "\\" and pos + 1 < length:
277 next_char = content[pos + 1]
278 if next_char == "n":
279 result.append("\n")
280 elif next_char == "t":
281 result.append("\t")
282 elif next_char == "r":
283 result.append("\r")
284 elif next_char == '"':
285 result.append('"')
286 elif next_char == "\\":
287 result.append("\\")
288 elif next_char == "$":
289 result.append(_ESCAPED_DOLLAR) # Placeholder to prevent expansion
290 else:
291 # Unknown escape, keep both characters
292 result.append(char)
293 result.append(next_char)
294 pos += 2
295 elif char == '"':
296 return "".join(result), pos + 1
297 else:
298 result.append(char)
299 pos += 1
300
301 # No closing quote found, return what we have
302 return "".join(result), pos
303
304
305def _parse_unquoted(content: str, pos: int, context: dict[str, str]) -> tuple[str, int]:
306 """Parse unquoted value (until comment or end of line)."""
307 result = []
308 length = len(content)
309
310 while pos < length and content[pos] not in "\r\n":
311 char = content[pos]
312
313 # Stop at inline comment (whitespace followed by #)
314 if char == "#" and result and result[-1] in " \t":
315 # Remove trailing whitespace
316 while result and result[-1] in " \t":
317 result.pop()
318 break
319
320 # Handle backslash escapes (like bash)
321 if char == "\\" and pos + 1 < length:
322 next_char = content[pos + 1]
323 if next_char == "$":
324 result.append(_ESCAPED_DOLLAR) # Placeholder to prevent expansion
325 pos += 2
326 continue
327 elif next_char == "\\":
328 result.append("\\")
329 pos += 2
330 continue
331 # Other backslashes kept as-is
332
333 result.append(char)
334 pos += 1
335
336 value = "".join(result).rstrip()
337
338 # Expand variables, then commands
339 value = _expand_variables(value, context)
340 value = _expand_commands(value)
341 value = value.replace(_ESCAPED_DOLLAR, "$") # Restore escaped $
342 return value, pos
343
344
345def _expand_variables(value: str, context: dict[str, str]) -> str:
346 """Expand $VAR and ${VAR} references in value.
347
348 Looks up variables in context (previously parsed .env vars) first,
349 then falls back to os.environ. Unknown variables expand to empty string.
350 """
351
352 def replacer(match: re.Match[str]) -> str:
353 var_name = match.group(1)
354 # Check context first (vars defined earlier in .env), then os.environ
355 if var_name in context:
356 return context[var_name]
357 return os.environ.get(var_name, "")
358
359 # Expand ${VAR} first (more specific), then $VAR
360 value = _VAR_BRACE_RE.sub(replacer, value)
361 value = _VAR_BARE_RE.sub(replacer, value)
362 return value
363
364
365def _expand_commands(value: str) -> str:
366 """Expand all $(command) substitutions in value.
367
368 Handles nested parentheses within commands, e.g., $(echo "(test)").
369 """
370 result = []
371 i = 0
372 length = len(value)
373
374 while i < length:
375 # Look for $(
376 if i + 1 < length and value[i] == "$" and value[i + 1] == "(":
377 # Find matching closing paren, accounting for nesting
378 cmd_start = i + 2
379 depth = 1
380 j = cmd_start
381
382 while j < length and depth > 0:
383 if value[j] == "(":
384 depth += 1
385 elif value[j] == ")":
386 depth -= 1
387 j += 1
388
389 if depth == 0:
390 # Found matching ), extract and execute command
391 command = value[cmd_start : j - 1]
392 output = _execute_command(command)
393 result.append(output)
394 i = j
395 else:
396 # No matching ), keep literal
397 result.append(value[i])
398 i += 1
399 else:
400 result.append(value[i])
401 i += 1
402
403 return "".join(result)
404
405
406def _execute_command(command: str, timeout: float = 5.0) -> str:
407 """Execute a shell command and return stdout."""
408 try:
409 result = subprocess.run(
410 command,
411 shell=True,
412 stdout=subprocess.PIPE,
413 text=True,
414 timeout=timeout,
415 )
416 return result.stdout.strip() if result.returncode == 0 else ""
417 except (subprocess.TimeoutExpired, OSError):
418 return ""