Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import datetime
  4import decimal
  5import ipaddress
  6import json
  7from abc import ABC, abstractmethod
  8from collections.abc import Iterable, Sequence
  9from functools import cached_property
 10from typing import TYPE_CHECKING, Any
 11
 12import sqlparse
 13
 14from plain.models.backends import utils
 15from plain.models.backends.utils import CursorWrapper
 16from plain.models.db import NotSupportedError
 17from plain.models.expressions import ResolvableExpression
 18from plain.utils import timezone
 19from plain.utils.encoding import force_str
 20
 21if TYPE_CHECKING:
 22    from plain.models.backends.base.base import BaseDatabaseWrapper
 23    from plain.models.fields import Field
 24    from plain.models.sql.compiler import SQLCompiler
 25    from plain.models.sql.query import Query
 26
 27
 28class BaseDatabaseOperations(ABC):
 29    """
 30    Encapsulate backend-specific differences, such as the way a backend
 31    performs ordering or calculates the ID of a recently-inserted row.
 32    """
 33
 34    # Integer field safe ranges by `internal_type` as documented
 35    # in docs/ref/models/fields.txt.
 36    integer_field_ranges: dict[str, tuple[int, int]] = {
 37        "SmallIntegerField": (-32768, 32767),
 38        "IntegerField": (-2147483648, 2147483647),
 39        "BigIntegerField": (-9223372036854775808, 9223372036854775807),
 40        "PositiveBigIntegerField": (0, 9223372036854775807),
 41        "PositiveSmallIntegerField": (0, 32767),
 42        "PositiveIntegerField": (0, 2147483647),
 43        "PrimaryKeyField": (-9223372036854775808, 9223372036854775807),
 44    }
 45    # Mapping of Field.get_internal_type() (typically the model field's class
 46    # name) to the data type to use for the Cast() function, if different from
 47    # DatabaseWrapper.data_types.
 48    cast_data_types: dict[str, str] = {}
 49    # CharField data type if the max_length argument isn't provided.
 50    cast_char_field_without_max_length: str | None = None
 51
 52    # Start and end points for window expressions.
 53    PRECEDING: str = "PRECEDING"
 54    FOLLOWING: str = "FOLLOWING"
 55    UNBOUNDED_PRECEDING: str = "UNBOUNDED " + PRECEDING
 56    UNBOUNDED_FOLLOWING: str = "UNBOUNDED " + FOLLOWING
 57    CURRENT_ROW: str = "CURRENT ROW"
 58
 59    # Prefix for EXPLAIN queries
 60    explain_prefix: str
 61
 62    def __init__(self, connection: BaseDatabaseWrapper):
 63        self.connection = connection
 64
 65    def bulk_batch_size(self, fields: list[Field], objs: list[Any]) -> int:
 66        """
 67        Return the maximum allowed batch size for the backend. The fields
 68        are the fields going to be inserted in the batch, the objs contains
 69        all the objects to be inserted.
 70        """
 71        return len(objs)
 72
 73    def format_for_duration_arithmetic(self, sql: str) -> str:
 74        raise NotImplementedError(
 75            "subclasses of BaseDatabaseOperations may require a "
 76            "format_for_duration_arithmetic() method."
 77        )
 78
 79    def unification_cast_sql(self, output_field: Field) -> str:
 80        """
 81        Given a field instance, return the SQL that casts the result of a union
 82        to that type. The resulting string should contain a '%s' placeholder
 83        for the expression being cast.
 84        """
 85        return "%s"
 86
 87    @abstractmethod
 88    def date_extract_sql(
 89        self, lookup_type: str, sql: str, params: list[Any] | tuple[Any, ...]
 90    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
 91        """
 92        Given a lookup_type of 'year', 'month', or 'day', return the SQL that
 93        extracts a value from the given date field field_name.
 94        """
 95        ...
 96
 97    @abstractmethod
 98    def date_trunc_sql(
 99        self,
100        lookup_type: str,
101        sql: str,
102        params: list[Any] | tuple[Any, ...],
103        tzname: str | None = None,
104    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
105        """
106        Given a lookup_type of 'year', 'month', or 'day', return the SQL that
107        truncates the given date or datetime field field_name to a date object
108        with only the given specificity.
109
110        If `tzname` is provided, the given value is truncated in a specific
111        timezone.
112        """
113        ...
114
115    @abstractmethod
116    def datetime_cast_date_sql(
117        self, sql: str, params: list[Any] | tuple[Any, ...], tzname: str | None
118    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
119        """
120        Return the SQL to cast a datetime value to date value.
121        """
122        ...
123
124    @abstractmethod
125    def datetime_cast_time_sql(
126        self, sql: str, params: list[Any] | tuple[Any, ...], tzname: str | None
127    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
128        """
129        Return the SQL to cast a datetime value to time value.
130        """
131        ...
132
133    @abstractmethod
134    def datetime_extract_sql(
135        self,
136        lookup_type: str,
137        sql: str,
138        params: list[Any] | tuple[Any, ...],
139        tzname: str | None,
140    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
141        """
142        Given a lookup_type of 'year', 'month', 'day', 'hour', 'minute', or
143        'second', return the SQL that extracts a value from the given
144        datetime field field_name.
145        """
146        ...
147
148    @abstractmethod
149    def datetime_trunc_sql(
150        self,
151        lookup_type: str,
152        sql: str,
153        params: list[Any] | tuple[Any, ...],
154        tzname: str | None,
155    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
156        """
157        Given a lookup_type of 'year', 'month', 'day', 'hour', 'minute', or
158        'second', return the SQL that truncates the given datetime field
159        field_name to a datetime object with only the given specificity.
160        """
161        ...
162
163    @abstractmethod
164    def time_trunc_sql(
165        self,
166        lookup_type: str,
167        sql: str,
168        params: list[Any] | tuple[Any, ...],
169        tzname: str | None = None,
170    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
171        """
172        Given a lookup_type of 'hour', 'minute' or 'second', return the SQL
173        that truncates the given time or datetime field field_name to a time
174        object with only the given specificity.
175
176        If `tzname` is provided, the given value is truncated in a specific
177        timezone.
178        """
179        ...
180
181    def time_extract_sql(
182        self, lookup_type: str, sql: str, params: list[Any] | tuple[Any, ...]
183    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
184        """
185        Given a lookup_type of 'hour', 'minute', or 'second', return the SQL
186        that extracts a value from the given time field field_name.
187        """
188        return self.date_extract_sql(lookup_type, sql, params)
189
190    def deferrable_sql(self) -> str:
191        """
192        Return the SQL to make a constraint "initially deferred" during a
193        CREATE TABLE statement.
194        """
195        return ""
196
197    def distinct_sql(
198        self, fields: list[str], params: list[Any] | tuple[Any, ...]
199    ) -> tuple[list[str], list[Any]]:
200        """
201        Return an SQL DISTINCT clause which removes duplicate rows from the
202        result set. If any fields are given, only check the given fields for
203        duplicates.
204        """
205        if fields:
206            raise NotSupportedError(
207                "DISTINCT ON fields is not supported by this database backend"
208            )
209        else:
210            return ["DISTINCT"], []
211
212    def fetch_returned_insert_columns(
213        self, cursor: CursorWrapper, returning_params: Any
214    ) -> Any:
215        """
216        Given a cursor object that has just performed an INSERT...RETURNING
217        statement into a table, return the newly created data.
218        """
219        return cursor.fetchone()
220
221    def field_cast_sql(self, db_type: str | None, internal_type: str) -> str:
222        """
223        Given a column type (e.g. 'BLOB', 'VARCHAR') and an internal type
224        (e.g. 'GenericIPAddressField'), return the SQL to cast it before using
225        it in a WHERE statement. The resulting string should contain a '%s'
226        placeholder for the column being searched against.
227        """
228        return "%s"
229
230    def force_no_ordering(self) -> list[tuple[Any, tuple[str, tuple[Any, ...], bool]]]:
231        """
232        Return a list used in the "ORDER BY" clause to force no ordering at
233        all. Return an empty list to include nothing in the ordering.
234        """
235        return []
236
237    def for_update_sql(
238        self,
239        nowait: bool = False,
240        skip_locked: bool = False,
241        of: tuple[str, ...] = (),
242        no_key: bool = False,
243    ) -> str:
244        """
245        Return the FOR UPDATE SQL clause to lock rows for an update operation.
246        """
247        return "FOR{} UPDATE{}{}{}".format(
248            " NO KEY" if no_key else "",
249            " OF {}".format(", ".join(of)) if of else "",
250            " NOWAIT" if nowait else "",
251            " SKIP LOCKED" if skip_locked else "",
252        )
253
254    def _get_limit_offset_params(
255        self, low_mark: int | None, high_mark: int | None
256    ) -> tuple[int | None, int]:
257        offset = low_mark or 0
258        if high_mark is not None:
259            return (high_mark - offset), offset
260        elif offset:
261            return self.connection.ops.no_limit_value(), offset
262        return None, offset
263
264    def limit_offset_sql(self, low_mark: int | None, high_mark: int | None) -> str:
265        """Return LIMIT/OFFSET SQL clause."""
266        limit, offset = self._get_limit_offset_params(low_mark, high_mark)
267        return " ".join(
268            sql
269            for sql in (
270                ("LIMIT %d" % limit) if limit else None,  # noqa: UP031
271                ("OFFSET %d" % offset) if offset else None,  # noqa: UP031
272            )
273            if sql
274        )
275
276    def last_executed_query(
277        self,
278        cursor: CursorWrapper,
279        sql: str,
280        params: Any,
281    ) -> str | None:
282        """
283        Return a string of the query last executed by the given cursor, with
284        placeholders replaced with actual values.
285
286        `sql` is the raw query containing placeholders and `params` is the
287        sequence of parameters. These are used by default, but this method
288        exists for database backends to provide a better implementation
289        according to their own quoting schemes.
290        """
291
292        # Convert params to contain string values.
293        def to_string(s: Any) -> str:
294            return force_str(s, strings_only=True, errors="replace")
295
296        u_params: tuple[str, ...] | dict[str, str]
297        if isinstance(params, (list, tuple)):  # noqa: UP038
298            u_params = tuple(to_string(val) for val in params)
299        elif params is None:
300            u_params = ()
301        else:
302            u_params = {to_string(k): to_string(v) for k, v in params.items()}
303
304        return f"QUERY = {sql!r} - PARAMS = {u_params!r}"
305
306    def last_insert_id(
307        self, cursor: CursorWrapper, table_name: str, pk_name: str
308    ) -> int:
309        """
310        Given a cursor object that has just performed an INSERT statement into
311        a table that has an auto-incrementing ID, return the newly created ID.
312
313        `pk_name` is the name of the primary-key column.
314        """
315        return cursor.lastrowid
316
317    def lookup_cast(self, lookup_type: str, internal_type: str | None = None) -> str:
318        """
319        Return the string to use in a query when performing lookups
320        ("contains", "like", etc.). It should contain a '%s' placeholder for
321        the column being searched against.
322        """
323        return "%s"
324
325    def max_in_list_size(self) -> int | None:
326        """
327        Return the maximum number of items that can be passed in a single 'IN'
328        list condition, or None if the backend does not impose a limit.
329        """
330        return None
331
332    def max_name_length(self) -> int | None:
333        """
334        Return the maximum length of table and column names, or None if there
335        is no limit.
336        """
337        return None
338
339    @abstractmethod
340    def no_limit_value(self) -> int | None:
341        """
342        Return the value to use for the LIMIT when we are wanting "LIMIT
343        infinity". Return None if the limit clause can be omitted in this case.
344        """
345        ...
346
347    def pk_default_value(self) -> str:
348        """
349        Return the value to use during an INSERT statement to specify that
350        the field should use its default value.
351        """
352        return "DEFAULT"
353
354    def prepare_sql_script(self, sql: str) -> list[str]:
355        """
356        Take an SQL script that may contain multiple lines and return a list
357        of statements to feed to successive cursor.execute() calls.
358
359        Since few databases are able to process raw SQL scripts in a single
360        cursor.execute() call and PEP 249 doesn't talk about this use case,
361        the default implementation is conservative.
362        """
363        return [
364            sqlparse.format(statement, strip_comments=True)
365            for statement in sqlparse.split(sql)
366            if statement
367        ]
368
369    def return_insert_columns(
370        self, fields: list[Field]
371    ) -> tuple[str, Sequence[Any]] | None:
372        """
373        For backends that support returning columns as part of an insert query,
374        return the SQL and params to append to the INSERT query. The returned
375        fragment should contain a format string to hold the appropriate column.
376        """
377        return None
378
379    @abstractmethod
380    def bulk_insert_sql(
381        self, fields: list[Field], placeholder_rows: list[list[str]]
382    ) -> str:
383        """
384        Return the SQL for bulk inserting rows.
385        """
386        ...
387
388    @abstractmethod
389    def fetch_returned_insert_rows(self, cursor: CursorWrapper) -> list[Any]:
390        """
391        Given a cursor object that has just performed an INSERT...RETURNING
392        statement into a table, return the list of returned data.
393        """
394        ...
395
396    @cached_property
397    def compilers(self) -> dict[type[Query], type[SQLCompiler]]:
398        """
399        Return a mapping of Query types to their SQLCompiler implementations.
400        Subclasses can override this to provide custom compiler implementations.
401        """
402        from plain.models.sql.compiler import (
403            SQLAggregateCompiler,
404            SQLCompiler,
405            SQLDeleteCompiler,
406            SQLInsertCompiler,
407            SQLUpdateCompiler,
408        )
409        from plain.models.sql.query import Query
410        from plain.models.sql.subqueries import (
411            AggregateQuery,
412            DeleteQuery,
413            InsertQuery,
414            UpdateQuery,
415        )
416
417        return {
418            Query: SQLCompiler,
419            DeleteQuery: SQLDeleteCompiler,
420            UpdateQuery: SQLUpdateCompiler,
421            InsertQuery: SQLInsertCompiler,
422            AggregateQuery: SQLAggregateCompiler,
423        }
424
425    def get_compiler_for(self, query: Query, elide_empty: bool = True) -> SQLCompiler:
426        """
427        Return a compiler instance for the given query.
428        Walks the query's MRO to find the appropriate compiler class.
429        """
430        for query_cls in type(query).__mro__:
431            if query_cls in self.compilers:
432                return self.compilers[query_cls](query, self.connection, elide_empty)
433        raise TypeError(f"No compiler registered for {type(query)}")
434
435    @abstractmethod
436    def quote_name(self, name: str) -> str:
437        """
438        Return a quoted version of the given table, index, or column name. Do
439        not quote the given name if it's already been quoted.
440        """
441        ...
442
443    def regex_lookup(self, lookup_type: str) -> str:
444        """
445        Return the string to use in a query when performing regular expression
446        lookups (using "regex" or "iregex"). It should contain a '%s'
447        placeholder for the column being searched against.
448
449        If the feature is not supported (or part of it is not supported), raise
450        NotImplementedError.
451        """
452        raise NotImplementedError(
453            "subclasses of BaseDatabaseOperations may require a regex_lookup() method"
454        )
455
456    def savepoint_create_sql(self, sid: str) -> str:
457        """
458        Return the SQL for starting a new savepoint. Only required if the
459        "uses_savepoints" feature is True. The "sid" parameter is a string
460        for the savepoint id.
461        """
462        return f"SAVEPOINT {self.quote_name(sid)}"
463
464    def savepoint_commit_sql(self, sid: str) -> str:
465        """
466        Return the SQL for committing the given savepoint.
467        """
468        return f"RELEASE SAVEPOINT {self.quote_name(sid)}"
469
470    def savepoint_rollback_sql(self, sid: str) -> str:
471        """
472        Return the SQL for rolling back the given savepoint.
473        """
474        return f"ROLLBACK TO SAVEPOINT {self.quote_name(sid)}"
475
476    def set_time_zone_sql(self) -> str:
477        """
478        Return the SQL that will set the connection's time zone.
479
480        Return '' if the backend doesn't support time zones.
481        """
482        return ""
483
484    def prep_for_like_query(self, x: str) -> str:
485        """Prepare a value for use in a LIKE query."""
486        return str(x).replace("\\", "\\\\").replace("%", r"\%").replace("_", r"\_")
487
488    # Same as prep_for_like_query(), but called for "iexact" matches, which
489    # need not necessarily be implemented using "LIKE" in the backend.
490    prep_for_iexact_query = prep_for_like_query
491
492    def validate_autopk_value(self, value: int) -> int:
493        """
494        Certain backends do not accept some values for "serial" fields
495        (for example zero in MySQL). Raise a ValueError if the value is
496        invalid, otherwise return the validated value.
497        """
498        return value
499
500    def adapt_unknown_value(self, value: Any) -> Any:
501        """
502        Transform a value to something compatible with the backend driver.
503
504        This method only depends on the type of the value. It's designed for
505        cases where the target type isn't known, such as .raw() SQL queries.
506        As a consequence it may not work perfectly in all circumstances.
507        """
508        if isinstance(value, datetime.datetime):  # must be before date
509            return self.adapt_datetimefield_value(value)
510        elif isinstance(value, datetime.date):
511            return self.adapt_datefield_value(value)
512        elif isinstance(value, datetime.time):
513            return self.adapt_timefield_value(value)
514        elif isinstance(value, decimal.Decimal):
515            return self.adapt_decimalfield_value(value)
516        else:
517            return value
518
519    def adapt_integerfield_value(
520        self, value: int | None, internal_type: str
521    ) -> int | None:
522        return value
523
524    def adapt_datefield_value(self, value: datetime.date | None) -> str | None:
525        """
526        Transform a date value to an object compatible with what is expected
527        by the backend driver for date columns.
528        """
529        if value is None:
530            return None
531        return str(value)
532
533    def adapt_datetimefield_value(
534        self, value: datetime.datetime | Any | None
535    ) -> str | Any | None:
536        """
537        Transform a datetime value to an object compatible with what is expected
538        by the backend driver for datetime columns.
539        """
540        if value is None:
541            return None
542        # Expression values are adapted by the database.
543        if isinstance(value, ResolvableExpression):
544            return value
545
546        return str(value)
547
548    def adapt_timefield_value(
549        self, value: datetime.time | Any | None
550    ) -> str | Any | None:
551        """
552        Transform a time value to an object compatible with what is expected
553        by the backend driver for time columns.
554        """
555        if value is None:
556            return None
557        # Expression values are adapted by the database.
558        if isinstance(value, ResolvableExpression):
559            return value
560
561        if timezone.is_aware(value):
562            raise ValueError("Plain does not support timezone-aware times.")
563        return str(value)
564
565    def adapt_decimalfield_value(
566        self,
567        value: decimal.Decimal | None,
568        max_digits: int | None = None,
569        decimal_places: int | None = None,
570    ) -> str | None:
571        """
572        Transform a decimal.Decimal value to an object compatible with what is
573        expected by the backend driver for decimal (numeric) columns.
574        """
575        return utils.format_number(value, max_digits, decimal_places)
576
577    def adapt_ipaddressfield_value(
578        self, value: str | None
579    ) -> str | ipaddress.IPv4Address | ipaddress.IPv6Address | None:
580        """
581        Transform a string representation of an IP address into the expected
582        type for the backend driver.
583        """
584        return value or None
585
586    def adapt_json_value(
587        self, value: Any, encoder: type[json.JSONEncoder] | None
588    ) -> Any:
589        return json.dumps(value, cls=encoder)
590
591    def year_lookup_bounds_for_date_field(
592        self, value: int, iso_year: bool = False
593    ) -> list[str | None]:
594        """
595        Return a two-elements list with the lower and upper bound to be used
596        with a BETWEEN operator to query a DateField value using a year
597        lookup.
598
599        `value` is an int, containing the looked-up year.
600        If `iso_year` is True, return bounds for ISO-8601 week-numbering years.
601        """
602        if iso_year:
603            first = datetime.date.fromisocalendar(value, 1, 1)
604            second = datetime.date.fromisocalendar(
605                value + 1, 1, 1
606            ) - datetime.timedelta(days=1)
607        else:
608            first = datetime.date(value, 1, 1)
609            second = datetime.date(value, 12, 31)
610        first_adapted = self.adapt_datefield_value(first)
611        second_adapted = self.adapt_datefield_value(second)
612        return [first_adapted, second_adapted]
613
614    def year_lookup_bounds_for_datetime_field(
615        self, value: int, iso_year: bool = False
616    ) -> list[str | Any | None]:
617        """
618        Return a two-elements list with the lower and upper bound to be used
619        with a BETWEEN operator to query a DateTimeField value using a year
620        lookup.
621
622        `value` is an int, containing the looked-up year.
623        If `iso_year` is True, return bounds for ISO-8601 week-numbering years.
624        """
625        if iso_year:
626            first = datetime.datetime.fromisocalendar(value, 1, 1)
627            second = datetime.datetime.fromisocalendar(
628                value + 1, 1, 1
629            ) - datetime.timedelta(microseconds=1)
630        else:
631            first = datetime.datetime(value, 1, 1)
632            second = datetime.datetime(value, 12, 31, 23, 59, 59, 999999)
633
634        # Make sure that datetimes are aware in the current timezone
635        tz = timezone.get_current_timezone()
636        first = timezone.make_aware(first, tz)
637        second = timezone.make_aware(second, tz)
638
639        first_adapted = self.adapt_datetimefield_value(first)
640        second_adapted = self.adapt_datetimefield_value(second)
641        return [first_adapted, second_adapted]
642
643    def get_db_converters(self, expression: Any) -> list[Any]:
644        """
645        Return a list of functions needed to convert field data.
646
647        Some field types on some backends do not provide data in the correct
648        format, this is the hook for converter functions.
649        """
650        return []
651
652    def convert_durationfield_value(
653        self, value: int | None, expression: Any, connection: BaseDatabaseWrapper
654    ) -> datetime.timedelta | None:
655        if value is not None:
656            return datetime.timedelta(0, 0, value)
657        return None
658
659    def check_expression_support(self, expression: Any) -> None:
660        """
661        Check that the backend supports the provided expression.
662
663        This is used on specific backends to rule out known expressions
664        that have problematic or nonexistent implementations. If the
665        expression has a known problem, the backend should raise
666        NotSupportedError.
667        """
668        return None
669
670    def conditional_expression_supported_in_where_clause(self, expression: Any) -> bool:
671        """
672        Return True, if the conditional expression is supported in the WHERE
673        clause.
674        """
675        return True
676
677    def combine_expression(self, connector: str, sub_expressions: list[str]) -> str:
678        """
679        Combine a list of subexpressions into a single expression, using
680        the provided connecting operator. This is required because operators
681        can vary between backends (e.g., Oracle with %% and &) and between
682        subexpression types (e.g., date expressions).
683        """
684        conn = f" {connector} "
685        return conn.join(sub_expressions)
686
687    def combine_duration_expression(
688        self, connector: str, sub_expressions: list[str]
689    ) -> str:
690        return self.combine_expression(connector, sub_expressions)
691
692    def binary_placeholder_sql(self, value: Any) -> str:
693        """
694        Some backends require special syntax to insert binary content (MySQL
695        for example uses '_binary %s').
696        """
697        return "%s"
698
699    def integer_field_range(self, internal_type: str) -> tuple[int, int]:
700        """
701        Given an integer field internal type (e.g. 'PositiveIntegerField'),
702        return a tuple of the (min_value, max_value) form representing the
703        range of the column type bound to the field.
704        """
705        return self.integer_field_ranges[internal_type]
706
707    def subtract_temporals(
708        self,
709        internal_type: str,
710        lhs: tuple[str, list[Any] | tuple[Any, ...]],
711        rhs: tuple[str, list[Any] | tuple[Any, ...]],
712    ) -> tuple[str, tuple[Any, ...]]:
713        if self.connection.features.supports_temporal_subtraction:
714            lhs_sql, lhs_params = lhs
715            rhs_sql, rhs_params = rhs
716            return f"({lhs_sql} - {rhs_sql})", (*lhs_params, *rhs_params)
717        raise NotSupportedError(
718            f"This backend does not support {internal_type} subtraction."
719        )
720
721    def window_frame_start(self, start: int | None) -> str:
722        if isinstance(start, int):
723            if start < 0:
724                return "%d %s" % (abs(start), self.PRECEDING)  # noqa: UP031
725            elif start == 0:
726                return self.CURRENT_ROW
727        elif start is None:
728            return self.UNBOUNDED_PRECEDING
729        raise ValueError(
730            f"start argument must be a negative integer, zero, or None, but got '{start}'."
731        )
732
733    def window_frame_end(self, end: int | None) -> str:
734        if isinstance(end, int):
735            if end == 0:
736                return self.CURRENT_ROW
737            elif end > 0:
738                return "%d %s" % (end, self.FOLLOWING)  # noqa: UP031
739        elif end is None:
740            return self.UNBOUNDED_FOLLOWING
741        raise ValueError(
742            f"end argument must be a positive integer, zero, or None, but got '{end}'."
743        )
744
745    def window_frame_rows_start_end(
746        self, start: int | None = None, end: int | None = None
747    ) -> tuple[str, str]:
748        """
749        Return SQL for start and end points in an OVER clause window frame.
750        """
751        if not self.connection.features.supports_over_clause:
752            raise NotSupportedError("This backend does not support window expressions.")
753        return self.window_frame_start(start), self.window_frame_end(end)
754
755    def window_frame_range_start_end(
756        self, start: int | None = None, end: int | None = None
757    ) -> tuple[str, str]:
758        start_, end_ = self.window_frame_rows_start_end(start, end)
759        features = self.connection.features
760        if features.only_supports_unbounded_with_preceding_and_following and (
761            (start and start < 0) or (end and end > 0)
762        ):
763            raise NotSupportedError(
764                f"{self.connection.display_name} only supports UNBOUNDED together with PRECEDING and "
765                "FOLLOWING."
766            )
767        return start_, end_
768
769    def explain_query_prefix(self, format: str | None = None, **options: Any) -> str:
770        if format:
771            supported_formats = self.connection.features.supported_explain_formats
772            normalized_format = format.upper()
773            if normalized_format not in supported_formats:
774                msg = f"{normalized_format} is not a recognized format."
775                if supported_formats:
776                    msg += " Allowed formats: {}".format(
777                        ", ".join(sorted(supported_formats))
778                    )
779                else:
780                    msg += (
781                        f" {self.connection.display_name} does not support any formats."
782                    )
783                raise ValueError(msg)
784        if options:
785            raise ValueError(
786                "Unknown options: {}".format(", ".join(sorted(options.keys())))
787            )
788        return self.explain_prefix
789
790    def insert_statement(self, on_conflict: Any = None) -> str:
791        return "INSERT INTO"
792
793    def on_conflict_suffix_sql(
794        self,
795        fields: list[Field],
796        on_conflict: Any,
797        update_fields: Iterable[str],
798        unique_fields: Iterable[str],
799    ) -> str:
800        return ""