1from __future__ import annotations
2
3import datetime
4import itertools
5import math
6from collections.abc import Sequence
7from functools import cached_property
8from typing import TYPE_CHECKING, Any
9
10from plain.postgres.dialect import (
11 INTEGER_FIELD_RANGES,
12 OPERATORS,
13 PATTERN_ESC,
14 PATTERN_OPS,
15 lookup_cast,
16 prep_for_like_query,
17 regex_lookup,
18 year_lookup_bounds_for_date_field,
19 year_lookup_bounds_for_datetime_field,
20)
21from plain.postgres.exceptions import EmptyResultSet, FullResultSet
22from plain.postgres.expressions import Expression, Func, ResolvableExpression, Value
23from plain.postgres.fields import (
24 BooleanField,
25 DateTimeField,
26 Field,
27 IntegerField,
28 UUIDField,
29)
30from plain.postgres.query_utils import RegisterLookupMixin
31from plain.utils.datastructures import OrderedSet
32from plain.utils.hashable import make_hashable
33
34if TYPE_CHECKING:
35 from plain.postgres.connection import DatabaseConnection
36 from plain.postgres.sql.compiler import SQLCompiler
37
38
39class Lookup(Expression):
40 lookup_name: str | None = None
41 prepare_rhs: bool = True
42 can_use_none_as_rhs: bool = False
43 lhs: Any
44 rhs: Any
45
46 def __init__(self, lhs: Any, rhs: Any):
47 self.lhs, self.rhs = lhs, rhs
48 self.rhs = self.get_prep_lookup()
49 self.lhs = self.get_prep_lhs()
50 if hasattr(self.lhs, "get_bilateral_transforms"):
51 bilateral_transforms = self.lhs.get_bilateral_transforms()
52 else:
53 bilateral_transforms = []
54 if bilateral_transforms:
55 # Warn the user as soon as possible if they are trying to apply
56 # a bilateral transformation on a nested QuerySet: that won't work.
57 from plain.postgres.sql.query import Query # avoid circular import
58
59 if isinstance(rhs, Query):
60 raise NotImplementedError(
61 "Bilateral transformations on nested querysets are not implemented."
62 )
63 self.bilateral_transforms = bilateral_transforms
64
65 def apply_bilateral_transforms(self, value: Any) -> Any:
66 for transform in self.bilateral_transforms:
67 value = transform(value)
68 return value
69
70 def __repr__(self) -> str:
71 return f"{self.__class__.__name__}({self.lhs!r}, {self.rhs!r})"
72
73 def batch_process_rhs(
74 self, compiler: SQLCompiler, connection: DatabaseConnection, rhs: Any = None
75 ) -> tuple[list[str], list[Any]]:
76 if rhs is None:
77 rhs = self.rhs
78 if self.bilateral_transforms:
79 sqls: list[str] = []
80 sqls_params: list[Any] = []
81 for p in rhs:
82 value = Value(p, output_field=self.lhs.output_field)
83 value = self.apply_bilateral_transforms(value)
84 value = value.resolve_expression(compiler.query)
85 sql, sql_params = compiler.compile(value)
86 sqls.append(sql)
87 sqls_params.extend(sql_params)
88 else:
89 _, params = self.get_db_prep_lookup(rhs, connection)
90 sqls = ["%s"] * len(params)
91 sqls_params = list(params)
92 return sqls, sqls_params
93
94 def get_source_expressions(self) -> list[Any]:
95 if self.rhs_is_direct_value():
96 return [self.lhs]
97 return [self.lhs, self.rhs]
98
99 def set_source_expressions(self, exprs: Sequence[Any]) -> None:
100 exprs_list = list(exprs)
101 if len(exprs_list) == 1:
102 self.lhs = exprs_list[0]
103 else:
104 self.lhs, self.rhs = exprs_list
105
106 def get_prep_lookup(self) -> Any:
107 if not self.prepare_rhs or isinstance(self.rhs, ResolvableExpression):
108 return self.rhs
109 if output_field := getattr(self.lhs, "output_field", None):
110 if get_prep_value := getattr(output_field, "get_prep_value", None):
111 return get_prep_value(self.rhs)
112 elif self.rhs_is_direct_value():
113 return Value(self.rhs)
114 return self.rhs
115
116 def get_prep_lhs(self) -> Any:
117 if isinstance(self.lhs, ResolvableExpression):
118 return self.lhs
119 return Value(self.lhs)
120
121 def get_db_prep_lookup(
122 self, value: Any, connection: DatabaseConnection
123 ) -> tuple[str, list[Any]]:
124 return ("%s", [value])
125
126 def process_lhs(
127 self, compiler: SQLCompiler, connection: DatabaseConnection, lhs: Any = None
128 ) -> tuple[str, list[Any]]:
129 lhs = lhs or self.lhs
130 if isinstance(lhs, ResolvableExpression):
131 lhs = lhs.resolve_expression(compiler.query)
132 sql, params = compiler.compile(lhs)
133 if isinstance(lhs, Lookup):
134 # Wrapped in parentheses to respect operator precedence.
135 sql = f"({sql})"
136 return sql, list(params)
137
138 def process_rhs(
139 self, compiler: SQLCompiler, connection: DatabaseConnection
140 ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
141 value = self.rhs
142 if self.bilateral_transforms:
143 if self.rhs_is_direct_value():
144 # Do not call get_db_prep_lookup here as the value will be
145 # transformed before being used for lookup
146 value = Value(value, output_field=self.lhs.output_field)
147 value = self.apply_bilateral_transforms(value)
148 value = value.resolve_expression(compiler.query)
149 if hasattr(value, "as_sql"):
150 sql, params = compiler.compile(value)
151 # Ensure expression is wrapped in parentheses to respect operator
152 # precedence but avoid double wrapping.
153 if sql and sql[0] != "(":
154 sql = f"({sql})"
155 return sql, list(params)
156 else:
157 return self.get_db_prep_lookup(value, connection)
158
159 def rhs_is_direct_value(self) -> bool:
160 return not hasattr(self.rhs, "as_sql")
161
162 def get_group_by_cols(self) -> list[Any]:
163 cols = []
164 for source in self.get_source_expressions():
165 cols.extend(source.get_group_by_cols())
166 return cols
167
168 @cached_property
169 def output_field(self) -> BooleanField:
170 return BooleanField()
171
172 @property
173 def identity(self) -> tuple[type[Lookup], Any, Any]:
174 return self.__class__, self.lhs, self.rhs
175
176 def __eq__(self, other: object) -> bool:
177 if not isinstance(other, Lookup):
178 return NotImplemented
179 return self.identity == other.identity
180
181 def __hash__(self) -> int:
182 return hash(make_hashable(self.identity))
183
184 def resolve_expression(
185 self,
186 query: Any = None,
187 allow_joins: bool = True,
188 reuse: Any = None,
189 summarize: bool = False,
190 for_save: bool = False,
191 ) -> Lookup:
192 c = self.copy()
193 c.is_summary = summarize
194 c.lhs = self.lhs.resolve_expression(
195 query, allow_joins, reuse, summarize, for_save
196 )
197 if isinstance(self.rhs, ResolvableExpression):
198 c.rhs = self.rhs.resolve_expression(
199 query, allow_joins, reuse, summarize, for_save
200 )
201 return c
202
203 def select_format(
204 self, compiler: SQLCompiler, sql: str, params: Sequence[Any]
205 ) -> tuple[str, Sequence[Any]]:
206 # Boolean expressions work directly in SELECT
207 return sql, params
208
209
210class Transform(RegisterLookupMixin, Func):
211 """
212 RegisterLookupMixin() is first so that get_lookup() and get_transform()
213 first examine self and then check output_field.
214 """
215
216 lookup_name: str | None = None
217 bilateral: bool = False
218 arity: int = 1
219
220 @property
221 def lhs(self) -> Any:
222 return self.get_source_expressions()[0]
223
224 def get_bilateral_transforms(self) -> list[type[Transform]]:
225 if hasattr(self.lhs, "get_bilateral_transforms"):
226 bilateral_transforms = self.lhs.get_bilateral_transforms()
227 else:
228 bilateral_transforms = []
229 if self.bilateral:
230 bilateral_transforms.append(self.__class__)
231 return bilateral_transforms
232
233
234class BuiltinLookup(Lookup):
235 def process_lhs(
236 self, compiler: SQLCompiler, connection: DatabaseConnection, lhs: Any = None
237 ) -> tuple[str, list[Any]]:
238 assert self.lookup_name is not None, (
239 "lookup_name must be set on Lookup subclass"
240 )
241 lhs_sql, params = super().process_lhs(compiler, connection, lhs)
242 field_internal_type = self.lhs.output_field.get_internal_type()
243 lhs_sql = lookup_cast(self.lookup_name, field_internal_type) % lhs_sql
244 return lhs_sql, list(params)
245
246 def as_sql(
247 self, compiler: SQLCompiler, connection: DatabaseConnection
248 ) -> tuple[str, list[Any]]:
249 lhs_sql, params = self.process_lhs(compiler, connection)
250 rhs_sql, rhs_params = self.process_rhs(compiler, connection)
251 params.extend(rhs_params)
252 rhs_sql = self.get_rhs_op(connection, rhs_sql)
253 return f"{lhs_sql} {rhs_sql}", params
254
255 def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
256 assert self.lookup_name is not None, (
257 "lookup_name must be set on Lookup subclass"
258 )
259 return OPERATORS[self.lookup_name] % rhs
260
261
262class FieldGetDbPrepValueMixin(Lookup):
263 """
264 Some lookups require Field.get_db_prep_value() to be called on their
265 inputs.
266 """
267
268 get_db_prep_lookup_value_is_iterable: bool = False
269 lhs: Any
270 rhs: Any
271
272 def get_db_prep_lookup(
273 self, value: Any, connection: DatabaseConnection
274 ) -> tuple[str, list[Any]]:
275 # For relational fields, use the 'target_field' attribute of the
276 # output_field.
277 field = getattr(self.lhs.output_field, "target_field", None)
278 get_db_prep_value = (
279 getattr(field, "get_db_prep_value", None)
280 or self.lhs.output_field.get_db_prep_value
281 )
282 return (
283 "%s",
284 [get_db_prep_value(v, connection, prepared=True) for v in value]
285 if self.get_db_prep_lookup_value_is_iterable
286 else [get_db_prep_value(value, connection, prepared=True)],
287 )
288
289
290class FieldGetDbPrepValueIterableMixin(FieldGetDbPrepValueMixin):
291 """
292 Some lookups require Field.get_db_prep_value() to be called on each value
293 in an iterable.
294 """
295
296 get_db_prep_lookup_value_is_iterable: bool = True
297 prepare_rhs: bool
298
299 def get_prep_lookup(self) -> Any:
300 if isinstance(self.rhs, ResolvableExpression):
301 return self.rhs
302 prepared_values = []
303 for rhs_value in self.rhs:
304 if isinstance(rhs_value, ResolvableExpression):
305 # An expression will be handled by the database but can coexist
306 # alongside real values.
307 pass
308 elif self.prepare_rhs:
309 if output_field := getattr(self.lhs, "output_field", None):
310 if get_prep_value := getattr(output_field, "get_prep_value", None):
311 rhs_value = get_prep_value(rhs_value)
312 prepared_values.append(rhs_value)
313 return prepared_values
314
315 def process_rhs(
316 self, compiler: SQLCompiler, connection: DatabaseConnection
317 ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
318 if self.rhs_is_direct_value():
319 # rhs should be an iterable of values. Use batch_process_rhs()
320 # to prepare/transform those values.
321 return self.batch_process_rhs(compiler, connection)
322 else:
323 return super().process_rhs(compiler, connection)
324
325 def resolve_expression_parameter(
326 self,
327 compiler: SQLCompiler,
328 connection: DatabaseConnection,
329 sql: str,
330 param: Any,
331 ) -> tuple[str, list[Any]]:
332 params: list[Any] = [param]
333 if isinstance(param, ResolvableExpression):
334 param = param.resolve_expression(compiler.query)
335 if hasattr(param, "as_sql"):
336 sql, compiled_params = compiler.compile(param)
337 params = list(compiled_params)
338 return sql, params
339
340 def batch_process_rhs(
341 self, compiler: SQLCompiler, connection: DatabaseConnection, rhs: Any = None
342 ) -> tuple[list[str], list[Any]]:
343 pre_processed = super().batch_process_rhs(compiler, connection, rhs)
344 # The params list may contain expressions which compile to a
345 # sql/param pair. Zip them to get sql and param pairs that refer to the
346 # same argument and attempt to replace them with the result of
347 # compiling the param step.
348 sql, params = zip(
349 *(
350 self.resolve_expression_parameter(compiler, connection, sql, param)
351 for sql, param in zip(*pre_processed)
352 )
353 )
354 params_list = list(itertools.chain.from_iterable(params))
355 return list(sql), params_list
356
357
358class OperatorLookup(Lookup):
359 """Lookup defined by a SQL operator."""
360
361 operator: str | None = None
362
363 def as_sql(
364 self, compiler: SQLCompiler, connection: DatabaseConnection
365 ) -> tuple[str, tuple[Any, ...]]:
366 lhs, lhs_params = self.process_lhs(compiler, connection)
367 rhs, rhs_params = self.process_rhs(compiler, connection)
368 params = tuple(lhs_params) + tuple(rhs_params)
369 return f"{lhs} {self.operator} {rhs}", params
370
371
372@Field.register_lookup
373class Exact(FieldGetDbPrepValueMixin, BuiltinLookup):
374 lookup_name: str = "exact"
375
376 def get_prep_lookup(self) -> Any:
377 from plain.postgres.sql.query import Query # avoid circular import
378
379 if isinstance(self.rhs, Query):
380 if self.rhs.has_limit_one():
381 if not self.rhs.has_select_fields:
382 self.rhs.clear_select_clause()
383 self.rhs.add_fields(["id"])
384 else:
385 raise ValueError(
386 "The QuerySet value for an exact lookup must be limited to "
387 "one result using slicing."
388 )
389 return super().get_prep_lookup()
390
391 def as_sql(
392 self, compiler: SQLCompiler, connection: DatabaseConnection
393 ) -> tuple[str, list[Any]]:
394 # Avoid comparison against direct rhs if lhs is a boolean value. That
395 # turns "boolfield__exact=True" into "WHERE boolean_field" instead of
396 # "WHERE boolean_field = True" when allowed.
397 if isinstance(self.rhs, bool) and getattr(self.lhs, "conditional", False):
398 lhs_sql, params = self.process_lhs(compiler, connection)
399 template = "%s" if self.rhs else "NOT %s"
400 return template % lhs_sql, params
401 return super().as_sql(compiler, connection)
402
403
404@Field.register_lookup
405class IExact(BuiltinLookup):
406 lookup_name: str = "iexact"
407 prepare_rhs: bool = False
408
409
410@Field.register_lookup
411class GreaterThan(FieldGetDbPrepValueMixin, BuiltinLookup):
412 lookup_name: str = "gt"
413
414
415@Field.register_lookup
416class GreaterThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup):
417 lookup_name: str = "gte"
418
419
420@Field.register_lookup
421class LessThan(FieldGetDbPrepValueMixin, BuiltinLookup):
422 lookup_name: str = "lt"
423
424
425@Field.register_lookup
426class LessThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup):
427 lookup_name: str = "lte"
428
429
430class IntegerFieldOverflow:
431 underflow_exception: type[Exception] = EmptyResultSet
432 overflow_exception: type[Exception] = EmptyResultSet
433 lhs: Any
434 rhs: Any
435
436 def process_rhs(
437 self, compiler: SQLCompiler, connection: DatabaseConnection
438 ) -> tuple[str, list[Any]]:
439 rhs = self.rhs
440 if isinstance(rhs, int):
441 field_internal_type = self.lhs.output_field.get_internal_type()
442 min_value, max_value = INTEGER_FIELD_RANGES[field_internal_type]
443 if min_value is not None and rhs < min_value:
444 raise self.underflow_exception
445 if max_value is not None and rhs > max_value:
446 raise self.overflow_exception
447 return super().process_rhs(compiler, connection) # type: ignore[misc]
448
449
450class IntegerFieldFloatRounding:
451 """
452 Allow floats to work as query values for IntegerField. Without this, the
453 decimal portion of the float would always be discarded.
454 """
455
456 rhs: Any
457
458 def get_prep_lookup(self) -> Any:
459 if isinstance(self.rhs, float):
460 self.rhs = math.ceil(self.rhs)
461 return super().get_prep_lookup() # type: ignore[misc]
462
463
464@IntegerField.register_lookup
465class IntegerFieldExact(IntegerFieldOverflow, Exact):
466 pass
467
468
469@IntegerField.register_lookup
470class IntegerGreaterThan(IntegerFieldOverflow, GreaterThan):
471 underflow_exception = FullResultSet
472
473
474@IntegerField.register_lookup
475class IntegerGreaterThanOrEqual(
476 IntegerFieldOverflow, IntegerFieldFloatRounding, GreaterThanOrEqual
477):
478 underflow_exception = FullResultSet
479
480
481@IntegerField.register_lookup
482class IntegerLessThan(IntegerFieldOverflow, IntegerFieldFloatRounding, LessThan):
483 overflow_exception = FullResultSet
484
485
486@IntegerField.register_lookup
487class IntegerLessThanOrEqual(IntegerFieldOverflow, LessThanOrEqual):
488 overflow_exception = FullResultSet
489
490
491@Field.register_lookup
492class In(FieldGetDbPrepValueIterableMixin, BuiltinLookup):
493 lookup_name: str = "in"
494
495 def get_prep_lookup(self) -> Any:
496 from plain.postgres.sql.query import Query # avoid circular import
497
498 if isinstance(self.rhs, Query):
499 self.rhs.clear_ordering(clear_default=True)
500 if not self.rhs.has_select_fields:
501 self.rhs.clear_select_clause()
502 self.rhs.add_fields(["id"])
503 return super().get_prep_lookup()
504
505 def process_rhs(
506 self, compiler: SQLCompiler, connection: DatabaseConnection
507 ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
508 if self.rhs_is_direct_value():
509 # Remove None from the list as NULL is never equal to anything.
510 try:
511 rhs = OrderedSet(self.rhs)
512 rhs.discard(None)
513 except TypeError: # Unhashable items in self.rhs
514 rhs = [r for r in self.rhs if r is not None]
515
516 if not rhs:
517 raise EmptyResultSet
518
519 # rhs should be an iterable; use batch_process_rhs() to
520 # prepare/transform those values.
521 sqls, sqls_params = self.batch_process_rhs(compiler, connection, rhs)
522 placeholder = "(" + ", ".join(sqls) + ")"
523 return (placeholder, sqls_params)
524 return super().process_rhs(compiler, connection)
525
526 def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
527 return f"IN {rhs}"
528
529 # PostgreSQL has no limit on IN clause size, so no need to override as_sql()
530
531
532class PatternLookup(BuiltinLookup):
533 param_pattern: str = "%%%s%%"
534 prepare_rhs: bool = False
535 bilateral_transforms: list[Any]
536
537 def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
538 # Assume we are in startswith. We need to produce SQL like:
539 # col LIKE %s, ['thevalue%']
540 # For python values we can (and should) do that directly in Python,
541 # but if the value is for example reference to other column, then
542 # we need to add the % pattern match to the lookup by something like
543 # col LIKE othercol || '%%'
544 # So, for Python values we don't need any special pattern, but for
545 # SQL reference values or SQL transformations we need the correct
546 # pattern added.
547 if hasattr(self.rhs, "as_sql") or self.bilateral_transforms:
548 assert self.lookup_name is not None, (
549 "lookup_name must be set on Lookup subclass"
550 )
551 pattern = PATTERN_OPS[self.lookup_name].format(PATTERN_ESC)
552 return pattern.format(rhs)
553 else:
554 return super().get_rhs_op(connection, rhs)
555
556 def process_rhs(
557 self, compiler: SQLCompiler, connection: DatabaseConnection
558 ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
559 rhs, params = super().process_rhs(compiler, connection)
560 if isinstance(rhs, str):
561 if self.rhs_is_direct_value() and params and not self.bilateral_transforms:
562 params[0] = self.param_pattern % prep_for_like_query(params[0])
563 return rhs, params
564 else:
565 return rhs, params
566
567
568@Field.register_lookup
569class Contains(PatternLookup):
570 lookup_name: str = "contains"
571
572
573@Field.register_lookup
574class IContains(Contains):
575 lookup_name: str = "icontains"
576
577
578@Field.register_lookup
579class StartsWith(PatternLookup):
580 lookup_name: str = "startswith"
581 param_pattern: str = "%s%%"
582
583
584@Field.register_lookup
585class IStartsWith(StartsWith):
586 lookup_name: str = "istartswith"
587
588
589@Field.register_lookup
590class EndsWith(PatternLookup):
591 lookup_name: str = "endswith"
592 param_pattern: str = "%%%s"
593
594
595@Field.register_lookup
596class IEndsWith(EndsWith):
597 lookup_name: str = "iendswith"
598
599
600@Field.register_lookup
601class Range(FieldGetDbPrepValueIterableMixin, BuiltinLookup):
602 lookup_name: str = "range"
603
604 def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
605 # Range lookup always receives a list of two elements from process_rhs
606 assert isinstance(rhs, list), f"Range lookup expects list, got {type(rhs)}"
607 return f"BETWEEN {rhs[0]} AND {rhs[1]}"
608
609
610@Field.register_lookup
611class IsNull(BuiltinLookup):
612 lookup_name: str = "isnull"
613 prepare_rhs: bool = False
614
615 def as_sql(
616 self, compiler: SQLCompiler, connection: DatabaseConnection
617 ) -> tuple[str, list[Any]]:
618 if not isinstance(self.rhs, bool):
619 raise ValueError(
620 "The QuerySet value for an isnull lookup must be True or False."
621 )
622 sql, params = self.process_lhs(compiler, connection)
623 if self.rhs:
624 return f"{sql} IS NULL", params
625 else:
626 return f"{sql} IS NOT NULL", params
627
628
629@Field.register_lookup
630class Regex(BuiltinLookup):
631 lookup_name: str = "regex"
632 prepare_rhs: bool = False
633
634 def as_sql(
635 self, compiler: SQLCompiler, connection: DatabaseConnection
636 ) -> tuple[str, list[Any]]:
637 if self.lookup_name in OPERATORS:
638 return super().as_sql(compiler, connection)
639 else:
640 lhs, lhs_params = self.process_lhs(compiler, connection)
641 rhs, rhs_params = self.process_rhs(compiler, connection)
642 sql_template = regex_lookup(self.lookup_name)
643 return sql_template % (lhs, rhs), lhs_params + rhs_params
644
645
646@Field.register_lookup
647class IRegex(Regex):
648 lookup_name: str = "iregex"
649
650
651class YearLookup(Lookup):
652 def year_lookup_bounds(
653 self, connection: DatabaseConnection, year: int
654 ) -> list[datetime.date] | list[datetime.datetime]:
655 from plain.postgres.functions import ExtractIsoYear
656
657 iso_year = isinstance(self.lhs, ExtractIsoYear)
658 output_field = self.lhs.lhs.output_field
659 if isinstance(output_field, DateTimeField):
660 bounds = year_lookup_bounds_for_datetime_field(year, iso_year=iso_year)
661 else:
662 bounds = year_lookup_bounds_for_date_field(year, iso_year=iso_year)
663 return bounds
664
665 def as_sql(
666 self, compiler: SQLCompiler, connection: DatabaseConnection
667 ) -> tuple[str, Sequence[Any]]:
668 # Avoid the extract operation if the rhs is a direct value to allow
669 # indexes to be used.
670 if self.rhs_is_direct_value():
671 # Skip the extract part by directly using the originating field,
672 # that is self.lhs.lhs.
673 lhs_sql, params = self.process_lhs(compiler, connection, self.lhs.lhs)
674 rhs_sql, _ = self.process_rhs(compiler, connection)
675 # rhs_sql should be a string for year lookups
676 assert isinstance(rhs_sql, str), f"Expected str, got {type(rhs_sql)}"
677 rhs_sql = self.get_direct_rhs_sql(connection, rhs_sql)
678 start, finish = self.year_lookup_bounds(connection, self.rhs)
679 params.extend(self.get_bound_params(start, finish))
680 return f"{lhs_sql} {rhs_sql}", params
681 return super().as_sql(compiler, connection)
682
683 def get_direct_rhs_sql(self, connection: DatabaseConnection, rhs: str) -> str:
684 assert self.lookup_name is not None, (
685 "lookup_name must be set on Lookup subclass"
686 )
687 return OPERATORS[self.lookup_name] % rhs
688
689 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any, ...]:
690 """Return bound parameters for the year lookup."""
691 raise NotImplementedError("Subclasses must implement get_bound_params()")
692
693
694class YearExact(YearLookup, Exact):
695 def get_direct_rhs_sql(self, connection: DatabaseConnection, rhs: str) -> str:
696 return "BETWEEN %s AND %s"
697
698 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any, Any]:
699 return (start, finish)
700
701
702class YearGt(YearLookup, GreaterThan):
703 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
704 return (finish,)
705
706
707class YearGte(YearLookup, GreaterThanOrEqual):
708 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
709 return (start,)
710
711
712class YearLt(YearLookup, LessThan):
713 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
714 return (start,)
715
716
717class YearLte(YearLookup, LessThanOrEqual):
718 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
719 return (finish,)
720
721
722# UUID lookups - PostgreSQL has native UUID support so these inherit directly
723# from their base classes without any special processing.
724
725
726@UUIDField.register_lookup
727class UUIDIExact(IExact):
728 pass
729
730
731@UUIDField.register_lookup
732class UUIDContains(Contains):
733 pass
734
735
736@UUIDField.register_lookup
737class UUIDIContains(IContains):
738 pass
739
740
741@UUIDField.register_lookup
742class UUIDStartsWith(StartsWith):
743 pass
744
745
746@UUIDField.register_lookup
747class UUIDIStartsWith(IStartsWith):
748 pass
749
750
751@UUIDField.register_lookup
752class UUIDEndsWith(EndsWith):
753 pass
754
755
756@UUIDField.register_lookup
757class UUIDIEndsWith(IEndsWith):
758 pass