v0.148.0
  1"""Bash-compatible `.env` file parsing and Plain dev/test dotenv loading.
  2
  3`plain.dev` owns all dotenv code so that production deployments (which don't
  4install plain.dev) never load `.env` files. plain.pytest opportunistically
  5imports `load_dotenv_files` — if plain.dev is installed, `.env.test*` loads
  6under pytest; if not, the plugin skips dotenv loading entirely.
  7
  8Parser supports:
  9- KEY=value (basic unquoted)
 10- KEY="double quoted value" (with escape handling and multiline)
 11- KEY='single quoted value' (literal, including multiline)
 12- export KEY=value (strips export prefix)
 13- Comments (# comment and inline KEY=value # comment)
 14- Variable expansion: $VAR and ${VAR} (in unquoted and double-quoted values)
 15- Command substitution: $(command)
 16"""
 17
 18from __future__ import annotations
 19
 20import os
 21import re
 22import subprocess
 23from collections.abc import Callable
 24from pathlib import Path
 25
 26import click
 27
 28__all__ = ["load_dotenv", "load_dotenv_files", "parse_dotenv"]
 29
 30# Match ${VAR} or $VAR (VAR must start with letter/underscore, then alphanumeric/underscore)
 31_VAR_BRACE_RE = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)\}")
 32_VAR_BARE_RE = re.compile(r"\$([A-Za-z_][A-Za-z0-9_]*)")
 33# Placeholder for escaped $ (to prevent expansion)
 34_ESCAPED_DOLLAR = "\x00DOLLAR\x00"
 35
 36_PLAIN_ENV_RE = re.compile(r"^[a-zA-Z][a-zA-Z0-9_-]*$")
 37_files_loaded = False
 38
 39
 40def load_dotenv_files() -> None:
 41    """Load `.env` files using Next.js / Vite-style precedence.
 42
 43    Files loaded (highest precedence first — `load_dotenv()` doesn't override
 44    existing keys, so the first file to bind a key wins):
 45
 46      1. `.env.{PLAIN_ENV}.local`  — gitignored, env-specific personal
 47      2. `.env.local`              — gitignored, personal; SKIPPED in test
 48      3. `.env.{PLAIN_ENV}`        — gitignored or committed, env-specific
 49      4. `.env`                    — committed baseline
 50
 51    `PLAIN_ENV` is set by the CLI dispatcher (`plain.cli.core`) based on the
 52    active command — `plain dev` → `dev`, `plain test` → `test`. Export
 53    `PLAIN_ENV` yourself to override.
 54
 55    Idempotent within a process — repeat calls are a no-op.
 56    """
 57    global _files_loaded
 58    if _files_loaded:
 59        return
 60    _files_loaded = True
 61
 62    plain_env = os.environ.get("PLAIN_ENV", "")
 63    if plain_env and not _PLAIN_ENV_RE.fullmatch(plain_env):
 64        raise ValueError(
 65            f"PLAIN_ENV must match {_PLAIN_ENV_RE.pattern}, got {plain_env!r}"
 66        )
 67
 68    def _load(path: str) -> None:
 69        if load_dotenv(path):
 70            click.secho(f"Loading {path}...", dim=True, italic=True, err=True)
 71
 72    if plain_env:
 73        _load(f".env.{plain_env}.local")
 74    if plain_env != "test":
 75        # Skipped under test (Next.js / Rails dotenv convention) so CI runs
 76        # stay deterministic and personal creds don't leak into the suite.
 77        _load(".env.local")
 78    if plain_env:
 79        _load(f".env.{plain_env}")
 80    _load(".env")
 81
 82
 83def load_dotenv(
 84    filepath: str | Path,
 85    *,
 86    override: bool = False,
 87) -> bool:
 88    """
 89    Load environment variables from a .env file into os.environ.
 90
 91    Args:
 92        filepath: Path to the .env file
 93        override: If True, overwrite existing environment variables
 94
 95    Returns:
 96        True if the file was loaded, False if it doesn't exist
 97    """
 98    path = Path(filepath)
 99    if not path.exists():
100        return False
101
102    content = path.read_text(encoding="utf-8")
103
104    # Skip command execution for keys that already exist (unless override)
105    skip_commands_for = None if override else set(os.environ.keys())
106
107    def on_bind(key: str, value: str) -> None:
108        if override or key not in os.environ:
109            os.environ[key] = value
110
111    _parse_content(content, skip_commands_for=skip_commands_for, on_bind=on_bind)
112    return True
113
114
115def parse_dotenv(filepath: str | Path) -> dict[str, str]:
116    """
117    Parse a .env file and return a dictionary of key-value pairs.
118
119    Does not modify os.environ. Supports multiline values in quoted strings.
120    """
121    content = Path(filepath).read_text(encoding="utf-8")
122    return _parse_content(content)
123
124
125def _parse_content(
126    content: str,
127    skip_commands_for: set[str] | None = None,
128    on_bind: Callable[[str, str], None] | None = None,
129) -> dict[str, str]:
130    """Parse .env file content and return key-value pairs."""
131    result: dict[str, str] = {}
132    pos = 0
133    length = len(content)
134
135    while pos < length:
136        # Skip whitespace and empty lines
137        while pos < length and content[pos] in " \t\r\n":
138            pos += 1
139
140        if pos >= length:
141            break
142
143        # Skip comment lines
144        if content[pos] == "#":
145            pos = _skip_to_eol(content, pos)
146            continue
147
148        # Try to parse a binding
149        parsed = _parse_binding(content, pos, result, skip_commands_for)
150        if parsed:
151            key, value, new_pos = parsed
152            result[key] = value
153            if on_bind:
154                on_bind(key, value)
155            pos = new_pos
156        else:
157            # Skip to next line on parse failure
158            pos = _skip_to_eol(content, pos)
159
160    return result
161
162
163def _skip_to_eol(content: str, pos: int) -> int:
164    """Skip to end of line, return position after newline."""
165    while pos < len(content) and content[pos] not in "\r\n":
166        pos += 1
167    if pos < len(content) and content[pos] == "\r":
168        pos += 1
169    if pos < len(content) and content[pos] == "\n":
170        pos += 1
171    return pos
172
173
174def _parse_binding(
175    content: str,
176    pos: int,
177    context: dict[str, str],
178    skip_commands_for: set[str] | None = None,
179) -> tuple[str, str, int] | None:
180    """Parse a KEY=value binding, return (key, value, new_pos) or None."""
181    length = len(content)
182
183    # Skip optional 'export ' prefix
184    if content[pos : pos + 7] == "export ":
185        pos += 7
186        while pos < length and content[pos] in " \t":
187            pos += 1
188
189    # Parse key
190    key_start = pos
191    while pos < length and (content[pos].isalnum() or content[pos] == "_"):
192        pos += 1
193
194    if pos == key_start:
195        return None
196
197    key = content[key_start:pos]
198
199    # Must start with letter or underscore
200    if not (key[0].isalpha() or key[0] == "_"):
201        return None
202
203    # Skip whitespace before =
204    while pos < length and content[pos] in " \t":
205        pos += 1
206
207    # Expect =
208    if pos >= length or content[pos] != "=":
209        return None
210    pos += 1
211
212    # Skip whitespace after =
213    while pos < length and content[pos] in " \t":
214        pos += 1
215
216    # If key already exists in env and we should skip commands, use existing value
217    if skip_commands_for and key in skip_commands_for:
218        # Skip to end of line without executing commands
219        new_pos = _skip_to_eol(content, pos)
220        return key, os.environ[key], new_pos
221
222    # Parse value (with command expansion)
223    value, pos = _parse_value(content, pos, context)
224
225    return key, value, pos
226
227
228def _parse_value(content: str, pos: int, context: dict[str, str]) -> tuple[str, int]:
229    """Parse a value starting at pos, return (value, new_pos)."""
230    if pos >= len(content) or content[pos] in "\r\n":
231        return "", pos
232
233    char = content[pos]
234
235    # Single-quoted: literal value (no escape, no expansion), supports multiline
236    if char == "'":
237        return _parse_single_quoted(content, pos)
238
239    # Double-quoted: process escapes, variable expansion, and commands, supports multiline
240    if char == '"':
241        value, pos = _parse_double_quoted(content, pos)
242        value = _expand_variables(value, context)
243        value = _expand_commands(value)
244        value = value.replace(_ESCAPED_DOLLAR, "$")  # Restore escaped $
245        return value, pos
246
247    # Unquoted value: variable expansion and command substitution
248    return _parse_unquoted(content, pos, context)
249
250
251def _parse_single_quoted(content: str, pos: int) -> tuple[str, int]:
252    """Parse single-quoted value (literal, multiline supported)."""
253    pos += 1  # Skip opening quote
254    start = pos
255    length = len(content)
256
257    while pos < length:
258        if content[pos] == "'":
259            value = content[start:pos]
260            return value, pos + 1
261        pos += 1
262
263    # No closing quote found, return what we have
264    return content[start:], pos
265
266
267def _parse_double_quoted(content: str, pos: int) -> tuple[str, int]:
268    """Parse double-quoted value (with escapes, multiline supported)."""
269    pos += 1  # Skip opening quote
270    result = []
271    length = len(content)
272
273    while pos < length:
274        char = content[pos]
275
276        if char == "\\" and pos + 1 < length:
277            next_char = content[pos + 1]
278            if next_char == "n":
279                result.append("\n")
280            elif next_char == "t":
281                result.append("\t")
282            elif next_char == "r":
283                result.append("\r")
284            elif next_char == '"':
285                result.append('"')
286            elif next_char == "\\":
287                result.append("\\")
288            elif next_char == "$":
289                result.append(_ESCAPED_DOLLAR)  # Placeholder to prevent expansion
290            else:
291                # Unknown escape, keep both characters
292                result.append(char)
293                result.append(next_char)
294            pos += 2
295        elif char == '"':
296            return "".join(result), pos + 1
297        else:
298            result.append(char)
299            pos += 1
300
301    # No closing quote found, return what we have
302    return "".join(result), pos
303
304
305def _parse_unquoted(content: str, pos: int, context: dict[str, str]) -> tuple[str, int]:
306    """Parse unquoted value (until comment or end of line)."""
307    result = []
308    length = len(content)
309
310    while pos < length and content[pos] not in "\r\n":
311        char = content[pos]
312
313        # Stop at inline comment (whitespace followed by #)
314        if char == "#" and result and result[-1] in " \t":
315            # Remove trailing whitespace
316            while result and result[-1] in " \t":
317                result.pop()
318            break
319
320        # Handle backslash escapes (like bash)
321        if char == "\\" and pos + 1 < length:
322            next_char = content[pos + 1]
323            if next_char == "$":
324                result.append(_ESCAPED_DOLLAR)  # Placeholder to prevent expansion
325                pos += 2
326                continue
327            elif next_char == "\\":
328                result.append("\\")
329                pos += 2
330                continue
331            # Other backslashes kept as-is
332
333        result.append(char)
334        pos += 1
335
336    value = "".join(result).rstrip()
337
338    # Expand variables, then commands
339    value = _expand_variables(value, context)
340    value = _expand_commands(value)
341    value = value.replace(_ESCAPED_DOLLAR, "$")  # Restore escaped $
342    return value, pos
343
344
345def _expand_variables(value: str, context: dict[str, str]) -> str:
346    """Expand $VAR and ${VAR} references in value.
347
348    Looks up variables in context (previously parsed .env vars) first,
349    then falls back to os.environ. Unknown variables expand to empty string.
350    """
351
352    def replacer(match: re.Match[str]) -> str:
353        var_name = match.group(1)
354        # Check context first (vars defined earlier in .env), then os.environ
355        if var_name in context:
356            return context[var_name]
357        return os.environ.get(var_name, "")
358
359    # Expand ${VAR} first (more specific), then $VAR
360    value = _VAR_BRACE_RE.sub(replacer, value)
361    value = _VAR_BARE_RE.sub(replacer, value)
362    return value
363
364
365def _expand_commands(value: str) -> str:
366    """Expand all $(command) substitutions in value.
367
368    Handles nested parentheses within commands, e.g., $(echo "(test)").
369    """
370    result = []
371    i = 0
372    length = len(value)
373
374    while i < length:
375        # Look for $(
376        if i + 1 < length and value[i] == "$" and value[i + 1] == "(":
377            # Find matching closing paren, accounting for nesting
378            cmd_start = i + 2
379            depth = 1
380            j = cmd_start
381
382            while j < length and depth > 0:
383                if value[j] == "(":
384                    depth += 1
385                elif value[j] == ")":
386                    depth -= 1
387                j += 1
388
389            if depth == 0:
390                # Found matching ), extract and execute command
391                command = value[cmd_start : j - 1]
392                output = _execute_command(command)
393                result.append(output)
394                i = j
395            else:
396                # No matching ), keep literal
397                result.append(value[i])
398                i += 1
399        else:
400            result.append(value[i])
401            i += 1
402
403    return "".join(result)
404
405
406def _execute_command(command: str, timeout: float = 5.0) -> str:
407    """Execute a shell command and return stdout."""
408    try:
409        result = subprocess.run(
410            command,
411            shell=True,
412            stdout=subprocess.PIPE,
413            text=True,
414            timeout=timeout,
415        )
416        return result.stdout.strip() if result.returncode == 0 else ""
417    except (subprocess.TimeoutExpired, OSError):
418        return ""