Plain is headed towards 1.0! Subscribe for development updates →

  1from collections import namedtuple
  2
  3from plain.models.backends.base.introspection import BaseDatabaseIntrospection
  4from plain.models.backends.base.introspection import FieldInfo as BaseFieldInfo
  5from plain.models.backends.base.introspection import TableInfo as BaseTableInfo
  6from plain.models.indexes import Index
  7
  8FieldInfo = namedtuple("FieldInfo", BaseFieldInfo._fields + ("is_autofield", "comment"))
  9TableInfo = namedtuple("TableInfo", BaseTableInfo._fields + ("comment",))
 10
 11
 12class DatabaseIntrospection(BaseDatabaseIntrospection):
 13    # Maps type codes to Plain Field types.
 14    data_types_reverse = {
 15        16: "BooleanField",
 16        17: "BinaryField",
 17        20: "BigIntegerField",
 18        21: "SmallIntegerField",
 19        23: "IntegerField",
 20        25: "TextField",
 21        700: "FloatField",
 22        701: "FloatField",
 23        869: "GenericIPAddressField",
 24        1042: "CharField",  # blank-padded
 25        1043: "CharField",
 26        1082: "DateField",
 27        1083: "TimeField",
 28        1114: "DateTimeField",
 29        1184: "DateTimeField",
 30        1186: "DurationField",
 31        1266: "TimeField",
 32        1700: "DecimalField",
 33        2950: "UUIDField",
 34        3802: "JSONField",
 35    }
 36    # A hook for subclasses.
 37    index_default_access_method = "btree"
 38
 39    ignored_tables = []
 40
 41    def get_field_type(self, data_type, description):
 42        field_type = super().get_field_type(data_type, description)
 43        if description.is_autofield or (
 44            # Required for pre-Plain 4.1 serial columns.
 45            description.default and "nextval" in description.default
 46        ):
 47            if field_type == "BigIntegerField":
 48                return "PrimaryKeyField"
 49        return field_type
 50
 51    def get_table_list(self, cursor):
 52        """Return a list of table and view names in the current database."""
 53        cursor.execute(
 54            """
 55            SELECT
 56                c.relname,
 57                CASE
 58                    WHEN c.relispartition THEN 'p'
 59                    WHEN c.relkind IN ('m', 'v') THEN 'v'
 60                    ELSE 't'
 61                END,
 62                obj_description(c.oid, 'pg_class')
 63            FROM pg_catalog.pg_class c
 64            LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
 65            WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
 66                AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
 67                AND pg_catalog.pg_table_is_visible(c.oid)
 68        """
 69        )
 70        return [
 71            TableInfo(*row)
 72            for row in cursor.fetchall()
 73            if row[0] not in self.ignored_tables
 74        ]
 75
 76    def get_table_description(self, cursor, table_name):
 77        """
 78        Return a description of the table with the DB-API cursor.description
 79        interface.
 80        """
 81        # Query the pg_catalog tables as cursor.description does not reliably
 82        # return the nullable property and information_schema.columns does not
 83        # contain details of materialized views.
 84        cursor.execute(
 85            """
 86            SELECT
 87                a.attname AS column_name,
 88                NOT (a.attnotnull OR (t.typtype = 'd' AND t.typnotnull)) AS is_nullable,
 89                pg_get_expr(ad.adbin, ad.adrelid) AS column_default,
 90                CASE WHEN collname = 'default' THEN NULL ELSE collname END AS collation,
 91                a.attidentity != '' AS is_autofield,
 92                col_description(a.attrelid, a.attnum) AS column_comment
 93            FROM pg_attribute a
 94            LEFT JOIN pg_attrdef ad ON a.attrelid = ad.adrelid AND a.attnum = ad.adnum
 95            LEFT JOIN pg_collation co ON a.attcollation = co.oid
 96            JOIN pg_type t ON a.atttypid = t.oid
 97            JOIN pg_class c ON a.attrelid = c.oid
 98            JOIN pg_namespace n ON c.relnamespace = n.oid
 99            WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
100                AND c.relname = %s
101                AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
102                AND pg_catalog.pg_table_is_visible(c.oid)
103        """,
104            [table_name],
105        )
106        field_map = {line[0]: line[1:] for line in cursor.fetchall()}
107        cursor.execute(
108            f"SELECT * FROM {self.connection.ops.quote_name(table_name)} LIMIT 1"
109        )
110        return [
111            FieldInfo(
112                line.name,
113                line.type_code,
114                line.internal_size if line.display_size is None else line.display_size,
115                line.internal_size,
116                line.precision,
117                line.scale,
118                *field_map[line.name],
119            )
120            for line in cursor.description
121        ]
122
123    def get_sequences(self, cursor, table_name, table_fields=()):
124        cursor.execute(
125            """
126            SELECT
127                s.relname AS sequence_name,
128                a.attname AS colname
129            FROM
130                pg_class s
131                JOIN pg_depend d ON d.objid = s.oid
132                    AND d.classid = 'pg_class'::regclass
133                    AND d.refclassid = 'pg_class'::regclass
134                JOIN pg_attribute a ON d.refobjid = a.attrelid
135                    AND d.refobjsubid = a.attnum
136                JOIN pg_class tbl ON tbl.oid = d.refobjid
137                    AND tbl.relname = %s
138                    AND pg_catalog.pg_table_is_visible(tbl.oid)
139            WHERE
140                s.relkind = 'S';
141        """,
142            [table_name],
143        )
144        return [
145            {"name": row[0], "table": table_name, "column": row[1]}
146            for row in cursor.fetchall()
147        ]
148
149    def get_relations(self, cursor, table_name):
150        """
151        Return a dictionary of {field_name: (field_name_other_table, other_table)}
152        representing all foreign keys in the given table.
153        """
154        cursor.execute(
155            """
156            SELECT a1.attname, c2.relname, a2.attname
157            FROM pg_constraint con
158            LEFT JOIN pg_class c1 ON con.conrelid = c1.oid
159            LEFT JOIN pg_class c2 ON con.confrelid = c2.oid
160            LEFT JOIN
161                pg_attribute a1 ON c1.oid = a1.attrelid AND a1.attnum = con.conkey[1]
162            LEFT JOIN
163                pg_attribute a2 ON c2.oid = a2.attrelid AND a2.attnum = con.confkey[1]
164            WHERE
165                c1.relname = %s AND
166                con.contype = 'f' AND
167                c1.relnamespace = c2.relnamespace AND
168                pg_catalog.pg_table_is_visible(c1.oid)
169        """,
170            [table_name],
171        )
172        return {row[0]: (row[2], row[1]) for row in cursor.fetchall()}
173
174    def get_constraints(self, cursor, table_name):
175        """
176        Retrieve any constraints or keys (unique, pk, fk, check, index) across
177        one or more columns. Also retrieve the definition of expression-based
178        indexes.
179        """
180        constraints = {}
181        # Loop over the key table, collecting things as constraints. The column
182        # array must return column names in the same order in which they were
183        # created.
184        cursor.execute(
185            """
186            SELECT
187                c.conname,
188                array(
189                    SELECT attname
190                    FROM unnest(c.conkey) WITH ORDINALITY cols(colid, arridx)
191                    JOIN pg_attribute AS ca ON cols.colid = ca.attnum
192                    WHERE ca.attrelid = c.conrelid
193                    ORDER BY cols.arridx
194                ),
195                c.contype,
196                (SELECT fkc.relname || '.' || fka.attname
197                FROM pg_attribute AS fka
198                JOIN pg_class AS fkc ON fka.attrelid = fkc.oid
199                WHERE fka.attrelid = c.confrelid AND fka.attnum = c.confkey[1]),
200                cl.reloptions
201            FROM pg_constraint AS c
202            JOIN pg_class AS cl ON c.conrelid = cl.oid
203            WHERE cl.relname = %s AND pg_catalog.pg_table_is_visible(cl.oid)
204        """,
205            [table_name],
206        )
207        for constraint, columns, kind, used_cols, options in cursor.fetchall():
208            constraints[constraint] = {
209                "columns": columns,
210                "primary_key": kind == "p",
211                "unique": kind in ["p", "u"],
212                "foreign_key": tuple(used_cols.split(".", 1)) if kind == "f" else None,
213                "check": kind == "c",
214                "index": False,
215                "definition": None,
216                "options": options,
217            }
218        # Now get indexes
219        cursor.execute(
220            """
221            SELECT
222                indexname,
223                array_agg(attname ORDER BY arridx),
224                indisunique,
225                indisprimary,
226                array_agg(ordering ORDER BY arridx),
227                amname,
228                exprdef,
229                s2.attoptions
230            FROM (
231                SELECT
232                    c2.relname as indexname, idx.*, attr.attname, am.amname,
233                    CASE
234                        WHEN idx.indexprs IS NOT NULL THEN
235                            pg_get_indexdef(idx.indexrelid)
236                    END AS exprdef,
237                    CASE am.amname
238                        WHEN %s THEN
239                            CASE (option & 1)
240                                WHEN 1 THEN 'DESC' ELSE 'ASC'
241                            END
242                    END as ordering,
243                    c2.reloptions as attoptions
244                FROM (
245                    SELECT *
246                    FROM
247                        pg_index i,
248                        unnest(i.indkey, i.indoption)
249                            WITH ORDINALITY koi(key, option, arridx)
250                ) idx
251                LEFT JOIN pg_class c ON idx.indrelid = c.oid
252                LEFT JOIN pg_class c2 ON idx.indexrelid = c2.oid
253                LEFT JOIN pg_am am ON c2.relam = am.oid
254                LEFT JOIN
255                    pg_attribute attr ON attr.attrelid = c.oid AND attr.attnum = idx.key
256                WHERE c.relname = %s AND pg_catalog.pg_table_is_visible(c.oid)
257            ) s2
258            GROUP BY indexname, indisunique, indisprimary, amname, exprdef, attoptions;
259        """,
260            [self.index_default_access_method, table_name],
261        )
262        for (
263            index,
264            columns,
265            unique,
266            primary,
267            orders,
268            type_,
269            definition,
270            options,
271        ) in cursor.fetchall():
272            if index not in constraints:
273                basic_index = (
274                    type_ == self.index_default_access_method and options is None
275                )
276                constraints[index] = {
277                    "columns": columns if columns != [None] else [],
278                    "orders": orders if orders != [None] else [],
279                    "primary_key": primary,
280                    "unique": unique,
281                    "foreign_key": None,
282                    "check": False,
283                    "index": True,
284                    "type": Index.suffix if basic_index else type_,
285                    "definition": definition,
286                    "options": options,
287                }
288        return constraints