1from collections import namedtuple
2
3from plain.models.backends.base.introspection import BaseDatabaseIntrospection
4from plain.models.backends.base.introspection import FieldInfo as BaseFieldInfo
5from plain.models.backends.base.introspection import TableInfo as BaseTableInfo
6from plain.models.indexes import Index
7
8FieldInfo = namedtuple("FieldInfo", BaseFieldInfo._fields + ("is_autofield", "comment"))
9TableInfo = namedtuple("TableInfo", BaseTableInfo._fields + ("comment",))
10
11
12class DatabaseIntrospection(BaseDatabaseIntrospection):
13 # Maps type codes to Plain Field types.
14 data_types_reverse = {
15 16: "BooleanField",
16 17: "BinaryField",
17 20: "BigIntegerField",
18 21: "SmallIntegerField",
19 23: "IntegerField",
20 25: "TextField",
21 700: "FloatField",
22 701: "FloatField",
23 869: "GenericIPAddressField",
24 1042: "CharField", # blank-padded
25 1043: "CharField",
26 1082: "DateField",
27 1083: "TimeField",
28 1114: "DateTimeField",
29 1184: "DateTimeField",
30 1186: "DurationField",
31 1266: "TimeField",
32 1700: "DecimalField",
33 2950: "UUIDField",
34 3802: "JSONField",
35 }
36 # A hook for subclasses.
37 index_default_access_method = "btree"
38
39 ignored_tables = []
40
41 def get_field_type(self, data_type, description):
42 field_type = super().get_field_type(data_type, description)
43 if description.is_autofield or (
44 # Required for pre-Plain 4.1 serial columns.
45 description.default and "nextval" in description.default
46 ):
47 if field_type == "IntegerField":
48 return "AutoField"
49 elif field_type == "BigIntegerField":
50 return "BigAutoField"
51 elif field_type == "SmallIntegerField":
52 return "SmallAutoField"
53 return field_type
54
55 def get_table_list(self, cursor):
56 """Return a list of table and view names in the current database."""
57 cursor.execute(
58 """
59 SELECT
60 c.relname,
61 CASE
62 WHEN c.relispartition THEN 'p'
63 WHEN c.relkind IN ('m', 'v') THEN 'v'
64 ELSE 't'
65 END,
66 obj_description(c.oid, 'pg_class')
67 FROM pg_catalog.pg_class c
68 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
69 WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
70 AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
71 AND pg_catalog.pg_table_is_visible(c.oid)
72 """
73 )
74 return [
75 TableInfo(*row)
76 for row in cursor.fetchall()
77 if row[0] not in self.ignored_tables
78 ]
79
80 def get_table_description(self, cursor, table_name):
81 """
82 Return a description of the table with the DB-API cursor.description
83 interface.
84 """
85 # Query the pg_catalog tables as cursor.description does not reliably
86 # return the nullable property and information_schema.columns does not
87 # contain details of materialized views.
88 cursor.execute(
89 """
90 SELECT
91 a.attname AS column_name,
92 NOT (a.attnotnull OR (t.typtype = 'd' AND t.typnotnull)) AS is_nullable,
93 pg_get_expr(ad.adbin, ad.adrelid) AS column_default,
94 CASE WHEN collname = 'default' THEN NULL ELSE collname END AS collation,
95 a.attidentity != '' AS is_autofield,
96 col_description(a.attrelid, a.attnum) AS column_comment
97 FROM pg_attribute a
98 LEFT JOIN pg_attrdef ad ON a.attrelid = ad.adrelid AND a.attnum = ad.adnum
99 LEFT JOIN pg_collation co ON a.attcollation = co.oid
100 JOIN pg_type t ON a.atttypid = t.oid
101 JOIN pg_class c ON a.attrelid = c.oid
102 JOIN pg_namespace n ON c.relnamespace = n.oid
103 WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
104 AND c.relname = %s
105 AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
106 AND pg_catalog.pg_table_is_visible(c.oid)
107 """,
108 [table_name],
109 )
110 field_map = {line[0]: line[1:] for line in cursor.fetchall()}
111 cursor.execute(
112 f"SELECT * FROM {self.connection.ops.quote_name(table_name)} LIMIT 1"
113 )
114 return [
115 FieldInfo(
116 line.name,
117 line.type_code,
118 line.internal_size if line.display_size is None else line.display_size,
119 line.internal_size,
120 line.precision,
121 line.scale,
122 *field_map[line.name],
123 )
124 for line in cursor.description
125 ]
126
127 def get_sequences(self, cursor, table_name, table_fields=()):
128 cursor.execute(
129 """
130 SELECT
131 s.relname AS sequence_name,
132 a.attname AS colname
133 FROM
134 pg_class s
135 JOIN pg_depend d ON d.objid = s.oid
136 AND d.classid = 'pg_class'::regclass
137 AND d.refclassid = 'pg_class'::regclass
138 JOIN pg_attribute a ON d.refobjid = a.attrelid
139 AND d.refobjsubid = a.attnum
140 JOIN pg_class tbl ON tbl.oid = d.refobjid
141 AND tbl.relname = %s
142 AND pg_catalog.pg_table_is_visible(tbl.oid)
143 WHERE
144 s.relkind = 'S';
145 """,
146 [table_name],
147 )
148 return [
149 {"name": row[0], "table": table_name, "column": row[1]}
150 for row in cursor.fetchall()
151 ]
152
153 def get_relations(self, cursor, table_name):
154 """
155 Return a dictionary of {field_name: (field_name_other_table, other_table)}
156 representing all foreign keys in the given table.
157 """
158 cursor.execute(
159 """
160 SELECT a1.attname, c2.relname, a2.attname
161 FROM pg_constraint con
162 LEFT JOIN pg_class c1 ON con.conrelid = c1.oid
163 LEFT JOIN pg_class c2 ON con.confrelid = c2.oid
164 LEFT JOIN
165 pg_attribute a1 ON c1.oid = a1.attrelid AND a1.attnum = con.conkey[1]
166 LEFT JOIN
167 pg_attribute a2 ON c2.oid = a2.attrelid AND a2.attnum = con.confkey[1]
168 WHERE
169 c1.relname = %s AND
170 con.contype = 'f' AND
171 c1.relnamespace = c2.relnamespace AND
172 pg_catalog.pg_table_is_visible(c1.oid)
173 """,
174 [table_name],
175 )
176 return {row[0]: (row[2], row[1]) for row in cursor.fetchall()}
177
178 def get_constraints(self, cursor, table_name):
179 """
180 Retrieve any constraints or keys (unique, pk, fk, check, index) across
181 one or more columns. Also retrieve the definition of expression-based
182 indexes.
183 """
184 constraints = {}
185 # Loop over the key table, collecting things as constraints. The column
186 # array must return column names in the same order in which they were
187 # created.
188 cursor.execute(
189 """
190 SELECT
191 c.conname,
192 array(
193 SELECT attname
194 FROM unnest(c.conkey) WITH ORDINALITY cols(colid, arridx)
195 JOIN pg_attribute AS ca ON cols.colid = ca.attnum
196 WHERE ca.attrelid = c.conrelid
197 ORDER BY cols.arridx
198 ),
199 c.contype,
200 (SELECT fkc.relname || '.' || fka.attname
201 FROM pg_attribute AS fka
202 JOIN pg_class AS fkc ON fka.attrelid = fkc.oid
203 WHERE fka.attrelid = c.confrelid AND fka.attnum = c.confkey[1]),
204 cl.reloptions
205 FROM pg_constraint AS c
206 JOIN pg_class AS cl ON c.conrelid = cl.oid
207 WHERE cl.relname = %s AND pg_catalog.pg_table_is_visible(cl.oid)
208 """,
209 [table_name],
210 )
211 for constraint, columns, kind, used_cols, options in cursor.fetchall():
212 constraints[constraint] = {
213 "columns": columns,
214 "primary_key": kind == "p",
215 "unique": kind in ["p", "u"],
216 "foreign_key": tuple(used_cols.split(".", 1)) if kind == "f" else None,
217 "check": kind == "c",
218 "index": False,
219 "definition": None,
220 "options": options,
221 }
222 # Now get indexes
223 cursor.execute(
224 """
225 SELECT
226 indexname,
227 array_agg(attname ORDER BY arridx),
228 indisunique,
229 indisprimary,
230 array_agg(ordering ORDER BY arridx),
231 amname,
232 exprdef,
233 s2.attoptions
234 FROM (
235 SELECT
236 c2.relname as indexname, idx.*, attr.attname, am.amname,
237 CASE
238 WHEN idx.indexprs IS NOT NULL THEN
239 pg_get_indexdef(idx.indexrelid)
240 END AS exprdef,
241 CASE am.amname
242 WHEN %s THEN
243 CASE (option & 1)
244 WHEN 1 THEN 'DESC' ELSE 'ASC'
245 END
246 END as ordering,
247 c2.reloptions as attoptions
248 FROM (
249 SELECT *
250 FROM
251 pg_index i,
252 unnest(i.indkey, i.indoption)
253 WITH ORDINALITY koi(key, option, arridx)
254 ) idx
255 LEFT JOIN pg_class c ON idx.indrelid = c.oid
256 LEFT JOIN pg_class c2 ON idx.indexrelid = c2.oid
257 LEFT JOIN pg_am am ON c2.relam = am.oid
258 LEFT JOIN
259 pg_attribute attr ON attr.attrelid = c.oid AND attr.attnum = idx.key
260 WHERE c.relname = %s AND pg_catalog.pg_table_is_visible(c.oid)
261 ) s2
262 GROUP BY indexname, indisunique, indisprimary, amname, exprdef, attoptions;
263 """,
264 [self.index_default_access_method, table_name],
265 )
266 for (
267 index,
268 columns,
269 unique,
270 primary,
271 orders,
272 type_,
273 definition,
274 options,
275 ) in cursor.fetchall():
276 if index not in constraints:
277 basic_index = (
278 type_ == self.index_default_access_method and options is None
279 )
280 constraints[index] = {
281 "columns": columns if columns != [None] else [],
282 "orders": orders if orders != [None] else [],
283 "primary_key": primary,
284 "unique": unique,
285 "foreign_key": None,
286 "check": False,
287 "index": True,
288 "type": Index.suffix if basic_index else type_,
289 "definition": definition,
290 "options": options,
291 }
292 return constraints