1"""
2SQLite backend for the sqlite3 module in the standard library.
3"""
4
5import datetime
6import decimal
7import warnings
8from collections.abc import Mapping
9from itertools import chain, tee
10from sqlite3 import dbapi2 as Database
11
12from plain.exceptions import ImproperlyConfigured
13from plain.models.backends.base.base import BaseDatabaseWrapper
14from plain.models.db import IntegrityError
15from plain.utils.dateparse import parse_date, parse_datetime, parse_time
16from plain.utils.regex_helper import _lazy_re_compile
17
18from ._functions import register as register_functions
19from .client import DatabaseClient
20from .creation import DatabaseCreation
21from .features import DatabaseFeatures
22from .introspection import DatabaseIntrospection
23from .operations import DatabaseOperations
24from .schema import DatabaseSchemaEditor
25
26
27def decoder(conv_func):
28 """
29 Convert bytestrings from Python's sqlite3 interface to a regular string.
30 """
31 return lambda s: conv_func(s.decode())
32
33
34def adapt_date(val):
35 return val.isoformat()
36
37
38def adapt_datetime(val):
39 return val.isoformat(" ")
40
41
42Database.register_converter("bool", b"1".__eq__)
43Database.register_converter("date", decoder(parse_date))
44Database.register_converter("time", decoder(parse_time))
45Database.register_converter("datetime", decoder(parse_datetime))
46Database.register_converter("timestamp", decoder(parse_datetime))
47
48Database.register_adapter(decimal.Decimal, str)
49Database.register_adapter(datetime.date, adapt_date)
50Database.register_adapter(datetime.datetime, adapt_datetime)
51
52
53class DatabaseWrapper(BaseDatabaseWrapper):
54 vendor = "sqlite"
55 display_name = "SQLite"
56 # SQLite doesn't actually support most of these types, but it "does the right
57 # thing" given more verbose field definitions, so leave them as is so that
58 # schema inspection is more useful.
59 data_types = {
60 "AutoField": "integer",
61 "BigAutoField": "integer",
62 "BinaryField": "BLOB",
63 "BooleanField": "bool",
64 "CharField": "varchar(%(max_length)s)",
65 "DateField": "date",
66 "DateTimeField": "datetime",
67 "DecimalField": "decimal",
68 "DurationField": "bigint",
69 "FloatField": "real",
70 "IntegerField": "integer",
71 "BigIntegerField": "bigint",
72 "IPAddressField": "char(15)",
73 "GenericIPAddressField": "char(39)",
74 "JSONField": "text",
75 "PositiveBigIntegerField": "bigint unsigned",
76 "PositiveIntegerField": "integer unsigned",
77 "PositiveSmallIntegerField": "smallint unsigned",
78 "SmallAutoField": "integer",
79 "SmallIntegerField": "smallint",
80 "TextField": "text",
81 "TimeField": "time",
82 "UUIDField": "char(32)",
83 }
84 data_type_check_constraints = {
85 "PositiveBigIntegerField": '"%(column)s" >= 0',
86 "JSONField": '(JSON_VALID("%(column)s") OR "%(column)s" IS NULL)',
87 "PositiveIntegerField": '"%(column)s" >= 0',
88 "PositiveSmallIntegerField": '"%(column)s" >= 0',
89 }
90 data_types_suffix = {
91 "AutoField": "AUTOINCREMENT",
92 "BigAutoField": "AUTOINCREMENT",
93 "SmallAutoField": "AUTOINCREMENT",
94 }
95 # SQLite requires LIKE statements to include an ESCAPE clause if the value
96 # being escaped has a percent or underscore in it.
97 # See https://www.sqlite.org/lang_expr.html for an explanation.
98 operators = {
99 "exact": "= %s",
100 "iexact": "LIKE %s ESCAPE '\\'",
101 "contains": "LIKE %s ESCAPE '\\'",
102 "icontains": "LIKE %s ESCAPE '\\'",
103 "regex": "REGEXP %s",
104 "iregex": "REGEXP '(?i)' || %s",
105 "gt": "> %s",
106 "gte": ">= %s",
107 "lt": "< %s",
108 "lte": "<= %s",
109 "startswith": "LIKE %s ESCAPE '\\'",
110 "endswith": "LIKE %s ESCAPE '\\'",
111 "istartswith": "LIKE %s ESCAPE '\\'",
112 "iendswith": "LIKE %s ESCAPE '\\'",
113 }
114
115 # The patterns below are used to generate SQL pattern lookup clauses when
116 # the right-hand side of the lookup isn't a raw string (it might be an expression
117 # or the result of a bilateral transformation).
118 # In those cases, special characters for LIKE operators (e.g. \, *, _) should be
119 # escaped on database side.
120 #
121 # Note: we use str.format() here for readability as '%' is used as a wildcard for
122 # the LIKE operator.
123 pattern_esc = r"REPLACE(REPLACE(REPLACE({}, '\', '\\'), '%%', '\%%'), '_', '\_')"
124 pattern_ops = {
125 "contains": r"LIKE '%%' || {} || '%%' ESCAPE '\'",
126 "icontains": r"LIKE '%%' || UPPER({}) || '%%' ESCAPE '\'",
127 "startswith": r"LIKE {} || '%%' ESCAPE '\'",
128 "istartswith": r"LIKE UPPER({}) || '%%' ESCAPE '\'",
129 "endswith": r"LIKE '%%' || {} ESCAPE '\'",
130 "iendswith": r"LIKE '%%' || UPPER({}) ESCAPE '\'",
131 }
132
133 Database = Database
134 SchemaEditorClass = DatabaseSchemaEditor
135 # Classes instantiated in __init__().
136 client_class = DatabaseClient
137 creation_class = DatabaseCreation
138 features_class = DatabaseFeatures
139 introspection_class = DatabaseIntrospection
140 ops_class = DatabaseOperations
141
142 def get_connection_params(self):
143 settings_dict = self.settings_dict
144 if not settings_dict["NAME"]:
145 raise ImproperlyConfigured(
146 "settings.DATABASE is improperly configured. "
147 "Please supply the NAME value."
148 )
149 kwargs = {
150 "database": settings_dict["NAME"],
151 "detect_types": Database.PARSE_DECLTYPES | Database.PARSE_COLNAMES,
152 **settings_dict["OPTIONS"],
153 }
154 # Always allow the underlying SQLite connection to be shareable
155 # between multiple threads. The safe-guarding will be handled at a
156 # higher level by the `BaseDatabaseWrapper.allow_thread_sharing`
157 # property. This is necessary as the shareability is disabled by
158 # default in sqlite3 and it cannot be changed once a connection is
159 # opened.
160 if "check_same_thread" in kwargs and kwargs["check_same_thread"]:
161 warnings.warn(
162 "The `check_same_thread` option was provided and set to "
163 "True. It will be overridden with False. Use the "
164 "`DatabaseWrapper.allow_thread_sharing` property instead "
165 "for controlling thread shareability.",
166 RuntimeWarning,
167 )
168 kwargs.update({"check_same_thread": False, "uri": True})
169 return kwargs
170
171 def get_database_version(self):
172 return self.Database.sqlite_version_info
173
174 def get_new_connection(self, conn_params):
175 conn = Database.connect(**conn_params)
176 register_functions(conn)
177
178 conn.execute("PRAGMA foreign_keys = ON")
179 # The macOS bundled SQLite defaults legacy_alter_table ON, which
180 # prevents atomic table renames (feature supports_atomic_references_rename)
181 conn.execute("PRAGMA legacy_alter_table = OFF")
182 return conn
183
184 def create_cursor(self, name=None):
185 return self.connection.cursor(factory=SQLiteCursorWrapper)
186
187 def close(self):
188 self.validate_thread_sharing()
189 # If database is in memory, closing the connection destroys the
190 # database. To prevent accidental data loss, ignore close requests on
191 # an in-memory db.
192 if not self.is_in_memory_db():
193 BaseDatabaseWrapper.close(self)
194
195 def _savepoint_allowed(self):
196 # When 'isolation_level' is not None, sqlite3 commits before each
197 # savepoint; it's a bug. When it is None, savepoints don't make sense
198 # because autocommit is enabled. The only exception is inside 'atomic'
199 # blocks. To work around that bug, on SQLite, 'atomic' starts a
200 # transaction explicitly rather than simply disable autocommit.
201 return self.in_atomic_block
202
203 def _set_autocommit(self, autocommit):
204 if autocommit:
205 level = None
206 else:
207 # sqlite3's internal default is ''. It's different from None.
208 # See Modules/_sqlite/connection.c.
209 level = ""
210 # 'isolation_level' is a misleading API.
211 # SQLite always runs at the SERIALIZABLE isolation level.
212 with self.wrap_database_errors:
213 self.connection.isolation_level = level
214
215 def disable_constraint_checking(self):
216 with self.cursor() as cursor:
217 cursor.execute("PRAGMA foreign_keys = OFF")
218 # Foreign key constraints cannot be turned off while in a multi-
219 # statement transaction. Fetch the current state of the pragma
220 # to determine if constraints are effectively disabled.
221 enabled = cursor.execute("PRAGMA foreign_keys").fetchone()[0]
222 return not bool(enabled)
223
224 def enable_constraint_checking(self):
225 with self.cursor() as cursor:
226 cursor.execute("PRAGMA foreign_keys = ON")
227
228 def check_constraints(self, table_names=None):
229 """
230 Check each table name in `table_names` for rows with invalid foreign
231 key references. This method is intended to be used in conjunction with
232 `disable_constraint_checking()` and `enable_constraint_checking()`, to
233 determine if rows with invalid references were entered while constraint
234 checks were off.
235 """
236 with self.cursor() as cursor:
237 if table_names is None:
238 violations = cursor.execute("PRAGMA foreign_key_check").fetchall()
239 else:
240 violations = chain.from_iterable(
241 cursor.execute(
242 f"PRAGMA foreign_key_check({self.ops.quote_name(table_name)})"
243 ).fetchall()
244 for table_name in table_names
245 )
246 # See https://www.sqlite.org/pragma.html#pragma_foreign_key_check
247 for (
248 table_name,
249 rowid,
250 referenced_table_name,
251 foreign_key_index,
252 ) in violations:
253 foreign_key = cursor.execute(
254 f"PRAGMA foreign_key_list({self.ops.quote_name(table_name)})"
255 ).fetchall()[foreign_key_index]
256 column_name, referenced_column_name = foreign_key[3:5]
257 primary_key_column_name = self.introspection.get_primary_key_column(
258 cursor, table_name
259 )
260 primary_key_value, bad_value = cursor.execute(
261 f"SELECT {self.ops.quote_name(primary_key_column_name)}, {self.ops.quote_name(column_name)} FROM {self.ops.quote_name(table_name)} WHERE rowid = %s",
262 (rowid,),
263 ).fetchone()
264 raise IntegrityError(
265 f"The row in table '{table_name}' with primary key '{primary_key_value}' has an "
266 f"invalid foreign key: {table_name}.{column_name} contains a value '{bad_value}' that "
267 f"does not have a corresponding value in {referenced_table_name}.{referenced_column_name}."
268 )
269
270 def is_usable(self):
271 return True
272
273 def _start_transaction_under_autocommit(self):
274 """
275 Start a transaction explicitly in autocommit mode.
276
277 Staying in autocommit mode works around a bug of sqlite3 that breaks
278 savepoints when autocommit is disabled.
279 """
280 self.cursor().execute("BEGIN")
281
282 def is_in_memory_db(self):
283 return self.creation.is_in_memory_db(self.settings_dict["NAME"])
284
285
286FORMAT_QMARK_REGEX = _lazy_re_compile(r"(?<!%)%s")
287
288
289class SQLiteCursorWrapper(Database.Cursor):
290 """
291 Plain uses the "format" and "pyformat" styles, but Python's sqlite3 module
292 supports neither of these styles.
293
294 This wrapper performs the following conversions:
295
296 - "format" style to "qmark" style
297 - "pyformat" style to "named" style
298
299 In both cases, if you want to use a literal "%s", you'll need to use "%%s".
300 """
301
302 def execute(self, query, params=None):
303 if params is None:
304 return super().execute(query)
305 # Extract names if params is a mapping, i.e. "pyformat" style is used.
306 param_names = list(params) if isinstance(params, Mapping) else None
307 query = self.convert_query(query, param_names=param_names)
308 return super().execute(query, params)
309
310 def executemany(self, query, param_list):
311 # Extract names if params is a mapping, i.e. "pyformat" style is used.
312 # Peek carefully as a generator can be passed instead of a list/tuple.
313 peekable, param_list = tee(iter(param_list))
314 if (params := next(peekable, None)) and isinstance(params, Mapping):
315 param_names = list(params)
316 else:
317 param_names = None
318 query = self.convert_query(query, param_names=param_names)
319 return super().executemany(query, param_list)
320
321 def convert_query(self, query, *, param_names=None):
322 if param_names is None:
323 # Convert from "format" style to "qmark" style.
324 return FORMAT_QMARK_REGEX.sub("?", query).replace("%%", "%")
325 else:
326 # Convert from "pyformat" style to "named" style.
327 return query % {name: f":{name}" for name in param_names}