1from __future__ import annotations
2
3from collections import namedtuple
4from typing import Any
5
6from plain.models.backends.base.introspection import BaseDatabaseIntrospection
7from plain.models.backends.base.introspection import FieldInfo as BaseFieldInfo
8from plain.models.backends.base.introspection import TableInfo as BaseTableInfo
9from plain.models.indexes import Index
10
11FieldInfo = namedtuple("FieldInfo", BaseFieldInfo._fields + ("is_autofield", "comment"))
12TableInfo = namedtuple("TableInfo", BaseTableInfo._fields + ("comment",))
13
14
15class DatabaseIntrospection(BaseDatabaseIntrospection):
16 # Maps type codes to Plain Field types.
17 data_types_reverse = {
18 16: "BooleanField",
19 17: "BinaryField",
20 20: "BigIntegerField",
21 21: "SmallIntegerField",
22 23: "IntegerField",
23 25: "TextField",
24 700: "FloatField",
25 701: "FloatField",
26 869: "GenericIPAddressField",
27 1042: "CharField", # blank-padded
28 1043: "CharField",
29 1082: "DateField",
30 1083: "TimeField",
31 1114: "DateTimeField",
32 1184: "DateTimeField",
33 1186: "DurationField",
34 1266: "TimeField",
35 1700: "DecimalField",
36 2950: "UUIDField",
37 3802: "JSONField",
38 }
39 # A hook for subclasses.
40 index_default_access_method = "btree"
41
42 ignored_tables: list[str] = []
43
44 def get_field_type(self, data_type: Any, description: Any) -> str:
45 field_type = super().get_field_type(data_type, description)
46 if description.is_autofield or (
47 # Required for pre-Plain 4.1 serial columns.
48 description.default and "nextval" in description.default
49 ):
50 if field_type == "BigIntegerField":
51 return "PrimaryKeyField"
52 return field_type
53
54 def get_table_list(self, cursor: Any) -> list[TableInfo]:
55 """Return a list of table and view names in the current database."""
56 cursor.execute(
57 """
58 SELECT
59 c.relname,
60 CASE
61 WHEN c.relispartition THEN 'p'
62 WHEN c.relkind IN ('m', 'v') THEN 'v'
63 ELSE 't'
64 END,
65 obj_description(c.oid, 'pg_class')
66 FROM pg_catalog.pg_class c
67 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
68 WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
69 AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
70 AND pg_catalog.pg_table_is_visible(c.oid)
71 """
72 )
73 return [
74 TableInfo(*row)
75 for row in cursor.fetchall()
76 if row[0] not in self.ignored_tables
77 ]
78
79 def get_table_description(self, cursor: Any, table_name: str) -> list[FieldInfo]:
80 """
81 Return a description of the table with the DB-API cursor.description
82 interface.
83 """
84 # Query the pg_catalog tables as cursor.description does not reliably
85 # return the nullable property and information_schema.columns does not
86 # contain details of materialized views.
87 cursor.execute(
88 """
89 SELECT
90 a.attname AS column_name,
91 NOT (a.attnotnull OR (t.typtype = 'd' AND t.typnotnull)) AS is_nullable,
92 pg_get_expr(ad.adbin, ad.adrelid) AS column_default,
93 CASE WHEN collname = 'default' THEN NULL ELSE collname END AS collation,
94 a.attidentity != '' AS is_autofield,
95 col_description(a.attrelid, a.attnum) AS column_comment
96 FROM pg_attribute a
97 LEFT JOIN pg_attrdef ad ON a.attrelid = ad.adrelid AND a.attnum = ad.adnum
98 LEFT JOIN pg_collation co ON a.attcollation = co.oid
99 JOIN pg_type t ON a.atttypid = t.oid
100 JOIN pg_class c ON a.attrelid = c.oid
101 JOIN pg_namespace n ON c.relnamespace = n.oid
102 WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
103 AND c.relname = %s
104 AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
105 AND pg_catalog.pg_table_is_visible(c.oid)
106 """,
107 [table_name],
108 )
109 field_map = {line[0]: line[1:] for line in cursor.fetchall()}
110 cursor.execute(
111 f"SELECT * FROM {self.connection.ops.quote_name(table_name)} LIMIT 1"
112 )
113 return [
114 FieldInfo(
115 line.name,
116 line.type_code,
117 line.internal_size if line.display_size is None else line.display_size,
118 line.internal_size,
119 line.precision,
120 line.scale,
121 *field_map[line.name],
122 )
123 for line in cursor.description
124 ]
125
126 def get_sequences(
127 self, cursor: Any, table_name: str, table_fields: tuple[Any, ...] = ()
128 ) -> list[dict[str, Any]]:
129 cursor.execute(
130 """
131 SELECT
132 s.relname AS sequence_name,
133 a.attname AS colname
134 FROM
135 pg_class s
136 JOIN pg_depend d ON d.objid = s.oid
137 AND d.classid = 'pg_class'::regclass
138 AND d.refclassid = 'pg_class'::regclass
139 JOIN pg_attribute a ON d.refobjid = a.attrelid
140 AND d.refobjsubid = a.attnum
141 JOIN pg_class tbl ON tbl.oid = d.refobjid
142 AND tbl.relname = %s
143 AND pg_catalog.pg_table_is_visible(tbl.oid)
144 WHERE
145 s.relkind = 'S';
146 """,
147 [table_name],
148 )
149 return [
150 {"name": row[0], "table": table_name, "column": row[1]}
151 for row in cursor.fetchall()
152 ]
153
154 def get_relations(self, cursor: Any, table_name: str) -> dict[str, tuple[str, str]]:
155 """
156 Return a dictionary of {field_name: (field_name_other_table, other_table)}
157 representing all foreign keys in the given table.
158 """
159 cursor.execute(
160 """
161 SELECT a1.attname, c2.relname, a2.attname
162 FROM pg_constraint con
163 LEFT JOIN pg_class c1 ON con.conrelid = c1.oid
164 LEFT JOIN pg_class c2 ON con.confrelid = c2.oid
165 LEFT JOIN
166 pg_attribute a1 ON c1.oid = a1.attrelid AND a1.attnum = con.conkey[1]
167 LEFT JOIN
168 pg_attribute a2 ON c2.oid = a2.attrelid AND a2.attnum = con.confkey[1]
169 WHERE
170 c1.relname = %s AND
171 con.contype = 'f' AND
172 c1.relnamespace = c2.relnamespace AND
173 pg_catalog.pg_table_is_visible(c1.oid)
174 """,
175 [table_name],
176 )
177 return {row[0]: (row[2], row[1]) for row in cursor.fetchall()}
178
179 def get_constraints(
180 self, cursor: Any, table_name: str
181 ) -> dict[str, dict[str, Any]]:
182 """
183 Retrieve any constraints or keys (unique, pk, fk, check, index) across
184 one or more columns. Also retrieve the definition of expression-based
185 indexes.
186 """
187 constraints: dict[str, dict[str, Any]] = {}
188 # Loop over the key table, collecting things as constraints. The column
189 # array must return column names in the same order in which they were
190 # created.
191 cursor.execute(
192 """
193 SELECT
194 c.conname,
195 array(
196 SELECT attname
197 FROM unnest(c.conkey) WITH ORDINALITY cols(colid, arridx)
198 JOIN pg_attribute AS ca ON cols.colid = ca.attnum
199 WHERE ca.attrelid = c.conrelid
200 ORDER BY cols.arridx
201 ),
202 c.contype,
203 (SELECT fkc.relname || '.' || fka.attname
204 FROM pg_attribute AS fka
205 JOIN pg_class AS fkc ON fka.attrelid = fkc.oid
206 WHERE fka.attrelid = c.confrelid AND fka.attnum = c.confkey[1]),
207 cl.reloptions
208 FROM pg_constraint AS c
209 JOIN pg_class AS cl ON c.conrelid = cl.oid
210 WHERE cl.relname = %s AND pg_catalog.pg_table_is_visible(cl.oid)
211 """,
212 [table_name],
213 )
214 for constraint, columns, kind, used_cols, options in cursor.fetchall():
215 constraints[constraint] = {
216 "columns": columns,
217 "primary_key": kind == "p",
218 "unique": kind in ["p", "u"],
219 "foreign_key": tuple(used_cols.split(".", 1)) if kind == "f" else None,
220 "check": kind == "c",
221 "index": False,
222 "definition": None,
223 "options": options,
224 }
225 # Now get indexes
226 cursor.execute(
227 """
228 SELECT
229 indexname,
230 array_agg(attname ORDER BY arridx),
231 indisunique,
232 indisprimary,
233 array_agg(ordering ORDER BY arridx),
234 amname,
235 exprdef,
236 s2.attoptions
237 FROM (
238 SELECT
239 c2.relname as indexname, idx.*, attr.attname, am.amname,
240 CASE
241 WHEN idx.indexprs IS NOT NULL THEN
242 pg_get_indexdef(idx.indexrelid)
243 END AS exprdef,
244 CASE am.amname
245 WHEN %s THEN
246 CASE (option & 1)
247 WHEN 1 THEN 'DESC' ELSE 'ASC'
248 END
249 END as ordering,
250 c2.reloptions as attoptions
251 FROM (
252 SELECT *
253 FROM
254 pg_index i,
255 unnest(i.indkey, i.indoption)
256 WITH ORDINALITY koi(key, option, arridx)
257 ) idx
258 LEFT JOIN pg_class c ON idx.indrelid = c.oid
259 LEFT JOIN pg_class c2 ON idx.indexrelid = c2.oid
260 LEFT JOIN pg_am am ON c2.relam = am.oid
261 LEFT JOIN
262 pg_attribute attr ON attr.attrelid = c.oid AND attr.attnum = idx.key
263 WHERE c.relname = %s AND pg_catalog.pg_table_is_visible(c.oid)
264 ) s2
265 GROUP BY indexname, indisunique, indisprimary, amname, exprdef, attoptions;
266 """,
267 [self.index_default_access_method, table_name],
268 )
269 for (
270 index,
271 columns,
272 unique,
273 primary,
274 orders,
275 type_,
276 definition,
277 options,
278 ) in cursor.fetchall():
279 if index not in constraints:
280 basic_index = (
281 type_ == self.index_default_access_method and options is None
282 )
283 constraints[index] = {
284 "columns": columns if columns != [None] else [],
285 "orders": orders if orders != [None] else [],
286 "primary_key": primary,
287 "unique": unique,
288 "foreign_key": None,
289 "check": False,
290 "index": True,
291 "type": Index.suffix if basic_index else type_,
292 "definition": definition,
293 "options": options,
294 }
295 return constraints