Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import datetime
  4import decimal
  5import ipaddress
  6import json
  7from abc import ABC, abstractmethod
  8from collections.abc import Iterable, Sequence
  9from importlib import import_module
 10from typing import TYPE_CHECKING, Any
 11
 12import sqlparse
 13
 14from plain.models.backends import utils
 15from plain.models.backends.utils import CursorWrapper
 16from plain.models.db import NotSupportedError
 17from plain.models.expressions import ResolvableExpression
 18from plain.utils import timezone
 19from plain.utils.encoding import force_str
 20
 21if TYPE_CHECKING:
 22    from types import ModuleType
 23
 24    from plain.models.backends.base.base import BaseDatabaseWrapper
 25    from plain.models.fields import Field
 26
 27
 28class BaseDatabaseOperations(ABC):
 29    """
 30    Encapsulate backend-specific differences, such as the way a backend
 31    performs ordering or calculates the ID of a recently-inserted row.
 32    """
 33
 34    compiler_module: str = "plain.models.sql.compiler"
 35
 36    # Integer field safe ranges by `internal_type` as documented
 37    # in docs/ref/models/fields.txt.
 38    integer_field_ranges: dict[str, tuple[int, int]] = {
 39        "SmallIntegerField": (-32768, 32767),
 40        "IntegerField": (-2147483648, 2147483647),
 41        "BigIntegerField": (-9223372036854775808, 9223372036854775807),
 42        "PositiveBigIntegerField": (0, 9223372036854775807),
 43        "PositiveSmallIntegerField": (0, 32767),
 44        "PositiveIntegerField": (0, 2147483647),
 45        "PrimaryKeyField": (-9223372036854775808, 9223372036854775807),
 46    }
 47    # Mapping of Field.get_internal_type() (typically the model field's class
 48    # name) to the data type to use for the Cast() function, if different from
 49    # DatabaseWrapper.data_types.
 50    cast_data_types: dict[str, str] = {}
 51    # CharField data type if the max_length argument isn't provided.
 52    cast_char_field_without_max_length: str | None = None
 53
 54    # Start and end points for window expressions.
 55    PRECEDING: str = "PRECEDING"
 56    FOLLOWING: str = "FOLLOWING"
 57    UNBOUNDED_PRECEDING: str = "UNBOUNDED " + PRECEDING
 58    UNBOUNDED_FOLLOWING: str = "UNBOUNDED " + FOLLOWING
 59    CURRENT_ROW: str = "CURRENT ROW"
 60
 61    # Prefix for EXPLAIN queries
 62    explain_prefix: str
 63
 64    def __init__(self, connection: BaseDatabaseWrapper):
 65        self.connection = connection
 66        self._cache: ModuleType | None = None
 67
 68    def bulk_batch_size(self, fields: list[Field], objs: list[Any]) -> int:
 69        """
 70        Return the maximum allowed batch size for the backend. The fields
 71        are the fields going to be inserted in the batch, the objs contains
 72        all the objects to be inserted.
 73        """
 74        return len(objs)
 75
 76    def format_for_duration_arithmetic(self, sql: str) -> str:
 77        raise NotImplementedError(
 78            "subclasses of BaseDatabaseOperations may require a "
 79            "format_for_duration_arithmetic() method."
 80        )
 81
 82    def unification_cast_sql(self, output_field: Field) -> str:
 83        """
 84        Given a field instance, return the SQL that casts the result of a union
 85        to that type. The resulting string should contain a '%s' placeholder
 86        for the expression being cast.
 87        """
 88        return "%s"
 89
 90    @abstractmethod
 91    def date_extract_sql(
 92        self, lookup_type: str, sql: str, params: list[Any] | tuple[Any, ...]
 93    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
 94        """
 95        Given a lookup_type of 'year', 'month', or 'day', return the SQL that
 96        extracts a value from the given date field field_name.
 97        """
 98        ...
 99
100    @abstractmethod
101    def date_trunc_sql(
102        self,
103        lookup_type: str,
104        sql: str,
105        params: list[Any] | tuple[Any, ...],
106        tzname: str | None = None,
107    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
108        """
109        Given a lookup_type of 'year', 'month', or 'day', return the SQL that
110        truncates the given date or datetime field field_name to a date object
111        with only the given specificity.
112
113        If `tzname` is provided, the given value is truncated in a specific
114        timezone.
115        """
116        ...
117
118    @abstractmethod
119    def datetime_cast_date_sql(
120        self, sql: str, params: list[Any] | tuple[Any, ...], tzname: str | None
121    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
122        """
123        Return the SQL to cast a datetime value to date value.
124        """
125        ...
126
127    @abstractmethod
128    def datetime_cast_time_sql(
129        self, sql: str, params: list[Any] | tuple[Any, ...], tzname: str | None
130    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
131        """
132        Return the SQL to cast a datetime value to time value.
133        """
134        ...
135
136    @abstractmethod
137    def datetime_extract_sql(
138        self,
139        lookup_type: str,
140        sql: str,
141        params: list[Any] | tuple[Any, ...],
142        tzname: str | None,
143    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
144        """
145        Given a lookup_type of 'year', 'month', 'day', 'hour', 'minute', or
146        'second', return the SQL that extracts a value from the given
147        datetime field field_name.
148        """
149        ...
150
151    @abstractmethod
152    def datetime_trunc_sql(
153        self,
154        lookup_type: str,
155        sql: str,
156        params: list[Any] | tuple[Any, ...],
157        tzname: str | None,
158    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
159        """
160        Given a lookup_type of 'year', 'month', 'day', 'hour', 'minute', or
161        'second', return the SQL that truncates the given datetime field
162        field_name to a datetime object with only the given specificity.
163        """
164        ...
165
166    @abstractmethod
167    def time_trunc_sql(
168        self,
169        lookup_type: str,
170        sql: str,
171        params: list[Any] | tuple[Any, ...],
172        tzname: str | None = None,
173    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
174        """
175        Given a lookup_type of 'hour', 'minute' or 'second', return the SQL
176        that truncates the given time or datetime field field_name to a time
177        object with only the given specificity.
178
179        If `tzname` is provided, the given value is truncated in a specific
180        timezone.
181        """
182        ...
183
184    def time_extract_sql(
185        self, lookup_type: str, sql: str, params: list[Any] | tuple[Any, ...]
186    ) -> tuple[str, list[Any] | tuple[Any, ...]]:
187        """
188        Given a lookup_type of 'hour', 'minute', or 'second', return the SQL
189        that extracts a value from the given time field field_name.
190        """
191        return self.date_extract_sql(lookup_type, sql, params)
192
193    def deferrable_sql(self) -> str:
194        """
195        Return the SQL to make a constraint "initially deferred" during a
196        CREATE TABLE statement.
197        """
198        return ""
199
200    def distinct_sql(
201        self, fields: list[str], params: list[Any] | tuple[Any, ...]
202    ) -> tuple[list[str], list[Any]]:
203        """
204        Return an SQL DISTINCT clause which removes duplicate rows from the
205        result set. If any fields are given, only check the given fields for
206        duplicates.
207        """
208        if fields:
209            raise NotSupportedError(
210                "DISTINCT ON fields is not supported by this database backend"
211            )
212        else:
213            return ["DISTINCT"], []
214
215    def fetch_returned_insert_columns(
216        self, cursor: CursorWrapper, returning_params: Any
217    ) -> Any:
218        """
219        Given a cursor object that has just performed an INSERT...RETURNING
220        statement into a table, return the newly created data.
221        """
222        return cursor.fetchone()
223
224    def field_cast_sql(self, db_type: str | None, internal_type: str) -> str:
225        """
226        Given a column type (e.g. 'BLOB', 'VARCHAR') and an internal type
227        (e.g. 'GenericIPAddressField'), return the SQL to cast it before using
228        it in a WHERE statement. The resulting string should contain a '%s'
229        placeholder for the column being searched against.
230        """
231        return "%s"
232
233    def force_no_ordering(self) -> list[tuple[Any, tuple[str, tuple[Any, ...], bool]]]:
234        """
235        Return a list used in the "ORDER BY" clause to force no ordering at
236        all. Return an empty list to include nothing in the ordering.
237        """
238        return []
239
240    def for_update_sql(
241        self,
242        nowait: bool = False,
243        skip_locked: bool = False,
244        of: tuple[str, ...] = (),
245        no_key: bool = False,
246    ) -> str:
247        """
248        Return the FOR UPDATE SQL clause to lock rows for an update operation.
249        """
250        return "FOR{} UPDATE{}{}{}".format(
251            " NO KEY" if no_key else "",
252            " OF {}".format(", ".join(of)) if of else "",
253            " NOWAIT" if nowait else "",
254            " SKIP LOCKED" if skip_locked else "",
255        )
256
257    def _get_limit_offset_params(
258        self, low_mark: int | None, high_mark: int | None
259    ) -> tuple[int | None, int]:
260        offset = low_mark or 0
261        if high_mark is not None:
262            return (high_mark - offset), offset
263        elif offset:
264            return self.connection.ops.no_limit_value(), offset
265        return None, offset
266
267    def limit_offset_sql(self, low_mark: int | None, high_mark: int | None) -> str:
268        """Return LIMIT/OFFSET SQL clause."""
269        limit, offset = self._get_limit_offset_params(low_mark, high_mark)
270        return " ".join(
271            sql
272            for sql in (
273                ("LIMIT %d" % limit) if limit else None,  # noqa: UP031
274                ("OFFSET %d" % offset) if offset else None,  # noqa: UP031
275            )
276            if sql
277        )
278
279    def last_executed_query(
280        self,
281        cursor: CursorWrapper,
282        sql: str,
283        params: Any,
284    ) -> str | None:
285        """
286        Return a string of the query last executed by the given cursor, with
287        placeholders replaced with actual values.
288
289        `sql` is the raw query containing placeholders and `params` is the
290        sequence of parameters. These are used by default, but this method
291        exists for database backends to provide a better implementation
292        according to their own quoting schemes.
293        """
294
295        # Convert params to contain string values.
296        def to_string(s: Any) -> str:
297            return force_str(s, strings_only=True, errors="replace")
298
299        u_params: tuple[str, ...] | dict[str, str]
300        if isinstance(params, (list, tuple)):  # noqa: UP038
301            u_params = tuple(to_string(val) for val in params)
302        elif params is None:
303            u_params = ()
304        else:
305            u_params = {to_string(k): to_string(v) for k, v in params.items()}
306
307        return f"QUERY = {sql!r} - PARAMS = {u_params!r}"
308
309    def last_insert_id(
310        self, cursor: CursorWrapper, table_name: str, pk_name: str
311    ) -> int:
312        """
313        Given a cursor object that has just performed an INSERT statement into
314        a table that has an auto-incrementing ID, return the newly created ID.
315
316        `pk_name` is the name of the primary-key column.
317        """
318        return cursor.lastrowid
319
320    def lookup_cast(self, lookup_type: str, internal_type: str | None = None) -> str:
321        """
322        Return the string to use in a query when performing lookups
323        ("contains", "like", etc.). It should contain a '%s' placeholder for
324        the column being searched against.
325        """
326        return "%s"
327
328    def max_in_list_size(self) -> int | None:
329        """
330        Return the maximum number of items that can be passed in a single 'IN'
331        list condition, or None if the backend does not impose a limit.
332        """
333        return None
334
335    def max_name_length(self) -> int | None:
336        """
337        Return the maximum length of table and column names, or None if there
338        is no limit.
339        """
340        return None
341
342    @abstractmethod
343    def no_limit_value(self) -> int | None:
344        """
345        Return the value to use for the LIMIT when we are wanting "LIMIT
346        infinity". Return None if the limit clause can be omitted in this case.
347        """
348        ...
349
350    def pk_default_value(self) -> str:
351        """
352        Return the value to use during an INSERT statement to specify that
353        the field should use its default value.
354        """
355        return "DEFAULT"
356
357    def prepare_sql_script(self, sql: str) -> list[str]:
358        """
359        Take an SQL script that may contain multiple lines and return a list
360        of statements to feed to successive cursor.execute() calls.
361
362        Since few databases are able to process raw SQL scripts in a single
363        cursor.execute() call and PEP 249 doesn't talk about this use case,
364        the default implementation is conservative.
365        """
366        return [
367            sqlparse.format(statement, strip_comments=True)
368            for statement in sqlparse.split(sql)
369            if statement
370        ]
371
372    def return_insert_columns(
373        self, fields: list[Field]
374    ) -> tuple[str, Sequence[Any]] | None:
375        """
376        For backends that support returning columns as part of an insert query,
377        return the SQL and params to append to the INSERT query. The returned
378        fragment should contain a format string to hold the appropriate column.
379        """
380        return None
381
382    @abstractmethod
383    def bulk_insert_sql(
384        self, fields: list[Field], placeholder_rows: list[list[str]]
385    ) -> str:
386        """
387        Return the SQL for bulk inserting rows.
388        """
389        ...
390
391    @abstractmethod
392    def fetch_returned_insert_rows(self, cursor: CursorWrapper) -> list[Any]:
393        """
394        Given a cursor object that has just performed an INSERT...RETURNING
395        statement into a table, return the list of returned data.
396        """
397        ...
398
399    def compiler(self, compiler_name: str) -> type[Any]:
400        """
401        Return the SQLCompiler class corresponding to the given name,
402        in the namespace corresponding to the `compiler_module` attribute
403        on this backend.
404        """
405        if self._cache is None:
406            self._cache = import_module(self.compiler_module)
407        return getattr(self._cache, compiler_name)
408
409    @abstractmethod
410    def quote_name(self, name: str) -> str:
411        """
412        Return a quoted version of the given table, index, or column name. Do
413        not quote the given name if it's already been quoted.
414        """
415        ...
416
417    def regex_lookup(self, lookup_type: str) -> str:
418        """
419        Return the string to use in a query when performing regular expression
420        lookups (using "regex" or "iregex"). It should contain a '%s'
421        placeholder for the column being searched against.
422
423        If the feature is not supported (or part of it is not supported), raise
424        NotImplementedError.
425        """
426        raise NotImplementedError(
427            "subclasses of BaseDatabaseOperations may require a regex_lookup() method"
428        )
429
430    def savepoint_create_sql(self, sid: str) -> str:
431        """
432        Return the SQL for starting a new savepoint. Only required if the
433        "uses_savepoints" feature is True. The "sid" parameter is a string
434        for the savepoint id.
435        """
436        return f"SAVEPOINT {self.quote_name(sid)}"
437
438    def savepoint_commit_sql(self, sid: str) -> str:
439        """
440        Return the SQL for committing the given savepoint.
441        """
442        return f"RELEASE SAVEPOINT {self.quote_name(sid)}"
443
444    def savepoint_rollback_sql(self, sid: str) -> str:
445        """
446        Return the SQL for rolling back the given savepoint.
447        """
448        return f"ROLLBACK TO SAVEPOINT {self.quote_name(sid)}"
449
450    def set_time_zone_sql(self) -> str:
451        """
452        Return the SQL that will set the connection's time zone.
453
454        Return '' if the backend doesn't support time zones.
455        """
456        return ""
457
458    def prep_for_like_query(self, x: str) -> str:
459        """Prepare a value for use in a LIKE query."""
460        return str(x).replace("\\", "\\\\").replace("%", r"\%").replace("_", r"\_")
461
462    # Same as prep_for_like_query(), but called for "iexact" matches, which
463    # need not necessarily be implemented using "LIKE" in the backend.
464    prep_for_iexact_query = prep_for_like_query
465
466    def validate_autopk_value(self, value: int) -> int:
467        """
468        Certain backends do not accept some values for "serial" fields
469        (for example zero in MySQL). Raise a ValueError if the value is
470        invalid, otherwise return the validated value.
471        """
472        return value
473
474    def adapt_unknown_value(self, value: Any) -> Any:
475        """
476        Transform a value to something compatible with the backend driver.
477
478        This method only depends on the type of the value. It's designed for
479        cases where the target type isn't known, such as .raw() SQL queries.
480        As a consequence it may not work perfectly in all circumstances.
481        """
482        if isinstance(value, datetime.datetime):  # must be before date
483            return self.adapt_datetimefield_value(value)
484        elif isinstance(value, datetime.date):
485            return self.adapt_datefield_value(value)
486        elif isinstance(value, datetime.time):
487            return self.adapt_timefield_value(value)
488        elif isinstance(value, decimal.Decimal):
489            return self.adapt_decimalfield_value(value)
490        else:
491            return value
492
493    def adapt_integerfield_value(
494        self, value: int | None, internal_type: str
495    ) -> int | None:
496        return value
497
498    def adapt_datefield_value(self, value: datetime.date | None) -> str | None:
499        """
500        Transform a date value to an object compatible with what is expected
501        by the backend driver for date columns.
502        """
503        if value is None:
504            return None
505        return str(value)
506
507    def adapt_datetimefield_value(
508        self, value: datetime.datetime | Any | None
509    ) -> str | Any | None:
510        """
511        Transform a datetime value to an object compatible with what is expected
512        by the backend driver for datetime columns.
513        """
514        if value is None:
515            return None
516        # Expression values are adapted by the database.
517        if isinstance(value, ResolvableExpression):
518            return value
519
520        return str(value)
521
522    def adapt_timefield_value(
523        self, value: datetime.time | Any | None
524    ) -> str | Any | None:
525        """
526        Transform a time value to an object compatible with what is expected
527        by the backend driver for time columns.
528        """
529        if value is None:
530            return None
531        # Expression values are adapted by the database.
532        if isinstance(value, ResolvableExpression):
533            return value
534
535        if timezone.is_aware(value):
536            raise ValueError("Plain does not support timezone-aware times.")
537        return str(value)
538
539    def adapt_decimalfield_value(
540        self,
541        value: decimal.Decimal | None,
542        max_digits: int | None = None,
543        decimal_places: int | None = None,
544    ) -> str | None:
545        """
546        Transform a decimal.Decimal value to an object compatible with what is
547        expected by the backend driver for decimal (numeric) columns.
548        """
549        return utils.format_number(value, max_digits, decimal_places)
550
551    def adapt_ipaddressfield_value(
552        self, value: str | None
553    ) -> str | ipaddress.IPv4Address | ipaddress.IPv6Address | None:
554        """
555        Transform a string representation of an IP address into the expected
556        type for the backend driver.
557        """
558        return value or None
559
560    def adapt_json_value(
561        self, value: Any, encoder: type[json.JSONEncoder] | None
562    ) -> Any:
563        return json.dumps(value, cls=encoder)
564
565    def year_lookup_bounds_for_date_field(
566        self, value: int, iso_year: bool = False
567    ) -> list[str | None]:
568        """
569        Return a two-elements list with the lower and upper bound to be used
570        with a BETWEEN operator to query a DateField value using a year
571        lookup.
572
573        `value` is an int, containing the looked-up year.
574        If `iso_year` is True, return bounds for ISO-8601 week-numbering years.
575        """
576        if iso_year:
577            first = datetime.date.fromisocalendar(value, 1, 1)
578            second = datetime.date.fromisocalendar(
579                value + 1, 1, 1
580            ) - datetime.timedelta(days=1)
581        else:
582            first = datetime.date(value, 1, 1)
583            second = datetime.date(value, 12, 31)
584        first_adapted = self.adapt_datefield_value(first)
585        second_adapted = self.adapt_datefield_value(second)
586        return [first_adapted, second_adapted]
587
588    def year_lookup_bounds_for_datetime_field(
589        self, value: int, iso_year: bool = False
590    ) -> list[str | Any | None]:
591        """
592        Return a two-elements list with the lower and upper bound to be used
593        with a BETWEEN operator to query a DateTimeField value using a year
594        lookup.
595
596        `value` is an int, containing the looked-up year.
597        If `iso_year` is True, return bounds for ISO-8601 week-numbering years.
598        """
599        if iso_year:
600            first = datetime.datetime.fromisocalendar(value, 1, 1)
601            second = datetime.datetime.fromisocalendar(
602                value + 1, 1, 1
603            ) - datetime.timedelta(microseconds=1)
604        else:
605            first = datetime.datetime(value, 1, 1)
606            second = datetime.datetime(value, 12, 31, 23, 59, 59, 999999)
607
608        # Make sure that datetimes are aware in the current timezone
609        tz = timezone.get_current_timezone()
610        first = timezone.make_aware(first, tz)
611        second = timezone.make_aware(second, tz)
612
613        first_adapted = self.adapt_datetimefield_value(first)
614        second_adapted = self.adapt_datetimefield_value(second)
615        return [first_adapted, second_adapted]
616
617    def get_db_converters(self, expression: Any) -> list[Any]:
618        """
619        Return a list of functions needed to convert field data.
620
621        Some field types on some backends do not provide data in the correct
622        format, this is the hook for converter functions.
623        """
624        return []
625
626    def convert_durationfield_value(
627        self, value: int | None, expression: Any, connection: BaseDatabaseWrapper
628    ) -> datetime.timedelta | None:
629        if value is not None:
630            return datetime.timedelta(0, 0, value)
631        return None
632
633    def check_expression_support(self, expression: Any) -> None:
634        """
635        Check that the backend supports the provided expression.
636
637        This is used on specific backends to rule out known expressions
638        that have problematic or nonexistent implementations. If the
639        expression has a known problem, the backend should raise
640        NotSupportedError.
641        """
642        return None
643
644    def conditional_expression_supported_in_where_clause(self, expression: Any) -> bool:
645        """
646        Return True, if the conditional expression is supported in the WHERE
647        clause.
648        """
649        return True
650
651    def combine_expression(self, connector: str, sub_expressions: list[str]) -> str:
652        """
653        Combine a list of subexpressions into a single expression, using
654        the provided connecting operator. This is required because operators
655        can vary between backends (e.g., Oracle with %% and &) and between
656        subexpression types (e.g., date expressions).
657        """
658        conn = f" {connector} "
659        return conn.join(sub_expressions)
660
661    def combine_duration_expression(
662        self, connector: str, sub_expressions: list[str]
663    ) -> str:
664        return self.combine_expression(connector, sub_expressions)
665
666    def binary_placeholder_sql(self, value: Any) -> str:
667        """
668        Some backends require special syntax to insert binary content (MySQL
669        for example uses '_binary %s').
670        """
671        return "%s"
672
673    def integer_field_range(self, internal_type: str) -> tuple[int, int]:
674        """
675        Given an integer field internal type (e.g. 'PositiveIntegerField'),
676        return a tuple of the (min_value, max_value) form representing the
677        range of the column type bound to the field.
678        """
679        return self.integer_field_ranges[internal_type]
680
681    def subtract_temporals(
682        self,
683        internal_type: str,
684        lhs: tuple[str, list[Any] | tuple[Any, ...]],
685        rhs: tuple[str, list[Any] | tuple[Any, ...]],
686    ) -> tuple[str, tuple[Any, ...]]:
687        if self.connection.features.supports_temporal_subtraction:
688            lhs_sql, lhs_params = lhs
689            rhs_sql, rhs_params = rhs
690            return f"({lhs_sql} - {rhs_sql})", (*lhs_params, *rhs_params)
691        raise NotSupportedError(
692            f"This backend does not support {internal_type} subtraction."
693        )
694
695    def window_frame_start(self, start: int | None) -> str:
696        if isinstance(start, int):
697            if start < 0:
698                return "%d %s" % (abs(start), self.PRECEDING)  # noqa: UP031
699            elif start == 0:
700                return self.CURRENT_ROW
701        elif start is None:
702            return self.UNBOUNDED_PRECEDING
703        raise ValueError(
704            f"start argument must be a negative integer, zero, or None, but got '{start}'."
705        )
706
707    def window_frame_end(self, end: int | None) -> str:
708        if isinstance(end, int):
709            if end == 0:
710                return self.CURRENT_ROW
711            elif end > 0:
712                return "%d %s" % (end, self.FOLLOWING)  # noqa: UP031
713        elif end is None:
714            return self.UNBOUNDED_FOLLOWING
715        raise ValueError(
716            f"end argument must be a positive integer, zero, or None, but got '{end}'."
717        )
718
719    def window_frame_rows_start_end(
720        self, start: int | None = None, end: int | None = None
721    ) -> tuple[str, str]:
722        """
723        Return SQL for start and end points in an OVER clause window frame.
724        """
725        if not self.connection.features.supports_over_clause:
726            raise NotSupportedError("This backend does not support window expressions.")
727        return self.window_frame_start(start), self.window_frame_end(end)
728
729    def window_frame_range_start_end(
730        self, start: int | None = None, end: int | None = None
731    ) -> tuple[str, str]:
732        start_, end_ = self.window_frame_rows_start_end(start, end)
733        features = self.connection.features
734        if features.only_supports_unbounded_with_preceding_and_following and (
735            (start and start < 0) or (end and end > 0)
736        ):
737            raise NotSupportedError(
738                f"{self.connection.display_name} only supports UNBOUNDED together with PRECEDING and "
739                "FOLLOWING."
740            )
741        return start_, end_
742
743    def explain_query_prefix(self, format: str | None = None, **options: Any) -> str:
744        if format:
745            supported_formats = self.connection.features.supported_explain_formats
746            normalized_format = format.upper()
747            if normalized_format not in supported_formats:
748                msg = f"{normalized_format} is not a recognized format."
749                if supported_formats:
750                    msg += " Allowed formats: {}".format(
751                        ", ".join(sorted(supported_formats))
752                    )
753                else:
754                    msg += (
755                        f" {self.connection.display_name} does not support any formats."
756                    )
757                raise ValueError(msg)
758        if options:
759            raise ValueError(
760                "Unknown options: {}".format(", ".join(sorted(options.keys())))
761            )
762        return self.explain_prefix
763
764    def insert_statement(self, on_conflict: Any = None) -> str:
765        return "INSERT INTO"
766
767    def on_conflict_suffix_sql(
768        self,
769        fields: list[Field],
770        on_conflict: Any,
771        update_fields: Iterable[str],
772        unique_fields: Iterable[str],
773    ) -> str:
774        return ""