1from collections import namedtuple
2
3from plain.models.backends.base.introspection import BaseDatabaseIntrospection
4from plain.models.backends.base.introspection import FieldInfo as BaseFieldInfo
5from plain.models.backends.base.introspection import TableInfo as BaseTableInfo
6from plain.models.indexes import Index
7
8FieldInfo = namedtuple("FieldInfo", BaseFieldInfo._fields + ("is_autofield", "comment"))
9TableInfo = namedtuple("TableInfo", BaseTableInfo._fields + ("comment",))
10
11
12class DatabaseIntrospection(BaseDatabaseIntrospection):
13 # Maps type codes to Plain Field types.
14 data_types_reverse = {
15 16: "BooleanField",
16 17: "BinaryField",
17 20: "BigIntegerField",
18 21: "SmallIntegerField",
19 23: "IntegerField",
20 25: "TextField",
21 700: "FloatField",
22 701: "FloatField",
23 869: "GenericIPAddressField",
24 1042: "CharField", # blank-padded
25 1043: "CharField",
26 1082: "DateField",
27 1083: "TimeField",
28 1114: "DateTimeField",
29 1184: "DateTimeField",
30 1186: "DurationField",
31 1266: "TimeField",
32 1700: "DecimalField",
33 2950: "UUIDField",
34 3802: "JSONField",
35 }
36 # A hook for subclasses.
37 index_default_access_method = "btree"
38
39 ignored_tables = []
40
41 def get_field_type(self, data_type, description):
42 field_type = super().get_field_type(data_type, description)
43 if description.is_autofield or (
44 # Required for pre-Plain 4.1 serial columns.
45 description.default and "nextval" in description.default
46 ):
47 if field_type == "IntegerField":
48 return "AutoField"
49 elif field_type == "BigIntegerField":
50 return "BigAutoField"
51 elif field_type == "SmallIntegerField":
52 return "SmallAutoField"
53 return field_type
54
55 def get_table_list(self, cursor):
56 """Return a list of table and view names in the current database."""
57 cursor.execute(
58 """
59 SELECT
60 c.relname,
61 CASE
62 WHEN c.relispartition THEN 'p'
63 WHEN c.relkind IN ('m', 'v') THEN 'v'
64 ELSE 't'
65 END,
66 obj_description(c.oid, 'pg_class')
67 FROM pg_catalog.pg_class c
68 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
69 WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
70 AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
71 AND pg_catalog.pg_table_is_visible(c.oid)
72 """
73 )
74 return [
75 TableInfo(*row)
76 for row in cursor.fetchall()
77 if row[0] not in self.ignored_tables
78 ]
79
80 def get_table_description(self, cursor, table_name):
81 """
82 Return a description of the table with the DB-API cursor.description
83 interface.
84 """
85 # Query the pg_catalog tables as cursor.description does not reliably
86 # return the nullable property and information_schema.columns does not
87 # contain details of materialized views.
88 cursor.execute(
89 """
90 SELECT
91 a.attname AS column_name,
92 NOT (a.attnotnull OR (t.typtype = 'd' AND t.typnotnull)) AS is_nullable,
93 pg_get_expr(ad.adbin, ad.adrelid) AS column_default,
94 CASE WHEN collname = 'default' THEN NULL ELSE collname END AS collation,
95 a.attidentity != '' AS is_autofield,
96 col_description(a.attrelid, a.attnum) AS column_comment
97 FROM pg_attribute a
98 LEFT JOIN pg_attrdef ad ON a.attrelid = ad.adrelid AND a.attnum = ad.adnum
99 LEFT JOIN pg_collation co ON a.attcollation = co.oid
100 JOIN pg_type t ON a.atttypid = t.oid
101 JOIN pg_class c ON a.attrelid = c.oid
102 JOIN pg_namespace n ON c.relnamespace = n.oid
103 WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
104 AND c.relname = %s
105 AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
106 AND pg_catalog.pg_table_is_visible(c.oid)
107 """,
108 [table_name],
109 )
110 field_map = {line[0]: line[1:] for line in cursor.fetchall()}
111 cursor.execute(
112 "SELECT * FROM %s LIMIT 1" % self.connection.ops.quote_name(table_name)
113 )
114 return [
115 FieldInfo(
116 line.name,
117 line.type_code,
118 # display_size is always None on psycopg2.
119 line.internal_size if line.display_size is None else line.display_size,
120 line.internal_size,
121 line.precision,
122 line.scale,
123 *field_map[line.name],
124 )
125 for line in cursor.description
126 ]
127
128 def get_sequences(self, cursor, table_name, table_fields=()):
129 cursor.execute(
130 """
131 SELECT
132 s.relname AS sequence_name,
133 a.attname AS colname
134 FROM
135 pg_class s
136 JOIN pg_depend d ON d.objid = s.oid
137 AND d.classid = 'pg_class'::regclass
138 AND d.refclassid = 'pg_class'::regclass
139 JOIN pg_attribute a ON d.refobjid = a.attrelid
140 AND d.refobjsubid = a.attnum
141 JOIN pg_class tbl ON tbl.oid = d.refobjid
142 AND tbl.relname = %s
143 AND pg_catalog.pg_table_is_visible(tbl.oid)
144 WHERE
145 s.relkind = 'S';
146 """,
147 [table_name],
148 )
149 return [
150 {"name": row[0], "table": table_name, "column": row[1]}
151 for row in cursor.fetchall()
152 ]
153
154 def get_relations(self, cursor, table_name):
155 """
156 Return a dictionary of {field_name: (field_name_other_table, other_table)}
157 representing all foreign keys in the given table.
158 """
159 cursor.execute(
160 """
161 SELECT a1.attname, c2.relname, a2.attname
162 FROM pg_constraint con
163 LEFT JOIN pg_class c1 ON con.conrelid = c1.oid
164 LEFT JOIN pg_class c2 ON con.confrelid = c2.oid
165 LEFT JOIN
166 pg_attribute a1 ON c1.oid = a1.attrelid AND a1.attnum = con.conkey[1]
167 LEFT JOIN
168 pg_attribute a2 ON c2.oid = a2.attrelid AND a2.attnum = con.confkey[1]
169 WHERE
170 c1.relname = %s AND
171 con.contype = 'f' AND
172 c1.relnamespace = c2.relnamespace AND
173 pg_catalog.pg_table_is_visible(c1.oid)
174 """,
175 [table_name],
176 )
177 return {row[0]: (row[2], row[1]) for row in cursor.fetchall()}
178
179 def get_constraints(self, cursor, table_name):
180 """
181 Retrieve any constraints or keys (unique, pk, fk, check, index) across
182 one or more columns. Also retrieve the definition of expression-based
183 indexes.
184 """
185 constraints = {}
186 # Loop over the key table, collecting things as constraints. The column
187 # array must return column names in the same order in which they were
188 # created.
189 cursor.execute(
190 """
191 SELECT
192 c.conname,
193 array(
194 SELECT attname
195 FROM unnest(c.conkey) WITH ORDINALITY cols(colid, arridx)
196 JOIN pg_attribute AS ca ON cols.colid = ca.attnum
197 WHERE ca.attrelid = c.conrelid
198 ORDER BY cols.arridx
199 ),
200 c.contype,
201 (SELECT fkc.relname || '.' || fka.attname
202 FROM pg_attribute AS fka
203 JOIN pg_class AS fkc ON fka.attrelid = fkc.oid
204 WHERE fka.attrelid = c.confrelid AND fka.attnum = c.confkey[1]),
205 cl.reloptions
206 FROM pg_constraint AS c
207 JOIN pg_class AS cl ON c.conrelid = cl.oid
208 WHERE cl.relname = %s AND pg_catalog.pg_table_is_visible(cl.oid)
209 """,
210 [table_name],
211 )
212 for constraint, columns, kind, used_cols, options in cursor.fetchall():
213 constraints[constraint] = {
214 "columns": columns,
215 "primary_key": kind == "p",
216 "unique": kind in ["p", "u"],
217 "foreign_key": tuple(used_cols.split(".", 1)) if kind == "f" else None,
218 "check": kind == "c",
219 "index": False,
220 "definition": None,
221 "options": options,
222 }
223 # Now get indexes
224 cursor.execute(
225 """
226 SELECT
227 indexname,
228 array_agg(attname ORDER BY arridx),
229 indisunique,
230 indisprimary,
231 array_agg(ordering ORDER BY arridx),
232 amname,
233 exprdef,
234 s2.attoptions
235 FROM (
236 SELECT
237 c2.relname as indexname, idx.*, attr.attname, am.amname,
238 CASE
239 WHEN idx.indexprs IS NOT NULL THEN
240 pg_get_indexdef(idx.indexrelid)
241 END AS exprdef,
242 CASE am.amname
243 WHEN %s THEN
244 CASE (option & 1)
245 WHEN 1 THEN 'DESC' ELSE 'ASC'
246 END
247 END as ordering,
248 c2.reloptions as attoptions
249 FROM (
250 SELECT *
251 FROM
252 pg_index i,
253 unnest(i.indkey, i.indoption)
254 WITH ORDINALITY koi(key, option, arridx)
255 ) idx
256 LEFT JOIN pg_class c ON idx.indrelid = c.oid
257 LEFT JOIN pg_class c2 ON idx.indexrelid = c2.oid
258 LEFT JOIN pg_am am ON c2.relam = am.oid
259 LEFT JOIN
260 pg_attribute attr ON attr.attrelid = c.oid AND attr.attnum = idx.key
261 WHERE c.relname = %s AND pg_catalog.pg_table_is_visible(c.oid)
262 ) s2
263 GROUP BY indexname, indisunique, indisprimary, amname, exprdef, attoptions;
264 """,
265 [self.index_default_access_method, table_name],
266 )
267 for (
268 index,
269 columns,
270 unique,
271 primary,
272 orders,
273 type_,
274 definition,
275 options,
276 ) in cursor.fetchall():
277 if index not in constraints:
278 basic_index = (
279 type_ == self.index_default_access_method and options is None
280 )
281 constraints[index] = {
282 "columns": columns if columns != [None] else [],
283 "orders": orders if orders != [None] else [],
284 "primary_key": primary,
285 "unique": unique,
286 "foreign_key": None,
287 "check": False,
288 "index": True,
289 "type": Index.suffix if basic_index else type_,
290 "definition": definition,
291 "options": options,
292 }
293 return constraints