v0.146.0
  1"""Public database API: the active-connection ContextVar plus management
  2and read-only helpers. Per-request lifecycle (clearing the query log,
  3returning pooled connections) lives in `DatabaseConnectionMiddleware`."""
  4
  5from __future__ import annotations
  6
  7from collections.abc import Generator
  8from contextlib import contextmanager
  9from contextvars import ContextVar
 10from typing import TYPE_CHECKING
 11
 12from plain.exceptions import ImproperlyConfigured
 13from plain.logs import get_framework_logger
 14from plain.postgres.database_url import parse_database_url
 15from plain.postgres.sources import DirectSource, runtime_pool_source
 16from plain.runtime import settings as plain_settings
 17
 18if TYPE_CHECKING:
 19    from plain.postgres.connection import DatabaseConnection
 20
 21logger = get_framework_logger()
 22
 23PLAIN_VERSION_PICKLE_KEY = "_plain_version"
 24
 25
 26_db_conn: ContextVar[DatabaseConnection | None] = ContextVar("_db_conn", default=None)
 27
 28
 29def get_connection() -> DatabaseConnection:
 30    from plain.postgres.connection import DatabaseConnection
 31
 32    conn = _db_conn.get()
 33    if conn is None:
 34        conn = DatabaseConnection(runtime_pool_source)
 35        _db_conn.set(conn)
 36    return conn
 37
 38
 39def has_connection() -> bool:
 40    return _db_conn.get() is not None
 41
 42
 43def return_database_connection(conn: DatabaseConnection | None = None) -> None:
 44    """Return a psycopg connection, clearing its queries log along the way.
 45
 46    Pool-backed wrappers return to the pool; direct-backed wrappers close.
 47    The wrapper itself stays referenced by the caller (typically a
 48    ContextVar) and will acquire a fresh connection on next use.
 49
 50    Pass `conn` explicitly when the caller is running outside the
 51    context that owns the wrapper — e.g. a streaming response's resource
 52    closer runs after `handle()` returns, so the request `ContextVar`
 53    context is no longer active. Middleware captures the wrapper at
 54    response time and hands it in. Without `conn`, falls back to
 55    `_db_conn.get()` for callers that *are* still in-context.
 56
 57    No-op when a transaction is in progress — returning mid-transaction
 58    would roll back the caller's work (tests wrap each case in `atomic()`,
 59    streaming views may still be mid-query).
 60    """
 61    if conn is None:
 62        conn = _db_conn.get()
 63    if conn is None:
 64        return
 65    if conn.in_atomic_block:
 66        return
 67    conn.queries_log.clear()
 68    conn.close()
 69
 70
 71@contextmanager
 72def use_management_connection() -> Generator[DatabaseConnection]:
 73    """Swap in a direct connection against `POSTGRES_MANAGEMENT_URL` for this block.
 74
 75    Used to route migrations, convergence, and other DDL through a separate
 76    connection — e.g. a direct Postgres port when the runtime connection
 77    goes through a transaction-mode pgbouncer, or a DDL-capable role when
 78    the runtime role only has DML. Falls back to the active connection when
 79    the management URL is unset or equals `POSTGRES_URL` — preserves any
 80    outer transaction, session state, temp tables, or advisory locks.
 81    """
 82    from plain.postgres.connection import DatabaseConnection
 83
 84    management_url = str(plain_settings.POSTGRES_MANAGEMENT_URL)
 85    runtime_url = str(plain_settings.POSTGRES_URL)
 86
 87    if not management_url or management_url == runtime_url:
 88        yield get_connection()
 89        return
 90
 91    if management_url.lower() == "none":
 92        raise ImproperlyConfigured(
 93            "The PostgreSQL database has been disabled (POSTGRES_MANAGEMENT_URL=none). "
 94            "No database operations are available in this context."
 95        )
 96
 97    config = parse_database_url(management_url)
 98    logger.info(
 99        "Using management database connection",
100        extra={
101            "context": {
102                "host": config["HOST"],
103                "port": config["PORT"],
104                "database": config["DATABASE"],
105            }
106        },
107    )
108
109    new_conn = DatabaseConnection(DirectSource(config))
110    token = _db_conn.set(new_conn)
111    try:
112        yield new_conn
113    finally:
114        try:
115            new_conn.close()
116        except Exception:
117            logger.debug("Error closing management connection", exc_info=True)
118        _db_conn.reset(token)
119
120
121@contextmanager
122def read_only() -> Generator[None]:
123    """Run a block of code inside a read-only transaction.
124
125    Opens a single ``BEGIN READ ONLY`` transaction for the duration of the
126    block — any INSERT/UPDATE/DELETE/DDL raises
127    ``psycopg.errors.ReadOnlySqlTransaction``. Nested ``atomic()`` blocks
128    inside become savepoints of the outer read-only transaction and inherit
129    read-only.
130
131    Because this opens its own transaction, it cannot be entered while an
132    ``atomic()`` block is already active.
133    """
134    from plain.postgres.transaction import (
135        TransactionManagementError,
136        atomic,
137    )
138
139    conn = get_connection()
140    if conn.in_atomic_block:
141        raise TransactionManagementError(
142            "read_only() cannot be entered inside an existing atomic() block; "
143            "it opens its own transaction."
144        )
145    conn.ensure_connection()
146    psy_conn = conn.connection
147    assert psy_conn is not None
148    # psycopg lazily emits BEGIN on the first query once autocommit is off;
149    # setting read_only=True makes it a READ ONLY transaction.
150    psy_conn.read_only = True
151    try:
152        with atomic():
153            yield
154    finally:
155        try:
156            psy_conn.read_only = None
157        except Exception:
158            logger.debug("Error clearing read_only on connection", exc_info=True)