1from collections import namedtuple
2
3from plain.models.backends.base.introspection import BaseDatabaseIntrospection
4from plain.models.backends.base.introspection import FieldInfo as BaseFieldInfo
5from plain.models.backends.base.introspection import TableInfo as BaseTableInfo
6from plain.models.indexes import Index
7
8FieldInfo = namedtuple("FieldInfo", BaseFieldInfo._fields + ("is_autofield", "comment"))
9TableInfo = namedtuple("TableInfo", BaseTableInfo._fields + ("comment",))
10
11
12class DatabaseIntrospection(BaseDatabaseIntrospection):
13 # Maps type codes to Plain Field types.
14 data_types_reverse = {
15 16: "BooleanField",
16 17: "BinaryField",
17 20: "BigIntegerField",
18 21: "SmallIntegerField",
19 23: "IntegerField",
20 25: "TextField",
21 700: "FloatField",
22 701: "FloatField",
23 869: "GenericIPAddressField",
24 1042: "CharField", # blank-padded
25 1043: "CharField",
26 1082: "DateField",
27 1083: "TimeField",
28 1114: "DateTimeField",
29 1184: "DateTimeField",
30 1186: "DurationField",
31 1266: "TimeField",
32 1700: "DecimalField",
33 2950: "UUIDField",
34 3802: "JSONField",
35 }
36 # A hook for subclasses.
37 index_default_access_method = "btree"
38
39 ignored_tables = []
40
41 def get_field_type(self, data_type, description):
42 field_type = super().get_field_type(data_type, description)
43 if description.is_autofield or (
44 # Required for pre-Plain 4.1 serial columns.
45 description.default and "nextval" in description.default
46 ):
47 if field_type == "BigIntegerField":
48 return "PrimaryKeyField"
49 return field_type
50
51 def get_table_list(self, cursor):
52 """Return a list of table and view names in the current database."""
53 cursor.execute(
54 """
55 SELECT
56 c.relname,
57 CASE
58 WHEN c.relispartition THEN 'p'
59 WHEN c.relkind IN ('m', 'v') THEN 'v'
60 ELSE 't'
61 END,
62 obj_description(c.oid, 'pg_class')
63 FROM pg_catalog.pg_class c
64 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
65 WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
66 AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
67 AND pg_catalog.pg_table_is_visible(c.oid)
68 """
69 )
70 return [
71 TableInfo(*row)
72 for row in cursor.fetchall()
73 if row[0] not in self.ignored_tables
74 ]
75
76 def get_table_description(self, cursor, table_name):
77 """
78 Return a description of the table with the DB-API cursor.description
79 interface.
80 """
81 # Query the pg_catalog tables as cursor.description does not reliably
82 # return the nullable property and information_schema.columns does not
83 # contain details of materialized views.
84 cursor.execute(
85 """
86 SELECT
87 a.attname AS column_name,
88 NOT (a.attnotnull OR (t.typtype = 'd' AND t.typnotnull)) AS is_nullable,
89 pg_get_expr(ad.adbin, ad.adrelid) AS column_default,
90 CASE WHEN collname = 'default' THEN NULL ELSE collname END AS collation,
91 a.attidentity != '' AS is_autofield,
92 col_description(a.attrelid, a.attnum) AS column_comment
93 FROM pg_attribute a
94 LEFT JOIN pg_attrdef ad ON a.attrelid = ad.adrelid AND a.attnum = ad.adnum
95 LEFT JOIN pg_collation co ON a.attcollation = co.oid
96 JOIN pg_type t ON a.atttypid = t.oid
97 JOIN pg_class c ON a.attrelid = c.oid
98 JOIN pg_namespace n ON c.relnamespace = n.oid
99 WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
100 AND c.relname = %s
101 AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
102 AND pg_catalog.pg_table_is_visible(c.oid)
103 """,
104 [table_name],
105 )
106 field_map = {line[0]: line[1:] for line in cursor.fetchall()}
107 cursor.execute(
108 f"SELECT * FROM {self.connection.ops.quote_name(table_name)} LIMIT 1"
109 )
110 return [
111 FieldInfo(
112 line.name,
113 line.type_code,
114 line.internal_size if line.display_size is None else line.display_size,
115 line.internal_size,
116 line.precision,
117 line.scale,
118 *field_map[line.name],
119 )
120 for line in cursor.description
121 ]
122
123 def get_sequences(self, cursor, table_name, table_fields=()):
124 cursor.execute(
125 """
126 SELECT
127 s.relname AS sequence_name,
128 a.attname AS colname
129 FROM
130 pg_class s
131 JOIN pg_depend d ON d.objid = s.oid
132 AND d.classid = 'pg_class'::regclass
133 AND d.refclassid = 'pg_class'::regclass
134 JOIN pg_attribute a ON d.refobjid = a.attrelid
135 AND d.refobjsubid = a.attnum
136 JOIN pg_class tbl ON tbl.oid = d.refobjid
137 AND tbl.relname = %s
138 AND pg_catalog.pg_table_is_visible(tbl.oid)
139 WHERE
140 s.relkind = 'S';
141 """,
142 [table_name],
143 )
144 return [
145 {"name": row[0], "table": table_name, "column": row[1]}
146 for row in cursor.fetchall()
147 ]
148
149 def get_relations(self, cursor, table_name):
150 """
151 Return a dictionary of {field_name: (field_name_other_table, other_table)}
152 representing all foreign keys in the given table.
153 """
154 cursor.execute(
155 """
156 SELECT a1.attname, c2.relname, a2.attname
157 FROM pg_constraint con
158 LEFT JOIN pg_class c1 ON con.conrelid = c1.oid
159 LEFT JOIN pg_class c2 ON con.confrelid = c2.oid
160 LEFT JOIN
161 pg_attribute a1 ON c1.oid = a1.attrelid AND a1.attnum = con.conkey[1]
162 LEFT JOIN
163 pg_attribute a2 ON c2.oid = a2.attrelid AND a2.attnum = con.confkey[1]
164 WHERE
165 c1.relname = %s AND
166 con.contype = 'f' AND
167 c1.relnamespace = c2.relnamespace AND
168 pg_catalog.pg_table_is_visible(c1.oid)
169 """,
170 [table_name],
171 )
172 return {row[0]: (row[2], row[1]) for row in cursor.fetchall()}
173
174 def get_constraints(self, cursor, table_name):
175 """
176 Retrieve any constraints or keys (unique, pk, fk, check, index) across
177 one or more columns. Also retrieve the definition of expression-based
178 indexes.
179 """
180 constraints = {}
181 # Loop over the key table, collecting things as constraints. The column
182 # array must return column names in the same order in which they were
183 # created.
184 cursor.execute(
185 """
186 SELECT
187 c.conname,
188 array(
189 SELECT attname
190 FROM unnest(c.conkey) WITH ORDINALITY cols(colid, arridx)
191 JOIN pg_attribute AS ca ON cols.colid = ca.attnum
192 WHERE ca.attrelid = c.conrelid
193 ORDER BY cols.arridx
194 ),
195 c.contype,
196 (SELECT fkc.relname || '.' || fka.attname
197 FROM pg_attribute AS fka
198 JOIN pg_class AS fkc ON fka.attrelid = fkc.oid
199 WHERE fka.attrelid = c.confrelid AND fka.attnum = c.confkey[1]),
200 cl.reloptions
201 FROM pg_constraint AS c
202 JOIN pg_class AS cl ON c.conrelid = cl.oid
203 WHERE cl.relname = %s AND pg_catalog.pg_table_is_visible(cl.oid)
204 """,
205 [table_name],
206 )
207 for constraint, columns, kind, used_cols, options in cursor.fetchall():
208 constraints[constraint] = {
209 "columns": columns,
210 "primary_key": kind == "p",
211 "unique": kind in ["p", "u"],
212 "foreign_key": tuple(used_cols.split(".", 1)) if kind == "f" else None,
213 "check": kind == "c",
214 "index": False,
215 "definition": None,
216 "options": options,
217 }
218 # Now get indexes
219 cursor.execute(
220 """
221 SELECT
222 indexname,
223 array_agg(attname ORDER BY arridx),
224 indisunique,
225 indisprimary,
226 array_agg(ordering ORDER BY arridx),
227 amname,
228 exprdef,
229 s2.attoptions
230 FROM (
231 SELECT
232 c2.relname as indexname, idx.*, attr.attname, am.amname,
233 CASE
234 WHEN idx.indexprs IS NOT NULL THEN
235 pg_get_indexdef(idx.indexrelid)
236 END AS exprdef,
237 CASE am.amname
238 WHEN %s THEN
239 CASE (option & 1)
240 WHEN 1 THEN 'DESC' ELSE 'ASC'
241 END
242 END as ordering,
243 c2.reloptions as attoptions
244 FROM (
245 SELECT *
246 FROM
247 pg_index i,
248 unnest(i.indkey, i.indoption)
249 WITH ORDINALITY koi(key, option, arridx)
250 ) idx
251 LEFT JOIN pg_class c ON idx.indrelid = c.oid
252 LEFT JOIN pg_class c2 ON idx.indexrelid = c2.oid
253 LEFT JOIN pg_am am ON c2.relam = am.oid
254 LEFT JOIN
255 pg_attribute attr ON attr.attrelid = c.oid AND attr.attnum = idx.key
256 WHERE c.relname = %s AND pg_catalog.pg_table_is_visible(c.oid)
257 ) s2
258 GROUP BY indexname, indisunique, indisprimary, amname, exprdef, attoptions;
259 """,
260 [self.index_default_access_method, table_name],
261 )
262 for (
263 index,
264 columns,
265 unique,
266 primary,
267 orders,
268 type_,
269 definition,
270 options,
271 ) in cursor.fetchall():
272 if index not in constraints:
273 basic_index = (
274 type_ == self.index_default_access_method and options is None
275 )
276 constraints[index] = {
277 "columns": columns if columns != [None] else [],
278 "orders": orders if orders != [None] else [],
279 "primary_key": primary,
280 "unique": unique,
281 "foreign_key": None,
282 "check": False,
283 "index": True,
284 "type": Index.suffix if basic_index else type_,
285 "definition": definition,
286 "options": options,
287 }
288 return constraints