1from __future__ import annotations
2
3import json
4import sys
5
6import click
7
8from ..convergence.analysis import ModelAnalysis, analyze_model
9from ..convergence.planning import can_auto_fix
10from ..db import get_connection
11from ..introspection import MANAGED_CONSTRAINT_TYPES, get_unknown_tables
12from ..registry import models_registry
13
14
15def _ok() -> None:
16 click.secho(" ✓", fg="green", dim=True)
17
18
19def _err(msg: str) -> None:
20 click.secho(f" ✗ {msg}", fg="red")
21
22
23def _fixable(msg: str) -> None:
24 click.secho(f" ~ {msg} (auto-fix)", fg="yellow")
25
26
27def _unmanaged(type_label: str) -> None:
28 click.secho(f" unmanaged ({type_label})", dim=True)
29
30
31def _render_model(analysis: ModelAnalysis) -> None:
32 """Render a model analysis result as human-readable output."""
33 click.secho(analysis.label, bold=True, nl=False)
34 click.secho(f" → {analysis.table}", dim=True)
35
36 # Table missing — nothing else to show
37 if not analysis.columns:
38 for issue in analysis.table_issues:
39 click.echo(" ", nl=False)
40 _err(issue)
41 return
42
43 # Columns
44 for col in analysis.columns:
45 col_display = col.name
46 if col.field_name and col.field_name != col.name:
47 col_display = f"{col.field_name} → {col.name}"
48
49 type_parts = [click.style(col.type, fg="cyan")]
50 if col.nullable:
51 type_parts.append(click.style("NULL", dim=True))
52 if col.primary_key:
53 type_parts.append(click.style("PK", fg="yellow"))
54 if col.pk_suffix:
55 type_parts.append(click.style(col.pk_suffix, dim=True))
56
57 click.echo(f" {col_display:30s} {' '.join(type_parts)}", nl=False)
58
59 if col.issue and col.drift and can_auto_fix(col.drift):
60 _fixable(col.issue)
61 elif col.issue:
62 _err(col.issue)
63 else:
64 _ok()
65
66 # Indexes
67 if analysis.indexes:
68 click.echo()
69 click.secho(" Indexes:", dim=True)
70
71 for idx in analysis.indexes:
72 fields_str = ", ".join(idx.fields) if idx.fields else "expressions"
73 click.echo(f" {idx.name} ({fields_str})", nl=False)
74
75 if idx.access_method:
76 _unmanaged(idx.access_method)
77 elif idx.issue and idx.drift and can_auto_fix(idx.drift):
78 _fixable(idx.issue)
79 elif idx.issue:
80 _err(idx.issue)
81 else:
82 _ok()
83
84 # Constraints
85 if analysis.constraints:
86 click.echo()
87 click.secho(" Constraints:", dim=True)
88
89 for con in analysis.constraints:
90 con_label = con.constraint_type.label.upper()
91 if con.fields:
92 click.echo(
93 f" {con.name} {con_label} ({', '.join(con.fields)})", nl=False
94 )
95 else:
96 click.echo(f" {con.name} {con_label}", nl=False)
97
98 if con.constraint_type not in MANAGED_CONSTRAINT_TYPES:
99 _unmanaged(con.constraint_type.label)
100 elif con.issue and con.drift and can_auto_fix(con.drift):
101 _fixable(con.issue)
102 elif con.issue:
103 _err(con.issue)
104 else:
105 _ok()
106
107
108@click.command()
109@click.argument("model_label", required=False)
110@click.option("--json", "output_json", is_flag=True, help="Output as JSON")
111def schema(model_label: str | None, output_json: bool) -> None:
112 """Show database schema from models, compared against the actual database"""
113 models = models_registry.get_models()
114
115 if model_label:
116 model_label_lower = model_label.lower()
117 models = [
118 m
119 for m in models
120 if m.model_options.label_lower == model_label_lower
121 or m.model_options.db_table == model_label
122 or m.__name__.lower() == model_label_lower
123 ]
124 if not models:
125 raise click.ClickException(f"No model found matching '{model_label}'")
126
127 conn = get_connection()
128
129 # Collect structured results
130 analyses: list[ModelAnalysis] = []
131 with conn.cursor() as cursor:
132 for model in models:
133 analyses.append(analyze_model(conn, cursor, model))
134
135 unknown_tables = get_unknown_tables(conn) if not model_label else []
136 total_issues = sum(a.issue_count for a in analyses) + len(unknown_tables)
137
138 if output_json:
139 output = {
140 "models_checked": len(analyses),
141 "total_issues": total_issues,
142 "models": [a.to_dict() for a in analyses],
143 "unknown_tables": unknown_tables,
144 }
145 click.echo(json.dumps(output, indent=2, default=str))
146 else:
147 for i, analysis in enumerate(analyses):
148 if i > 0:
149 click.echo()
150 _render_model(analysis)
151
152 if unknown_tables:
153 click.echo()
154 click.secho("Unknown tables", bold=True)
155 for table in unknown_tables:
156 click.echo(f" {table:30s} ", nl=False)
157 _err("not managed by any model")
158
159 click.echo()
160 parts = []
161 parts.append(f"{len(analyses)} model{'s' if len(analyses) != 1 else ''}")
162 if unknown_tables:
163 parts.append(
164 f"{len(unknown_tables)} unknown table{'s' if len(unknown_tables) != 1 else ''}"
165 )
166 if total_issues == 0:
167 click.secho(
168 f"{', '.join(parts)}, all match the database.",
169 fg="green",
170 )
171 else:
172 click.secho(
173 f"{', '.join(parts)}, {total_issues} issue{'s' if total_issues != 1 else ''}.",
174 fg="red",
175 )
176 sys.exit(1)