Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3from collections import namedtuple
  4from typing import Any
  5
  6from plain.models.backends.base.introspection import BaseDatabaseIntrospection
  7from plain.models.backends.base.introspection import FieldInfo as BaseFieldInfo
  8from plain.models.backends.base.introspection import TableInfo as BaseTableInfo
  9from plain.models.indexes import Index
 10
 11FieldInfo = namedtuple("FieldInfo", BaseFieldInfo._fields + ("is_autofield", "comment"))
 12TableInfo = namedtuple("TableInfo", BaseTableInfo._fields + ("comment",))
 13
 14
 15class DatabaseIntrospection(BaseDatabaseIntrospection):
 16    # Maps type codes to Plain Field types.
 17    data_types_reverse = {
 18        16: "BooleanField",
 19        17: "BinaryField",
 20        20: "BigIntegerField",
 21        21: "SmallIntegerField",
 22        23: "IntegerField",
 23        25: "TextField",
 24        700: "FloatField",
 25        701: "FloatField",
 26        869: "GenericIPAddressField",
 27        1042: "CharField",  # blank-padded
 28        1043: "CharField",
 29        1082: "DateField",
 30        1083: "TimeField",
 31        1114: "DateTimeField",
 32        1184: "DateTimeField",
 33        1186: "DurationField",
 34        1266: "TimeField",
 35        1700: "DecimalField",
 36        2950: "UUIDField",
 37        3802: "JSONField",
 38    }
 39    # A hook for subclasses.
 40    index_default_access_method = "btree"
 41
 42    ignored_tables: list[str] = []
 43
 44    def get_field_type(self, data_type: Any, description: Any) -> str:
 45        field_type = super().get_field_type(data_type, description)
 46        if description.is_autofield or (
 47            # Required for pre-Plain 4.1 serial columns.
 48            description.default and "nextval" in description.default
 49        ):
 50            if field_type == "BigIntegerField":
 51                return "PrimaryKeyField"
 52        return field_type
 53
 54    def get_table_list(self, cursor: Any) -> list[TableInfo]:
 55        """Return a list of table and view names in the current database."""
 56        cursor.execute(
 57            """
 58            SELECT
 59                c.relname,
 60                CASE
 61                    WHEN c.relispartition THEN 'p'
 62                    WHEN c.relkind IN ('m', 'v') THEN 'v'
 63                    ELSE 't'
 64                END,
 65                obj_description(c.oid, 'pg_class')
 66            FROM pg_catalog.pg_class c
 67            LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
 68            WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
 69                AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
 70                AND pg_catalog.pg_table_is_visible(c.oid)
 71        """
 72        )
 73        return [
 74            TableInfo(*row)
 75            for row in cursor.fetchall()
 76            if row[0] not in self.ignored_tables
 77        ]
 78
 79    def get_table_description(self, cursor: Any, table_name: str) -> list[FieldInfo]:
 80        """
 81        Return a description of the table with the DB-API cursor.description
 82        interface.
 83        """
 84        # Query the pg_catalog tables as cursor.description does not reliably
 85        # return the nullable property and information_schema.columns does not
 86        # contain details of materialized views.
 87        cursor.execute(
 88            """
 89            SELECT
 90                a.attname AS column_name,
 91                NOT (a.attnotnull OR (t.typtype = 'd' AND t.typnotnull)) AS is_nullable,
 92                pg_get_expr(ad.adbin, ad.adrelid) AS column_default,
 93                CASE WHEN collname = 'default' THEN NULL ELSE collname END AS collation,
 94                a.attidentity != '' AS is_autofield,
 95                col_description(a.attrelid, a.attnum) AS column_comment
 96            FROM pg_attribute a
 97            LEFT JOIN pg_attrdef ad ON a.attrelid = ad.adrelid AND a.attnum = ad.adnum
 98            LEFT JOIN pg_collation co ON a.attcollation = co.oid
 99            JOIN pg_type t ON a.atttypid = t.oid
100            JOIN pg_class c ON a.attrelid = c.oid
101            JOIN pg_namespace n ON c.relnamespace = n.oid
102            WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
103                AND c.relname = %s
104                AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
105                AND pg_catalog.pg_table_is_visible(c.oid)
106        """,
107            [table_name],
108        )
109        field_map = {line[0]: line[1:] for line in cursor.fetchall()}
110        cursor.execute(
111            f"SELECT * FROM {self.connection.ops.quote_name(table_name)} LIMIT 1"
112        )
113        return [
114            FieldInfo(
115                line.name,
116                line.type_code,
117                line.internal_size if line.display_size is None else line.display_size,
118                line.internal_size,
119                line.precision,
120                line.scale,
121                *field_map[line.name],
122            )
123            for line in cursor.description
124        ]
125
126    def get_sequences(
127        self, cursor: Any, table_name: str, table_fields: tuple[Any, ...] = ()
128    ) -> list[dict[str, Any]]:
129        cursor.execute(
130            """
131            SELECT
132                s.relname AS sequence_name,
133                a.attname AS colname
134            FROM
135                pg_class s
136                JOIN pg_depend d ON d.objid = s.oid
137                    AND d.classid = 'pg_class'::regclass
138                    AND d.refclassid = 'pg_class'::regclass
139                JOIN pg_attribute a ON d.refobjid = a.attrelid
140                    AND d.refobjsubid = a.attnum
141                JOIN pg_class tbl ON tbl.oid = d.refobjid
142                    AND tbl.relname = %s
143                    AND pg_catalog.pg_table_is_visible(tbl.oid)
144            WHERE
145                s.relkind = 'S';
146        """,
147            [table_name],
148        )
149        return [
150            {"name": row[0], "table": table_name, "column": row[1]}
151            for row in cursor.fetchall()
152        ]
153
154    def get_relations(self, cursor: Any, table_name: str) -> dict[str, tuple[str, str]]:
155        """
156        Return a dictionary of {field_name: (field_name_other_table, other_table)}
157        representing all foreign keys in the given table.
158        """
159        cursor.execute(
160            """
161            SELECT a1.attname, c2.relname, a2.attname
162            FROM pg_constraint con
163            LEFT JOIN pg_class c1 ON con.conrelid = c1.oid
164            LEFT JOIN pg_class c2 ON con.confrelid = c2.oid
165            LEFT JOIN
166                pg_attribute a1 ON c1.oid = a1.attrelid AND a1.attnum = con.conkey[1]
167            LEFT JOIN
168                pg_attribute a2 ON c2.oid = a2.attrelid AND a2.attnum = con.confkey[1]
169            WHERE
170                c1.relname = %s AND
171                con.contype = 'f' AND
172                c1.relnamespace = c2.relnamespace AND
173                pg_catalog.pg_table_is_visible(c1.oid)
174        """,
175            [table_name],
176        )
177        return {row[0]: (row[2], row[1]) for row in cursor.fetchall()}
178
179    def get_constraints(
180        self, cursor: Any, table_name: str
181    ) -> dict[str, dict[str, Any]]:
182        """
183        Retrieve any constraints or keys (unique, pk, fk, check, index) across
184        one or more columns. Also retrieve the definition of expression-based
185        indexes.
186        """
187        constraints: dict[str, dict[str, Any]] = {}
188        # Loop over the key table, collecting things as constraints. The column
189        # array must return column names in the same order in which they were
190        # created.
191        cursor.execute(
192            """
193            SELECT
194                c.conname,
195                array(
196                    SELECT attname
197                    FROM unnest(c.conkey) WITH ORDINALITY cols(colid, arridx)
198                    JOIN pg_attribute AS ca ON cols.colid = ca.attnum
199                    WHERE ca.attrelid = c.conrelid
200                    ORDER BY cols.arridx
201                ),
202                c.contype,
203                (SELECT fkc.relname || '.' || fka.attname
204                FROM pg_attribute AS fka
205                JOIN pg_class AS fkc ON fka.attrelid = fkc.oid
206                WHERE fka.attrelid = c.confrelid AND fka.attnum = c.confkey[1]),
207                cl.reloptions
208            FROM pg_constraint AS c
209            JOIN pg_class AS cl ON c.conrelid = cl.oid
210            WHERE cl.relname = %s AND pg_catalog.pg_table_is_visible(cl.oid)
211        """,
212            [table_name],
213        )
214        for constraint, columns, kind, used_cols, options in cursor.fetchall():
215            constraints[constraint] = {
216                "columns": columns,
217                "primary_key": kind == "p",
218                "unique": kind in ["p", "u"],
219                "foreign_key": tuple(used_cols.split(".", 1)) if kind == "f" else None,
220                "check": kind == "c",
221                "index": False,
222                "definition": None,
223                "options": options,
224            }
225        # Now get indexes
226        cursor.execute(
227            """
228            SELECT
229                indexname,
230                array_agg(attname ORDER BY arridx),
231                indisunique,
232                indisprimary,
233                array_agg(ordering ORDER BY arridx),
234                amname,
235                exprdef,
236                s2.attoptions
237            FROM (
238                SELECT
239                    c2.relname as indexname, idx.*, attr.attname, am.amname,
240                    CASE
241                        WHEN idx.indexprs IS NOT NULL THEN
242                            pg_get_indexdef(idx.indexrelid)
243                    END AS exprdef,
244                    CASE am.amname
245                        WHEN %s THEN
246                            CASE (option & 1)
247                                WHEN 1 THEN 'DESC' ELSE 'ASC'
248                            END
249                    END as ordering,
250                    c2.reloptions as attoptions
251                FROM (
252                    SELECT *
253                    FROM
254                        pg_index i,
255                        unnest(i.indkey, i.indoption)
256                            WITH ORDINALITY koi(key, option, arridx)
257                ) idx
258                LEFT JOIN pg_class c ON idx.indrelid = c.oid
259                LEFT JOIN pg_class c2 ON idx.indexrelid = c2.oid
260                LEFT JOIN pg_am am ON c2.relam = am.oid
261                LEFT JOIN
262                    pg_attribute attr ON attr.attrelid = c.oid AND attr.attnum = idx.key
263                WHERE c.relname = %s AND pg_catalog.pg_table_is_visible(c.oid)
264            ) s2
265            GROUP BY indexname, indisunique, indisprimary, amname, exprdef, attoptions;
266        """,
267            [self.index_default_access_method, table_name],
268        )
269        for (
270            index,
271            columns,
272            unique,
273            primary,
274            orders,
275            type_,
276            definition,
277            options,
278        ) in cursor.fetchall():
279            if index not in constraints:
280                basic_index = (
281                    type_ == self.index_default_access_method and options is None
282                )
283                constraints[index] = {
284                    "columns": columns if columns != [None] else [],
285                    "orders": orders if orders != [None] else [],
286                    "primary_key": primary,
287                    "unique": unique,
288                    "foreign_key": None,
289                    "check": False,
290                    "index": True,
291                    "type": Index.suffix if basic_index else type_,
292                    "definition": definition,
293                    "options": options,
294                }
295        return constraints