Plain is headed towards 1.0! Subscribe for development updates →

  1"""
  2SQLite backend for the sqlite3 module in the standard library.
  3"""
  4
  5import datetime
  6import decimal
  7import warnings
  8from collections.abc import Mapping
  9from itertools import chain, tee
 10from sqlite3 import dbapi2 as Database
 11
 12from plain.exceptions import ImproperlyConfigured
 13from plain.models.backends.base.base import BaseDatabaseWrapper
 14from plain.models.db import IntegrityError
 15from plain.utils.dateparse import parse_date, parse_datetime, parse_time
 16from plain.utils.regex_helper import _lazy_re_compile
 17
 18from ._functions import register as register_functions
 19from .client import DatabaseClient
 20from .creation import DatabaseCreation
 21from .features import DatabaseFeatures
 22from .introspection import DatabaseIntrospection
 23from .operations import DatabaseOperations
 24from .schema import DatabaseSchemaEditor
 25
 26
 27def decoder(conv_func):
 28    """
 29    Convert bytestrings from Python's sqlite3 interface to a regular string.
 30    """
 31    return lambda s: conv_func(s.decode())
 32
 33
 34def adapt_date(val):
 35    return val.isoformat()
 36
 37
 38def adapt_datetime(val):
 39    return val.isoformat(" ")
 40
 41
 42Database.register_converter("bool", b"1".__eq__)
 43Database.register_converter("date", decoder(parse_date))
 44Database.register_converter("time", decoder(parse_time))
 45Database.register_converter("datetime", decoder(parse_datetime))
 46Database.register_converter("timestamp", decoder(parse_datetime))
 47
 48Database.register_adapter(decimal.Decimal, str)
 49Database.register_adapter(datetime.date, adapt_date)
 50Database.register_adapter(datetime.datetime, adapt_datetime)
 51
 52
 53class DatabaseWrapper(BaseDatabaseWrapper):
 54    vendor = "sqlite"
 55    display_name = "SQLite"
 56    # SQLite doesn't actually support most of these types, but it "does the right
 57    # thing" given more verbose field definitions, so leave them as is so that
 58    # schema inspection is more useful.
 59    data_types = {
 60        "AutoField": "integer",
 61        "BigAutoField": "integer",
 62        "BinaryField": "BLOB",
 63        "BooleanField": "bool",
 64        "CharField": "varchar(%(max_length)s)",
 65        "DateField": "date",
 66        "DateTimeField": "datetime",
 67        "DecimalField": "decimal",
 68        "DurationField": "bigint",
 69        "FloatField": "real",
 70        "IntegerField": "integer",
 71        "BigIntegerField": "bigint",
 72        "IPAddressField": "char(15)",
 73        "GenericIPAddressField": "char(39)",
 74        "JSONField": "text",
 75        "PositiveBigIntegerField": "bigint unsigned",
 76        "PositiveIntegerField": "integer unsigned",
 77        "PositiveSmallIntegerField": "smallint unsigned",
 78        "SmallAutoField": "integer",
 79        "SmallIntegerField": "smallint",
 80        "TextField": "text",
 81        "TimeField": "time",
 82        "UUIDField": "char(32)",
 83    }
 84    data_type_check_constraints = {
 85        "PositiveBigIntegerField": '"%(column)s" >= 0',
 86        "JSONField": '(JSON_VALID("%(column)s") OR "%(column)s" IS NULL)',
 87        "PositiveIntegerField": '"%(column)s" >= 0',
 88        "PositiveSmallIntegerField": '"%(column)s" >= 0',
 89    }
 90    data_types_suffix = {
 91        "AutoField": "AUTOINCREMENT",
 92        "BigAutoField": "AUTOINCREMENT",
 93        "SmallAutoField": "AUTOINCREMENT",
 94    }
 95    # SQLite requires LIKE statements to include an ESCAPE clause if the value
 96    # being escaped has a percent or underscore in it.
 97    # See https://www.sqlite.org/lang_expr.html for an explanation.
 98    operators = {
 99        "exact": "= %s",
100        "iexact": "LIKE %s ESCAPE '\\'",
101        "contains": "LIKE %s ESCAPE '\\'",
102        "icontains": "LIKE %s ESCAPE '\\'",
103        "regex": "REGEXP %s",
104        "iregex": "REGEXP '(?i)' || %s",
105        "gt": "> %s",
106        "gte": ">= %s",
107        "lt": "< %s",
108        "lte": "<= %s",
109        "startswith": "LIKE %s ESCAPE '\\'",
110        "endswith": "LIKE %s ESCAPE '\\'",
111        "istartswith": "LIKE %s ESCAPE '\\'",
112        "iendswith": "LIKE %s ESCAPE '\\'",
113    }
114
115    # The patterns below are used to generate SQL pattern lookup clauses when
116    # the right-hand side of the lookup isn't a raw string (it might be an expression
117    # or the result of a bilateral transformation).
118    # In those cases, special characters for LIKE operators (e.g. \, *, _) should be
119    # escaped on database side.
120    #
121    # Note: we use str.format() here for readability as '%' is used as a wildcard for
122    # the LIKE operator.
123    pattern_esc = r"REPLACE(REPLACE(REPLACE({}, '\', '\\'), '%%', '\%%'), '_', '\_')"
124    pattern_ops = {
125        "contains": r"LIKE '%%' || {} || '%%' ESCAPE '\'",
126        "icontains": r"LIKE '%%' || UPPER({}) || '%%' ESCAPE '\'",
127        "startswith": r"LIKE {} || '%%' ESCAPE '\'",
128        "istartswith": r"LIKE UPPER({}) || '%%' ESCAPE '\'",
129        "endswith": r"LIKE '%%' || {} ESCAPE '\'",
130        "iendswith": r"LIKE '%%' || UPPER({}) ESCAPE '\'",
131    }
132
133    Database = Database
134    SchemaEditorClass = DatabaseSchemaEditor
135    # Classes instantiated in __init__().
136    client_class = DatabaseClient
137    creation_class = DatabaseCreation
138    features_class = DatabaseFeatures
139    introspection_class = DatabaseIntrospection
140    ops_class = DatabaseOperations
141
142    def get_connection_params(self):
143        settings_dict = self.settings_dict
144        if not settings_dict["NAME"]:
145            raise ImproperlyConfigured(
146                "settings.DATABASE is improperly configured. "
147                "Please supply the NAME value."
148            )
149        kwargs = {
150            "database": settings_dict["NAME"],
151            "detect_types": Database.PARSE_DECLTYPES | Database.PARSE_COLNAMES,
152            **settings_dict["OPTIONS"],
153        }
154        # Always allow the underlying SQLite connection to be shareable
155        # between multiple threads. The safe-guarding will be handled at a
156        # higher level by the `BaseDatabaseWrapper.allow_thread_sharing`
157        # property. This is necessary as the shareability is disabled by
158        # default in sqlite3 and it cannot be changed once a connection is
159        # opened.
160        if "check_same_thread" in kwargs and kwargs["check_same_thread"]:
161            warnings.warn(
162                "The `check_same_thread` option was provided and set to "
163                "True. It will be overridden with False. Use the "
164                "`DatabaseWrapper.allow_thread_sharing` property instead "
165                "for controlling thread shareability.",
166                RuntimeWarning,
167            )
168        kwargs.update({"check_same_thread": False, "uri": True})
169        return kwargs
170
171    def get_database_version(self):
172        return self.Database.sqlite_version_info
173
174    def get_new_connection(self, conn_params):
175        conn = Database.connect(**conn_params)
176        register_functions(conn)
177
178        conn.execute("PRAGMA foreign_keys = ON")
179        # The macOS bundled SQLite defaults legacy_alter_table ON, which
180        # prevents atomic table renames (feature supports_atomic_references_rename)
181        conn.execute("PRAGMA legacy_alter_table = OFF")
182        return conn
183
184    def create_cursor(self, name=None):
185        return self.connection.cursor(factory=SQLiteCursorWrapper)
186
187    def close(self):
188        self.validate_thread_sharing()
189        # If database is in memory, closing the connection destroys the
190        # database. To prevent accidental data loss, ignore close requests on
191        # an in-memory db.
192        if not self.is_in_memory_db():
193            BaseDatabaseWrapper.close(self)
194
195    def _savepoint_allowed(self):
196        # When 'isolation_level' is not None, sqlite3 commits before each
197        # savepoint; it's a bug. When it is None, savepoints don't make sense
198        # because autocommit is enabled. The only exception is inside 'atomic'
199        # blocks. To work around that bug, on SQLite, 'atomic' starts a
200        # transaction explicitly rather than simply disable autocommit.
201        return self.in_atomic_block
202
203    def _set_autocommit(self, autocommit):
204        if autocommit:
205            level = None
206        else:
207            # sqlite3's internal default is ''. It's different from None.
208            # See Modules/_sqlite/connection.c.
209            level = ""
210        # 'isolation_level' is a misleading API.
211        # SQLite always runs at the SERIALIZABLE isolation level.
212        with self.wrap_database_errors:
213            self.connection.isolation_level = level
214
215    def disable_constraint_checking(self):
216        with self.cursor() as cursor:
217            cursor.execute("PRAGMA foreign_keys = OFF")
218            # Foreign key constraints cannot be turned off while in a multi-
219            # statement transaction. Fetch the current state of the pragma
220            # to determine if constraints are effectively disabled.
221            enabled = cursor.execute("PRAGMA foreign_keys").fetchone()[0]
222        return not bool(enabled)
223
224    def enable_constraint_checking(self):
225        with self.cursor() as cursor:
226            cursor.execute("PRAGMA foreign_keys = ON")
227
228    def check_constraints(self, table_names=None):
229        """
230        Check each table name in `table_names` for rows with invalid foreign
231        key references. This method is intended to be used in conjunction with
232        `disable_constraint_checking()` and `enable_constraint_checking()`, to
233        determine if rows with invalid references were entered while constraint
234        checks were off.
235        """
236        with self.cursor() as cursor:
237            if table_names is None:
238                violations = cursor.execute("PRAGMA foreign_key_check").fetchall()
239            else:
240                violations = chain.from_iterable(
241                    cursor.execute(
242                        f"PRAGMA foreign_key_check({self.ops.quote_name(table_name)})"
243                    ).fetchall()
244                    for table_name in table_names
245                )
246            # See https://www.sqlite.org/pragma.html#pragma_foreign_key_check
247            for (
248                table_name,
249                rowid,
250                referenced_table_name,
251                foreign_key_index,
252            ) in violations:
253                foreign_key = cursor.execute(
254                    f"PRAGMA foreign_key_list({self.ops.quote_name(table_name)})"
255                ).fetchall()[foreign_key_index]
256                column_name, referenced_column_name = foreign_key[3:5]
257                primary_key_column_name = self.introspection.get_primary_key_column(
258                    cursor, table_name
259                )
260                primary_key_value, bad_value = cursor.execute(
261                    f"SELECT {self.ops.quote_name(primary_key_column_name)}, {self.ops.quote_name(column_name)} FROM {self.ops.quote_name(table_name)} WHERE rowid = %s",
262                    (rowid,),
263                ).fetchone()
264                raise IntegrityError(
265                    f"The row in table '{table_name}' with primary key '{primary_key_value}' has an "
266                    f"invalid foreign key: {table_name}.{column_name} contains a value '{bad_value}' that "
267                    f"does not have a corresponding value in {referenced_table_name}.{referenced_column_name}."
268                )
269
270    def is_usable(self):
271        return True
272
273    def _start_transaction_under_autocommit(self):
274        """
275        Start a transaction explicitly in autocommit mode.
276
277        Staying in autocommit mode works around a bug of sqlite3 that breaks
278        savepoints when autocommit is disabled.
279        """
280        self.cursor().execute("BEGIN")
281
282    def is_in_memory_db(self):
283        return self.creation.is_in_memory_db(self.settings_dict["NAME"])
284
285
286FORMAT_QMARK_REGEX = _lazy_re_compile(r"(?<!%)%s")
287
288
289class SQLiteCursorWrapper(Database.Cursor):
290    """
291    Plain uses the "format" and "pyformat" styles, but Python's sqlite3 module
292    supports neither of these styles.
293
294    This wrapper performs the following conversions:
295
296    - "format" style to "qmark" style
297    - "pyformat" style to "named" style
298
299    In both cases, if you want to use a literal "%s", you'll need to use "%%s".
300    """
301
302    def execute(self, query, params=None):
303        if params is None:
304            return super().execute(query)
305        # Extract names if params is a mapping, i.e. "pyformat" style is used.
306        param_names = list(params) if isinstance(params, Mapping) else None
307        query = self.convert_query(query, param_names=param_names)
308        return super().execute(query, params)
309
310    def executemany(self, query, param_list):
311        # Extract names if params is a mapping, i.e. "pyformat" style is used.
312        # Peek carefully as a generator can be passed instead of a list/tuple.
313        peekable, param_list = tee(iter(param_list))
314        if (params := next(peekable, None)) and isinstance(params, Mapping):
315            param_names = list(params)
316        else:
317            param_names = None
318        query = self.convert_query(query, param_names=param_names)
319        return super().executemany(query, param_list)
320
321    def convert_query(self, query, *, param_names=None):
322        if param_names is None:
323            # Convert from "format" style to "qmark" style.
324            return FORMAT_QMARK_REGEX.sub("?", query).replace("%%", "%")
325        else:
326            # Convert from "pyformat" style to "named" style.
327            return query % {name: f":{name}" for name in param_names}