1from __future__ import annotations
  2
  3import json
  4import sys
  5
  6import click
  7
  8from ..convergence.analysis import ModelAnalysis, analyze_model
  9from ..convergence.planning import can_auto_fix
 10from ..db import get_connection
 11from ..introspection import MANAGED_CONSTRAINT_TYPES, get_unknown_tables
 12from ..registry import models_registry
 13
 14
 15def _ok() -> None:
 16    click.secho("  ✓", fg="green", dim=True)
 17
 18
 19def _err(msg: str) -> None:
 20    click.secho(f"  ✗ {msg}", fg="red")
 21
 22
 23def _fixable(msg: str) -> None:
 24    click.secho(f"  ~ {msg} (auto-fix)", fg="yellow")
 25
 26
 27def _unmanaged(type_label: str) -> None:
 28    click.secho(f"  unmanaged ({type_label})", dim=True)
 29
 30
 31def _render_model(analysis: ModelAnalysis) -> None:
 32    """Render a model analysis result as human-readable output."""
 33    click.secho(analysis.label, bold=True, nl=False)
 34    click.secho(f"  →  {analysis.table}", dim=True)
 35
 36    # Table missing — nothing else to show
 37    if not analysis.columns:
 38        for issue in analysis.table_issues:
 39            click.echo("  ", nl=False)
 40            _err(issue)
 41        return
 42
 43    # Columns
 44    for col in analysis.columns:
 45        col_display = col.name
 46        if col.field_name and col.field_name != col.name:
 47            col_display = f"{col.field_name}{col.name}"
 48
 49        type_parts = [click.style(col.type, fg="cyan")]
 50        if col.nullable:
 51            type_parts.append(click.style("NULL", dim=True))
 52        if col.primary_key:
 53            type_parts.append(click.style("PK", fg="yellow"))
 54            if col.pk_suffix:
 55                type_parts.append(click.style(col.pk_suffix, dim=True))
 56
 57        click.echo(f"  {col_display:30s}  {' '.join(type_parts)}", nl=False)
 58
 59        if col.issue and col.drift and can_auto_fix(col.drift):
 60            _fixable(col.issue)
 61        elif col.issue:
 62            _err(col.issue)
 63        else:
 64            _ok()
 65
 66    # Indexes
 67    if analysis.indexes:
 68        click.echo()
 69        click.secho("  Indexes:", dim=True)
 70
 71    for idx in analysis.indexes:
 72        fields_str = ", ".join(idx.fields) if idx.fields else "expressions"
 73        click.echo(f"    {idx.name}  ({fields_str})", nl=False)
 74
 75        if idx.access_method:
 76            _unmanaged(idx.access_method)
 77        elif idx.issue and idx.drift and can_auto_fix(idx.drift):
 78            _fixable(idx.issue)
 79        elif idx.issue:
 80            _err(idx.issue)
 81        else:
 82            _ok()
 83
 84    # Constraints
 85    if analysis.constraints:
 86        click.echo()
 87        click.secho("  Constraints:", dim=True)
 88
 89    for con in analysis.constraints:
 90        con_label = con.constraint_type.label.upper()
 91        if con.fields:
 92            click.echo(
 93                f"    {con.name}  {con_label} ({', '.join(con.fields)})", nl=False
 94            )
 95        else:
 96            click.echo(f"    {con.name}  {con_label}", nl=False)
 97
 98        if con.constraint_type not in MANAGED_CONSTRAINT_TYPES:
 99            _unmanaged(con.constraint_type.label)
100        elif con.issue and con.drift and can_auto_fix(con.drift):
101            _fixable(con.issue)
102        elif con.issue:
103            _err(con.issue)
104        else:
105            _ok()
106
107
108@click.command()
109@click.argument("model_label", required=False)
110@click.option("--json", "output_json", is_flag=True, help="Output as JSON")
111def schema(model_label: str | None, output_json: bool) -> None:
112    """Show database schema from models, compared against the actual database"""
113    models = models_registry.get_models()
114
115    if model_label:
116        model_label_lower = model_label.lower()
117        models = [
118            m
119            for m in models
120            if m.model_options.label_lower == model_label_lower
121            or m.model_options.db_table == model_label
122            or m.__name__.lower() == model_label_lower
123        ]
124        if not models:
125            raise click.ClickException(f"No model found matching '{model_label}'")
126
127    conn = get_connection()
128
129    # Collect structured results
130    analyses: list[ModelAnalysis] = []
131    with conn.cursor() as cursor:
132        for model in models:
133            analyses.append(analyze_model(conn, cursor, model))
134
135    unknown_tables = get_unknown_tables(conn) if not model_label else []
136    total_issues = sum(a.issue_count for a in analyses) + len(unknown_tables)
137
138    if output_json:
139        output = {
140            "models_checked": len(analyses),
141            "total_issues": total_issues,
142            "models": [a.to_dict() for a in analyses],
143            "unknown_tables": unknown_tables,
144        }
145        click.echo(json.dumps(output, indent=2, default=str))
146    else:
147        for i, analysis in enumerate(analyses):
148            if i > 0:
149                click.echo()
150            _render_model(analysis)
151
152        if unknown_tables:
153            click.echo()
154            click.secho("Unknown tables", bold=True)
155            for table in unknown_tables:
156                click.echo(f"  {table:30s}  ", nl=False)
157                _err("not managed by any model")
158
159        click.echo()
160        parts = []
161        parts.append(f"{len(analyses)} model{'s' if len(analyses) != 1 else ''}")
162        if unknown_tables:
163            parts.append(
164                f"{len(unknown_tables)} unknown table{'s' if len(unknown_tables) != 1 else ''}"
165            )
166        if total_issues == 0:
167            click.secho(
168                f"{', '.join(parts)}, all match the database.",
169                fg="green",
170            )
171        else:
172            click.secho(
173                f"{', '.join(parts)}, {total_issues} issue{'s' if total_issues != 1 else ''}.",
174                fg="red",
175            )
176            sys.exit(1)