1from __future__ import annotations
2
3import datetime
4import itertools
5import math
6from collections.abc import Sequence
7from functools import cached_property
8from typing import TYPE_CHECKING, Any
9
10from plain.postgres.dialect import (
11 OPERATORS,
12 PATTERN_ESC,
13 PATTERN_OPS,
14 lookup_cast,
15 prep_for_like_query,
16 regex_lookup,
17 year_lookup_bounds_for_date_field,
18 year_lookup_bounds_for_datetime_field,
19)
20from plain.postgres.exceptions import EmptyResultSet, FullResultSet
21from plain.postgres.expressions import Expression, Func, ResolvableExpression, Value
22from plain.postgres.fields import (
23 BooleanField,
24 DateTimeField,
25 Field,
26 IntegerField,
27 UUIDField,
28)
29from plain.postgres.query_utils import RegisterLookupMixin
30from plain.utils.datastructures import OrderedSet
31from plain.utils.hashable import make_hashable
32
33if TYPE_CHECKING:
34 from plain.postgres.connection import DatabaseConnection
35 from plain.postgres.sql.compiler import SQLCompiler
36
37
38class Lookup(Expression):
39 lookup_name: str | None = None
40 prepare_rhs: bool = True
41 can_use_none_as_rhs: bool = False
42 lhs: Any
43 rhs: Any
44
45 def __init__(self, lhs: Any, rhs: Any):
46 self.lhs, self.rhs = lhs, rhs
47 self.rhs = self.get_prep_lookup()
48 self.lhs = self.get_prep_lhs()
49 if hasattr(self.lhs, "get_bilateral_transforms"):
50 bilateral_transforms = self.lhs.get_bilateral_transforms()
51 else:
52 bilateral_transforms = []
53 if bilateral_transforms:
54 # Warn the user as soon as possible if they are trying to apply
55 # a bilateral transformation on a nested QuerySet: that won't work.
56 from plain.postgres.sql.query import Query # avoid circular import
57
58 if isinstance(rhs, Query):
59 raise NotImplementedError(
60 "Bilateral transformations on nested querysets are not implemented."
61 )
62 self.bilateral_transforms = bilateral_transforms
63
64 def apply_bilateral_transforms(self, value: Any) -> Any:
65 for transform in self.bilateral_transforms:
66 value = transform(value)
67 return value
68
69 def __repr__(self) -> str:
70 return f"{self.__class__.__name__}({self.lhs!r}, {self.rhs!r})"
71
72 def batch_process_rhs(
73 self, compiler: SQLCompiler, connection: DatabaseConnection, rhs: Any = None
74 ) -> tuple[list[str], list[Any]]:
75 if rhs is None:
76 rhs = self.rhs
77 if self.bilateral_transforms:
78 sqls: list[str] = []
79 sqls_params: list[Any] = []
80 for p in rhs:
81 value = Value(p, output_field=self.lhs.output_field)
82 value = self.apply_bilateral_transforms(value)
83 value = value.resolve_expression(compiler.query)
84 sql, sql_params = compiler.compile(value)
85 sqls.append(sql)
86 sqls_params.extend(sql_params)
87 else:
88 _, params = self.get_db_prep_lookup(rhs, connection)
89 sqls = ["%s"] * len(params)
90 sqls_params = list(params)
91 return sqls, sqls_params
92
93 def get_source_expressions(self) -> list[Any]:
94 if self.rhs_is_direct_value():
95 return [self.lhs]
96 return [self.lhs, self.rhs]
97
98 def set_source_expressions(self, exprs: Sequence[Any]) -> None:
99 exprs_list = list(exprs)
100 if len(exprs_list) == 1:
101 self.lhs = exprs_list[0]
102 else:
103 self.lhs, self.rhs = exprs_list
104
105 def get_prep_lookup(self) -> Any:
106 if not self.prepare_rhs or isinstance(self.rhs, ResolvableExpression):
107 return self.rhs
108 if output_field := getattr(self.lhs, "output_field", None):
109 if get_prep_value := getattr(output_field, "get_prep_value", None):
110 return get_prep_value(self.rhs)
111 elif self.rhs_is_direct_value():
112 return Value(self.rhs)
113 return self.rhs
114
115 def get_prep_lhs(self) -> Any:
116 if isinstance(self.lhs, ResolvableExpression):
117 return self.lhs
118 return Value(self.lhs)
119
120 def get_db_prep_lookup(
121 self, value: Any, connection: DatabaseConnection
122 ) -> tuple[str, list[Any]]:
123 return ("%s", [value])
124
125 def process_lhs(
126 self, compiler: SQLCompiler, connection: DatabaseConnection, lhs: Any = None
127 ) -> tuple[str, list[Any]]:
128 lhs = lhs or self.lhs
129 if isinstance(lhs, ResolvableExpression):
130 lhs = lhs.resolve_expression(compiler.query)
131 sql, params = compiler.compile(lhs)
132 if isinstance(lhs, Lookup):
133 # Wrapped in parentheses to respect operator precedence.
134 sql = f"({sql})"
135 return sql, list(params)
136
137 def process_rhs(
138 self, compiler: SQLCompiler, connection: DatabaseConnection
139 ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
140 value = self.rhs
141 if self.bilateral_transforms:
142 if self.rhs_is_direct_value():
143 # Do not call get_db_prep_lookup here as the value will be
144 # transformed before being used for lookup
145 value = Value(value, output_field=self.lhs.output_field)
146 value = self.apply_bilateral_transforms(value)
147 value = value.resolve_expression(compiler.query)
148 if hasattr(value, "as_sql"):
149 sql, params = compiler.compile(value)
150 # Ensure expression is wrapped in parentheses to respect operator
151 # precedence but avoid double wrapping.
152 if sql and sql[0] != "(":
153 sql = f"({sql})"
154 return sql, list(params)
155 else:
156 return self.get_db_prep_lookup(value, connection)
157
158 def rhs_is_direct_value(self) -> bool:
159 return not hasattr(self.rhs, "as_sql")
160
161 def get_group_by_cols(self) -> list[Any]:
162 cols = []
163 for source in self.get_source_expressions():
164 cols.extend(source.get_group_by_cols())
165 return cols
166
167 @cached_property
168 def output_field(self) -> BooleanField:
169 return BooleanField()
170
171 @property
172 def identity(self) -> tuple[type[Lookup], Any, Any]:
173 return self.__class__, self.lhs, self.rhs
174
175 def __eq__(self, other: object) -> bool:
176 if not isinstance(other, Lookup):
177 return NotImplemented
178 return self.identity == other.identity
179
180 def __hash__(self) -> int:
181 return hash(make_hashable(self.identity))
182
183 def resolve_expression(
184 self,
185 query: Any = None,
186 allow_joins: bool = True,
187 reuse: Any = None,
188 summarize: bool = False,
189 for_save: bool = False,
190 ) -> Lookup:
191 c = self.copy()
192 c.is_summary = summarize
193 c.lhs = self.lhs.resolve_expression(
194 query, allow_joins, reuse, summarize, for_save
195 )
196 if isinstance(self.rhs, ResolvableExpression):
197 c.rhs = self.rhs.resolve_expression(
198 query, allow_joins, reuse, summarize, for_save
199 )
200 return c
201
202 def select_format(
203 self, compiler: SQLCompiler, sql: str, params: Sequence[Any]
204 ) -> tuple[str, Sequence[Any]]:
205 # Boolean expressions work directly in SELECT
206 return sql, params
207
208
209class Transform(RegisterLookupMixin, Func):
210 """
211 RegisterLookupMixin() is first so that get_lookup() and get_transform()
212 first examine self and then check output_field.
213 """
214
215 lookup_name: str | None = None
216 bilateral: bool = False
217 arity: int = 1
218
219 @property
220 def lhs(self) -> Any:
221 return self.get_source_expressions()[0]
222
223 def get_bilateral_transforms(self) -> list[type[Transform]]:
224 if hasattr(self.lhs, "get_bilateral_transforms"):
225 bilateral_transforms = self.lhs.get_bilateral_transforms()
226 else:
227 bilateral_transforms = []
228 if self.bilateral:
229 bilateral_transforms.append(self.__class__)
230 return bilateral_transforms
231
232
233class BuiltinLookup(Lookup):
234 def process_lhs(
235 self, compiler: SQLCompiler, connection: DatabaseConnection, lhs: Any = None
236 ) -> tuple[str, list[Any]]:
237 assert self.lookup_name is not None, (
238 "lookup_name must be set on Lookup subclass"
239 )
240 lhs_sql, params = super().process_lhs(compiler, connection, lhs)
241 lhs_sql = lookup_cast(self.lookup_name, self.lhs.output_field) % lhs_sql
242 return lhs_sql, list(params)
243
244 def as_sql(
245 self, compiler: SQLCompiler, connection: DatabaseConnection
246 ) -> tuple[str, list[Any]]:
247 lhs_sql, params = self.process_lhs(compiler, connection)
248 rhs_sql, rhs_params = self.process_rhs(compiler, connection)
249 params.extend(rhs_params)
250 rhs_sql = self.get_rhs_op(connection, rhs_sql)
251 return f"{lhs_sql} {rhs_sql}", params
252
253 def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
254 assert self.lookup_name is not None, (
255 "lookup_name must be set on Lookup subclass"
256 )
257 return OPERATORS[self.lookup_name] % rhs
258
259
260class FieldGetDbPrepValueMixin(Lookup):
261 """
262 Some lookups require Field.get_db_prep_value() to be called on their
263 inputs.
264 """
265
266 get_db_prep_lookup_value_is_iterable: bool = False
267 lhs: Any
268 rhs: Any
269
270 def get_db_prep_lookup(
271 self, value: Any, connection: DatabaseConnection
272 ) -> tuple[str, list[Any]]:
273 # For relational fields, use the 'target_field' attribute of the
274 # output_field.
275 field = getattr(self.lhs.output_field, "target_field", None)
276 get_db_prep_value = (
277 getattr(field, "get_db_prep_value", None)
278 or self.lhs.output_field.get_db_prep_value
279 )
280 return (
281 "%s",
282 [get_db_prep_value(v, connection, prepared=True) for v in value]
283 if self.get_db_prep_lookup_value_is_iterable
284 else [get_db_prep_value(value, connection, prepared=True)],
285 )
286
287
288class FieldGetDbPrepValueIterableMixin(FieldGetDbPrepValueMixin):
289 """
290 Some lookups require Field.get_db_prep_value() to be called on each value
291 in an iterable.
292 """
293
294 get_db_prep_lookup_value_is_iterable: bool = True
295 prepare_rhs: bool
296
297 def get_prep_lookup(self) -> Any:
298 if isinstance(self.rhs, ResolvableExpression):
299 return self.rhs
300 prepared_values = []
301 for rhs_value in self.rhs:
302 if isinstance(rhs_value, ResolvableExpression):
303 # An expression will be handled by the database but can coexist
304 # alongside real values.
305 pass
306 elif self.prepare_rhs:
307 if output_field := getattr(self.lhs, "output_field", None):
308 if get_prep_value := getattr(output_field, "get_prep_value", None):
309 rhs_value = get_prep_value(rhs_value)
310 prepared_values.append(rhs_value)
311 return prepared_values
312
313 def process_rhs(
314 self, compiler: SQLCompiler, connection: DatabaseConnection
315 ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
316 if self.rhs_is_direct_value():
317 # rhs should be an iterable of values. Use batch_process_rhs()
318 # to prepare/transform those values.
319 return self.batch_process_rhs(compiler, connection)
320 else:
321 return super().process_rhs(compiler, connection)
322
323 def resolve_expression_parameter(
324 self,
325 compiler: SQLCompiler,
326 connection: DatabaseConnection,
327 sql: str,
328 param: Any,
329 ) -> tuple[str, list[Any]]:
330 params: list[Any] = [param]
331 if isinstance(param, ResolvableExpression):
332 param = param.resolve_expression(compiler.query)
333 if hasattr(param, "as_sql"):
334 sql, compiled_params = compiler.compile(param)
335 params = list(compiled_params)
336 return sql, params
337
338 def batch_process_rhs(
339 self, compiler: SQLCompiler, connection: DatabaseConnection, rhs: Any = None
340 ) -> tuple[list[str], list[Any]]:
341 pre_processed = super().batch_process_rhs(compiler, connection, rhs)
342 # The params list may contain expressions which compile to a
343 # sql/param pair. Zip them to get sql and param pairs that refer to the
344 # same argument and attempt to replace them with the result of
345 # compiling the param step.
346 sql, params = zip(
347 *(
348 self.resolve_expression_parameter(compiler, connection, sql, param)
349 for sql, param in zip(*pre_processed)
350 )
351 )
352 params_list = list(itertools.chain.from_iterable(params))
353 return list(sql), params_list
354
355
356class OperatorLookup(Lookup):
357 """Lookup defined by a SQL operator."""
358
359 operator: str | None = None
360
361 def as_sql(
362 self, compiler: SQLCompiler, connection: DatabaseConnection
363 ) -> tuple[str, tuple[Any, ...]]:
364 lhs, lhs_params = self.process_lhs(compiler, connection)
365 rhs, rhs_params = self.process_rhs(compiler, connection)
366 params = tuple(lhs_params) + tuple(rhs_params)
367 return f"{lhs} {self.operator} {rhs}", params
368
369
370@Field.register_lookup
371class Exact(FieldGetDbPrepValueMixin, BuiltinLookup):
372 lookup_name: str = "exact"
373
374 def get_prep_lookup(self) -> Any:
375 from plain.postgres.sql.query import Query # avoid circular import
376
377 if isinstance(self.rhs, Query):
378 if self.rhs.has_limit_one():
379 if not self.rhs.has_select_fields:
380 self.rhs.clear_select_clause()
381 self.rhs.add_fields(["id"])
382 else:
383 raise ValueError(
384 "The QuerySet value for an exact lookup must be limited to "
385 "one result using slicing."
386 )
387 return super().get_prep_lookup()
388
389 def as_sql(
390 self, compiler: SQLCompiler, connection: DatabaseConnection
391 ) -> tuple[str, list[Any]]:
392 # Avoid comparison against direct rhs if lhs is a boolean value. That
393 # turns "boolfield__exact=True" into "WHERE boolean_field" instead of
394 # "WHERE boolean_field = True" when allowed.
395 if isinstance(self.rhs, bool) and getattr(self.lhs, "conditional", False):
396 lhs_sql, params = self.process_lhs(compiler, connection)
397 template = "%s" if self.rhs else "NOT %s"
398 return template % lhs_sql, params
399 return super().as_sql(compiler, connection)
400
401
402@Field.register_lookup
403class IExact(BuiltinLookup):
404 lookup_name: str = "iexact"
405 prepare_rhs: bool = False
406
407
408@Field.register_lookup
409class GreaterThan(FieldGetDbPrepValueMixin, BuiltinLookup):
410 lookup_name: str = "gt"
411
412
413@Field.register_lookup
414class GreaterThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup):
415 lookup_name: str = "gte"
416
417
418@Field.register_lookup
419class LessThan(FieldGetDbPrepValueMixin, BuiltinLookup):
420 lookup_name: str = "lt"
421
422
423@Field.register_lookup
424class LessThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup):
425 lookup_name: str = "lte"
426
427
428class IntegerFieldOverflow:
429 underflow_exception: type[Exception] = EmptyResultSet
430 overflow_exception: type[Exception] = EmptyResultSet
431 lhs: Any
432 rhs: Any
433
434 def process_rhs(
435 self, compiler: SQLCompiler, connection: DatabaseConnection
436 ) -> tuple[str, list[Any]]:
437 rhs = self.rhs
438 if isinstance(rhs, int):
439 min_value, max_value = self.lhs.output_field.integer_range
440 if min_value is not None and rhs < min_value:
441 raise self.underflow_exception
442 if max_value is not None and rhs > max_value:
443 raise self.overflow_exception
444 return super().process_rhs(compiler, connection) # ty: ignore[unresolved-attribute]
445
446
447class IntegerFieldFloatRounding:
448 """
449 Allow floats to work as query values for IntegerField. Without this, the
450 decimal portion of the float would always be discarded.
451 """
452
453 rhs: Any
454
455 def get_prep_lookup(self) -> Any:
456 if isinstance(self.rhs, float):
457 self.rhs = math.ceil(self.rhs)
458 return super().get_prep_lookup() # ty: ignore[unresolved-attribute]
459
460
461@IntegerField.register_lookup
462class IntegerFieldExact(IntegerFieldOverflow, Exact):
463 pass
464
465
466@IntegerField.register_lookup
467class IntegerGreaterThan(IntegerFieldOverflow, GreaterThan):
468 underflow_exception = FullResultSet
469
470
471@IntegerField.register_lookup
472class IntegerGreaterThanOrEqual(
473 IntegerFieldOverflow, IntegerFieldFloatRounding, GreaterThanOrEqual
474):
475 underflow_exception = FullResultSet
476
477
478@IntegerField.register_lookup
479class IntegerLessThan(IntegerFieldOverflow, IntegerFieldFloatRounding, LessThan):
480 overflow_exception = FullResultSet
481
482
483@IntegerField.register_lookup
484class IntegerLessThanOrEqual(IntegerFieldOverflow, LessThanOrEqual):
485 overflow_exception = FullResultSet
486
487
488@Field.register_lookup
489class In(FieldGetDbPrepValueIterableMixin, BuiltinLookup):
490 lookup_name: str = "in"
491
492 def get_prep_lookup(self) -> Any:
493 from plain.postgres.sql.query import Query # avoid circular import
494
495 if isinstance(self.rhs, Query):
496 self.rhs.clear_ordering(clear_default=True)
497 if not self.rhs.has_select_fields:
498 self.rhs.clear_select_clause()
499 self.rhs.add_fields(["id"])
500 return super().get_prep_lookup()
501
502 def process_rhs(
503 self, compiler: SQLCompiler, connection: DatabaseConnection
504 ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
505 if self.rhs_is_direct_value():
506 # Remove None from the list as NULL is never equal to anything.
507 try:
508 rhs = OrderedSet(self.rhs)
509 rhs.discard(None)
510 except TypeError: # Unhashable items in self.rhs
511 rhs = [r for r in self.rhs if r is not None]
512
513 if not rhs:
514 raise EmptyResultSet
515
516 # rhs should be an iterable; use batch_process_rhs() to
517 # prepare/transform those values.
518 sqls, sqls_params = self.batch_process_rhs(compiler, connection, rhs)
519 placeholder = "(" + ", ".join(sqls) + ")"
520 return (placeholder, sqls_params)
521 return super().process_rhs(compiler, connection)
522
523 def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
524 return f"IN {rhs}"
525
526 # PostgreSQL has no limit on IN clause size, so no need to override as_sql()
527
528
529class PatternLookup(BuiltinLookup):
530 param_pattern: str = "%%%s%%"
531 prepare_rhs: bool = False
532 bilateral_transforms: list[Any]
533
534 def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
535 # Assume we are in startswith. We need to produce SQL like:
536 # col LIKE %s, ['thevalue%']
537 # For python values we can (and should) do that directly in Python,
538 # but if the value is for example reference to other column, then
539 # we need to add the % pattern match to the lookup by something like
540 # col LIKE othercol || '%%'
541 # So, for Python values we don't need any special pattern, but for
542 # SQL reference values or SQL transformations we need the correct
543 # pattern added.
544 if hasattr(self.rhs, "as_sql") or self.bilateral_transforms:
545 assert self.lookup_name is not None, (
546 "lookup_name must be set on Lookup subclass"
547 )
548 pattern = PATTERN_OPS[self.lookup_name].format(PATTERN_ESC)
549 return pattern.format(rhs)
550 else:
551 return super().get_rhs_op(connection, rhs)
552
553 def process_rhs(
554 self, compiler: SQLCompiler, connection: DatabaseConnection
555 ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
556 rhs, params = super().process_rhs(compiler, connection)
557 if isinstance(rhs, str):
558 if self.rhs_is_direct_value() and params and not self.bilateral_transforms:
559 params[0] = self.param_pattern % prep_for_like_query(params[0])
560 return rhs, params
561 else:
562 return rhs, params
563
564
565@Field.register_lookup
566class Contains(PatternLookup):
567 lookup_name: str = "contains"
568
569
570@Field.register_lookup
571class IContains(Contains):
572 lookup_name: str = "icontains"
573
574
575@Field.register_lookup
576class StartsWith(PatternLookup):
577 lookup_name: str = "startswith"
578 param_pattern: str = "%s%%"
579
580
581@Field.register_lookup
582class IStartsWith(StartsWith):
583 lookup_name: str = "istartswith"
584
585
586@Field.register_lookup
587class EndsWith(PatternLookup):
588 lookup_name: str = "endswith"
589 param_pattern: str = "%%%s"
590
591
592@Field.register_lookup
593class IEndsWith(EndsWith):
594 lookup_name: str = "iendswith"
595
596
597@Field.register_lookup
598class Range(FieldGetDbPrepValueIterableMixin, BuiltinLookup):
599 lookup_name: str = "range"
600
601 def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
602 # Range lookup always receives a list of two elements from process_rhs
603 assert isinstance(rhs, list), f"Range lookup expects list, got {type(rhs)}"
604 return f"BETWEEN {rhs[0]} AND {rhs[1]}"
605
606
607@Field.register_lookup
608class IsNull(BuiltinLookup):
609 lookup_name: str = "isnull"
610 prepare_rhs: bool = False
611
612 def as_sql(
613 self, compiler: SQLCompiler, connection: DatabaseConnection
614 ) -> tuple[str, list[Any]]:
615 if not isinstance(self.rhs, bool):
616 raise ValueError(
617 "The QuerySet value for an isnull lookup must be True or False."
618 )
619 sql, params = self.process_lhs(compiler, connection)
620 if self.rhs:
621 return f"{sql} IS NULL", params
622 else:
623 return f"{sql} IS NOT NULL", params
624
625
626@Field.register_lookup
627class Regex(BuiltinLookup):
628 lookup_name: str = "regex"
629 prepare_rhs: bool = False
630
631 def as_sql(
632 self, compiler: SQLCompiler, connection: DatabaseConnection
633 ) -> tuple[str, list[Any]]:
634 if self.lookup_name in OPERATORS:
635 return super().as_sql(compiler, connection)
636 else:
637 lhs, lhs_params = self.process_lhs(compiler, connection)
638 rhs, rhs_params = self.process_rhs(compiler, connection)
639 sql_template = regex_lookup(self.lookup_name)
640 return sql_template % (lhs, rhs), lhs_params + rhs_params
641
642
643@Field.register_lookup
644class IRegex(Regex):
645 lookup_name: str = "iregex"
646
647
648class YearLookup(Lookup):
649 def year_lookup_bounds(
650 self, connection: DatabaseConnection, year: int
651 ) -> list[datetime.date] | list[datetime.datetime]:
652 from plain.postgres.functions import ExtractIsoYear
653
654 iso_year = isinstance(self.lhs, ExtractIsoYear)
655 output_field = self.lhs.lhs.output_field
656 if isinstance(output_field, DateTimeField):
657 bounds = year_lookup_bounds_for_datetime_field(year, iso_year=iso_year)
658 else:
659 bounds = year_lookup_bounds_for_date_field(year, iso_year=iso_year)
660 return bounds
661
662 def as_sql(
663 self, compiler: SQLCompiler, connection: DatabaseConnection
664 ) -> tuple[str, Sequence[Any]]:
665 # Avoid the extract operation if the rhs is a direct value to allow
666 # indexes to be used.
667 if self.rhs_is_direct_value():
668 # Skip the extract part by directly using the originating field,
669 # that is self.lhs.lhs.
670 lhs_sql, params = self.process_lhs(compiler, connection, self.lhs.lhs)
671 rhs_sql, _ = self.process_rhs(compiler, connection)
672 # rhs_sql should be a string for year lookups
673 assert isinstance(rhs_sql, str), f"Expected str, got {type(rhs_sql)}"
674 rhs_sql = self.get_direct_rhs_sql(connection, rhs_sql)
675 start, finish = self.year_lookup_bounds(connection, self.rhs)
676 params.extend(self.get_bound_params(start, finish))
677 return f"{lhs_sql} {rhs_sql}", params
678 return super().as_sql(compiler, connection)
679
680 def get_direct_rhs_sql(self, connection: DatabaseConnection, rhs: str) -> str:
681 assert self.lookup_name is not None, (
682 "lookup_name must be set on Lookup subclass"
683 )
684 return OPERATORS[self.lookup_name] % rhs
685
686 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any, ...]:
687 """Return bound parameters for the year lookup."""
688 raise NotImplementedError("Subclasses must implement get_bound_params()")
689
690
691class YearExact(YearLookup, Exact):
692 def get_direct_rhs_sql(self, connection: DatabaseConnection, rhs: str) -> str:
693 return "BETWEEN %s AND %s"
694
695 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any, Any]:
696 return (start, finish)
697
698
699class YearGt(YearLookup, GreaterThan):
700 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
701 return (finish,)
702
703
704class YearGte(YearLookup, GreaterThanOrEqual):
705 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
706 return (start,)
707
708
709class YearLt(YearLookup, LessThan):
710 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
711 return (start,)
712
713
714class YearLte(YearLookup, LessThanOrEqual):
715 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
716 return (finish,)
717
718
719# UUID lookups - PostgreSQL has native UUID support so these inherit directly
720# from their base classes without any special processing.
721
722
723@UUIDField.register_lookup
724class UUIDIExact(IExact):
725 pass
726
727
728@UUIDField.register_lookup
729class UUIDContains(Contains):
730 pass
731
732
733@UUIDField.register_lookup
734class UUIDIContains(IContains):
735 pass
736
737
738@UUIDField.register_lookup
739class UUIDStartsWith(StartsWith):
740 pass
741
742
743@UUIDField.register_lookup
744class UUIDIStartsWith(IStartsWith):
745 pass
746
747
748@UUIDField.register_lookup
749class UUIDEndsWith(EndsWith):
750 pass
751
752
753@UUIDField.register_lookup
754class UUIDIEndsWith(IEndsWith):
755 pass