v0.142.0
  1from __future__ import annotations
  2
  3from typing import Any
  4
  5import psycopg.errors
  6
  7from .ownership import _table_info
  8from .types import Informational, TableOwner
  9
 10
 11def gather_context(cursor: Any, table_owners: dict[str, TableOwner]) -> dict[str, Any]:
 12    """Collect informational data about the database: hit ratios, XID age,
 13    connection utilization, table sizes, slow queries, etc. These are surfaced
 14    alongside check results but never produce warnings on their own — they give
 15    an agent or human the situational awareness to interpret check findings.
 16    """
 17    context: dict[str, Any] = {}
 18    informationals: list[Informational] = []
 19    cursor.execute("""
 20        SELECT
 21            c.relname AS table_name,
 22            c.reltuples::bigint AS estimated_row_count,
 23            pg_total_relation_size(c.oid) AS total_size_bytes,
 24            pg_size_pretty(pg_total_relation_size(c.oid)) AS total_size,
 25            pg_size_pretty(pg_relation_size(c.oid)) AS table_size,
 26            pg_size_pretty(pg_indexes_size(c.oid)) AS indexes_size,
 27            (SELECT count(*) FROM pg_catalog.pg_index i WHERE i.indrelid = c.oid) AS index_count
 28        FROM pg_catalog.pg_class c
 29        JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
 30        WHERE c.relkind IN ('r', 'p')
 31          AND n.nspname = 'public'
 32        ORDER BY total_size_bytes DESC
 33    """)
 34    context["tables"] = []
 35    for row in cursor.fetchall():
 36        source, package, model_class, model_file = _table_info(row[0], table_owners)
 37        context["tables"].append(
 38            {
 39                "table": row[0],
 40                "estimated_rows": max(row[1], 0),
 41                "total_size_bytes": row[2],
 42                "total_size": row[3],
 43                "table_size": row[4],
 44                "indexes_size": row[5],
 45                "index_count": row[6],
 46                "source": source,
 47                "package": package,
 48            }
 49        )
 50
 51    # Cache hit ratio — below 98.5% can indicate insufficient shared_buffers
 52    # but is also volatile after restart (cold cache). Informational only.
 53    cursor.execute("""
 54        SELECT ROUND(
 55            100.0 * SUM(heap_blks_hit) / NULLIF(SUM(heap_blks_hit) + SUM(heap_blks_read), 0), 2
 56        ) FROM pg_catalog.pg_statio_user_tables
 57    """)
 58    row = cursor.fetchone()
 59    cache_hit_ratio = float(row[0]) if row and row[0] is not None else None
 60    if cache_hit_ratio is not None:
 61        informationals.append(
 62            Informational(
 63                name="cache_hit_ratio",
 64                label="Cache hit ratio",
 65                value=cache_hit_ratio,
 66                unit="%",
 67                note="volatile after restart; sustained <95% may indicate memory pressure",
 68            )
 69        )
 70
 71    # Index hit ratio — same caveats.
 72    cursor.execute("""
 73        SELECT ROUND(
 74            100.0 * SUM(idx_blks_hit) / NULLIF(SUM(idx_blks_hit) + SUM(idx_blks_read), 0), 2
 75        ) FROM pg_catalog.pg_statio_user_indexes
 76    """)
 77    row = cursor.fetchone()
 78    index_hit_ratio = float(row[0]) if row and row[0] is not None else None
 79    if index_hit_ratio is not None:
 80        informationals.append(
 81            Informational(
 82                name="index_hit_ratio",
 83                label="Index hit ratio",
 84                value=index_hit_ratio,
 85                unit="%",
 86                note="",
 87            )
 88        )
 89
 90    # XID wraparound age for current database. Managed Postgres usually
 91    # tunes autovacuum to keep this in check, but long-running transactions
 92    # or a disabled autovacuum can still let it climb.
 93    cursor.execute("""
 94        SELECT
 95            ROUND(100.0 * age(datfrozenxid) / 2147483648, 2) AS pct_towards_wraparound
 96        FROM pg_catalog.pg_database
 97        WHERE datname = current_database()
 98    """)
 99    row = cursor.fetchone()
100    xid_pct = float(row[0]) if row and row[0] is not None else None
101    if xid_pct is not None:
102        informationals.append(
103            Informational(
104                name="xid_wraparound",
105                label="XID wraparound",
106                value=xid_pct,
107                unit="% toward wraparound",
108                note="catastrophic if it reaches 100%; autovacuum usually keeps this low, but long-running transactions can block the freeze process",
109            )
110        )
111
112    # Connection utilization. Point-in-time snapshot.
113    cursor.execute("""
114        SELECT
115            (SELECT count(*) FROM pg_catalog.pg_stat_activity
116             WHERE datname = current_database()) AS active_connections,
117            (SELECT setting::int FROM pg_catalog.pg_settings
118             WHERE name = 'max_connections') AS max_connections
119    """)
120    row = cursor.fetchone()
121    active_conns, max_conns = row[0], row[1]
122    context["connections"] = {"active": active_conns, "max": max_conns}
123    if max_conns:
124        pct = round(100.0 * active_conns / max_conns, 1)
125        informationals.append(
126            Informational(
127                name="connection_saturation",
128                label="Connection saturation",
129                value=pct,
130                unit="%",
131                note=f"{active_conns}/{max_conns} connections in use (snapshot)",
132            )
133        )
134
135    # Stats reset time — tells you how much history the cumulative checks have.
136    cursor.execute("""
137        SELECT stats_reset
138        FROM pg_catalog.pg_stat_database
139        WHERE datname = current_database()
140    """)
141    row = cursor.fetchone()
142    stats_reset_iso = row[0].isoformat() if row and row[0] else None
143    context["stats_reset"] = stats_reset_iso
144    informationals.append(
145        Informational(
146            name="stats_reset",
147            label="Stats reset",
148            value=stats_reset_iso,
149            unit="",
150            note="cumulative-stat checks (unused_indexes, missing_index_candidates) need sufficient history after this point to be reliable",
151        )
152    )
153
154    # pg_stat_statements availability + slow queries.
155    cursor.execute("""
156        SELECT EXISTS (
157            SELECT 1 FROM pg_catalog.pg_extension WHERE extname = 'pg_stat_statements'
158        )
159    """)
160    has_pgss = cursor.fetchone()[0]
161
162    if not has_pgss:
163        context["pg_stat_statements"] = "not_installed"
164        context["slow_queries"] = []
165        informationals.append(
166            Informational(
167                name="pg_stat_statements",
168                label="pg_stat_statements",
169                value="not_installed",
170                unit="",
171                note="install for query-level analysis",
172            )
173        )
174    else:
175        try:
176            cursor.execute("""
177                SELECT
178                    calls,
179                    ROUND(total_exec_time::numeric, 2) AS total_time_ms,
180                    ROUND(mean_exec_time::numeric, 2) AS mean_time_ms,
181                    ROUND(
182                        (100 * total_exec_time / NULLIF(SUM(total_exec_time) OVER (), 0))::numeric, 2
183                    ) AS pct_total_time,
184                    LEFT(query, 200) AS query
185                FROM pg_stat_statements
186                ORDER BY total_exec_time DESC
187                LIMIT 10
188            """)
189            context["pg_stat_statements"] = "available"
190            context["slow_queries"] = [
191                {
192                    "calls": row[0],
193                    "total_time_ms": float(row[1]),
194                    "mean_time_ms": float(row[2]),
195                    "pct_total_time": float(row[3]),
196                    "query": row[4],
197                }
198                for row in cursor.fetchall()
199            ]
200            informationals.append(
201                Informational(
202                    name="pg_stat_statements",
203                    label="pg_stat_statements",
204                    value="available",
205                    unit="",
206                    note="",
207                )
208            )
209        except psycopg.errors.DatabaseError:
210            context["pg_stat_statements"] = "no_permission"
211            context["slow_queries"] = []
212            informationals.append(
213                Informational(
214                    name="pg_stat_statements",
215                    label="pg_stat_statements",
216                    value="no_permission",
217                    unit="",
218                    note="grant pg_read_all_stats to enable query analysis",
219                )
220            )
221
222    context["informationals"] = informationals
223    return context