1from __future__ import annotations
  2
  3from typing import Any, TypedDict
  4
  5import psycopg.errors
  6
  7# Types
  8
  9
 10class TableOwner(TypedDict):
 11    package_label: str
 12    source: str  # "app" | "package"
 13
 14
 15class CheckItem(TypedDict):
 16    table: str
 17    name: str
 18    detail: str
 19    source: str  # "app" | "package" | ""
 20    package: str  # package label or ""
 21    suggestion: str
 22
 23
 24class CheckResult(TypedDict):
 25    name: str
 26    label: str
 27    status: str  # "ok" | "warning" | "critical" | "skipped" | "error"
 28    summary: str
 29    items: list[CheckItem]
 30    message: str
 31
 32
 33# Table ownership
 34
 35
 36def build_table_owners() -> dict[str, TableOwner]:
 37    """Map table names to their owning package and source (app vs dependency)."""
 38    from plain.packages import packages_registry
 39    from plain.postgres import models_registry
 40
 41    owners: dict[str, TableOwner] = {}
 42    for package_config in packages_registry.get_package_configs():
 43        source = "app" if package_config.name.startswith("app.") else "package"
 44        for model in models_registry.get_models(
 45            package_label=package_config.package_label
 46        ):
 47            owners[model.model_options.db_table] = TableOwner(
 48                package_label=package_config.package_label,
 49                source=source,
 50            )
 51            for field in model._model_meta.local_many_to_many:
 52                owners[field.m2m_db_table()] = TableOwner(
 53                    package_label=package_config.package_label,
 54                    source=source,
 55                )
 56    return owners
 57
 58
 59def _table_source(
 60    table_name: str, table_owners: dict[str, TableOwner]
 61) -> tuple[str, str]:
 62    """Return (source, package) for a table name."""
 63    owner = table_owners.get(table_name)
 64    if owner:
 65        return owner["source"], owner["package_label"]
 66    return "", ""
 67
 68
 69# Context
 70
 71
 72def gather_context(cursor: Any, table_owners: dict[str, TableOwner]) -> dict[str, Any]:
 73    """Collect contextual information that isn't pass/fail but helps interpretation."""
 74    context: dict[str, Any] = {}
 75
 76    # Table sizes
 77    cursor.execute("""
 78        SELECT
 79            c.relname AS table_name,
 80            c.reltuples::bigint AS estimated_row_count,
 81            pg_total_relation_size(c.oid) AS total_size_bytes,
 82            pg_size_pretty(pg_total_relation_size(c.oid)) AS total_size,
 83            pg_size_pretty(pg_relation_size(c.oid)) AS table_size,
 84            pg_size_pretty(pg_indexes_size(c.oid)) AS indexes_size,
 85            (SELECT count(*) FROM pg_catalog.pg_index i WHERE i.indrelid = c.oid) AS index_count
 86        FROM pg_catalog.pg_class c
 87        JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
 88        WHERE c.relkind IN ('r', 'p')
 89          AND n.nspname = 'public'
 90        ORDER BY total_size_bytes DESC
 91    """)
 92    context["tables"] = []
 93    for row in cursor.fetchall():
 94        source, package = _table_source(row[0], table_owners)
 95        context["tables"].append(
 96            {
 97                "table": row[0],
 98                "estimated_rows": max(row[1], 0),
 99                "total_size_bytes": row[2],
100                "total_size": row[3],
101                "table_size": row[4],
102                "indexes_size": row[5],
103                "index_count": row[6],
104                "source": source,
105                "package": package,
106            }
107        )
108
109    # Connection usage
110    cursor.execute("""
111        SELECT
112            (SELECT count(*) FROM pg_catalog.pg_stat_activity
113             WHERE datname = current_database()) AS active_connections,
114            (SELECT setting::int FROM pg_catalog.pg_settings
115             WHERE name = 'max_connections') AS max_connections
116    """)
117    row = cursor.fetchone()
118    context["connections"] = {
119        "active": row[0],
120        "max": row[1],
121    }
122
123    # Stats reset time
124    cursor.execute("""
125        SELECT stats_reset
126        FROM pg_catalog.pg_stat_database
127        WHERE datname = current_database()
128    """)
129    row = cursor.fetchone()
130    if row and row[0]:
131        context["stats_reset"] = row[0].isoformat()
132    else:
133        context["stats_reset"] = None
134
135    # pg_stat_statements availability + slow queries
136    cursor.execute("""
137        SELECT EXISTS (
138            SELECT 1 FROM pg_catalog.pg_extension WHERE extname = 'pg_stat_statements'
139        )
140    """)
141    has_pgss = cursor.fetchone()[0]
142
143    if not has_pgss:
144        context["pg_stat_statements"] = "not_installed"
145        context["slow_queries"] = []
146    else:
147        try:
148            cursor.execute("""
149                SELECT
150                    calls,
151                    ROUND(total_exec_time::numeric, 2) AS total_time_ms,
152                    ROUND(mean_exec_time::numeric, 2) AS mean_time_ms,
153                    ROUND(
154                        (100 * total_exec_time / NULLIF(SUM(total_exec_time) OVER (), 0))::numeric, 2
155                    ) AS pct_total_time,
156                    LEFT(query, 200) AS query
157                FROM pg_stat_statements
158                ORDER BY total_exec_time DESC
159                LIMIT 10
160            """)
161            context["pg_stat_statements"] = "available"
162            context["slow_queries"] = [
163                {
164                    "calls": row[0],
165                    "total_time_ms": float(row[1]),
166                    "mean_time_ms": float(row[2]),
167                    "pct_total_time": float(row[3]),
168                    "query": row[4],
169                }
170                for row in cursor.fetchall()
171            ]
172        except psycopg.errors.DatabaseError:
173            context["pg_stat_statements"] = "no_permission"
174            context["slow_queries"] = []
175
176    return context
177
178
179# Checks
180
181
182def _format_bytes(nbytes: int) -> str:
183    value = float(nbytes)
184    for unit in ("B", "kB", "MB", "GB", "TB"):
185        if abs(value) < 1024:
186            if unit == "B":
187                return f"{int(value)} {unit}"
188            return f"{value:.1f} {unit}"
189        value /= 1024
190    return f"{value:.1f} PB"
191
192
193def _index_suggestion(
194    *,
195    source: str,
196    package: str,
197    app_suggestion: str,
198    unmanaged_suggestion: str,
199) -> str:
200    """Return the appropriate suggestion based on table ownership."""
201    if source == "app":
202        return app_suggestion
203    elif source == "package":
204        return f"Managed by {package} — not directly actionable in your app"
205    return unmanaged_suggestion
206
207
208def check_invalid_indexes(
209    cursor: Any, table_owners: dict[str, TableOwner]
210) -> CheckResult:
211    """Indexes from failed CREATE INDEX CONCURRENTLY — maintained on writes, never used for reads."""
212    cursor.execute("""
213        SELECT
214            s.relname AS table_name,
215            s.indexrelname AS index_name,
216            pg_size_pretty(pg_relation_size(s.indexrelid)) AS index_size
217        FROM pg_catalog.pg_stat_user_indexes s
218        JOIN pg_catalog.pg_index i ON s.indexrelid = i.indexrelid
219        WHERE NOT i.indisvalid
220    """)
221    rows = cursor.fetchall()
222
223    items: list[CheckItem] = []
224    for table_name, index_name, index_size in rows:
225        source, package = _table_source(table_name, table_owners)
226        items.append(
227            CheckItem(
228                table=table_name,
229                name=index_name,
230                detail=index_size,
231                source=source,
232                package=package,
233                suggestion=_index_suggestion(
234                    source=source,
235                    package=package,
236                    app_suggestion=f'Drop and re-run the migration that created it: DROP INDEX CONCURRENTLY "{index_name}";',
237                    unmanaged_suggestion=f'DROP INDEX CONCURRENTLY "{index_name}";',
238                ),
239            )
240        )
241
242    return CheckResult(
243        name="invalid_indexes",
244        label="Invalid indexes",
245        status="warning" if items else "ok",
246        summary=str(len(items)) if items else "none",
247        items=items,
248        message="",
249    )
250
251
252def check_duplicate_indexes(
253    cursor: Any, table_owners: dict[str, TableOwner]
254) -> CheckResult:
255    """Indexes where one is a column-prefix of another on the same table."""
256    cursor.execute("""
257        SELECT
258            ct.relname AS table_name,
259            ci.relname AS index_name,
260            i.indkey::int[] AS column_numbers,
261            i.indclass::int[] AS opclass_numbers,
262            i.indisunique,
263            pg_size_pretty(pg_relation_size(ci.oid)) AS index_size,
264            pg_relation_size(ci.oid) AS index_size_bytes
265        FROM pg_catalog.pg_index i
266        JOIN pg_catalog.pg_class ci ON ci.oid = i.indexrelid
267        JOIN pg_catalog.pg_class ct ON ct.oid = i.indrelid
268        JOIN pg_catalog.pg_namespace n ON n.oid = ct.relnamespace
269        WHERE n.nspname = 'public'
270          AND i.indisvalid
271          AND i.indexprs IS NULL
272          AND i.indpred IS NULL
273        ORDER BY ct.relname, ci.relname
274    """)
275    rows = cursor.fetchall()
276
277    # Group by table
278    by_table: dict[str, list[tuple[str, list[int], list[int], bool, str, int]]] = {}
279    for table_name, index_name, cols, opclasses, is_unique, size, size_bytes in rows:
280        by_table.setdefault(table_name, []).append(
281            (index_name, cols, opclasses, is_unique, size, size_bytes)
282        )
283
284    items: list[CheckItem] = []
285    flagged: set[str] = set()  # avoid reporting the same index multiple times
286    for table_name, indexes in by_table.items():
287        for i, idx_a in enumerate(indexes):
288            for idx_b in indexes[i + 1 :]:
289                # Check both directions: is either a prefix of the other?
290                for shorter, longer in [(idx_a, idx_b), (idx_b, idx_a)]:
291                    name_s, cols_s, ops_s, unique_s, size_s, _ = shorter
292                    name_l, cols_l, ops_l, _, _, _ = longer
293                    if (
294                        name_s not in flagged
295                        and len(cols_s) < len(cols_l)
296                        and cols_l[: len(cols_s)] == cols_s
297                        and ops_l[: len(cols_s)] == ops_s
298                        and not unique_s  # unique indexes serve a constraint purpose
299                    ):
300                        source, package = _table_source(table_name, table_owners)
301                        app_suggestion = f'Remove "{name_s}" from model indexes/constraints, then run plain postgres sync'
302
303                        items.append(
304                            CheckItem(
305                                table=table_name,
306                                name=name_s,
307                                detail=f"{size_s}, redundant with {name_l}",
308                                source=source,
309                                package=package,
310                                suggestion=_index_suggestion(
311                                    source=source,
312                                    package=package,
313                                    app_suggestion=app_suggestion,
314                                    unmanaged_suggestion=f'DROP INDEX CONCURRENTLY "{name_s}";',
315                                ),
316                            )
317                        )
318                        flagged.add(name_s)
319
320    return CheckResult(
321        name="duplicate_indexes",
322        label="Duplicate indexes",
323        status="warning" if items else "ok",
324        summary=str(len(items)) if items else "none",
325        items=items,
326        message="",
327    )
328
329
330def check_unused_indexes(
331    cursor: Any, table_owners: dict[str, TableOwner]
332) -> CheckResult:
333    """Indexes with zero scans since stats reset, excluding unique/expression/constraint-backing.
334
335    Also excludes indexes that are the sole coverage for a FK column — even at
336    0 scans, FK columns should always have index coverage for referential
337    integrity enforcement (parent DELETE/UPDATE scans the child table).
338    """
339    cursor.execute("""
340        SELECT
341            s.relname AS table_name,
342            s.indexrelname AS index_name,
343            pg_size_pretty(pg_relation_size(s.indexrelid)) AS index_size,
344            pg_relation_size(s.indexrelid) AS index_size_bytes
345        FROM pg_catalog.pg_stat_user_indexes s
346        JOIN pg_catalog.pg_index i ON s.indexrelid = i.indexrelid
347        WHERE s.idx_scan = 0
348          AND pg_relation_size(s.indexrelid) > 1048576
349          AND 0 <> ALL (i.indkey)
350          AND NOT i.indisunique
351          AND NOT EXISTS (
352              SELECT 1 FROM pg_catalog.pg_constraint c
353              WHERE c.conindid = s.indexrelid
354          )
355          AND i.indisvalid
356          AND NOT (
357              -- Leading column is a FK column on this table
358              EXISTS (
359                  SELECT 1 FROM pg_catalog.pg_constraint fk
360                  WHERE fk.conrelid = i.indrelid
361                    AND fk.contype = 'f'
362                    AND array_length(fk.conkey, 1) = 1
363                    AND fk.conkey[1] = i.indkey[0]
364              )
365              -- And no other valid index covers that column as its leading column
366              AND NOT EXISTS (
367                  SELECT 1 FROM pg_catalog.pg_index other
368                  WHERE other.indrelid = i.indrelid
369                    AND other.indexrelid != i.indexrelid
370                    AND other.indisvalid
371                    AND other.indkey[0] = i.indkey[0]
372              )
373          )
374        ORDER BY pg_relation_size(s.indexrelid) DESC
375    """)
376    rows = cursor.fetchall()
377
378    total_bytes = 0
379    items: list[CheckItem] = []
380    for table_name, index_name, index_size, index_size_bytes in rows:
381        total_bytes += index_size_bytes
382        source, package = _table_source(table_name, table_owners)
383        items.append(
384            CheckItem(
385                table=table_name,
386                name=index_name,
387                detail=f"{index_size}, 0 scans",
388                source=source,
389                package=package,
390                suggestion=_index_suggestion(
391                    source=source,
392                    package=package,
393                    app_suggestion=f'Remove "{index_name}" from model indexes/constraints, then run plain postgres sync',
394                    unmanaged_suggestion=f'DROP INDEX CONCURRENTLY "{index_name}";',
395                ),
396            )
397        )
398
399    if items:
400        summary = f"{len(items)} ({_format_bytes(total_bytes)})"
401    else:
402        summary = "none"
403
404    return CheckResult(
405        name="unused_indexes",
406        label="Unused indexes",
407        status="warning" if items else "ok",
408        summary=summary,
409        items=items,
410        message="",
411    )
412
413
414def check_missing_fk_indexes(
415    cursor: Any, table_owners: dict[str, TableOwner]
416) -> CheckResult:
417    """Foreign key columns without a leading index — JOINs on these do sequential scans."""
418    cursor.execute("""
419        SELECT
420            ct.relname AS table_name,
421            a.attname AS column_name,
422            cc.relname AS referenced_table,
423            c.conname AS constraint_name
424        FROM pg_catalog.pg_constraint c
425        JOIN pg_catalog.pg_class ct ON ct.oid = c.conrelid
426        JOIN pg_catalog.pg_namespace n ON n.oid = ct.relnamespace
427        JOIN pg_catalog.pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = c.conkey[1]
428        JOIN pg_catalog.pg_class cc ON cc.oid = c.confrelid
429        WHERE c.contype = 'f'
430          AND array_length(c.conkey, 1) = 1
431          AND n.nspname = 'public'
432          AND NOT EXISTS (
433              SELECT 1
434              FROM pg_catalog.pg_index i
435              WHERE i.indrelid = c.conrelid
436                AND i.indkey[0] = c.conkey[1]
437          )
438        ORDER BY ct.relname, a.attname
439    """)
440    rows = cursor.fetchall()
441
442    items: list[CheckItem] = []
443    for table_name, column_name, referenced_table, constraint_name in rows:
444        source, package = _table_source(table_name, table_owners)
445        items.append(
446            CheckItem(
447                table=table_name,
448                name=f"{table_name}.{column_name}",
449                detail=f"references {referenced_table}",
450                source=source,
451                package=package,
452                suggestion=_index_suggestion(
453                    source=source,
454                    package=package,
455                    app_suggestion=f'Add an Index on ["{column_name}"] to the model, then run plain postgres sync',
456                    unmanaged_suggestion=f'CREATE INDEX CONCURRENTLY ON "{table_name}" ("{column_name}");',
457                ),
458            )
459        )
460
461    return CheckResult(
462        name="missing_fk_indexes",
463        label="Missing FK indexes",
464        status="warning" if items else "ok",
465        summary=str(len(items)) if items else "none",
466        items=items,
467        message="",
468    )
469
470
471def check_sequence_exhaustion(
472    cursor: Any, table_owners: dict[str, TableOwner]
473) -> CheckResult:
474    """Identity sequences approaching their type max."""
475    cursor.execute("""
476        WITH sequences AS (
477            SELECT
478                s.seqrelid,
479                s.seqtypid,
480                s.seqmax,
481                ps.last_value,
482                ps.start_value
483            FROM pg_catalog.pg_sequence s
484            JOIN pg_catalog.pg_class c ON c.oid = s.seqrelid
485            JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
486            JOIN pg_sequences ps ON ps.schemaname = n.nspname
487                AND ps.sequencename = c.relname
488        )
489        SELECT
490            d.refobjid::regclass AS table_name,
491            a.attname AS column_name,
492            seq.seqtypid::regtype AS data_type,
493            COALESCE(seq.last_value, seq.start_value) AS current_value,
494            seq.seqmax AS max_value,
495            ROUND(
496                100.0 * COALESCE(seq.last_value, seq.start_value) / seq.seqmax, 2
497            ) AS pct_used
498        FROM sequences seq
499        JOIN pg_catalog.pg_depend d ON d.objid = seq.seqrelid
500            AND d.deptype IN ('a', 'i')
501            AND d.classid = 'pg_class'::regclass
502            AND d.refclassid = 'pg_class'::regclass
503        JOIN pg_catalog.pg_attribute a ON a.attrelid = d.refobjid
504            AND a.attnum = d.refobjsubid
505        WHERE ROUND(
506            100.0 * COALESCE(seq.last_value, seq.start_value) / seq.seqmax, 2
507        ) > 50
508        ORDER BY pct_used DESC
509    """)
510    rows = cursor.fetchall()
511
512    items: list[CheckItem] = []
513    worst_pct = 0.0
514    for table_name, column_name, data_type, current_value, max_value, pct_used in rows:
515        pct = float(pct_used)
516        worst_pct = max(worst_pct, pct)
517        table_str = str(table_name)
518        source, package = _table_source(table_str, table_owners)
519        items.append(
520            CheckItem(
521                table=table_str,
522                name=f"{table_str}.{column_name}",
523                detail=f"{data_type}, {pct_used}% used ({current_value:,} / {max_value:,})",
524                source=source,
525                package=package,
526                suggestion=f'ALTER TABLE "{table_str}" ALTER COLUMN "{column_name}" SET DATA TYPE bigint;',
527            )
528        )
529
530    if worst_pct >= 90:
531        status = "critical"
532    elif items:
533        status = "warning"
534    else:
535        status = "ok"
536
537    return CheckResult(
538        name="sequence_exhaustion",
539        label="Sequence exhaustion",
540        status=status,
541        summary=f"{worst_pct}% worst" if items else "all ok",
542        items=items,
543        message="",
544    )
545
546
547def check_xid_wraparound(
548    cursor: Any, table_owners: dict[str, TableOwner]
549) -> CheckResult:
550    """Transaction ID age approaching the 2B wraparound limit."""
551    cursor.execute("""
552        SELECT
553            datname,
554            age(datfrozenxid) AS xid_age,
555            ROUND(100.0 * age(datfrozenxid) / 2147483648, 2) AS pct_towards_wraparound
556        FROM pg_catalog.pg_database
557        WHERE datallowconn
558        ORDER BY age(datfrozenxid) DESC
559    """)
560    rows = cursor.fetchall()
561
562    items: list[CheckItem] = []
563    worst_pct = 0.0
564    for datname, xid_age, pct in rows:
565        pct_float = float(pct)
566        if pct_float > 25:
567            worst_pct = max(worst_pct, pct_float)
568            items.append(
569                CheckItem(
570                    table="",
571                    name=datname,
572                    detail=f"{pct}% towards wraparound ({xid_age:,} XIDs)",
573                    source="",
574                    package="",
575                    suggestion="Investigate autovacuum health, consider VACUUM FREEZE",
576                )
577            )
578
579    if worst_pct >= 40:
580        status = "critical"
581    elif items:
582        status = "warning"
583    else:
584        status = "ok"
585
586    # Show the current database's percentage even when ok
587    current_pct = float(rows[0][2]) if rows else 0
588    summary = f"{current_pct}%" if not items else f"{worst_pct}%"
589
590    return CheckResult(
591        name="xid_wraparound",
592        label="XID wraparound",
593        status=status,
594        summary=summary,
595        items=items,
596        message="",
597    )
598
599
600def _check_hit_ratio(
601    cursor: Any,
602    *,
603    name: str,
604    label: str,
605    catalog_table: str,
606    hit_col: str,
607    read_col: str,
608) -> CheckResult:
609    """Hit ratio check — below 98.5% indicates insufficient shared_buffers or RAM."""
610    cursor.execute(f"""
611        SELECT ROUND(
612            100.0 * SUM({hit_col}) / NULLIF(SUM({hit_col}) + SUM({read_col}), 0), 2
613        ) FROM {catalog_table}
614    """)
615    row = cursor.fetchone()
616    ratio = float(row[0]) if row and row[0] is not None else 100.0
617
618    return CheckResult(
619        name=name,
620        label=label,
621        status="warning" if ratio < 98.5 else "ok",
622        summary=f"{ratio}%",
623        items=[],
624        message="Consider increasing shared_buffers or adding more RAM"
625        if ratio < 98.5
626        else "",
627    )
628
629
630def check_cache_hit_ratio(
631    cursor: Any, table_owners: dict[str, TableOwner]
632) -> CheckResult:
633    return _check_hit_ratio(
634        cursor,
635        name="cache_hit_ratio",
636        label="Cache hit ratio",
637        catalog_table="pg_catalog.pg_statio_user_tables",
638        hit_col="heap_blks_hit",
639        read_col="heap_blks_read",
640    )
641
642
643def check_index_hit_ratio(
644    cursor: Any, table_owners: dict[str, TableOwner]
645) -> CheckResult:
646    return _check_hit_ratio(
647        cursor,
648        name="index_hit_ratio",
649        label="Index hit ratio",
650        catalog_table="pg_catalog.pg_statio_user_indexes",
651        hit_col="idx_blks_hit",
652        read_col="idx_blks_read",
653    )
654
655
656def check_vacuum_health(
657    cursor: Any, table_owners: dict[str, TableOwner]
658) -> CheckResult:
659    """Tables with significant dead tuple accumulation."""
660    cursor.execute("""
661        SELECT
662            relname AS table_name,
663            n_dead_tup,
664            n_live_tup,
665            CASE WHEN n_live_tup > 0
666                THEN ROUND(100.0 * n_dead_tup / n_live_tup, 2)
667                ELSE 0
668            END AS dead_tuple_pct,
669            last_autovacuum
670        FROM pg_catalog.pg_stat_user_tables
671        WHERE n_dead_tup > 1000
672        ORDER BY n_dead_tup DESC
673    """)
674    rows = cursor.fetchall()
675
676    items: list[CheckItem] = []
677    for table_name, n_dead, n_live, dead_pct, last_vacuum in rows:
678        pct = float(dead_pct)
679        if pct > 10:
680            vacuum_info = str(last_vacuum)[:19] if last_vacuum else "never"
681            source, package = _table_source(table_name, table_owners)
682            items.append(
683                CheckItem(
684                    table=table_name,
685                    name=table_name,
686                    detail=f"{n_dead:,} dead tuples ({dead_pct}% of live), last vacuum: {vacuum_info}",
687                    source=source,
688                    package=package,
689                    suggestion="Investigate autovacuum — it may be falling behind on this table",
690                )
691            )
692
693    return CheckResult(
694        name="vacuum_health",
695        label="Vacuum health",
696        status="warning" if items else "ok",
697        summary=f"{len(items)} tables need attention" if items else "all ok",
698        items=items,
699        message="",
700    )
701
702
703ALL_CHECKS = [
704    check_invalid_indexes,
705    check_duplicate_indexes,
706    check_unused_indexes,
707    check_missing_fk_indexes,
708    check_sequence_exhaustion,
709    check_xid_wraparound,
710    check_cache_hit_ratio,
711    check_index_hit_ratio,
712    check_vacuum_health,
713]
714
715
716def run_all_checks(
717    cursor: Any, table_owners: dict[str, TableOwner]
718) -> tuple[list[CheckResult], dict[str, Any]]:
719    results: list[CheckResult] = []
720    for check_fn in ALL_CHECKS:
721        try:
722            result = check_fn(cursor, table_owners)
723        except Exception as e:
724            result = CheckResult(
725                name=check_fn.__name__.removeprefix("check_"),
726                label=check_fn.__name__.removeprefix("check_")
727                .replace("_", " ")
728                .title(),
729                status="error",
730                summary="error",
731                items=[],
732                message=str(e),
733            )
734        results.append(result)
735
736    context = gather_context(cursor, table_owners)
737    return results, context