1from __future__ import annotations
  2
  3import json
  4import sys
  5from typing import Any
  6
  7import click
  8
  9from ..db import get_connection
 10from ..introspection import CheckItem, CheckResult, build_table_owners, run_all_checks
 11
 12STATUS_SYMBOLS = {
 13    "ok": ("✓", "green"),
 14    "warning": ("!", "yellow"),
 15    "critical": ("!!", "red"),
 16    "skipped": ("—", None),
 17    "error": ("✗", "red"),
 18}
 19
 20
 21def format_human(
 22    results: list[CheckResult],
 23    context: dict[str, Any],
 24    *,
 25    show_all: bool = False,
 26) -> None:
 27    # Split items into actionable (app + unmanaged) vs package
 28    def _actionable_items(r: CheckResult) -> list[CheckItem]:
 29        return [i for i in r["items"] if i["source"] != "package"]
 30
 31    def _package_items(r: CheckResult) -> list[CheckItem]:
 32        return [i for i in r["items"] if i["source"] == "package"]
 33
 34    # Compute effective status (only actionable items trigger warnings unless --all)
 35    def _effective_status(r: CheckResult) -> str:
 36        if show_all:
 37            return r["status"]
 38        if r["status"] in ("ok", "skipped", "error"):
 39            return r["status"]
 40        if r["items"] and not _actionable_items(r):
 41            return "ok"
 42        return r["status"]
 43
 44    # Summary table
 45    label_width = max(len(r["label"]) for r in results)
 46    summaries: list[str] = []
 47    for r in results:
 48        if _effective_status(r) == r["status"]:
 49            summaries.append(r["summary"])
 50        else:
 51            summaries.append("ok")
 52    summary_width = max(len(s) for s in summaries)
 53
 54    click.echo()
 55    for r, summary_text in zip(results, summaries):
 56        status = _effective_status(r)
 57        symbol, color = STATUS_SYMBOLS.get(status, ("?", None))
 58        label = r["label"].ljust(label_width)
 59        summary = summary_text.ljust(summary_width)
 60        click.echo(f"  {label}  {summary}  ", nl=False)
 61        click.secho(symbol, fg=color)
 62
 63    # Counts
 64    statuses = [_effective_status(r) for r in results]
 65    ok_count = statuses.count("ok")
 66    warn_count = statuses.count("warning")
 67    critical_count = statuses.count("critical")
 68    error_count = statuses.count("error")
 69
 70    parts = []
 71    if ok_count:
 72        parts.append(f"{ok_count} passed")
 73    if warn_count:
 74        parts.append(f"{warn_count} warnings")
 75    if critical_count:
 76        parts.append(f"{critical_count} critical")
 77    if error_count:
 78        parts.append(f"{error_count} errors")
 79    click.echo(f"\n  {', '.join(parts)}")
 80
 81    # Details
 82    for r in results:
 83        if _effective_status(r) in ("ok", "skipped"):
 84            continue
 85
 86        items_to_show = r["items"] if show_all else _actionable_items(r)
 87        if items_to_show:
 88            click.echo()
 89            click.secho(f"  {r['label']}", bold=True)
 90            for item in items_to_show:
 91                if item["table"]:
 92                    line = f"    {item['name']} on {item['table']} ({item['detail']})"
 93                else:
 94                    line = f"    {item['name']} ({item['detail']})"
 95
 96                if item["source"] == "package":
 97                    click.secho(line, dim=True)
 98                    click.secho(
 99                        f"      {item['package']} package — not your code",
100                        dim=True,
101                    )
102                else:
103                    if item["source"] == "app" and item["package"]:
104                        click.echo(f"{line}  [{item['package']}]")
105                    else:
106                        click.echo(line)
107                    click.secho(f"      {item['suggestion']}", dim=True)
108
109        if r["message"]:
110            click.echo()
111            click.secho(f"  {r['label']}: {r['message']}", bold=True)
112
113    # Package issues footnote (only when not --all)
114    all_package_items: list[tuple[str, CheckItem]] = []
115    if not show_all:
116        for r in results:
117            for item in _package_items(r):
118                all_package_items.append((r["label"], item))
119
120    if all_package_items:
121        click.echo()
122        # Group by package
123        by_package: dict[str, list[tuple[str, CheckItem]]] = {}
124        for check_label, item in all_package_items:
125            by_package.setdefault(item["package"], []).append((check_label, item))
126
127        pkg_parts = []
128        for pkg, items in sorted(by_package.items()):
129            check_names = sorted({label.lower() for label, _ in items})
130            pkg_parts.append(f"{pkg} ({len(items)}{', '.join(check_names)})")
131
132        click.secho(
133            f"  Also found {len(all_package_items)} issues in installed packages: {'; '.join(pkg_parts)}",
134            dim=True,
135        )
136
137    # Slow queries
138    slow_queries = context.get("slow_queries", [])
139    if slow_queries:
140        click.echo()
141        click.secho("  Slowest queries (by total time)", bold=True)
142        for q in slow_queries:
143            click.echo(
144                f"    {q['total_time_ms']:>10.0f}ms total"
145                f"  {q['mean_time_ms']:>8.0f}ms avg"
146                f"  {q['calls']:>8,} calls"
147                f"  ({q['pct_total_time']:.1f}%)"
148            )
149            query_preview = q["query"].replace("\n", " ").strip()
150            click.secho(f"      {query_preview}", dim=True)
151
152    # Footer
153    click.echo()
154    stats_reset = context.get("stats_reset")
155    click.secho(
156        f"  Stats reset: {stats_reset[:10] if stats_reset else 'never'}",
157        dim=True,
158    )
159
160    pgss = context.get("pg_stat_statements")
161    if pgss == "not_installed":
162        click.secho(
163            "  pg_stat_statements: not installed (install for query analysis)",
164            dim=True,
165        )
166    elif pgss == "no_permission":
167        click.secho(
168            "  pg_stat_statements: installed but not accessible (insufficient privileges)",
169            dim=True,
170        )
171
172    conn = context.get("connections", {})
173    if conn:
174        pct = round(100 * conn["active"] / conn["max"]) if conn["max"] else 0
175        click.secho(f"  Connections: {conn['active']}/{conn['max']} ({pct}%)", dim=True)
176
177    click.echo()
178
179
180@click.command()
181@click.option("--json", "output_json", is_flag=True, help="Output as JSON")
182@click.option(
183    "--all", "show_all", is_flag=True, help="Include package issues in detail"
184)
185def diagnose(output_json: bool, show_all: bool) -> None:
186    """Run health checks against the database"""
187    conn = get_connection()
188    table_owners = build_table_owners()
189
190    with conn.cursor() as cursor:
191        results, context = run_all_checks(cursor, table_owners)
192
193    if output_json:
194        output = {
195            "checks": results,
196            "context": context,
197        }
198        click.echo(json.dumps(output, indent=2, default=str))
199    else:
200        format_human(results, context, show_all=show_all)
201
202    # Exit 1 if any critical (JSON mode always exits 0 — the data is the signal)
203    if not output_json and any(r["status"] == "critical" for r in results):
204        sys.exit(1)