1from __future__ import annotations
2
3from typing import Any
4
5import psycopg.errors
6
7from .ownership import _table_info
8from .types import Informational, TableOwner
9
10
11def gather_context(cursor: Any, table_owners: dict[str, TableOwner]) -> dict[str, Any]:
12 """Collect informational data about the database: hit ratios, XID age,
13 connection utilization, table sizes, slow queries, etc. These are surfaced
14 alongside check results but never produce warnings on their own — they give
15 an agent or human the situational awareness to interpret check findings.
16 """
17 context: dict[str, Any] = {}
18 informationals: list[Informational] = []
19 cursor.execute("""
20 SELECT
21 c.relname AS table_name,
22 c.reltuples::bigint AS estimated_row_count,
23 pg_total_relation_size(c.oid) AS total_size_bytes,
24 pg_size_pretty(pg_total_relation_size(c.oid)) AS total_size,
25 pg_size_pretty(pg_relation_size(c.oid)) AS table_size,
26 pg_size_pretty(pg_indexes_size(c.oid)) AS indexes_size,
27 (SELECT count(*) FROM pg_catalog.pg_index i WHERE i.indrelid = c.oid) AS index_count
28 FROM pg_catalog.pg_class c
29 JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
30 WHERE c.relkind IN ('r', 'p')
31 AND n.nspname = 'public'
32 ORDER BY total_size_bytes DESC
33 """)
34 context["tables"] = []
35 for row in cursor.fetchall():
36 source, package, model_class, model_file = _table_info(row[0], table_owners)
37 context["tables"].append(
38 {
39 "table": row[0],
40 "estimated_rows": max(row[1], 0),
41 "total_size_bytes": row[2],
42 "total_size": row[3],
43 "table_size": row[4],
44 "indexes_size": row[5],
45 "index_count": row[6],
46 "source": source,
47 "package": package,
48 }
49 )
50
51 # Cache hit ratio — below 98.5% can indicate insufficient shared_buffers
52 # but is also volatile after restart (cold cache). Informational only.
53 cursor.execute("""
54 SELECT ROUND(
55 100.0 * SUM(heap_blks_hit) / NULLIF(SUM(heap_blks_hit) + SUM(heap_blks_read), 0), 2
56 ) FROM pg_catalog.pg_statio_user_tables
57 """)
58 row = cursor.fetchone()
59 cache_hit_ratio = float(row[0]) if row and row[0] is not None else None
60 if cache_hit_ratio is not None:
61 informationals.append(
62 Informational(
63 name="cache_hit_ratio",
64 label="Cache hit ratio",
65 value=cache_hit_ratio,
66 unit="%",
67 note="volatile after restart; sustained <95% may indicate memory pressure",
68 )
69 )
70
71 # Index hit ratio — same caveats.
72 cursor.execute("""
73 SELECT ROUND(
74 100.0 * SUM(idx_blks_hit) / NULLIF(SUM(idx_blks_hit) + SUM(idx_blks_read), 0), 2
75 ) FROM pg_catalog.pg_statio_user_indexes
76 """)
77 row = cursor.fetchone()
78 index_hit_ratio = float(row[0]) if row and row[0] is not None else None
79 if index_hit_ratio is not None:
80 informationals.append(
81 Informational(
82 name="index_hit_ratio",
83 label="Index hit ratio",
84 value=index_hit_ratio,
85 unit="%",
86 note="",
87 )
88 )
89
90 # XID wraparound age for current database. Managed Postgres usually
91 # tunes autovacuum to keep this in check, but long-running transactions
92 # or a disabled autovacuum can still let it climb.
93 cursor.execute("""
94 SELECT
95 ROUND(100.0 * age(datfrozenxid) / 2147483648, 2) AS pct_towards_wraparound
96 FROM pg_catalog.pg_database
97 WHERE datname = current_database()
98 """)
99 row = cursor.fetchone()
100 xid_pct = float(row[0]) if row and row[0] is not None else None
101 if xid_pct is not None:
102 informationals.append(
103 Informational(
104 name="xid_wraparound",
105 label="XID wraparound",
106 value=xid_pct,
107 unit="% toward wraparound",
108 note="catastrophic if it reaches 100%; autovacuum usually keeps this low, but long-running transactions can block the freeze process",
109 )
110 )
111
112 # Connection utilization. Point-in-time snapshot.
113 cursor.execute("""
114 SELECT
115 (SELECT count(*) FROM pg_catalog.pg_stat_activity
116 WHERE datname = current_database()) AS active_connections,
117 (SELECT setting::int FROM pg_catalog.pg_settings
118 WHERE name = 'max_connections') AS max_connections
119 """)
120 row = cursor.fetchone()
121 active_conns, max_conns = row[0], row[1]
122 context["connections"] = {"active": active_conns, "max": max_conns}
123 if max_conns:
124 pct = round(100.0 * active_conns / max_conns, 1)
125 informationals.append(
126 Informational(
127 name="connection_saturation",
128 label="Connection saturation",
129 value=pct,
130 unit="%",
131 note=f"{active_conns}/{max_conns} connections in use (snapshot)",
132 )
133 )
134
135 # Stats reset time — tells you how much history the cumulative checks have.
136 cursor.execute("""
137 SELECT stats_reset
138 FROM pg_catalog.pg_stat_database
139 WHERE datname = current_database()
140 """)
141 row = cursor.fetchone()
142 stats_reset_iso = row[0].isoformat() if row and row[0] else None
143 context["stats_reset"] = stats_reset_iso
144 informationals.append(
145 Informational(
146 name="stats_reset",
147 label="Stats reset",
148 value=stats_reset_iso,
149 unit="",
150 note="cumulative-stat checks (unused_indexes, missing_index_candidates) need sufficient history after this point to be reliable",
151 )
152 )
153
154 # pg_stat_statements availability + slow queries.
155 cursor.execute("""
156 SELECT EXISTS (
157 SELECT 1 FROM pg_catalog.pg_extension WHERE extname = 'pg_stat_statements'
158 )
159 """)
160 has_pgss = cursor.fetchone()[0]
161
162 if not has_pgss:
163 context["pg_stat_statements"] = "not_installed"
164 context["slow_queries"] = []
165 informationals.append(
166 Informational(
167 name="pg_stat_statements",
168 label="pg_stat_statements",
169 value="not_installed",
170 unit="",
171 note="install for query-level analysis",
172 )
173 )
174 else:
175 try:
176 cursor.execute("""
177 SELECT
178 calls,
179 ROUND(total_exec_time::numeric, 2) AS total_time_ms,
180 ROUND(mean_exec_time::numeric, 2) AS mean_time_ms,
181 ROUND(
182 (100 * total_exec_time / NULLIF(SUM(total_exec_time) OVER (), 0))::numeric, 2
183 ) AS pct_total_time,
184 LEFT(query, 200) AS query
185 FROM pg_stat_statements
186 ORDER BY total_exec_time DESC
187 LIMIT 10
188 """)
189 context["pg_stat_statements"] = "available"
190 context["slow_queries"] = [
191 {
192 "calls": row[0],
193 "total_time_ms": float(row[1]),
194 "mean_time_ms": float(row[2]),
195 "pct_total_time": float(row[3]),
196 "query": row[4],
197 }
198 for row in cursor.fetchall()
199 ]
200 informationals.append(
201 Informational(
202 name="pg_stat_statements",
203 label="pg_stat_statements",
204 value="available",
205 unit="",
206 note="",
207 )
208 )
209 except psycopg.errors.DatabaseError:
210 context["pg_stat_statements"] = "no_permission"
211 context["slow_queries"] = []
212 informationals.append(
213 Informational(
214 name="pg_stat_statements",
215 label="pg_stat_statements",
216 value="no_permission",
217 unit="",
218 note="grant pg_read_all_stats to enable query analysis",
219 )
220 )
221
222 context["informationals"] = informationals
223 return context