1"""Public database API: the active-connection ContextVar plus management
2and read-only helpers. Per-request lifecycle (clearing the query log,
3returning pooled connections) lives in `DatabaseConnectionMiddleware`."""
4
5from __future__ import annotations
6
7from collections.abc import Generator
8from contextlib import contextmanager
9from contextvars import ContextVar
10from typing import TYPE_CHECKING
11
12from plain.exceptions import ImproperlyConfigured
13from plain.logs import get_framework_logger
14from plain.postgres.database_url import parse_database_url
15from plain.postgres.sources import DirectSource, runtime_pool_source
16from plain.runtime import settings as plain_settings
17
18if TYPE_CHECKING:
19 from plain.postgres.connection import DatabaseConnection
20
21logger = get_framework_logger()
22
23PLAIN_VERSION_PICKLE_KEY = "_plain_version"
24
25
26_db_conn: ContextVar[DatabaseConnection | None] = ContextVar("_db_conn", default=None)
27
28
29def get_connection() -> DatabaseConnection:
30 from plain.postgres.connection import DatabaseConnection
31
32 conn = _db_conn.get()
33 if conn is None:
34 conn = DatabaseConnection(runtime_pool_source)
35 _db_conn.set(conn)
36 return conn
37
38
39def has_connection() -> bool:
40 return _db_conn.get() is not None
41
42
43def return_database_connection(conn: DatabaseConnection | None = None) -> None:
44 """Return a psycopg connection, clearing its queries log along the way.
45
46 Pool-backed wrappers return to the pool; direct-backed wrappers close.
47 The wrapper itself stays referenced by the caller (typically a
48 ContextVar) and will acquire a fresh connection on next use.
49
50 Pass `conn` explicitly when the caller is running outside the
51 context that owns the wrapper — e.g. a streaming response's resource
52 closer runs after `handle()` returns, so the request `ContextVar`
53 context is no longer active. Middleware captures the wrapper at
54 response time and hands it in. Without `conn`, falls back to
55 `_db_conn.get()` for callers that *are* still in-context.
56
57 No-op when a transaction is in progress — returning mid-transaction
58 would roll back the caller's work (tests wrap each case in `atomic()`,
59 streaming views may still be mid-query).
60 """
61 if conn is None:
62 conn = _db_conn.get()
63 if conn is None:
64 return
65 if conn.in_atomic_block:
66 return
67 conn.queries_log.clear()
68 conn.close()
69
70
71@contextmanager
72def use_management_connection() -> Generator[DatabaseConnection]:
73 """Swap in a direct connection against `POSTGRES_MANAGEMENT_URL` for this block.
74
75 Used to route migrations, convergence, and other DDL through a separate
76 connection — e.g. a direct Postgres port when the runtime connection
77 goes through a transaction-mode pgbouncer, or a DDL-capable role when
78 the runtime role only has DML. Falls back to the active connection when
79 the management URL is unset or equals `POSTGRES_URL` — preserves any
80 outer transaction, session state, temp tables, or advisory locks.
81 """
82 from plain.postgres.connection import DatabaseConnection
83
84 management_url = str(plain_settings.POSTGRES_MANAGEMENT_URL)
85 runtime_url = str(plain_settings.POSTGRES_URL)
86
87 if not management_url or management_url == runtime_url:
88 yield get_connection()
89 return
90
91 if management_url.lower() == "none":
92 raise ImproperlyConfigured(
93 "The PostgreSQL database has been disabled (POSTGRES_MANAGEMENT_URL=none). "
94 "No database operations are available in this context."
95 )
96
97 config = parse_database_url(management_url)
98 logger.info(
99 "Using management database connection",
100 extra={
101 "context": {
102 "host": config["HOST"],
103 "port": config["PORT"],
104 "database": config["DATABASE"],
105 }
106 },
107 )
108
109 new_conn = DatabaseConnection(DirectSource(config))
110 token = _db_conn.set(new_conn)
111 try:
112 yield new_conn
113 finally:
114 try:
115 new_conn.close()
116 except Exception:
117 logger.debug("Error closing management connection", exc_info=True)
118 _db_conn.reset(token)
119
120
121@contextmanager
122def read_only() -> Generator[None]:
123 """Run a block of code inside a read-only transaction.
124
125 Opens a single ``BEGIN READ ONLY`` transaction for the duration of the
126 block — any INSERT/UPDATE/DELETE/DDL raises
127 ``psycopg.errors.ReadOnlySqlTransaction``. Nested ``atomic()`` blocks
128 inside become savepoints of the outer read-only transaction and inherit
129 read-only.
130
131 Because this opens its own transaction, it cannot be entered while an
132 ``atomic()`` block is already active.
133 """
134 from plain.postgres.transaction import (
135 TransactionManagementError,
136 atomic,
137 )
138
139 conn = get_connection()
140 if conn.in_atomic_block:
141 raise TransactionManagementError(
142 "read_only() cannot be entered inside an existing atomic() block; "
143 "it opens its own transaction."
144 )
145 conn.ensure_connection()
146 psy_conn = conn.connection
147 assert psy_conn is not None
148 # psycopg lazily emits BEGIN on the first query once autocommit is off;
149 # setting read_only=True makes it a READ ONLY transaction.
150 psy_conn.read_only = True
151 try:
152 with atomic():
153 yield
154 finally:
155 try:
156 psy_conn.read_only = None
157 except Exception:
158 logger.debug("Error clearing read_only on connection", exc_info=True)