1from __future__ import annotations
2
3import json
4import sys
5from typing import Any
6
7import click
8
9from ..db import get_connection
10from ..introspection import CheckItem, CheckResult, build_table_owners, run_all_checks
11
12STATUS_SYMBOLS = {
13 "ok": ("✓", "green"),
14 "warning": ("!", "yellow"),
15 "critical": ("!!", "red"),
16 "skipped": ("—", None),
17 "error": ("✗", "red"),
18}
19
20
21def format_human(
22 results: list[CheckResult],
23 context: dict[str, Any],
24 *,
25 show_all: bool = False,
26) -> None:
27 # Split items into actionable (app + unmanaged) vs package
28 def _actionable_items(r: CheckResult) -> list[CheckItem]:
29 return [i for i in r["items"] if i["source"] != "package"]
30
31 def _package_items(r: CheckResult) -> list[CheckItem]:
32 return [i for i in r["items"] if i["source"] == "package"]
33
34 # Compute effective status (only actionable items trigger warnings unless --all)
35 def _effective_status(r: CheckResult) -> str:
36 if show_all:
37 return r["status"]
38 if r["status"] in ("ok", "skipped", "error"):
39 return r["status"]
40 if r["items"] and not _actionable_items(r):
41 return "ok"
42 return r["status"]
43
44 # Summary table
45 label_width = max(len(r["label"]) for r in results)
46 summaries: list[str] = []
47 for r in results:
48 if _effective_status(r) == r["status"]:
49 summaries.append(r["summary"])
50 else:
51 summaries.append("ok")
52 summary_width = max(len(s) for s in summaries)
53
54 click.echo()
55 for r, summary_text in zip(results, summaries):
56 status = _effective_status(r)
57 symbol, color = STATUS_SYMBOLS.get(status, ("?", None))
58 label = r["label"].ljust(label_width)
59 summary = summary_text.ljust(summary_width)
60 click.echo(f" {label} {summary} ", nl=False)
61 click.secho(symbol, fg=color)
62
63 # Counts
64 statuses = [_effective_status(r) for r in results]
65 ok_count = statuses.count("ok")
66 warn_count = statuses.count("warning")
67 critical_count = statuses.count("critical")
68 error_count = statuses.count("error")
69
70 parts = []
71 if ok_count:
72 parts.append(f"{ok_count} passed")
73 if warn_count:
74 parts.append(f"{warn_count} warnings")
75 if critical_count:
76 parts.append(f"{critical_count} critical")
77 if error_count:
78 parts.append(f"{error_count} errors")
79 click.echo(f"\n {', '.join(parts)}")
80
81 # Details
82 for r in results:
83 if _effective_status(r) in ("ok", "skipped"):
84 continue
85
86 items_to_show = r["items"] if show_all else _actionable_items(r)
87 if items_to_show:
88 click.echo()
89 click.secho(f" {r['label']}", bold=True)
90 for item in items_to_show:
91 if item["table"]:
92 line = f" {item['name']} on {item['table']} ({item['detail']})"
93 else:
94 line = f" {item['name']} ({item['detail']})"
95
96 if item["source"] == "package":
97 click.secho(line, dim=True)
98 click.secho(
99 f" {item['package']} package — not your code",
100 dim=True,
101 )
102 else:
103 if item["source"] == "app" and item["package"]:
104 click.echo(f"{line} [{item['package']}]")
105 else:
106 click.echo(line)
107 click.secho(f" {item['suggestion']}", dim=True)
108
109 if r["message"]:
110 click.echo()
111 click.secho(f" {r['label']}: {r['message']}", bold=True)
112
113 # Package issues footnote (only when not --all)
114 all_package_items: list[tuple[str, CheckItem]] = []
115 if not show_all:
116 for r in results:
117 for item in _package_items(r):
118 all_package_items.append((r["label"], item))
119
120 if all_package_items:
121 click.echo()
122 # Group by package
123 by_package: dict[str, list[tuple[str, CheckItem]]] = {}
124 for check_label, item in all_package_items:
125 by_package.setdefault(item["package"], []).append((check_label, item))
126
127 pkg_parts = []
128 for pkg, items in sorted(by_package.items()):
129 check_names = sorted({label.lower() for label, _ in items})
130 pkg_parts.append(f"{pkg} ({len(items)} — {', '.join(check_names)})")
131
132 click.secho(
133 f" Also found {len(all_package_items)} issues in installed packages: {'; '.join(pkg_parts)}",
134 dim=True,
135 )
136
137 # Slow queries
138 slow_queries = context.get("slow_queries", [])
139 if slow_queries:
140 click.echo()
141 click.secho(" Slowest queries (by total time)", bold=True)
142 for q in slow_queries:
143 click.echo(
144 f" {q['total_time_ms']:>10.0f}ms total"
145 f" {q['mean_time_ms']:>8.0f}ms avg"
146 f" {q['calls']:>8,} calls"
147 f" ({q['pct_total_time']:.1f}%)"
148 )
149 query_preview = q["query"].replace("\n", " ").strip()
150 click.secho(f" {query_preview}", dim=True)
151
152 # Footer
153 click.echo()
154 stats_reset = context.get("stats_reset")
155 click.secho(
156 f" Stats reset: {stats_reset[:10] if stats_reset else 'never'}",
157 dim=True,
158 )
159
160 pgss = context.get("pg_stat_statements")
161 if pgss == "not_installed":
162 click.secho(
163 " pg_stat_statements: not installed (install for query analysis)",
164 dim=True,
165 )
166 elif pgss == "no_permission":
167 click.secho(
168 " pg_stat_statements: installed but not accessible (insufficient privileges)",
169 dim=True,
170 )
171
172 conn = context.get("connections", {})
173 if conn:
174 pct = round(100 * conn["active"] / conn["max"]) if conn["max"] else 0
175 click.secho(f" Connections: {conn['active']}/{conn['max']} ({pct}%)", dim=True)
176
177 click.echo()
178
179
180@click.command()
181@click.option("--json", "output_json", is_flag=True, help="Output as JSON")
182@click.option(
183 "--all", "show_all", is_flag=True, help="Include package issues in detail"
184)
185def diagnose(output_json: bool, show_all: bool) -> None:
186 """Run health checks against the database"""
187 conn = get_connection()
188 table_owners = build_table_owners()
189
190 with conn.cursor() as cursor:
191 results, context = run_all_checks(cursor, table_owners)
192
193 if output_json:
194 output = {
195 "checks": results,
196 "context": context,
197 }
198 click.echo(json.dumps(output, indent=2, default=str))
199 else:
200 format_human(results, context, show_all=show_all)
201
202 # Exit 1 if any critical (JSON mode always exits 0 — the data is the signal)
203 if not output_json and any(r["status"] == "critical" for r in results):
204 sys.exit(1)