Plain is headed towards 1.0! Subscribe for development updates →

  1from collections import namedtuple
  2
  3from plain.models.backends.base.introspection import BaseDatabaseIntrospection
  4from plain.models.backends.base.introspection import FieldInfo as BaseFieldInfo
  5from plain.models.backends.base.introspection import TableInfo as BaseTableInfo
  6from plain.models.indexes import Index
  7
  8FieldInfo = namedtuple("FieldInfo", BaseFieldInfo._fields + ("is_autofield", "comment"))
  9TableInfo = namedtuple("TableInfo", BaseTableInfo._fields + ("comment",))
 10
 11
 12class DatabaseIntrospection(BaseDatabaseIntrospection):
 13    # Maps type codes to Plain Field types.
 14    data_types_reverse = {
 15        16: "BooleanField",
 16        17: "BinaryField",
 17        20: "BigIntegerField",
 18        21: "SmallIntegerField",
 19        23: "IntegerField",
 20        25: "TextField",
 21        700: "FloatField",
 22        701: "FloatField",
 23        869: "GenericIPAddressField",
 24        1042: "CharField",  # blank-padded
 25        1043: "CharField",
 26        1082: "DateField",
 27        1083: "TimeField",
 28        1114: "DateTimeField",
 29        1184: "DateTimeField",
 30        1186: "DurationField",
 31        1266: "TimeField",
 32        1700: "DecimalField",
 33        2950: "UUIDField",
 34        3802: "JSONField",
 35    }
 36    # A hook for subclasses.
 37    index_default_access_method = "btree"
 38
 39    ignored_tables = []
 40
 41    def get_field_type(self, data_type, description):
 42        field_type = super().get_field_type(data_type, description)
 43        if description.is_autofield or (
 44            # Required for pre-Plain 4.1 serial columns.
 45            description.default and "nextval" in description.default
 46        ):
 47            if field_type == "IntegerField":
 48                return "AutoField"
 49            elif field_type == "BigIntegerField":
 50                return "BigAutoField"
 51            elif field_type == "SmallIntegerField":
 52                return "SmallAutoField"
 53        return field_type
 54
 55    def get_table_list(self, cursor):
 56        """Return a list of table and view names in the current database."""
 57        cursor.execute(
 58            """
 59            SELECT
 60                c.relname,
 61                CASE
 62                    WHEN c.relispartition THEN 'p'
 63                    WHEN c.relkind IN ('m', 'v') THEN 'v'
 64                    ELSE 't'
 65                END,
 66                obj_description(c.oid, 'pg_class')
 67            FROM pg_catalog.pg_class c
 68            LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
 69            WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
 70                AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
 71                AND pg_catalog.pg_table_is_visible(c.oid)
 72        """
 73        )
 74        return [
 75            TableInfo(*row)
 76            for row in cursor.fetchall()
 77            if row[0] not in self.ignored_tables
 78        ]
 79
 80    def get_table_description(self, cursor, table_name):
 81        """
 82        Return a description of the table with the DB-API cursor.description
 83        interface.
 84        """
 85        # Query the pg_catalog tables as cursor.description does not reliably
 86        # return the nullable property and information_schema.columns does not
 87        # contain details of materialized views.
 88        cursor.execute(
 89            """
 90            SELECT
 91                a.attname AS column_name,
 92                NOT (a.attnotnull OR (t.typtype = 'd' AND t.typnotnull)) AS is_nullable,
 93                pg_get_expr(ad.adbin, ad.adrelid) AS column_default,
 94                CASE WHEN collname = 'default' THEN NULL ELSE collname END AS collation,
 95                a.attidentity != '' AS is_autofield,
 96                col_description(a.attrelid, a.attnum) AS column_comment
 97            FROM pg_attribute a
 98            LEFT JOIN pg_attrdef ad ON a.attrelid = ad.adrelid AND a.attnum = ad.adnum
 99            LEFT JOIN pg_collation co ON a.attcollation = co.oid
100            JOIN pg_type t ON a.atttypid = t.oid
101            JOIN pg_class c ON a.attrelid = c.oid
102            JOIN pg_namespace n ON c.relnamespace = n.oid
103            WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
104                AND c.relname = %s
105                AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
106                AND pg_catalog.pg_table_is_visible(c.oid)
107        """,
108            [table_name],
109        )
110        field_map = {line[0]: line[1:] for line in cursor.fetchall()}
111        cursor.execute(
112            f"SELECT * FROM {self.connection.ops.quote_name(table_name)} LIMIT 1"
113        )
114        return [
115            FieldInfo(
116                line.name,
117                line.type_code,
118                line.internal_size if line.display_size is None else line.display_size,
119                line.internal_size,
120                line.precision,
121                line.scale,
122                *field_map[line.name],
123            )
124            for line in cursor.description
125        ]
126
127    def get_sequences(self, cursor, table_name, table_fields=()):
128        cursor.execute(
129            """
130            SELECT
131                s.relname AS sequence_name,
132                a.attname AS colname
133            FROM
134                pg_class s
135                JOIN pg_depend d ON d.objid = s.oid
136                    AND d.classid = 'pg_class'::regclass
137                    AND d.refclassid = 'pg_class'::regclass
138                JOIN pg_attribute a ON d.refobjid = a.attrelid
139                    AND d.refobjsubid = a.attnum
140                JOIN pg_class tbl ON tbl.oid = d.refobjid
141                    AND tbl.relname = %s
142                    AND pg_catalog.pg_table_is_visible(tbl.oid)
143            WHERE
144                s.relkind = 'S';
145        """,
146            [table_name],
147        )
148        return [
149            {"name": row[0], "table": table_name, "column": row[1]}
150            for row in cursor.fetchall()
151        ]
152
153    def get_relations(self, cursor, table_name):
154        """
155        Return a dictionary of {field_name: (field_name_other_table, other_table)}
156        representing all foreign keys in the given table.
157        """
158        cursor.execute(
159            """
160            SELECT a1.attname, c2.relname, a2.attname
161            FROM pg_constraint con
162            LEFT JOIN pg_class c1 ON con.conrelid = c1.oid
163            LEFT JOIN pg_class c2 ON con.confrelid = c2.oid
164            LEFT JOIN
165                pg_attribute a1 ON c1.oid = a1.attrelid AND a1.attnum = con.conkey[1]
166            LEFT JOIN
167                pg_attribute a2 ON c2.oid = a2.attrelid AND a2.attnum = con.confkey[1]
168            WHERE
169                c1.relname = %s AND
170                con.contype = 'f' AND
171                c1.relnamespace = c2.relnamespace AND
172                pg_catalog.pg_table_is_visible(c1.oid)
173        """,
174            [table_name],
175        )
176        return {row[0]: (row[2], row[1]) for row in cursor.fetchall()}
177
178    def get_constraints(self, cursor, table_name):
179        """
180        Retrieve any constraints or keys (unique, pk, fk, check, index) across
181        one or more columns. Also retrieve the definition of expression-based
182        indexes.
183        """
184        constraints = {}
185        # Loop over the key table, collecting things as constraints. The column
186        # array must return column names in the same order in which they were
187        # created.
188        cursor.execute(
189            """
190            SELECT
191                c.conname,
192                array(
193                    SELECT attname
194                    FROM unnest(c.conkey) WITH ORDINALITY cols(colid, arridx)
195                    JOIN pg_attribute AS ca ON cols.colid = ca.attnum
196                    WHERE ca.attrelid = c.conrelid
197                    ORDER BY cols.arridx
198                ),
199                c.contype,
200                (SELECT fkc.relname || '.' || fka.attname
201                FROM pg_attribute AS fka
202                JOIN pg_class AS fkc ON fka.attrelid = fkc.oid
203                WHERE fka.attrelid = c.confrelid AND fka.attnum = c.confkey[1]),
204                cl.reloptions
205            FROM pg_constraint AS c
206            JOIN pg_class AS cl ON c.conrelid = cl.oid
207            WHERE cl.relname = %s AND pg_catalog.pg_table_is_visible(cl.oid)
208        """,
209            [table_name],
210        )
211        for constraint, columns, kind, used_cols, options in cursor.fetchall():
212            constraints[constraint] = {
213                "columns": columns,
214                "primary_key": kind == "p",
215                "unique": kind in ["p", "u"],
216                "foreign_key": tuple(used_cols.split(".", 1)) if kind == "f" else None,
217                "check": kind == "c",
218                "index": False,
219                "definition": None,
220                "options": options,
221            }
222        # Now get indexes
223        cursor.execute(
224            """
225            SELECT
226                indexname,
227                array_agg(attname ORDER BY arridx),
228                indisunique,
229                indisprimary,
230                array_agg(ordering ORDER BY arridx),
231                amname,
232                exprdef,
233                s2.attoptions
234            FROM (
235                SELECT
236                    c2.relname as indexname, idx.*, attr.attname, am.amname,
237                    CASE
238                        WHEN idx.indexprs IS NOT NULL THEN
239                            pg_get_indexdef(idx.indexrelid)
240                    END AS exprdef,
241                    CASE am.amname
242                        WHEN %s THEN
243                            CASE (option & 1)
244                                WHEN 1 THEN 'DESC' ELSE 'ASC'
245                            END
246                    END as ordering,
247                    c2.reloptions as attoptions
248                FROM (
249                    SELECT *
250                    FROM
251                        pg_index i,
252                        unnest(i.indkey, i.indoption)
253                            WITH ORDINALITY koi(key, option, arridx)
254                ) idx
255                LEFT JOIN pg_class c ON idx.indrelid = c.oid
256                LEFT JOIN pg_class c2 ON idx.indexrelid = c2.oid
257                LEFT JOIN pg_am am ON c2.relam = am.oid
258                LEFT JOIN
259                    pg_attribute attr ON attr.attrelid = c.oid AND attr.attnum = idx.key
260                WHERE c.relname = %s AND pg_catalog.pg_table_is_visible(c.oid)
261            ) s2
262            GROUP BY indexname, indisunique, indisprimary, amname, exprdef, attoptions;
263        """,
264            [self.index_default_access_method, table_name],
265        )
266        for (
267            index,
268            columns,
269            unique,
270            primary,
271            orders,
272            type_,
273            definition,
274            options,
275        ) in cursor.fetchall():
276            if index not in constraints:
277                basic_index = (
278                    type_ == self.index_default_access_method and options is None
279                )
280                constraints[index] = {
281                    "columns": columns if columns != [None] else [],
282                    "orders": orders if orders != [None] else [],
283                    "primary_key": primary,
284                    "unique": unique,
285                    "foreign_key": None,
286                    "check": False,
287                    "index": True,
288                    "type": Index.suffix if basic_index else type_,
289                    "definition": definition,
290                    "options": options,
291                }
292        return constraints