1from __future__ import annotations
2
3import datetime
4import decimal
5import ipaddress
6import json
7from abc import ABC, abstractmethod
8from collections.abc import Iterable, Sequence
9from importlib import import_module
10from typing import TYPE_CHECKING, Any
11
12import sqlparse
13
14from plain.models.backends import utils
15from plain.models.backends.utils import CursorWrapper
16from plain.models.db import NotSupportedError
17from plain.models.expressions import ResolvableExpression
18from plain.utils import timezone
19from plain.utils.encoding import force_str
20
21if TYPE_CHECKING:
22 from types import ModuleType
23
24 from plain.models.backends.base.base import BaseDatabaseWrapper
25 from plain.models.fields import Field
26
27
28class BaseDatabaseOperations(ABC):
29 """
30 Encapsulate backend-specific differences, such as the way a backend
31 performs ordering or calculates the ID of a recently-inserted row.
32 """
33
34 compiler_module: str = "plain.models.sql.compiler"
35
36 # Integer field safe ranges by `internal_type` as documented
37 # in docs/ref/models/fields.txt.
38 integer_field_ranges: dict[str, tuple[int, int]] = {
39 "SmallIntegerField": (-32768, 32767),
40 "IntegerField": (-2147483648, 2147483647),
41 "BigIntegerField": (-9223372036854775808, 9223372036854775807),
42 "PositiveBigIntegerField": (0, 9223372036854775807),
43 "PositiveSmallIntegerField": (0, 32767),
44 "PositiveIntegerField": (0, 2147483647),
45 "PrimaryKeyField": (-9223372036854775808, 9223372036854775807),
46 }
47 # Mapping of Field.get_internal_type() (typically the model field's class
48 # name) to the data type to use for the Cast() function, if different from
49 # DatabaseWrapper.data_types.
50 cast_data_types: dict[str, str] = {}
51 # CharField data type if the max_length argument isn't provided.
52 cast_char_field_without_max_length: str | None = None
53
54 # Start and end points for window expressions.
55 PRECEDING: str = "PRECEDING"
56 FOLLOWING: str = "FOLLOWING"
57 UNBOUNDED_PRECEDING: str = "UNBOUNDED " + PRECEDING
58 UNBOUNDED_FOLLOWING: str = "UNBOUNDED " + FOLLOWING
59 CURRENT_ROW: str = "CURRENT ROW"
60
61 # Prefix for EXPLAIN queries
62 explain_prefix: str
63
64 def __init__(self, connection: BaseDatabaseWrapper):
65 self.connection = connection
66 self._cache: ModuleType | None = None
67
68 def bulk_batch_size(self, fields: list[Field], objs: list[Any]) -> int:
69 """
70 Return the maximum allowed batch size for the backend. The fields
71 are the fields going to be inserted in the batch, the objs contains
72 all the objects to be inserted.
73 """
74 return len(objs)
75
76 def format_for_duration_arithmetic(self, sql: str) -> str:
77 raise NotImplementedError(
78 "subclasses of BaseDatabaseOperations may require a "
79 "format_for_duration_arithmetic() method."
80 )
81
82 def unification_cast_sql(self, output_field: Field) -> str:
83 """
84 Given a field instance, return the SQL that casts the result of a union
85 to that type. The resulting string should contain a '%s' placeholder
86 for the expression being cast.
87 """
88 return "%s"
89
90 @abstractmethod
91 def date_extract_sql(
92 self, lookup_type: str, sql: str, params: list[Any] | tuple[Any, ...]
93 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
94 """
95 Given a lookup_type of 'year', 'month', or 'day', return the SQL that
96 extracts a value from the given date field field_name.
97 """
98 ...
99
100 @abstractmethod
101 def date_trunc_sql(
102 self,
103 lookup_type: str,
104 sql: str,
105 params: list[Any] | tuple[Any, ...],
106 tzname: str | None = None,
107 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
108 """
109 Given a lookup_type of 'year', 'month', or 'day', return the SQL that
110 truncates the given date or datetime field field_name to a date object
111 with only the given specificity.
112
113 If `tzname` is provided, the given value is truncated in a specific
114 timezone.
115 """
116 ...
117
118 @abstractmethod
119 def datetime_cast_date_sql(
120 self, sql: str, params: list[Any] | tuple[Any, ...], tzname: str | None
121 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
122 """
123 Return the SQL to cast a datetime value to date value.
124 """
125 ...
126
127 @abstractmethod
128 def datetime_cast_time_sql(
129 self, sql: str, params: list[Any] | tuple[Any, ...], tzname: str | None
130 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
131 """
132 Return the SQL to cast a datetime value to time value.
133 """
134 ...
135
136 @abstractmethod
137 def datetime_extract_sql(
138 self,
139 lookup_type: str,
140 sql: str,
141 params: list[Any] | tuple[Any, ...],
142 tzname: str | None,
143 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
144 """
145 Given a lookup_type of 'year', 'month', 'day', 'hour', 'minute', or
146 'second', return the SQL that extracts a value from the given
147 datetime field field_name.
148 """
149 ...
150
151 @abstractmethod
152 def datetime_trunc_sql(
153 self,
154 lookup_type: str,
155 sql: str,
156 params: list[Any] | tuple[Any, ...],
157 tzname: str | None,
158 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
159 """
160 Given a lookup_type of 'year', 'month', 'day', 'hour', 'minute', or
161 'second', return the SQL that truncates the given datetime field
162 field_name to a datetime object with only the given specificity.
163 """
164 ...
165
166 @abstractmethod
167 def time_trunc_sql(
168 self,
169 lookup_type: str,
170 sql: str,
171 params: list[Any] | tuple[Any, ...],
172 tzname: str | None = None,
173 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
174 """
175 Given a lookup_type of 'hour', 'minute' or 'second', return the SQL
176 that truncates the given time or datetime field field_name to a time
177 object with only the given specificity.
178
179 If `tzname` is provided, the given value is truncated in a specific
180 timezone.
181 """
182 ...
183
184 def time_extract_sql(
185 self, lookup_type: str, sql: str, params: list[Any] | tuple[Any, ...]
186 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
187 """
188 Given a lookup_type of 'hour', 'minute', or 'second', return the SQL
189 that extracts a value from the given time field field_name.
190 """
191 return self.date_extract_sql(lookup_type, sql, params)
192
193 def deferrable_sql(self) -> str:
194 """
195 Return the SQL to make a constraint "initially deferred" during a
196 CREATE TABLE statement.
197 """
198 return ""
199
200 def distinct_sql(
201 self, fields: list[str], params: list[Any] | tuple[Any, ...]
202 ) -> tuple[list[str], list[Any]]:
203 """
204 Return an SQL DISTINCT clause which removes duplicate rows from the
205 result set. If any fields are given, only check the given fields for
206 duplicates.
207 """
208 if fields:
209 raise NotSupportedError(
210 "DISTINCT ON fields is not supported by this database backend"
211 )
212 else:
213 return ["DISTINCT"], []
214
215 def fetch_returned_insert_columns(
216 self, cursor: CursorWrapper, returning_params: Any
217 ) -> Any:
218 """
219 Given a cursor object that has just performed an INSERT...RETURNING
220 statement into a table, return the newly created data.
221 """
222 return cursor.fetchone()
223
224 def field_cast_sql(self, db_type: str | None, internal_type: str) -> str:
225 """
226 Given a column type (e.g. 'BLOB', 'VARCHAR') and an internal type
227 (e.g. 'GenericIPAddressField'), return the SQL to cast it before using
228 it in a WHERE statement. The resulting string should contain a '%s'
229 placeholder for the column being searched against.
230 """
231 return "%s"
232
233 def force_no_ordering(self) -> list[tuple[Any, tuple[str, tuple[Any, ...], bool]]]:
234 """
235 Return a list used in the "ORDER BY" clause to force no ordering at
236 all. Return an empty list to include nothing in the ordering.
237 """
238 return []
239
240 def for_update_sql(
241 self,
242 nowait: bool = False,
243 skip_locked: bool = False,
244 of: tuple[str, ...] = (),
245 no_key: bool = False,
246 ) -> str:
247 """
248 Return the FOR UPDATE SQL clause to lock rows for an update operation.
249 """
250 return "FOR{} UPDATE{}{}{}".format(
251 " NO KEY" if no_key else "",
252 " OF {}".format(", ".join(of)) if of else "",
253 " NOWAIT" if nowait else "",
254 " SKIP LOCKED" if skip_locked else "",
255 )
256
257 def _get_limit_offset_params(
258 self, low_mark: int | None, high_mark: int | None
259 ) -> tuple[int | None, int]:
260 offset = low_mark or 0
261 if high_mark is not None:
262 return (high_mark - offset), offset
263 elif offset:
264 return self.connection.ops.no_limit_value(), offset
265 return None, offset
266
267 def limit_offset_sql(self, low_mark: int | None, high_mark: int | None) -> str:
268 """Return LIMIT/OFFSET SQL clause."""
269 limit, offset = self._get_limit_offset_params(low_mark, high_mark)
270 return " ".join(
271 sql
272 for sql in (
273 ("LIMIT %d" % limit) if limit else None, # noqa: UP031
274 ("OFFSET %d" % offset) if offset else None, # noqa: UP031
275 )
276 if sql
277 )
278
279 def last_executed_query(
280 self,
281 cursor: CursorWrapper,
282 sql: str,
283 params: Any,
284 ) -> str | None:
285 """
286 Return a string of the query last executed by the given cursor, with
287 placeholders replaced with actual values.
288
289 `sql` is the raw query containing placeholders and `params` is the
290 sequence of parameters. These are used by default, but this method
291 exists for database backends to provide a better implementation
292 according to their own quoting schemes.
293 """
294
295 # Convert params to contain string values.
296 def to_string(s: Any) -> str:
297 return force_str(s, strings_only=True, errors="replace")
298
299 u_params: tuple[str, ...] | dict[str, str]
300 if isinstance(params, (list, tuple)): # noqa: UP038
301 u_params = tuple(to_string(val) for val in params)
302 elif params is None:
303 u_params = ()
304 else:
305 u_params = {to_string(k): to_string(v) for k, v in params.items()}
306
307 return f"QUERY = {sql!r} - PARAMS = {u_params!r}"
308
309 def last_insert_id(
310 self, cursor: CursorWrapper, table_name: str, pk_name: str
311 ) -> int:
312 """
313 Given a cursor object that has just performed an INSERT statement into
314 a table that has an auto-incrementing ID, return the newly created ID.
315
316 `pk_name` is the name of the primary-key column.
317 """
318 return cursor.lastrowid
319
320 def lookup_cast(self, lookup_type: str, internal_type: str | None = None) -> str:
321 """
322 Return the string to use in a query when performing lookups
323 ("contains", "like", etc.). It should contain a '%s' placeholder for
324 the column being searched against.
325 """
326 return "%s"
327
328 def max_in_list_size(self) -> int | None:
329 """
330 Return the maximum number of items that can be passed in a single 'IN'
331 list condition, or None if the backend does not impose a limit.
332 """
333 return None
334
335 def max_name_length(self) -> int | None:
336 """
337 Return the maximum length of table and column names, or None if there
338 is no limit.
339 """
340 return None
341
342 @abstractmethod
343 def no_limit_value(self) -> int | None:
344 """
345 Return the value to use for the LIMIT when we are wanting "LIMIT
346 infinity". Return None if the limit clause can be omitted in this case.
347 """
348 ...
349
350 def pk_default_value(self) -> str:
351 """
352 Return the value to use during an INSERT statement to specify that
353 the field should use its default value.
354 """
355 return "DEFAULT"
356
357 def prepare_sql_script(self, sql: str) -> list[str]:
358 """
359 Take an SQL script that may contain multiple lines and return a list
360 of statements to feed to successive cursor.execute() calls.
361
362 Since few databases are able to process raw SQL scripts in a single
363 cursor.execute() call and PEP 249 doesn't talk about this use case,
364 the default implementation is conservative.
365 """
366 return [
367 sqlparse.format(statement, strip_comments=True)
368 for statement in sqlparse.split(sql)
369 if statement
370 ]
371
372 def return_insert_columns(
373 self, fields: list[Field]
374 ) -> tuple[str, Sequence[Any]] | None:
375 """
376 For backends that support returning columns as part of an insert query,
377 return the SQL and params to append to the INSERT query. The returned
378 fragment should contain a format string to hold the appropriate column.
379 """
380 return None
381
382 @abstractmethod
383 def bulk_insert_sql(
384 self, fields: list[Field], placeholder_rows: list[list[str]]
385 ) -> str:
386 """
387 Return the SQL for bulk inserting rows.
388 """
389 ...
390
391 @abstractmethod
392 def fetch_returned_insert_rows(self, cursor: CursorWrapper) -> list[Any]:
393 """
394 Given a cursor object that has just performed an INSERT...RETURNING
395 statement into a table, return the list of returned data.
396 """
397 ...
398
399 def compiler(self, compiler_name: str) -> type[Any]:
400 """
401 Return the SQLCompiler class corresponding to the given name,
402 in the namespace corresponding to the `compiler_module` attribute
403 on this backend.
404 """
405 if self._cache is None:
406 self._cache = import_module(self.compiler_module)
407 return getattr(self._cache, compiler_name)
408
409 @abstractmethod
410 def quote_name(self, name: str) -> str:
411 """
412 Return a quoted version of the given table, index, or column name. Do
413 not quote the given name if it's already been quoted.
414 """
415 ...
416
417 def regex_lookup(self, lookup_type: str) -> str:
418 """
419 Return the string to use in a query when performing regular expression
420 lookups (using "regex" or "iregex"). It should contain a '%s'
421 placeholder for the column being searched against.
422
423 If the feature is not supported (or part of it is not supported), raise
424 NotImplementedError.
425 """
426 raise NotImplementedError(
427 "subclasses of BaseDatabaseOperations may require a regex_lookup() method"
428 )
429
430 def savepoint_create_sql(self, sid: str) -> str:
431 """
432 Return the SQL for starting a new savepoint. Only required if the
433 "uses_savepoints" feature is True. The "sid" parameter is a string
434 for the savepoint id.
435 """
436 return f"SAVEPOINT {self.quote_name(sid)}"
437
438 def savepoint_commit_sql(self, sid: str) -> str:
439 """
440 Return the SQL for committing the given savepoint.
441 """
442 return f"RELEASE SAVEPOINT {self.quote_name(sid)}"
443
444 def savepoint_rollback_sql(self, sid: str) -> str:
445 """
446 Return the SQL for rolling back the given savepoint.
447 """
448 return f"ROLLBACK TO SAVEPOINT {self.quote_name(sid)}"
449
450 def set_time_zone_sql(self) -> str:
451 """
452 Return the SQL that will set the connection's time zone.
453
454 Return '' if the backend doesn't support time zones.
455 """
456 return ""
457
458 def prep_for_like_query(self, x: str) -> str:
459 """Prepare a value for use in a LIKE query."""
460 return str(x).replace("\\", "\\\\").replace("%", r"\%").replace("_", r"\_")
461
462 # Same as prep_for_like_query(), but called for "iexact" matches, which
463 # need not necessarily be implemented using "LIKE" in the backend.
464 prep_for_iexact_query = prep_for_like_query
465
466 def validate_autopk_value(self, value: int) -> int:
467 """
468 Certain backends do not accept some values for "serial" fields
469 (for example zero in MySQL). Raise a ValueError if the value is
470 invalid, otherwise return the validated value.
471 """
472 return value
473
474 def adapt_unknown_value(self, value: Any) -> Any:
475 """
476 Transform a value to something compatible with the backend driver.
477
478 This method only depends on the type of the value. It's designed for
479 cases where the target type isn't known, such as .raw() SQL queries.
480 As a consequence it may not work perfectly in all circumstances.
481 """
482 if isinstance(value, datetime.datetime): # must be before date
483 return self.adapt_datetimefield_value(value)
484 elif isinstance(value, datetime.date):
485 return self.adapt_datefield_value(value)
486 elif isinstance(value, datetime.time):
487 return self.adapt_timefield_value(value)
488 elif isinstance(value, decimal.Decimal):
489 return self.adapt_decimalfield_value(value)
490 else:
491 return value
492
493 def adapt_integerfield_value(
494 self, value: int | None, internal_type: str
495 ) -> int | None:
496 return value
497
498 def adapt_datefield_value(self, value: datetime.date | None) -> str | None:
499 """
500 Transform a date value to an object compatible with what is expected
501 by the backend driver for date columns.
502 """
503 if value is None:
504 return None
505 return str(value)
506
507 def adapt_datetimefield_value(
508 self, value: datetime.datetime | Any | None
509 ) -> str | Any | None:
510 """
511 Transform a datetime value to an object compatible with what is expected
512 by the backend driver for datetime columns.
513 """
514 if value is None:
515 return None
516 # Expression values are adapted by the database.
517 if isinstance(value, ResolvableExpression):
518 return value
519
520 return str(value)
521
522 def adapt_timefield_value(
523 self, value: datetime.time | Any | None
524 ) -> str | Any | None:
525 """
526 Transform a time value to an object compatible with what is expected
527 by the backend driver for time columns.
528 """
529 if value is None:
530 return None
531 # Expression values are adapted by the database.
532 if isinstance(value, ResolvableExpression):
533 return value
534
535 if timezone.is_aware(value):
536 raise ValueError("Plain does not support timezone-aware times.")
537 return str(value)
538
539 def adapt_decimalfield_value(
540 self,
541 value: decimal.Decimal | None,
542 max_digits: int | None = None,
543 decimal_places: int | None = None,
544 ) -> str | None:
545 """
546 Transform a decimal.Decimal value to an object compatible with what is
547 expected by the backend driver for decimal (numeric) columns.
548 """
549 return utils.format_number(value, max_digits, decimal_places)
550
551 def adapt_ipaddressfield_value(
552 self, value: str | None
553 ) -> str | ipaddress.IPv4Address | ipaddress.IPv6Address | None:
554 """
555 Transform a string representation of an IP address into the expected
556 type for the backend driver.
557 """
558 return value or None
559
560 def adapt_json_value(
561 self, value: Any, encoder: type[json.JSONEncoder] | None
562 ) -> Any:
563 return json.dumps(value, cls=encoder)
564
565 def year_lookup_bounds_for_date_field(
566 self, value: int, iso_year: bool = False
567 ) -> list[str | None]:
568 """
569 Return a two-elements list with the lower and upper bound to be used
570 with a BETWEEN operator to query a DateField value using a year
571 lookup.
572
573 `value` is an int, containing the looked-up year.
574 If `iso_year` is True, return bounds for ISO-8601 week-numbering years.
575 """
576 if iso_year:
577 first = datetime.date.fromisocalendar(value, 1, 1)
578 second = datetime.date.fromisocalendar(
579 value + 1, 1, 1
580 ) - datetime.timedelta(days=1)
581 else:
582 first = datetime.date(value, 1, 1)
583 second = datetime.date(value, 12, 31)
584 first_adapted = self.adapt_datefield_value(first)
585 second_adapted = self.adapt_datefield_value(second)
586 return [first_adapted, second_adapted]
587
588 def year_lookup_bounds_for_datetime_field(
589 self, value: int, iso_year: bool = False
590 ) -> list[str | Any | None]:
591 """
592 Return a two-elements list with the lower and upper bound to be used
593 with a BETWEEN operator to query a DateTimeField value using a year
594 lookup.
595
596 `value` is an int, containing the looked-up year.
597 If `iso_year` is True, return bounds for ISO-8601 week-numbering years.
598 """
599 if iso_year:
600 first = datetime.datetime.fromisocalendar(value, 1, 1)
601 second = datetime.datetime.fromisocalendar(
602 value + 1, 1, 1
603 ) - datetime.timedelta(microseconds=1)
604 else:
605 first = datetime.datetime(value, 1, 1)
606 second = datetime.datetime(value, 12, 31, 23, 59, 59, 999999)
607
608 # Make sure that datetimes are aware in the current timezone
609 tz = timezone.get_current_timezone()
610 first = timezone.make_aware(first, tz)
611 second = timezone.make_aware(second, tz)
612
613 first_adapted = self.adapt_datetimefield_value(first)
614 second_adapted = self.adapt_datetimefield_value(second)
615 return [first_adapted, second_adapted]
616
617 def get_db_converters(self, expression: Any) -> list[Any]:
618 """
619 Return a list of functions needed to convert field data.
620
621 Some field types on some backends do not provide data in the correct
622 format, this is the hook for converter functions.
623 """
624 return []
625
626 def convert_durationfield_value(
627 self, value: int | None, expression: Any, connection: BaseDatabaseWrapper
628 ) -> datetime.timedelta | None:
629 if value is not None:
630 return datetime.timedelta(0, 0, value)
631 return None
632
633 def check_expression_support(self, expression: Any) -> None:
634 """
635 Check that the backend supports the provided expression.
636
637 This is used on specific backends to rule out known expressions
638 that have problematic or nonexistent implementations. If the
639 expression has a known problem, the backend should raise
640 NotSupportedError.
641 """
642 return None
643
644 def conditional_expression_supported_in_where_clause(self, expression: Any) -> bool:
645 """
646 Return True, if the conditional expression is supported in the WHERE
647 clause.
648 """
649 return True
650
651 def combine_expression(self, connector: str, sub_expressions: list[str]) -> str:
652 """
653 Combine a list of subexpressions into a single expression, using
654 the provided connecting operator. This is required because operators
655 can vary between backends (e.g., Oracle with %% and &) and between
656 subexpression types (e.g., date expressions).
657 """
658 conn = f" {connector} "
659 return conn.join(sub_expressions)
660
661 def combine_duration_expression(
662 self, connector: str, sub_expressions: list[str]
663 ) -> str:
664 return self.combine_expression(connector, sub_expressions)
665
666 def binary_placeholder_sql(self, value: Any) -> str:
667 """
668 Some backends require special syntax to insert binary content (MySQL
669 for example uses '_binary %s').
670 """
671 return "%s"
672
673 def integer_field_range(self, internal_type: str) -> tuple[int, int]:
674 """
675 Given an integer field internal type (e.g. 'PositiveIntegerField'),
676 return a tuple of the (min_value, max_value) form representing the
677 range of the column type bound to the field.
678 """
679 return self.integer_field_ranges[internal_type]
680
681 def subtract_temporals(
682 self,
683 internal_type: str,
684 lhs: tuple[str, list[Any] | tuple[Any, ...]],
685 rhs: tuple[str, list[Any] | tuple[Any, ...]],
686 ) -> tuple[str, tuple[Any, ...]]:
687 if self.connection.features.supports_temporal_subtraction:
688 lhs_sql, lhs_params = lhs
689 rhs_sql, rhs_params = rhs
690 return f"({lhs_sql} - {rhs_sql})", (*lhs_params, *rhs_params)
691 raise NotSupportedError(
692 f"This backend does not support {internal_type} subtraction."
693 )
694
695 def window_frame_start(self, start: int | None) -> str:
696 if isinstance(start, int):
697 if start < 0:
698 return "%d %s" % (abs(start), self.PRECEDING) # noqa: UP031
699 elif start == 0:
700 return self.CURRENT_ROW
701 elif start is None:
702 return self.UNBOUNDED_PRECEDING
703 raise ValueError(
704 f"start argument must be a negative integer, zero, or None, but got '{start}'."
705 )
706
707 def window_frame_end(self, end: int | None) -> str:
708 if isinstance(end, int):
709 if end == 0:
710 return self.CURRENT_ROW
711 elif end > 0:
712 return "%d %s" % (end, self.FOLLOWING) # noqa: UP031
713 elif end is None:
714 return self.UNBOUNDED_FOLLOWING
715 raise ValueError(
716 f"end argument must be a positive integer, zero, or None, but got '{end}'."
717 )
718
719 def window_frame_rows_start_end(
720 self, start: int | None = None, end: int | None = None
721 ) -> tuple[str, str]:
722 """
723 Return SQL for start and end points in an OVER clause window frame.
724 """
725 if not self.connection.features.supports_over_clause:
726 raise NotSupportedError("This backend does not support window expressions.")
727 return self.window_frame_start(start), self.window_frame_end(end)
728
729 def window_frame_range_start_end(
730 self, start: int | None = None, end: int | None = None
731 ) -> tuple[str, str]:
732 start_, end_ = self.window_frame_rows_start_end(start, end)
733 features = self.connection.features
734 if features.only_supports_unbounded_with_preceding_and_following and (
735 (start and start < 0) or (end and end > 0)
736 ):
737 raise NotSupportedError(
738 f"{self.connection.display_name} only supports UNBOUNDED together with PRECEDING and "
739 "FOLLOWING."
740 )
741 return start_, end_
742
743 def explain_query_prefix(self, format: str | None = None, **options: Any) -> str:
744 if format:
745 supported_formats = self.connection.features.supported_explain_formats
746 normalized_format = format.upper()
747 if normalized_format not in supported_formats:
748 msg = f"{normalized_format} is not a recognized format."
749 if supported_formats:
750 msg += " Allowed formats: {}".format(
751 ", ".join(sorted(supported_formats))
752 )
753 else:
754 msg += (
755 f" {self.connection.display_name} does not support any formats."
756 )
757 raise ValueError(msg)
758 if options:
759 raise ValueError(
760 "Unknown options: {}".format(", ".join(sorted(options.keys())))
761 )
762 return self.explain_prefix
763
764 def insert_statement(self, on_conflict: Any = None) -> str:
765 return "INSERT INTO"
766
767 def on_conflict_suffix_sql(
768 self,
769 fields: list[Field],
770 on_conflict: Any,
771 update_fields: Iterable[str],
772 unique_fields: Iterable[str],
773 ) -> str:
774 return ""