v0.146.0
  1"""
  2DDL generation helpers for convergence and schema management.
  3
  4Higher-level SQL builders that need database connections, ORM query machinery,
  5and constraint/index types. Separated from dialect.py (which is low-level and
  6imported everywhere) to avoid circular imports.
  7"""
  8
  9from __future__ import annotations
 10
 11from typing import TYPE_CHECKING, Any
 12
 13import psycopg.sql
 14
 15from plain.postgres.db import get_connection
 16from plain.postgres.dialect import quote_name
 17from plain.postgres.expressions import ExpressionList
 18from plain.postgres.query_utils import Q
 19from plain.postgres.sql.query import Query
 20
 21if TYPE_CHECKING:
 22    from plain.postgres.base import Model
 23    from plain.postgres.constraints import Deferrable
 24
 25
 26def quote_value(value: Any) -> str:
 27    """Quote a value for safe inclusion in a SQL string.
 28
 29    Not safe against injection from user code — intended only for SQL scripts,
 30    default values, and constraint expressions (which are not user-defined).
 31    """
 32    if isinstance(value, str):
 33        value = value.replace("%", "%%")
 34    conn = get_connection()
 35    return psycopg.sql.quote(value, conn.connection)
 36
 37
 38def deferrable_sql(deferrable: Deferrable | None) -> str:
 39    """Return the DEFERRABLE clause for a constraint, or empty string."""
 40    from plain.postgres.constraints import (
 41        Deferrable,  # circular: constraints imports ddl
 42    )
 43
 44    if deferrable is None:
 45        return ""
 46    if deferrable == Deferrable.DEFERRED:
 47        return " DEFERRABLE INITIALLY DEFERRED"
 48    if deferrable == Deferrable.IMMEDIATE:
 49        return " DEFERRABLE INITIALLY IMMEDIATE"
 50    return ""
 51
 52
 53def compile_expression_sql(model: type[Model], expression_q: Q) -> str:
 54    """Compile a Q expression to a SQL string with quoted literal params."""
 55    query = Query(model=model, alias_cols=False)
 56    where = query.build_where(expression_q)
 57    compiler = query.get_compiler()
 58    conn = get_connection()
 59    sql, params = where.as_sql(compiler, conn)
 60    return sql % tuple(quote_value(p) for p in params)
 61
 62
 63def compile_database_default_sql(expression: Any) -> str:
 64    """Compile a DB-default expression (Now, GenRandomUUID) to parameter-free DDL SQL."""
 65    compiler = Query(None).get_compiler()
 66    conn = get_connection()
 67    sql, params = expression.as_sql(compiler, conn)
 68    if params:
 69        raise ValueError(
 70            f"Expression defaults must compile to parameter-free SQL; "
 71            f"got params={params!r} for {expression!r}."
 72        )
 73    return sql
 74
 75
 76def compile_literal_default_sql(field: Any) -> str:
 77    """Compile a field's literal ``default=`` value as DDL-ready SQL.
 78
 79    Result is executed directly (no ``%s`` interpolation), so we use
 80    ``psycopg.sql.quote`` — not ``quote_value``, which doubles ``%``.
 81    """
 82    conn = get_connection()
 83    prepared = field.get_db_prep_save(field.default, conn)
 84    return psycopg.sql.quote(prepared, conn.connection)
 85
 86
 87def compile_index_expressions_sql(
 88    model: type[Model], expressions: tuple[Any, ...]
 89) -> str:
 90    """Compile index/constraint expressions (e.g. F(), OrderBy) to SQL."""
 91    from plain.postgres.indexes import IndexExpression  # circular: indexes imports ddl
 92
 93    query = Query(model, alias_cols=False)
 94    compiler = query.get_compiler()
 95    index_expressions = [IndexExpression(expr) for expr in expressions]
 96    expr_list = ExpressionList(*index_expressions).resolve_expression(query)
 97    sql, params = compiler.compile(expr_list)
 98    return sql % tuple(quote_value(p) for p in params)
 99
100
101def build_include_sql(model: type[Model], include: tuple[str, ...]) -> str:
102    """Build the INCLUDE clause for an index or constraint, or empty string."""
103    if not include:
104        return ""
105    include_cols = [
106        quote_name(model._model_meta.get_forward_field(f).column) for f in include
107    ]
108    return " INCLUDE ({})".format(", ".join(include_cols))