Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import datetime
  4import decimal
  5import json
  6from abc import ABC, abstractmethod
  7from collections.abc import Iterable
  8from importlib import import_module
  9from typing import TYPE_CHECKING, Any
 10
 11import sqlparse
 12
 13from plain.models.backends import utils
 14from plain.models.backends.utils import CursorWrapper
 15from plain.models.db import NotSupportedError
 16from plain.models.expressions import ResolvableExpression
 17from plain.utils import timezone
 18from plain.utils.encoding import force_str
 19
 20if TYPE_CHECKING:
 21    from types import ModuleType
 22
 23    from plain.models.backends.base.base import BaseDatabaseWrapper
 24    from plain.models.fields import Field
 25
 26
 27class BaseDatabaseOperations(ABC):
 28    """
 29    Encapsulate backend-specific differences, such as the way a backend
 30    performs ordering or calculates the ID of a recently-inserted row.
 31    """
 32
 33    compiler_module: str = "plain.models.sql.compiler"
 34
 35    # Integer field safe ranges by `internal_type` as documented
 36    # in docs/ref/models/fields.txt.
 37    integer_field_ranges: dict[str, tuple[int, int]] = {
 38        "SmallIntegerField": (-32768, 32767),
 39        "IntegerField": (-2147483648, 2147483647),
 40        "BigIntegerField": (-9223372036854775808, 9223372036854775807),
 41        "PositiveBigIntegerField": (0, 9223372036854775807),
 42        "PositiveSmallIntegerField": (0, 32767),
 43        "PositiveIntegerField": (0, 2147483647),
 44        "PrimaryKeyField": (-9223372036854775808, 9223372036854775807),
 45    }
 46    set_operators: dict[str, str] = {
 47        "union": "UNION",
 48        "intersection": "INTERSECT",
 49        "difference": "EXCEPT",
 50    }
 51    # Mapping of Field.get_internal_type() (typically the model field's class
 52    # name) to the data type to use for the Cast() function, if different from
 53    # DatabaseWrapper.data_types.
 54    cast_data_types: dict[str, str] = {}
 55    # CharField data type if the max_length argument isn't provided.
 56    cast_char_field_without_max_length: str | None = None
 57
 58    # Start and end points for window expressions.
 59    PRECEDING: str = "PRECEDING"
 60    FOLLOWING: str = "FOLLOWING"
 61    UNBOUNDED_PRECEDING: str = "UNBOUNDED " + PRECEDING
 62    UNBOUNDED_FOLLOWING: str = "UNBOUNDED " + FOLLOWING
 63    CURRENT_ROW: str = "CURRENT ROW"
 64
 65    # Prefix for EXPLAIN queries
 66    explain_prefix: str
 67
 68    def __init__(self, connection: BaseDatabaseWrapper):
 69        self.connection = connection
 70        self._cache: ModuleType | None = None
 71
 72    def autoinc_sql(self, table: str, column: str) -> str | None:
 73        """
 74        Return any SQL needed to support auto-incrementing primary keys, or
 75        None if no SQL is necessary.
 76
 77        This SQL is executed when a table is created.
 78        """
 79        return None
 80
 81    def bulk_batch_size(self, fields: list[Field], objs: list[Any]) -> int:
 82        """
 83        Return the maximum allowed batch size for the backend. The fields
 84        are the fields going to be inserted in the batch, the objs contains
 85        all the objects to be inserted.
 86        """
 87        return len(objs)
 88
 89    def format_for_duration_arithmetic(self, sql: str) -> str:
 90        raise NotImplementedError(
 91            "subclasses of BaseDatabaseOperations may require a "
 92            "format_for_duration_arithmetic() method."
 93        )
 94
 95    def unification_cast_sql(self, output_field: Field) -> str:
 96        """
 97        Given a field instance, return the SQL that casts the result of a union
 98        to that type. The resulting string should contain a '%s' placeholder
 99        for the expression being cast.
100        """
101        return "%s"
102
103    @abstractmethod
104    def date_extract_sql(
105        self, lookup_type: str, sql: str, params: list[Any] | tuple[Any, ...]
106    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
107        """
108        Given a lookup_type of 'year', 'month', or 'day', return the SQL that
109        extracts a value from the given date field field_name.
110        """
111        ...
112
113    @abstractmethod
114    def date_trunc_sql(
115        self,
116        lookup_type: str,
117        sql: str,
118        params: list[Any] | tuple[Any, ...],
119        tzname: str | None = None,
120    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
121        """
122        Given a lookup_type of 'year', 'month', or 'day', return the SQL that
123        truncates the given date or datetime field field_name to a date object
124        with only the given specificity.
125
126        If `tzname` is provided, the given value is truncated in a specific
127        timezone.
128        """
129        ...
130
131    @abstractmethod
132    def datetime_cast_date_sql(
133        self, sql: str, params: list[Any] | tuple[Any, ...], tzname: str | None
134    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
135        """
136        Return the SQL to cast a datetime value to date value.
137        """
138        ...
139
140    @abstractmethod
141    def datetime_cast_time_sql(
142        self, sql: str, params: list[Any] | tuple[Any, ...], tzname: str | None
143    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
144        """
145        Return the SQL to cast a datetime value to time value.
146        """
147        ...
148
149    @abstractmethod
150    def datetime_extract_sql(
151        self,
152        lookup_type: str,
153        sql: str,
154        params: list[Any] | tuple[Any, ...],
155        tzname: str | None,
156    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
157        """
158        Given a lookup_type of 'year', 'month', 'day', 'hour', 'minute', or
159        'second', return the SQL that extracts a value from the given
160        datetime field field_name.
161        """
162        ...
163
164    @abstractmethod
165    def datetime_trunc_sql(
166        self,
167        lookup_type: str,
168        sql: str,
169        params: list[Any] | tuple[Any, ...],
170        tzname: str | None,
171    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
172        """
173        Given a lookup_type of 'year', 'month', 'day', 'hour', 'minute', or
174        'second', return the SQL that truncates the given datetime field
175        field_name to a datetime object with only the given specificity.
176        """
177        ...
178
179    @abstractmethod
180    def time_trunc_sql(
181        self,
182        lookup_type: str,
183        sql: str,
184        params: list[Any] | tuple[Any, ...],
185        tzname: str | None = None,
186    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
187        """
188        Given a lookup_type of 'hour', 'minute' or 'second', return the SQL
189        that truncates the given time or datetime field field_name to a time
190        object with only the given specificity.
191
192        If `tzname` is provided, the given value is truncated in a specific
193        timezone.
194        """
195        ...
196
197    def time_extract_sql(
198        self, lookup_type: str, sql: str, params: list[Any] | tuple[Any, ...]
199    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
200        """
201        Given a lookup_type of 'hour', 'minute', or 'second', return the SQL
202        that extracts a value from the given time field field_name.
203        """
204        return self.date_extract_sql(lookup_type, sql, params)
205
206    def deferrable_sql(self) -> str:
207        """
208        Return the SQL to make a constraint "initially deferred" during a
209        CREATE TABLE statement.
210        """
211        return ""
212
213    def distinct_sql(
214        self, fields: list[str], params: list[Any] | tuple[Any, ...]
215    ) -> tuple[list[str], list[Any]]:
216        """
217        Return an SQL DISTINCT clause which removes duplicate rows from the
218        result set. If any fields are given, only check the given fields for
219        duplicates.
220        """
221        if fields:
222            raise NotSupportedError(
223                "DISTINCT ON fields is not supported by this database backend"
224            )
225        else:
226            return ["DISTINCT"], []
227
228    def fetch_returned_insert_columns(
229        self, cursor: CursorWrapper, returning_params: Any
230    ) -> Any:
231        """
232        Given a cursor object that has just performed an INSERT...RETURNING
233        statement into a table, return the newly created data.
234        """
235        return cursor.fetchone()
236
237    def field_cast_sql(self, db_type: str | None, internal_type: str) -> str:
238        """
239        Given a column type (e.g. 'BLOB', 'VARCHAR') and an internal type
240        (e.g. 'GenericIPAddressField'), return the SQL to cast it before using
241        it in a WHERE statement. The resulting string should contain a '%s'
242        placeholder for the column being searched against.
243        """
244        return "%s"
245
246    def force_no_ordering(self) -> list[str]:
247        """
248        Return a list used in the "ORDER BY" clause to force no ordering at
249        all. Return an empty list to include nothing in the ordering.
250        """
251        return []
252
253    def for_update_sql(
254        self,
255        nowait: bool = False,
256        skip_locked: bool = False,
257        of: tuple[str, ...] = (),
258        no_key: bool = False,
259    ) -> str:
260        """
261        Return the FOR UPDATE SQL clause to lock rows for an update operation.
262        """
263        return "FOR{} UPDATE{}{}{}".format(
264            " NO KEY" if no_key else "",
265            " OF {}".format(", ".join(of)) if of else "",
266            " NOWAIT" if nowait else "",
267            " SKIP LOCKED" if skip_locked else "",
268        )
269
270    def _get_limit_offset_params(
271        self, low_mark: int | None, high_mark: int | None
272    ) -> tuple[int | None, int]:
273        offset = low_mark or 0
274        if high_mark is not None:
275            return (high_mark - offset), offset
276        elif offset:
277            return self.connection.ops.no_limit_value(), offset
278        return None, offset
279
280    def limit_offset_sql(self, low_mark: int | None, high_mark: int | None) -> str:
281        """Return LIMIT/OFFSET SQL clause."""
282        limit, offset = self._get_limit_offset_params(low_mark, high_mark)
283        return " ".join(
284            sql
285            for sql in (
286                ("LIMIT %d" % limit) if limit else None,  # noqa: UP031
287                ("OFFSET %d" % offset) if offset else None,  # noqa: UP031
288            )
289            if sql
290        )
291
292    def last_executed_query(
293        self,
294        cursor: CursorWrapper,
295        sql: str,
296        params: list[Any] | tuple[Any, ...] | dict[str, Any] | None,
297    ) -> str:
298        """
299        Return a string of the query last executed by the given cursor, with
300        placeholders replaced with actual values.
301
302        `sql` is the raw query containing placeholders and `params` is the
303        sequence of parameters. These are used by default, but this method
304        exists for database backends to provide a better implementation
305        according to their own quoting schemes.
306        """
307
308        # Convert params to contain string values.
309        def to_string(s: Any) -> str:
310            return force_str(s, strings_only=True, errors="replace")
311
312        u_params: tuple[str, ...] | dict[str, str]
313        if isinstance(params, (list, tuple)):  # noqa: UP038
314            u_params = tuple(to_string(val) for val in params)
315        elif params is None:
316            u_params = ()
317        else:
318            u_params = {to_string(k): to_string(v) for k, v in params.items()}
319
320        return f"QUERY = {sql!r} - PARAMS = {u_params!r}"
321
322    def last_insert_id(
323        self, cursor: CursorWrapper, table_name: str, pk_name: str
324    ) -> int:
325        """
326        Given a cursor object that has just performed an INSERT statement into
327        a table that has an auto-incrementing ID, return the newly created ID.
328
329        `pk_name` is the name of the primary-key column.
330        """
331        return cursor.lastrowid
332
333    def lookup_cast(self, lookup_type: str, internal_type: str | None = None) -> str:
334        """
335        Return the string to use in a query when performing lookups
336        ("contains", "like", etc.). It should contain a '%s' placeholder for
337        the column being searched against.
338        """
339        return "%s"
340
341    def max_in_list_size(self) -> int | None:
342        """
343        Return the maximum number of items that can be passed in a single 'IN'
344        list condition, or None if the backend does not impose a limit.
345        """
346        return None
347
348    def max_name_length(self) -> int | None:
349        """
350        Return the maximum length of table and column names, or None if there
351        is no limit.
352        """
353        return None
354
355    @abstractmethod
356    def no_limit_value(self) -> int | None:
357        """
358        Return the value to use for the LIMIT when we are wanting "LIMIT
359        infinity". Return None if the limit clause can be omitted in this case.
360        """
361        ...
362
363    def pk_default_value(self) -> str:
364        """
365        Return the value to use during an INSERT statement to specify that
366        the field should use its default value.
367        """
368        return "DEFAULT"
369
370    def prepare_sql_script(self, sql: str) -> list[str]:
371        """
372        Take an SQL script that may contain multiple lines and return a list
373        of statements to feed to successive cursor.execute() calls.
374
375        Since few databases are able to process raw SQL scripts in a single
376        cursor.execute() call and PEP 249 doesn't talk about this use case,
377        the default implementation is conservative.
378        """
379        return [
380            sqlparse.format(statement, strip_comments=True)
381            for statement in sqlparse.split(sql)
382            if statement
383        ]
384
385    def return_insert_columns(
386        self, fields: list[Field]
387    ) -> tuple[str, list[Any]] | None:
388        """
389        For backends that support returning columns as part of an insert query,
390        return the SQL and params to append to the INSERT query. The returned
391        fragment should contain a format string to hold the appropriate column.
392        """
393        return None
394
395    @abstractmethod
396    def bulk_insert_sql(
397        self, fields: list[Field], placeholder_rows: list[list[str]]
398    ) -> str:
399        """
400        Return the SQL for bulk inserting rows.
401        """
402        ...
403
404    @abstractmethod
405    def fetch_returned_insert_rows(self, cursor: CursorWrapper) -> list[Any]:
406        """
407        Given a cursor object that has just performed an INSERT...RETURNING
408        statement into a table, return the list of returned data.
409        """
410        ...
411
412    def compiler(self, compiler_name: str) -> type[Any]:
413        """
414        Return the SQLCompiler class corresponding to the given name,
415        in the namespace corresponding to the `compiler_module` attribute
416        on this backend.
417        """
418        if self._cache is None:
419            self._cache = import_module(self.compiler_module)
420        return getattr(self._cache, compiler_name)
421
422    @abstractmethod
423    def quote_name(self, name: str) -> str:
424        """
425        Return a quoted version of the given table, index, or column name. Do
426        not quote the given name if it's already been quoted.
427        """
428        ...
429
430    def regex_lookup(self, lookup_type: str) -> str:
431        """
432        Return the string to use in a query when performing regular expression
433        lookups (using "regex" or "iregex"). It should contain a '%s'
434        placeholder for the column being searched against.
435
436        If the feature is not supported (or part of it is not supported), raise
437        NotImplementedError.
438        """
439        raise NotImplementedError(
440            "subclasses of BaseDatabaseOperations may require a regex_lookup() method"
441        )
442
443    def savepoint_create_sql(self, sid: str) -> str:
444        """
445        Return the SQL for starting a new savepoint. Only required if the
446        "uses_savepoints" feature is True. The "sid" parameter is a string
447        for the savepoint id.
448        """
449        return f"SAVEPOINT {self.quote_name(sid)}"
450
451    def savepoint_commit_sql(self, sid: str) -> str:
452        """
453        Return the SQL for committing the given savepoint.
454        """
455        return f"RELEASE SAVEPOINT {self.quote_name(sid)}"
456
457    def savepoint_rollback_sql(self, sid: str) -> str:
458        """
459        Return the SQL for rolling back the given savepoint.
460        """
461        return f"ROLLBACK TO SAVEPOINT {self.quote_name(sid)}"
462
463    def set_time_zone_sql(self) -> str:
464        """
465        Return the SQL that will set the connection's time zone.
466
467        Return '' if the backend doesn't support time zones.
468        """
469        return ""
470
471    def prep_for_like_query(self, x: str) -> str:
472        """Prepare a value for use in a LIKE query."""
473        return str(x).replace("\\", "\\\\").replace("%", r"\%").replace("_", r"\_")
474
475    # Same as prep_for_like_query(), but called for "iexact" matches, which
476    # need not necessarily be implemented using "LIKE" in the backend.
477    prep_for_iexact_query = prep_for_like_query
478
479    def validate_autopk_value(self, value: int) -> int:
480        """
481        Certain backends do not accept some values for "serial" fields
482        (for example zero in MySQL). Raise a ValueError if the value is
483        invalid, otherwise return the validated value.
484        """
485        return value
486
487    def adapt_unknown_value(self, value: Any) -> Any:
488        """
489        Transform a value to something compatible with the backend driver.
490
491        This method only depends on the type of the value. It's designed for
492        cases where the target type isn't known, such as .raw() SQL queries.
493        As a consequence it may not work perfectly in all circumstances.
494        """
495        if isinstance(value, datetime.datetime):  # must be before date
496            return self.adapt_datetimefield_value(value)
497        elif isinstance(value, datetime.date):
498            return self.adapt_datefield_value(value)
499        elif isinstance(value, datetime.time):
500            return self.adapt_timefield_value(value)
501        elif isinstance(value, decimal.Decimal):
502            return self.adapt_decimalfield_value(value)
503        else:
504            return value
505
506    def adapt_integerfield_value(
507        self, value: int | None, internal_type: str
508    ) -> int | None:
509        return value
510
511    def adapt_datefield_value(self, value: datetime.date | None) -> str | None:
512        """
513        Transform a date value to an object compatible with what is expected
514        by the backend driver for date columns.
515        """
516        if value is None:
517            return None
518        return str(value)
519
520    def adapt_datetimefield_value(
521        self, value: datetime.datetime | Any | None
522    ) -> str | Any | None:
523        """
524        Transform a datetime value to an object compatible with what is expected
525        by the backend driver for datetime columns.
526        """
527        if value is None:
528            return None
529        # Expression values are adapted by the database.
530        if isinstance(value, ResolvableExpression):
531            return value
532
533        return str(value)
534
535    def adapt_timefield_value(
536        self, value: datetime.time | Any | None
537    ) -> str | Any | None:
538        """
539        Transform a time value to an object compatible with what is expected
540        by the backend driver for time columns.
541        """
542        if value is None:
543            return None
544        # Expression values are adapted by the database.
545        if isinstance(value, ResolvableExpression):
546            return value
547
548        if timezone.is_aware(value):
549            raise ValueError("Plain does not support timezone-aware times.")
550        return str(value)
551
552    def adapt_decimalfield_value(
553        self,
554        value: decimal.Decimal | None,
555        max_digits: int | None = None,
556        decimal_places: int | None = None,
557    ) -> str | None:
558        """
559        Transform a decimal.Decimal value to an object compatible with what is
560        expected by the backend driver for decimal (numeric) columns.
561        """
562        return utils.format_number(value, max_digits, decimal_places)
563
564    def adapt_ipaddressfield_value(self, value: str | None) -> str | None:
565        """
566        Transform a string representation of an IP address into the expected
567        type for the backend driver.
568        """
569        return value or None
570
571    def adapt_json_value(
572        self, value: Any, encoder: type[json.JSONEncoder] | None
573    ) -> str:
574        return json.dumps(value, cls=encoder)
575
576    def year_lookup_bounds_for_date_field(
577        self, value: int, iso_year: bool = False
578    ) -> list[str | None]:
579        """
580        Return a two-elements list with the lower and upper bound to be used
581        with a BETWEEN operator to query a DateField value using a year
582        lookup.
583
584        `value` is an int, containing the looked-up year.
585        If `iso_year` is True, return bounds for ISO-8601 week-numbering years.
586        """
587        if iso_year:
588            first = datetime.date.fromisocalendar(value, 1, 1)
589            second = datetime.date.fromisocalendar(
590                value + 1, 1, 1
591            ) - datetime.timedelta(days=1)
592        else:
593            first = datetime.date(value, 1, 1)
594            second = datetime.date(value, 12, 31)
595        first_adapted = self.adapt_datefield_value(first)
596        second_adapted = self.adapt_datefield_value(second)
597        return [first_adapted, second_adapted]
598
599    def year_lookup_bounds_for_datetime_field(
600        self, value: int, iso_year: bool = False
601    ) -> list[str | Any | None]:
602        """
603        Return a two-elements list with the lower and upper bound to be used
604        with a BETWEEN operator to query a DateTimeField value using a year
605        lookup.
606
607        `value` is an int, containing the looked-up year.
608        If `iso_year` is True, return bounds for ISO-8601 week-numbering years.
609        """
610        if iso_year:
611            first = datetime.datetime.fromisocalendar(value, 1, 1)
612            second = datetime.datetime.fromisocalendar(
613                value + 1, 1, 1
614            ) - datetime.timedelta(microseconds=1)
615        else:
616            first = datetime.datetime(value, 1, 1)
617            second = datetime.datetime(value, 12, 31, 23, 59, 59, 999999)
618
619        # Make sure that datetimes are aware in the current timezone
620        tz = timezone.get_current_timezone()
621        first = timezone.make_aware(first, tz)
622        second = timezone.make_aware(second, tz)
623
624        first_adapted = self.adapt_datetimefield_value(first)
625        second_adapted = self.adapt_datetimefield_value(second)
626        return [first_adapted, second_adapted]
627
628    def get_db_converters(self, expression: Any) -> list[Any]:
629        """
630        Return a list of functions needed to convert field data.
631
632        Some field types on some backends do not provide data in the correct
633        format, this is the hook for converter functions.
634        """
635        return []
636
637    def convert_durationfield_value(
638        self, value: int | None, expression: Any, connection: BaseDatabaseWrapper
639    ) -> datetime.timedelta | None:
640        if value is not None:
641            return datetime.timedelta(0, 0, value)
642        return None
643
644    def check_expression_support(self, expression: Any) -> None:
645        """
646        Check that the backend supports the provided expression.
647
648        This is used on specific backends to rule out known expressions
649        that have problematic or nonexistent implementations. If the
650        expression has a known problem, the backend should raise
651        NotSupportedError.
652        """
653        return None
654
655    def conditional_expression_supported_in_where_clause(self, expression: Any) -> bool:
656        """
657        Return True, if the conditional expression is supported in the WHERE
658        clause.
659        """
660        return True
661
662    def combine_expression(self, connector: str, sub_expressions: list[str]) -> str:
663        """
664        Combine a list of subexpressions into a single expression, using
665        the provided connecting operator. This is required because operators
666        can vary between backends (e.g., Oracle with %% and &) and between
667        subexpression types (e.g., date expressions).
668        """
669        conn = f" {connector} "
670        return conn.join(sub_expressions)
671
672    def combine_duration_expression(
673        self, connector: str, sub_expressions: list[str]
674    ) -> str:
675        return self.combine_expression(connector, sub_expressions)
676
677    def binary_placeholder_sql(self, value: Any) -> str:
678        """
679        Some backends require special syntax to insert binary content (MySQL
680        for example uses '_binary %s').
681        """
682        return "%s"
683
684    def modify_insert_params(
685        self, placeholder: str, params: list[Any] | tuple[Any, ...]
686    ) -> list[Any] | tuple[Any, ...]:
687        """
688        Allow modification of insert parameters. Needed for Oracle Spatial
689        backend due to #10888.
690        """
691        return params
692
693    def integer_field_range(self, internal_type: str) -> tuple[int, int]:
694        """
695        Given an integer field internal type (e.g. 'PositiveIntegerField'),
696        return a tuple of the (min_value, max_value) form representing the
697        range of the column type bound to the field.
698        """
699        return self.integer_field_ranges[internal_type]
700
701    def subtract_temporals(
702        self,
703        internal_type: str,
704        lhs: tuple[str, list[Any] | tuple[Any, ...]],
705        rhs: tuple[str, list[Any] | tuple[Any, ...]],
706    ) -> tuple[str, tuple[Any, ...]]:
707        if self.connection.features.supports_temporal_subtraction:
708            lhs_sql, lhs_params = lhs
709            rhs_sql, rhs_params = rhs
710            return f"({lhs_sql} - {rhs_sql})", (*lhs_params, *rhs_params)
711        raise NotSupportedError(
712            f"This backend does not support {internal_type} subtraction."
713        )
714
715    def window_frame_start(self, start: int | None) -> str:
716        if isinstance(start, int):
717            if start < 0:
718                return "%d %s" % (abs(start), self.PRECEDING)  # noqa: UP031
719            elif start == 0:
720                return self.CURRENT_ROW
721        elif start is None:
722            return self.UNBOUNDED_PRECEDING
723        raise ValueError(
724            f"start argument must be a negative integer, zero, or None, but got '{start}'."
725        )
726
727    def window_frame_end(self, end: int | None) -> str:
728        if isinstance(end, int):
729            if end == 0:
730                return self.CURRENT_ROW
731            elif end > 0:
732                return "%d %s" % (end, self.FOLLOWING)  # noqa: UP031
733        elif end is None:
734            return self.UNBOUNDED_FOLLOWING
735        raise ValueError(
736            f"end argument must be a positive integer, zero, or None, but got '{end}'."
737        )
738
739    def window_frame_rows_start_end(
740        self, start: int | None = None, end: int | None = None
741    ) -> tuple[str, str]:
742        """
743        Return SQL for start and end points in an OVER clause window frame.
744        """
745        if not self.connection.features.supports_over_clause:
746            raise NotSupportedError("This backend does not support window expressions.")
747        return self.window_frame_start(start), self.window_frame_end(end)
748
749    def window_frame_range_start_end(
750        self, start: int | None = None, end: int | None = None
751    ) -> tuple[str, str]:
752        start_, end_ = self.window_frame_rows_start_end(start, end)
753        features = self.connection.features
754        if features.only_supports_unbounded_with_preceding_and_following and (
755            (start and start < 0) or (end and end > 0)
756        ):
757            raise NotSupportedError(
758                f"{self.connection.display_name} only supports UNBOUNDED together with PRECEDING and "
759                "FOLLOWING."
760            )
761        return start_, end_
762
763    def explain_query_prefix(self, format: str | None = None, **options: Any) -> str:
764        if format:
765            supported_formats = self.connection.features.supported_explain_formats
766            normalized_format = format.upper()
767            if normalized_format not in supported_formats:
768                msg = f"{normalized_format} is not a recognized format."
769                if supported_formats:
770                    msg += " Allowed formats: {}".format(
771                        ", ".join(sorted(supported_formats))
772                    )
773                else:
774                    msg += (
775                        f" {self.connection.display_name} does not support any formats."
776                    )
777                raise ValueError(msg)
778        if options:
779            raise ValueError(
780                "Unknown options: {}".format(", ".join(sorted(options.keys())))
781            )
782        return self.explain_prefix
783
784    def insert_statement(self, on_conflict: Any = None) -> str:
785        return "INSERT INTO"
786
787    def on_conflict_suffix_sql(
788        self,
789        fields: list[Field],
790        on_conflict: Any,
791        update_fields: Iterable[str],
792        unique_fields: Iterable[str],
793    ) -> str:
794        return ""