1from __future__ import annotations
2
3import datetime
4import decimal
5import ipaddress
6import json
7from abc import ABC, abstractmethod
8from collections.abc import Iterable, Sequence
9from functools import cached_property
10from typing import TYPE_CHECKING, Any
11
12import sqlparse
13
14from plain.models.backends import utils
15from plain.models.backends.utils import CursorWrapper
16from plain.models.db import NotSupportedError
17from plain.models.expressions import ResolvableExpression
18from plain.utils import timezone
19from plain.utils.encoding import force_str
20
21if TYPE_CHECKING:
22 from plain.models.backends.base.base import BaseDatabaseWrapper
23 from plain.models.fields import Field
24 from plain.models.sql.compiler import SQLCompiler
25 from plain.models.sql.query import Query
26
27
28class BaseDatabaseOperations(ABC):
29 """
30 Encapsulate backend-specific differences, such as the way a backend
31 performs ordering or calculates the ID of a recently-inserted row.
32 """
33
34 # Integer field safe ranges by `internal_type` as documented
35 # in docs/ref/models/fields.txt.
36 integer_field_ranges: dict[str, tuple[int, int]] = {
37 "SmallIntegerField": (-32768, 32767),
38 "IntegerField": (-2147483648, 2147483647),
39 "BigIntegerField": (-9223372036854775808, 9223372036854775807),
40 "PositiveBigIntegerField": (0, 9223372036854775807),
41 "PositiveSmallIntegerField": (0, 32767),
42 "PositiveIntegerField": (0, 2147483647),
43 "PrimaryKeyField": (-9223372036854775808, 9223372036854775807),
44 }
45 # Mapping of Field.get_internal_type() (typically the model field's class
46 # name) to the data type to use for the Cast() function, if different from
47 # DatabaseWrapper.data_types.
48 cast_data_types: dict[str, str] = {}
49 # CharField data type if the max_length argument isn't provided.
50 cast_char_field_without_max_length: str | None = None
51
52 # Start and end points for window expressions.
53 PRECEDING: str = "PRECEDING"
54 FOLLOWING: str = "FOLLOWING"
55 UNBOUNDED_PRECEDING: str = "UNBOUNDED " + PRECEDING
56 UNBOUNDED_FOLLOWING: str = "UNBOUNDED " + FOLLOWING
57 CURRENT_ROW: str = "CURRENT ROW"
58
59 # Prefix for EXPLAIN queries
60 explain_prefix: str
61
62 def __init__(self, connection: BaseDatabaseWrapper):
63 self.connection = connection
64
65 def bulk_batch_size(self, fields: list[Field], objs: list[Any]) -> int:
66 """
67 Return the maximum allowed batch size for the backend. The fields
68 are the fields going to be inserted in the batch, the objs contains
69 all the objects to be inserted.
70 """
71 return len(objs)
72
73 def format_for_duration_arithmetic(self, sql: str) -> str:
74 raise NotImplementedError(
75 "subclasses of BaseDatabaseOperations may require a "
76 "format_for_duration_arithmetic() method."
77 )
78
79 def unification_cast_sql(self, output_field: Field) -> str:
80 """
81 Given a field instance, return the SQL that casts the result of a union
82 to that type. The resulting string should contain a '%s' placeholder
83 for the expression being cast.
84 """
85 return "%s"
86
87 @abstractmethod
88 def date_extract_sql(
89 self, lookup_type: str, sql: str, params: list[Any] | tuple[Any, ...]
90 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
91 """
92 Given a lookup_type of 'year', 'month', or 'day', return the SQL that
93 extracts a value from the given date field field_name.
94 """
95 ...
96
97 @abstractmethod
98 def date_trunc_sql(
99 self,
100 lookup_type: str,
101 sql: str,
102 params: list[Any] | tuple[Any, ...],
103 tzname: str | None = None,
104 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
105 """
106 Given a lookup_type of 'year', 'month', or 'day', return the SQL that
107 truncates the given date or datetime field field_name to a date object
108 with only the given specificity.
109
110 If `tzname` is provided, the given value is truncated in a specific
111 timezone.
112 """
113 ...
114
115 @abstractmethod
116 def datetime_cast_date_sql(
117 self, sql: str, params: list[Any] | tuple[Any, ...], tzname: str | None
118 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
119 """
120 Return the SQL to cast a datetime value to date value.
121 """
122 ...
123
124 @abstractmethod
125 def datetime_cast_time_sql(
126 self, sql: str, params: list[Any] | tuple[Any, ...], tzname: str | None
127 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
128 """
129 Return the SQL to cast a datetime value to time value.
130 """
131 ...
132
133 @abstractmethod
134 def datetime_extract_sql(
135 self,
136 lookup_type: str,
137 sql: str,
138 params: list[Any] | tuple[Any, ...],
139 tzname: str | None,
140 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
141 """
142 Given a lookup_type of 'year', 'month', 'day', 'hour', 'minute', or
143 'second', return the SQL that extracts a value from the given
144 datetime field field_name.
145 """
146 ...
147
148 @abstractmethod
149 def datetime_trunc_sql(
150 self,
151 lookup_type: str,
152 sql: str,
153 params: list[Any] | tuple[Any, ...],
154 tzname: str | None,
155 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
156 """
157 Given a lookup_type of 'year', 'month', 'day', 'hour', 'minute', or
158 'second', return the SQL that truncates the given datetime field
159 field_name to a datetime object with only the given specificity.
160 """
161 ...
162
163 @abstractmethod
164 def time_trunc_sql(
165 self,
166 lookup_type: str,
167 sql: str,
168 params: list[Any] | tuple[Any, ...],
169 tzname: str | None = None,
170 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
171 """
172 Given a lookup_type of 'hour', 'minute' or 'second', return the SQL
173 that truncates the given time or datetime field field_name to a time
174 object with only the given specificity.
175
176 If `tzname` is provided, the given value is truncated in a specific
177 timezone.
178 """
179 ...
180
181 def time_extract_sql(
182 self, lookup_type: str, sql: str, params: list[Any] | tuple[Any, ...]
183 ) -> tuple[str, list[Any] | tuple[Any, ...]]:
184 """
185 Given a lookup_type of 'hour', 'minute', or 'second', return the SQL
186 that extracts a value from the given time field field_name.
187 """
188 return self.date_extract_sql(lookup_type, sql, params)
189
190 def deferrable_sql(self) -> str:
191 """
192 Return the SQL to make a constraint "initially deferred" during a
193 CREATE TABLE statement.
194 """
195 return ""
196
197 def distinct_sql(
198 self, fields: list[str], params: list[Any] | tuple[Any, ...]
199 ) -> tuple[list[str], list[Any]]:
200 """
201 Return an SQL DISTINCT clause which removes duplicate rows from the
202 result set. If any fields are given, only check the given fields for
203 duplicates.
204 """
205 if fields:
206 raise NotSupportedError(
207 "DISTINCT ON fields is not supported by this database backend"
208 )
209 else:
210 return ["DISTINCT"], []
211
212 def fetch_returned_insert_columns(
213 self, cursor: CursorWrapper, returning_params: Any
214 ) -> Any:
215 """
216 Given a cursor object that has just performed an INSERT...RETURNING
217 statement into a table, return the newly created data.
218 """
219 return cursor.fetchone()
220
221 def field_cast_sql(self, db_type: str | None, internal_type: str) -> str:
222 """
223 Given a column type (e.g. 'BLOB', 'VARCHAR') and an internal type
224 (e.g. 'GenericIPAddressField'), return the SQL to cast it before using
225 it in a WHERE statement. The resulting string should contain a '%s'
226 placeholder for the column being searched against.
227 """
228 return "%s"
229
230 def force_no_ordering(self) -> list[tuple[Any, tuple[str, tuple[Any, ...], bool]]]:
231 """
232 Return a list used in the "ORDER BY" clause to force no ordering at
233 all. Return an empty list to include nothing in the ordering.
234 """
235 return []
236
237 def for_update_sql(
238 self,
239 nowait: bool = False,
240 skip_locked: bool = False,
241 of: tuple[str, ...] = (),
242 no_key: bool = False,
243 ) -> str:
244 """
245 Return the FOR UPDATE SQL clause to lock rows for an update operation.
246 """
247 return "FOR{} UPDATE{}{}{}".format(
248 " NO KEY" if no_key else "",
249 " OF {}".format(", ".join(of)) if of else "",
250 " NOWAIT" if nowait else "",
251 " SKIP LOCKED" if skip_locked else "",
252 )
253
254 def _get_limit_offset_params(
255 self, low_mark: int | None, high_mark: int | None
256 ) -> tuple[int | None, int]:
257 offset = low_mark or 0
258 if high_mark is not None:
259 return (high_mark - offset), offset
260 elif offset:
261 return self.connection.ops.no_limit_value(), offset
262 return None, offset
263
264 def limit_offset_sql(self, low_mark: int | None, high_mark: int | None) -> str:
265 """Return LIMIT/OFFSET SQL clause."""
266 limit, offset = self._get_limit_offset_params(low_mark, high_mark)
267 return " ".join(
268 sql
269 for sql in (
270 ("LIMIT %d" % limit) if limit else None, # noqa: UP031
271 ("OFFSET %d" % offset) if offset else None, # noqa: UP031
272 )
273 if sql
274 )
275
276 def last_executed_query(
277 self,
278 cursor: CursorWrapper,
279 sql: str,
280 params: Any,
281 ) -> str | None:
282 """
283 Return a string of the query last executed by the given cursor, with
284 placeholders replaced with actual values.
285
286 `sql` is the raw query containing placeholders and `params` is the
287 sequence of parameters. These are used by default, but this method
288 exists for database backends to provide a better implementation
289 according to their own quoting schemes.
290 """
291
292 # Convert params to contain string values.
293 def to_string(s: Any) -> str:
294 return force_str(s, strings_only=True, errors="replace")
295
296 u_params: tuple[str, ...] | dict[str, str]
297 if isinstance(params, (list, tuple)): # noqa: UP038
298 u_params = tuple(to_string(val) for val in params)
299 elif params is None:
300 u_params = ()
301 else:
302 u_params = {to_string(k): to_string(v) for k, v in params.items()}
303
304 return f"QUERY = {sql!r} - PARAMS = {u_params!r}"
305
306 def last_insert_id(
307 self, cursor: CursorWrapper, table_name: str, pk_name: str
308 ) -> int:
309 """
310 Given a cursor object that has just performed an INSERT statement into
311 a table that has an auto-incrementing ID, return the newly created ID.
312
313 `pk_name` is the name of the primary-key column.
314 """
315 return cursor.lastrowid
316
317 def lookup_cast(self, lookup_type: str, internal_type: str | None = None) -> str:
318 """
319 Return the string to use in a query when performing lookups
320 ("contains", "like", etc.). It should contain a '%s' placeholder for
321 the column being searched against.
322 """
323 return "%s"
324
325 def max_in_list_size(self) -> int | None:
326 """
327 Return the maximum number of items that can be passed in a single 'IN'
328 list condition, or None if the backend does not impose a limit.
329 """
330 return None
331
332 def max_name_length(self) -> int | None:
333 """
334 Return the maximum length of table and column names, or None if there
335 is no limit.
336 """
337 return None
338
339 @abstractmethod
340 def no_limit_value(self) -> int | None:
341 """
342 Return the value to use for the LIMIT when we are wanting "LIMIT
343 infinity". Return None if the limit clause can be omitted in this case.
344 """
345 ...
346
347 def pk_default_value(self) -> str:
348 """
349 Return the value to use during an INSERT statement to specify that
350 the field should use its default value.
351 """
352 return "DEFAULT"
353
354 def prepare_sql_script(self, sql: str) -> list[str]:
355 """
356 Take an SQL script that may contain multiple lines and return a list
357 of statements to feed to successive cursor.execute() calls.
358
359 Since few databases are able to process raw SQL scripts in a single
360 cursor.execute() call and PEP 249 doesn't talk about this use case,
361 the default implementation is conservative.
362 """
363 return [
364 sqlparse.format(statement, strip_comments=True)
365 for statement in sqlparse.split(sql)
366 if statement
367 ]
368
369 def return_insert_columns(
370 self, fields: list[Field]
371 ) -> tuple[str, Sequence[Any]] | None:
372 """
373 For backends that support returning columns as part of an insert query,
374 return the SQL and params to append to the INSERT query. The returned
375 fragment should contain a format string to hold the appropriate column.
376 """
377 return None
378
379 @abstractmethod
380 def bulk_insert_sql(
381 self, fields: list[Field], placeholder_rows: list[list[str]]
382 ) -> str:
383 """
384 Return the SQL for bulk inserting rows.
385 """
386 ...
387
388 @abstractmethod
389 def fetch_returned_insert_rows(self, cursor: CursorWrapper) -> list[Any]:
390 """
391 Given a cursor object that has just performed an INSERT...RETURNING
392 statement into a table, return the list of returned data.
393 """
394 ...
395
396 @cached_property
397 def compilers(self) -> dict[type[Query], type[SQLCompiler]]:
398 """
399 Return a mapping of Query types to their SQLCompiler implementations.
400 Subclasses can override this to provide custom compiler implementations.
401 """
402 from plain.models.sql.compiler import (
403 SQLAggregateCompiler,
404 SQLCompiler,
405 SQLDeleteCompiler,
406 SQLInsertCompiler,
407 SQLUpdateCompiler,
408 )
409 from plain.models.sql.query import Query
410 from plain.models.sql.subqueries import (
411 AggregateQuery,
412 DeleteQuery,
413 InsertQuery,
414 UpdateQuery,
415 )
416
417 return {
418 Query: SQLCompiler,
419 DeleteQuery: SQLDeleteCompiler,
420 UpdateQuery: SQLUpdateCompiler,
421 InsertQuery: SQLInsertCompiler,
422 AggregateQuery: SQLAggregateCompiler,
423 }
424
425 def get_compiler_for(self, query: Query, elide_empty: bool = True) -> SQLCompiler:
426 """
427 Return a compiler instance for the given query.
428 Walks the query's MRO to find the appropriate compiler class.
429 """
430 for query_cls in type(query).__mro__:
431 if query_cls in self.compilers:
432 return self.compilers[query_cls](query, self.connection, elide_empty)
433 raise TypeError(f"No compiler registered for {type(query)}")
434
435 @abstractmethod
436 def quote_name(self, name: str) -> str:
437 """
438 Return a quoted version of the given table, index, or column name. Do
439 not quote the given name if it's already been quoted.
440 """
441 ...
442
443 def regex_lookup(self, lookup_type: str) -> str:
444 """
445 Return the string to use in a query when performing regular expression
446 lookups (using "regex" or "iregex"). It should contain a '%s'
447 placeholder for the column being searched against.
448
449 If the feature is not supported (or part of it is not supported), raise
450 NotImplementedError.
451 """
452 raise NotImplementedError(
453 "subclasses of BaseDatabaseOperations may require a regex_lookup() method"
454 )
455
456 def savepoint_create_sql(self, sid: str) -> str:
457 """
458 Return the SQL for starting a new savepoint. Only required if the
459 "uses_savepoints" feature is True. The "sid" parameter is a string
460 for the savepoint id.
461 """
462 return f"SAVEPOINT {self.quote_name(sid)}"
463
464 def savepoint_commit_sql(self, sid: str) -> str:
465 """
466 Return the SQL for committing the given savepoint.
467 """
468 return f"RELEASE SAVEPOINT {self.quote_name(sid)}"
469
470 def savepoint_rollback_sql(self, sid: str) -> str:
471 """
472 Return the SQL for rolling back the given savepoint.
473 """
474 return f"ROLLBACK TO SAVEPOINT {self.quote_name(sid)}"
475
476 def set_time_zone_sql(self) -> str:
477 """
478 Return the SQL that will set the connection's time zone.
479
480 Return '' if the backend doesn't support time zones.
481 """
482 return ""
483
484 def prep_for_like_query(self, x: str) -> str:
485 """Prepare a value for use in a LIKE query."""
486 return str(x).replace("\\", "\\\\").replace("%", r"\%").replace("_", r"\_")
487
488 # Same as prep_for_like_query(), but called for "iexact" matches, which
489 # need not necessarily be implemented using "LIKE" in the backend.
490 prep_for_iexact_query = prep_for_like_query
491
492 def validate_autopk_value(self, value: int) -> int:
493 """
494 Certain backends do not accept some values for "serial" fields
495 (for example zero in MySQL). Raise a ValueError if the value is
496 invalid, otherwise return the validated value.
497 """
498 return value
499
500 def adapt_unknown_value(self, value: Any) -> Any:
501 """
502 Transform a value to something compatible with the backend driver.
503
504 This method only depends on the type of the value. It's designed for
505 cases where the target type isn't known, such as .raw() SQL queries.
506 As a consequence it may not work perfectly in all circumstances.
507 """
508 if isinstance(value, datetime.datetime): # must be before date
509 return self.adapt_datetimefield_value(value)
510 elif isinstance(value, datetime.date):
511 return self.adapt_datefield_value(value)
512 elif isinstance(value, datetime.time):
513 return self.adapt_timefield_value(value)
514 elif isinstance(value, decimal.Decimal):
515 return self.adapt_decimalfield_value(value)
516 else:
517 return value
518
519 def adapt_integerfield_value(
520 self, value: int | None, internal_type: str
521 ) -> int | None:
522 return value
523
524 def adapt_datefield_value(self, value: datetime.date | None) -> str | None:
525 """
526 Transform a date value to an object compatible with what is expected
527 by the backend driver for date columns.
528 """
529 if value is None:
530 return None
531 return str(value)
532
533 def adapt_datetimefield_value(
534 self, value: datetime.datetime | Any | None
535 ) -> str | Any | None:
536 """
537 Transform a datetime value to an object compatible with what is expected
538 by the backend driver for datetime columns.
539 """
540 if value is None:
541 return None
542 # Expression values are adapted by the database.
543 if isinstance(value, ResolvableExpression):
544 return value
545
546 return str(value)
547
548 def adapt_timefield_value(
549 self, value: datetime.time | Any | None
550 ) -> str | Any | None:
551 """
552 Transform a time value to an object compatible with what is expected
553 by the backend driver for time columns.
554 """
555 if value is None:
556 return None
557 # Expression values are adapted by the database.
558 if isinstance(value, ResolvableExpression):
559 return value
560
561 if timezone.is_aware(value):
562 raise ValueError("Plain does not support timezone-aware times.")
563 return str(value)
564
565 def adapt_decimalfield_value(
566 self,
567 value: decimal.Decimal | None,
568 max_digits: int | None = None,
569 decimal_places: int | None = None,
570 ) -> str | None:
571 """
572 Transform a decimal.Decimal value to an object compatible with what is
573 expected by the backend driver for decimal (numeric) columns.
574 """
575 return utils.format_number(value, max_digits, decimal_places)
576
577 def adapt_ipaddressfield_value(
578 self, value: str | None
579 ) -> str | ipaddress.IPv4Address | ipaddress.IPv6Address | None:
580 """
581 Transform a string representation of an IP address into the expected
582 type for the backend driver.
583 """
584 return value or None
585
586 def adapt_json_value(
587 self, value: Any, encoder: type[json.JSONEncoder] | None
588 ) -> Any:
589 return json.dumps(value, cls=encoder)
590
591 def year_lookup_bounds_for_date_field(
592 self, value: int, iso_year: bool = False
593 ) -> list[str | None]:
594 """
595 Return a two-elements list with the lower and upper bound to be used
596 with a BETWEEN operator to query a DateField value using a year
597 lookup.
598
599 `value` is an int, containing the looked-up year.
600 If `iso_year` is True, return bounds for ISO-8601 week-numbering years.
601 """
602 if iso_year:
603 first = datetime.date.fromisocalendar(value, 1, 1)
604 second = datetime.date.fromisocalendar(
605 value + 1, 1, 1
606 ) - datetime.timedelta(days=1)
607 else:
608 first = datetime.date(value, 1, 1)
609 second = datetime.date(value, 12, 31)
610 first_adapted = self.adapt_datefield_value(first)
611 second_adapted = self.adapt_datefield_value(second)
612 return [first_adapted, second_adapted]
613
614 def year_lookup_bounds_for_datetime_field(
615 self, value: int, iso_year: bool = False
616 ) -> list[str | Any | None]:
617 """
618 Return a two-elements list with the lower and upper bound to be used
619 with a BETWEEN operator to query a DateTimeField value using a year
620 lookup.
621
622 `value` is an int, containing the looked-up year.
623 If `iso_year` is True, return bounds for ISO-8601 week-numbering years.
624 """
625 if iso_year:
626 first = datetime.datetime.fromisocalendar(value, 1, 1)
627 second = datetime.datetime.fromisocalendar(
628 value + 1, 1, 1
629 ) - datetime.timedelta(microseconds=1)
630 else:
631 first = datetime.datetime(value, 1, 1)
632 second = datetime.datetime(value, 12, 31, 23, 59, 59, 999999)
633
634 # Make sure that datetimes are aware in the current timezone
635 tz = timezone.get_current_timezone()
636 first = timezone.make_aware(first, tz)
637 second = timezone.make_aware(second, tz)
638
639 first_adapted = self.adapt_datetimefield_value(first)
640 second_adapted = self.adapt_datetimefield_value(second)
641 return [first_adapted, second_adapted]
642
643 def get_db_converters(self, expression: Any) -> list[Any]:
644 """
645 Return a list of functions needed to convert field data.
646
647 Some field types on some backends do not provide data in the correct
648 format, this is the hook for converter functions.
649 """
650 return []
651
652 def convert_durationfield_value(
653 self, value: int | None, expression: Any, connection: BaseDatabaseWrapper
654 ) -> datetime.timedelta | None:
655 if value is not None:
656 return datetime.timedelta(0, 0, value)
657 return None
658
659 def check_expression_support(self, expression: Any) -> None:
660 """
661 Check that the backend supports the provided expression.
662
663 This is used on specific backends to rule out known expressions
664 that have problematic or nonexistent implementations. If the
665 expression has a known problem, the backend should raise
666 NotSupportedError.
667 """
668 return None
669
670 def conditional_expression_supported_in_where_clause(self, expression: Any) -> bool:
671 """
672 Return True, if the conditional expression is supported in the WHERE
673 clause.
674 """
675 return True
676
677 def combine_expression(self, connector: str, sub_expressions: list[str]) -> str:
678 """
679 Combine a list of subexpressions into a single expression, using
680 the provided connecting operator. This is required because operators
681 can vary between backends (e.g., Oracle with %% and &) and between
682 subexpression types (e.g., date expressions).
683 """
684 conn = f" {connector} "
685 return conn.join(sub_expressions)
686
687 def combine_duration_expression(
688 self, connector: str, sub_expressions: list[str]
689 ) -> str:
690 return self.combine_expression(connector, sub_expressions)
691
692 def binary_placeholder_sql(self, value: Any) -> str:
693 """
694 Some backends require special syntax to insert binary content (MySQL
695 for example uses '_binary %s').
696 """
697 return "%s"
698
699 def integer_field_range(self, internal_type: str) -> tuple[int, int]:
700 """
701 Given an integer field internal type (e.g. 'PositiveIntegerField'),
702 return a tuple of the (min_value, max_value) form representing the
703 range of the column type bound to the field.
704 """
705 return self.integer_field_ranges[internal_type]
706
707 def subtract_temporals(
708 self,
709 internal_type: str,
710 lhs: tuple[str, list[Any] | tuple[Any, ...]],
711 rhs: tuple[str, list[Any] | tuple[Any, ...]],
712 ) -> tuple[str, tuple[Any, ...]]:
713 if self.connection.features.supports_temporal_subtraction:
714 lhs_sql, lhs_params = lhs
715 rhs_sql, rhs_params = rhs
716 return f"({lhs_sql} - {rhs_sql})", (*lhs_params, *rhs_params)
717 raise NotSupportedError(
718 f"This backend does not support {internal_type} subtraction."
719 )
720
721 def window_frame_start(self, start: int | None) -> str:
722 if isinstance(start, int):
723 if start < 0:
724 return "%d %s" % (abs(start), self.PRECEDING) # noqa: UP031
725 elif start == 0:
726 return self.CURRENT_ROW
727 elif start is None:
728 return self.UNBOUNDED_PRECEDING
729 raise ValueError(
730 f"start argument must be a negative integer, zero, or None, but got '{start}'."
731 )
732
733 def window_frame_end(self, end: int | None) -> str:
734 if isinstance(end, int):
735 if end == 0:
736 return self.CURRENT_ROW
737 elif end > 0:
738 return "%d %s" % (end, self.FOLLOWING) # noqa: UP031
739 elif end is None:
740 return self.UNBOUNDED_FOLLOWING
741 raise ValueError(
742 f"end argument must be a positive integer, zero, or None, but got '{end}'."
743 )
744
745 def window_frame_rows_start_end(
746 self, start: int | None = None, end: int | None = None
747 ) -> tuple[str, str]:
748 """
749 Return SQL for start and end points in an OVER clause window frame.
750 """
751 if not self.connection.features.supports_over_clause:
752 raise NotSupportedError("This backend does not support window expressions.")
753 return self.window_frame_start(start), self.window_frame_end(end)
754
755 def window_frame_range_start_end(
756 self, start: int | None = None, end: int | None = None
757 ) -> tuple[str, str]:
758 start_, end_ = self.window_frame_rows_start_end(start, end)
759 features = self.connection.features
760 if features.only_supports_unbounded_with_preceding_and_following and (
761 (start and start < 0) or (end and end > 0)
762 ):
763 raise NotSupportedError(
764 f"{self.connection.display_name} only supports UNBOUNDED together with PRECEDING and "
765 "FOLLOWING."
766 )
767 return start_, end_
768
769 def explain_query_prefix(self, format: str | None = None, **options: Any) -> str:
770 if format:
771 supported_formats = self.connection.features.supported_explain_formats
772 normalized_format = format.upper()
773 if normalized_format not in supported_formats:
774 msg = f"{normalized_format} is not a recognized format."
775 if supported_formats:
776 msg += " Allowed formats: {}".format(
777 ", ".join(sorted(supported_formats))
778 )
779 else:
780 msg += (
781 f" {self.connection.display_name} does not support any formats."
782 )
783 raise ValueError(msg)
784 if options:
785 raise ValueError(
786 "Unknown options: {}".format(", ".join(sorted(options.keys())))
787 )
788 return self.explain_prefix
789
790 def insert_statement(self, on_conflict: Any = None) -> str:
791 return "INSERT INTO"
792
793 def on_conflict_suffix_sql(
794 self,
795 fields: list[Field],
796 on_conflict: Any,
797 update_fields: Iterable[str],
798 unique_fields: Iterable[str],
799 ) -> str:
800 return ""