1from __future__ import annotations
2
3import datetime
4import itertools
5import math
6from collections.abc import Sequence
7from functools import cached_property
8from typing import TYPE_CHECKING, Any
9
10from plain.models.exceptions import EmptyResultSet, FullResultSet
11from plain.models.expressions import Expression, Func, ResolvableExpression, Value
12from plain.models.fields import (
13 BooleanField,
14 DateTimeField,
15 Field,
16 IntegerField,
17 UUIDField,
18)
19from plain.models.postgres.sql import (
20 INTEGER_FIELD_RANGES,
21 OPERATORS,
22 PATTERN_ESC,
23 PATTERN_OPS,
24 lookup_cast,
25 prep_for_like_query,
26 regex_lookup,
27 year_lookup_bounds_for_date_field,
28 year_lookup_bounds_for_datetime_field,
29)
30from plain.models.query_utils import RegisterLookupMixin
31from plain.utils.datastructures import OrderedSet
32from plain.utils.hashable import make_hashable
33
34if TYPE_CHECKING:
35 from plain.models.postgres.wrapper import DatabaseWrapper
36 from plain.models.sql.compiler import SQLCompiler
37
38
39class Lookup(Expression):
40 lookup_name: str | None = None
41 prepare_rhs: bool = True
42 can_use_none_as_rhs: bool = False
43 lhs: Any
44 rhs: Any
45
46 def __init__(self, lhs: Any, rhs: Any):
47 self.lhs, self.rhs = lhs, rhs
48 self.rhs = self.get_prep_lookup()
49 self.lhs = self.get_prep_lhs()
50 if hasattr(self.lhs, "get_bilateral_transforms"):
51 bilateral_transforms = self.lhs.get_bilateral_transforms()
52 else:
53 bilateral_transforms = []
54 if bilateral_transforms:
55 # Warn the user as soon as possible if they are trying to apply
56 # a bilateral transformation on a nested QuerySet: that won't work.
57 from plain.models.sql.query import Query # avoid circular import
58
59 if isinstance(rhs, Query):
60 raise NotImplementedError(
61 "Bilateral transformations on nested querysets are not implemented."
62 )
63 self.bilateral_transforms = bilateral_transforms
64
65 def apply_bilateral_transforms(self, value: Any) -> Any:
66 for transform in self.bilateral_transforms:
67 value = transform(value)
68 return value
69
70 def __repr__(self) -> str:
71 return f"{self.__class__.__name__}({self.lhs!r}, {self.rhs!r})"
72
73 def batch_process_rhs(
74 self, compiler: SQLCompiler, connection: DatabaseWrapper, rhs: Any = None
75 ) -> tuple[list[str], list[Any]]:
76 if rhs is None:
77 rhs = self.rhs
78 if self.bilateral_transforms:
79 sqls: list[str] = []
80 sqls_params: list[Any] = []
81 for p in rhs:
82 value = Value(p, output_field=self.lhs.output_field)
83 value = self.apply_bilateral_transforms(value)
84 value = value.resolve_expression(compiler.query)
85 sql, sql_params = compiler.compile(value)
86 sqls.append(sql)
87 sqls_params.extend(sql_params)
88 else:
89 _, params = self.get_db_prep_lookup(rhs, connection)
90 sqls = ["%s"] * len(params)
91 sqls_params = list(params)
92 return sqls, sqls_params
93
94 def get_source_expressions(self) -> list[Any]:
95 if self.rhs_is_direct_value():
96 return [self.lhs]
97 return [self.lhs, self.rhs]
98
99 def set_source_expressions(self, exprs: Sequence[Any]) -> None:
100 exprs_list = list(exprs)
101 if len(exprs_list) == 1:
102 self.lhs = exprs_list[0]
103 else:
104 self.lhs, self.rhs = exprs_list
105
106 def get_prep_lookup(self) -> Any:
107 if not self.prepare_rhs or isinstance(self.rhs, ResolvableExpression):
108 return self.rhs
109 if output_field := getattr(self.lhs, "output_field", None):
110 if get_prep_value := getattr(output_field, "get_prep_value", None):
111 return get_prep_value(self.rhs)
112 elif self.rhs_is_direct_value():
113 return Value(self.rhs)
114 return self.rhs
115
116 def get_prep_lhs(self) -> Any:
117 if isinstance(self.lhs, ResolvableExpression):
118 return self.lhs
119 return Value(self.lhs)
120
121 def get_db_prep_lookup(
122 self, value: Any, connection: DatabaseWrapper
123 ) -> tuple[str, list[Any]]:
124 return ("%s", [value])
125
126 def process_lhs(
127 self, compiler: SQLCompiler, connection: DatabaseWrapper, lhs: Any = None
128 ) -> tuple[str, list[Any]]:
129 lhs = lhs or self.lhs
130 if isinstance(lhs, ResolvableExpression):
131 lhs = lhs.resolve_expression(compiler.query)
132 sql, params = compiler.compile(lhs)
133 if isinstance(lhs, Lookup):
134 # Wrapped in parentheses to respect operator precedence.
135 sql = f"({sql})"
136 return sql, list(params)
137
138 def process_rhs(
139 self, compiler: SQLCompiler, connection: DatabaseWrapper
140 ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
141 value = self.rhs
142 if self.bilateral_transforms:
143 if self.rhs_is_direct_value():
144 # Do not call get_db_prep_lookup here as the value will be
145 # transformed before being used for lookup
146 value = Value(value, output_field=self.lhs.output_field)
147 value = self.apply_bilateral_transforms(value)
148 value = value.resolve_expression(compiler.query)
149 if hasattr(value, "as_sql"):
150 sql, params = compiler.compile(value)
151 # Ensure expression is wrapped in parentheses to respect operator
152 # precedence but avoid double wrapping.
153 if sql and sql[0] != "(":
154 sql = f"({sql})"
155 return sql, list(params)
156 else:
157 return self.get_db_prep_lookup(value, connection)
158
159 def rhs_is_direct_value(self) -> bool:
160 return not hasattr(self.rhs, "as_sql")
161
162 def get_group_by_cols(self) -> list[Any]:
163 cols = []
164 for source in self.get_source_expressions():
165 cols.extend(source.get_group_by_cols())
166 return cols
167
168 @cached_property
169 def output_field(self) -> BooleanField:
170 return BooleanField()
171
172 @property
173 def identity(self) -> tuple[type[Lookup], Any, Any]:
174 return self.__class__, self.lhs, self.rhs
175
176 def __eq__(self, other: object) -> bool:
177 if not isinstance(other, Lookup):
178 return NotImplemented
179 return self.identity == other.identity
180
181 def __hash__(self) -> int:
182 return hash(make_hashable(self.identity))
183
184 def resolve_expression(
185 self,
186 query: Any = None,
187 allow_joins: bool = True,
188 reuse: Any = None,
189 summarize: bool = False,
190 for_save: bool = False,
191 ) -> Lookup:
192 c = self.copy()
193 c.is_summary = summarize
194 c.lhs = self.lhs.resolve_expression(
195 query, allow_joins, reuse, summarize, for_save
196 )
197 if isinstance(self.rhs, ResolvableExpression):
198 c.rhs = self.rhs.resolve_expression(
199 query, allow_joins, reuse, summarize, for_save
200 )
201 return c
202
203 def select_format(
204 self, compiler: SQLCompiler, sql: str, params: Sequence[Any]
205 ) -> tuple[str, Sequence[Any]]:
206 # Boolean expressions work directly in SELECT
207 return sql, params
208
209
210class Transform(RegisterLookupMixin, Func):
211 """
212 RegisterLookupMixin() is first so that get_lookup() and get_transform()
213 first examine self and then check output_field.
214 """
215
216 bilateral: bool = False
217 arity: int = 1
218
219 @property
220 def lhs(self) -> Any:
221 return self.get_source_expressions()[0]
222
223 def get_bilateral_transforms(self) -> list[type[Transform]]:
224 if hasattr(self.lhs, "get_bilateral_transforms"):
225 bilateral_transforms = self.lhs.get_bilateral_transforms()
226 else:
227 bilateral_transforms = []
228 if self.bilateral:
229 bilateral_transforms.append(self.__class__)
230 return bilateral_transforms
231
232
233class BuiltinLookup(Lookup):
234 def process_lhs(
235 self, compiler: SQLCompiler, connection: DatabaseWrapper, lhs: Any = None
236 ) -> tuple[str, list[Any]]:
237 assert self.lookup_name is not None, (
238 "lookup_name must be set on Lookup subclass"
239 )
240 lhs_sql, params = super().process_lhs(compiler, connection, lhs)
241 field_internal_type = self.lhs.output_field.get_internal_type()
242 lhs_sql = lookup_cast(self.lookup_name, field_internal_type) % lhs_sql
243 return lhs_sql, list(params)
244
245 def as_sql(
246 self, compiler: SQLCompiler, connection: DatabaseWrapper
247 ) -> tuple[str, list[Any]]:
248 lhs_sql, params = self.process_lhs(compiler, connection)
249 rhs_sql, rhs_params = self.process_rhs(compiler, connection)
250 params.extend(rhs_params)
251 rhs_sql = self.get_rhs_op(connection, rhs_sql)
252 return f"{lhs_sql} {rhs_sql}", params
253
254 def get_rhs_op(self, connection: DatabaseWrapper, rhs: str | list[str]) -> str:
255 assert self.lookup_name is not None, (
256 "lookup_name must be set on Lookup subclass"
257 )
258 return OPERATORS[self.lookup_name] % rhs
259
260
261class FieldGetDbPrepValueMixin(Lookup):
262 """
263 Some lookups require Field.get_db_prep_value() to be called on their
264 inputs.
265 """
266
267 get_db_prep_lookup_value_is_iterable: bool = False
268 lhs: Any
269 rhs: Any
270
271 def get_db_prep_lookup(
272 self, value: Any, connection: DatabaseWrapper
273 ) -> tuple[str, list[Any]]:
274 # For relational fields, use the 'target_field' attribute of the
275 # output_field.
276 field = getattr(self.lhs.output_field, "target_field", None)
277 get_db_prep_value = (
278 getattr(field, "get_db_prep_value", None)
279 or self.lhs.output_field.get_db_prep_value
280 )
281 return (
282 "%s",
283 [get_db_prep_value(v, connection, prepared=True) for v in value]
284 if self.get_db_prep_lookup_value_is_iterable
285 else [get_db_prep_value(value, connection, prepared=True)],
286 )
287
288
289class FieldGetDbPrepValueIterableMixin(FieldGetDbPrepValueMixin):
290 """
291 Some lookups require Field.get_db_prep_value() to be called on each value
292 in an iterable.
293 """
294
295 get_db_prep_lookup_value_is_iterable: bool = True
296 prepare_rhs: bool
297
298 def get_prep_lookup(self) -> Any:
299 if isinstance(self.rhs, ResolvableExpression):
300 return self.rhs
301 prepared_values = []
302 for rhs_value in self.rhs:
303 if isinstance(rhs_value, ResolvableExpression):
304 # An expression will be handled by the database but can coexist
305 # alongside real values.
306 pass
307 elif self.prepare_rhs:
308 if output_field := getattr(self.lhs, "output_field", None):
309 if get_prep_value := getattr(output_field, "get_prep_value", None):
310 rhs_value = get_prep_value(rhs_value)
311 prepared_values.append(rhs_value)
312 return prepared_values
313
314 def process_rhs(
315 self, compiler: SQLCompiler, connection: DatabaseWrapper
316 ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
317 if self.rhs_is_direct_value():
318 # rhs should be an iterable of values. Use batch_process_rhs()
319 # to prepare/transform those values.
320 return self.batch_process_rhs(compiler, connection)
321 else:
322 return super().process_rhs(compiler, connection)
323
324 def resolve_expression_parameter(
325 self,
326 compiler: SQLCompiler,
327 connection: DatabaseWrapper,
328 sql: str,
329 param: Any,
330 ) -> tuple[str, list[Any]]:
331 params: list[Any] = [param]
332 if isinstance(param, ResolvableExpression):
333 param = param.resolve_expression(compiler.query)
334 if hasattr(param, "as_sql"):
335 sql, compiled_params = compiler.compile(param)
336 params = list(compiled_params)
337 return sql, params
338
339 def batch_process_rhs(
340 self, compiler: SQLCompiler, connection: DatabaseWrapper, rhs: Any = None
341 ) -> tuple[list[str], list[Any]]:
342 pre_processed = super().batch_process_rhs(compiler, connection, rhs)
343 # The params list may contain expressions which compile to a
344 # sql/param pair. Zip them to get sql and param pairs that refer to the
345 # same argument and attempt to replace them with the result of
346 # compiling the param step.
347 sql, params = zip(
348 *(
349 self.resolve_expression_parameter(compiler, connection, sql, param)
350 for sql, param in zip(*pre_processed)
351 )
352 )
353 params_list = list(itertools.chain.from_iterable(params))
354 return list(sql), params_list
355
356
357class OperatorLookup(Lookup):
358 """Lookup defined by a SQL operator."""
359
360 operator: str | None = None
361
362 def as_sql(
363 self, compiler: SQLCompiler, connection: DatabaseWrapper
364 ) -> tuple[str, tuple[Any, ...]]:
365 lhs, lhs_params = self.process_lhs(compiler, connection)
366 rhs, rhs_params = self.process_rhs(compiler, connection)
367 params = tuple(lhs_params) + tuple(rhs_params)
368 return f"{lhs} {self.operator} {rhs}", params
369
370
371@Field.register_lookup
372class Exact(FieldGetDbPrepValueMixin, BuiltinLookup):
373 lookup_name: str = "exact"
374
375 def get_prep_lookup(self) -> Any:
376 from plain.models.sql.query import Query # avoid circular import
377
378 if isinstance(self.rhs, Query):
379 if self.rhs.has_limit_one():
380 if not self.rhs.has_select_fields:
381 self.rhs.clear_select_clause()
382 self.rhs.add_fields(["id"])
383 else:
384 raise ValueError(
385 "The QuerySet value for an exact lookup must be limited to "
386 "one result using slicing."
387 )
388 return super().get_prep_lookup()
389
390 def as_sql(
391 self, compiler: SQLCompiler, connection: DatabaseWrapper
392 ) -> tuple[str, list[Any]]:
393 # Avoid comparison against direct rhs if lhs is a boolean value. That
394 # turns "boolfield__exact=True" into "WHERE boolean_field" instead of
395 # "WHERE boolean_field = True" when allowed.
396 if isinstance(self.rhs, bool) and getattr(self.lhs, "conditional", False):
397 lhs_sql, params = self.process_lhs(compiler, connection)
398 template = "%s" if self.rhs else "NOT %s"
399 return template % lhs_sql, params
400 return super().as_sql(compiler, connection)
401
402
403@Field.register_lookup
404class IExact(BuiltinLookup):
405 lookup_name: str = "iexact"
406 prepare_rhs: bool = False
407
408
409@Field.register_lookup
410class GreaterThan(FieldGetDbPrepValueMixin, BuiltinLookup):
411 lookup_name: str = "gt"
412
413
414@Field.register_lookup
415class GreaterThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup):
416 lookup_name: str = "gte"
417
418
419@Field.register_lookup
420class LessThan(FieldGetDbPrepValueMixin, BuiltinLookup):
421 lookup_name: str = "lt"
422
423
424@Field.register_lookup
425class LessThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup):
426 lookup_name: str = "lte"
427
428
429class IntegerFieldOverflow:
430 underflow_exception: type[Exception] = EmptyResultSet
431 overflow_exception: type[Exception] = EmptyResultSet
432 lhs: Any
433 rhs: Any
434
435 def process_rhs(
436 self, compiler: SQLCompiler, connection: DatabaseWrapper
437 ) -> tuple[str, list[Any]]:
438 rhs = self.rhs
439 if isinstance(rhs, int):
440 field_internal_type = self.lhs.output_field.get_internal_type()
441 min_value, max_value = INTEGER_FIELD_RANGES[field_internal_type]
442 if min_value is not None and rhs < min_value:
443 raise self.underflow_exception
444 if max_value is not None and rhs > max_value:
445 raise self.overflow_exception
446 return super().process_rhs(compiler, connection) # type: ignore[misc]
447
448
449class IntegerFieldFloatRounding:
450 """
451 Allow floats to work as query values for IntegerField. Without this, the
452 decimal portion of the float would always be discarded.
453 """
454
455 rhs: Any
456
457 def get_prep_lookup(self) -> Any:
458 if isinstance(self.rhs, float):
459 self.rhs = math.ceil(self.rhs)
460 return super().get_prep_lookup() # type: ignore[misc]
461
462
463@IntegerField.register_lookup
464class IntegerFieldExact(IntegerFieldOverflow, Exact):
465 pass
466
467
468@IntegerField.register_lookup
469class IntegerGreaterThan(IntegerFieldOverflow, GreaterThan):
470 underflow_exception = FullResultSet
471
472
473@IntegerField.register_lookup
474class IntegerGreaterThanOrEqual(
475 IntegerFieldOverflow, IntegerFieldFloatRounding, GreaterThanOrEqual
476):
477 underflow_exception = FullResultSet
478
479
480@IntegerField.register_lookup
481class IntegerLessThan(IntegerFieldOverflow, IntegerFieldFloatRounding, LessThan):
482 overflow_exception = FullResultSet
483
484
485@IntegerField.register_lookup
486class IntegerLessThanOrEqual(IntegerFieldOverflow, LessThanOrEqual):
487 overflow_exception = FullResultSet
488
489
490@Field.register_lookup
491class In(FieldGetDbPrepValueIterableMixin, BuiltinLookup):
492 lookup_name: str = "in"
493
494 def get_prep_lookup(self) -> Any:
495 from plain.models.sql.query import Query # avoid circular import
496
497 if isinstance(self.rhs, Query):
498 self.rhs.clear_ordering(clear_default=True)
499 if not self.rhs.has_select_fields:
500 self.rhs.clear_select_clause()
501 self.rhs.add_fields(["id"])
502 return super().get_prep_lookup()
503
504 def process_rhs(
505 self, compiler: SQLCompiler, connection: DatabaseWrapper
506 ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
507 if self.rhs_is_direct_value():
508 # Remove None from the list as NULL is never equal to anything.
509 try:
510 rhs = OrderedSet(self.rhs)
511 rhs.discard(None)
512 except TypeError: # Unhashable items in self.rhs
513 rhs = [r for r in self.rhs if r is not None]
514
515 if not rhs:
516 raise EmptyResultSet
517
518 # rhs should be an iterable; use batch_process_rhs() to
519 # prepare/transform those values.
520 sqls, sqls_params = self.batch_process_rhs(compiler, connection, rhs)
521 placeholder = "(" + ", ".join(sqls) + ")"
522 return (placeholder, sqls_params)
523 return super().process_rhs(compiler, connection)
524
525 def get_rhs_op(self, connection: DatabaseWrapper, rhs: str | list[str]) -> str:
526 return f"IN {rhs}"
527
528 # PostgreSQL has no limit on IN clause size, so no need to override as_sql()
529
530
531class PatternLookup(BuiltinLookup):
532 param_pattern: str = "%%%s%%"
533 prepare_rhs: bool = False
534 bilateral_transforms: list[Any]
535
536 def get_rhs_op(self, connection: DatabaseWrapper, rhs: str | list[str]) -> str:
537 # Assume we are in startswith. We need to produce SQL like:
538 # col LIKE %s, ['thevalue%']
539 # For python values we can (and should) do that directly in Python,
540 # but if the value is for example reference to other column, then
541 # we need to add the % pattern match to the lookup by something like
542 # col LIKE othercol || '%%'
543 # So, for Python values we don't need any special pattern, but for
544 # SQL reference values or SQL transformations we need the correct
545 # pattern added.
546 if hasattr(self.rhs, "as_sql") or self.bilateral_transforms:
547 assert self.lookup_name is not None, (
548 "lookup_name must be set on Lookup subclass"
549 )
550 pattern = PATTERN_OPS[self.lookup_name].format(PATTERN_ESC)
551 return pattern.format(rhs)
552 else:
553 return super().get_rhs_op(connection, rhs)
554
555 def process_rhs(
556 self, compiler: SQLCompiler, connection: DatabaseWrapper
557 ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
558 rhs, params = super().process_rhs(compiler, connection)
559 if isinstance(rhs, str):
560 if self.rhs_is_direct_value() and params and not self.bilateral_transforms:
561 params[0] = self.param_pattern % prep_for_like_query(params[0])
562 return rhs, params
563 else:
564 return rhs, params
565
566
567@Field.register_lookup
568class Contains(PatternLookup):
569 lookup_name: str = "contains"
570
571
572@Field.register_lookup
573class IContains(Contains):
574 lookup_name: str = "icontains"
575
576
577@Field.register_lookup
578class StartsWith(PatternLookup):
579 lookup_name: str = "startswith"
580 param_pattern: str = "%s%%"
581
582
583@Field.register_lookup
584class IStartsWith(StartsWith):
585 lookup_name: str = "istartswith"
586
587
588@Field.register_lookup
589class EndsWith(PatternLookup):
590 lookup_name: str = "endswith"
591 param_pattern: str = "%%%s"
592
593
594@Field.register_lookup
595class IEndsWith(EndsWith):
596 lookup_name: str = "iendswith"
597
598
599@Field.register_lookup
600class Range(FieldGetDbPrepValueIterableMixin, BuiltinLookup):
601 lookup_name: str = "range"
602
603 def get_rhs_op(self, connection: DatabaseWrapper, rhs: str | list[str]) -> str:
604 # Range lookup always receives a list of two elements from process_rhs
605 assert isinstance(rhs, list), f"Range lookup expects list, got {type(rhs)}"
606 return f"BETWEEN {rhs[0]} AND {rhs[1]}"
607
608
609@Field.register_lookup
610class IsNull(BuiltinLookup):
611 lookup_name: str = "isnull"
612 prepare_rhs: bool = False
613
614 def as_sql(
615 self, compiler: SQLCompiler, connection: DatabaseWrapper
616 ) -> tuple[str, list[Any]]:
617 if not isinstance(self.rhs, bool):
618 raise ValueError(
619 "The QuerySet value for an isnull lookup must be True or False."
620 )
621 sql, params = self.process_lhs(compiler, connection)
622 if self.rhs:
623 return f"{sql} IS NULL", params
624 else:
625 return f"{sql} IS NOT NULL", params
626
627
628@Field.register_lookup
629class Regex(BuiltinLookup):
630 lookup_name: str = "regex"
631 prepare_rhs: bool = False
632
633 def as_sql(
634 self, compiler: SQLCompiler, connection: DatabaseWrapper
635 ) -> tuple[str, list[Any]]:
636 if self.lookup_name in OPERATORS:
637 return super().as_sql(compiler, connection)
638 else:
639 lhs, lhs_params = self.process_lhs(compiler, connection)
640 rhs, rhs_params = self.process_rhs(compiler, connection)
641 sql_template = regex_lookup(self.lookup_name)
642 return sql_template % (lhs, rhs), lhs_params + rhs_params
643
644
645@Field.register_lookup
646class IRegex(Regex):
647 lookup_name: str = "iregex"
648
649
650class YearLookup(Lookup):
651 def year_lookup_bounds(
652 self, connection: DatabaseWrapper, year: int
653 ) -> list[datetime.date] | list[datetime.datetime]:
654 from plain.models.functions import ExtractIsoYear
655
656 iso_year = isinstance(self.lhs, ExtractIsoYear)
657 output_field = self.lhs.lhs.output_field
658 if isinstance(output_field, DateTimeField):
659 bounds = year_lookup_bounds_for_datetime_field(year, iso_year=iso_year)
660 else:
661 bounds = year_lookup_bounds_for_date_field(year, iso_year=iso_year)
662 return bounds
663
664 def as_sql(
665 self, compiler: SQLCompiler, connection: DatabaseWrapper
666 ) -> tuple[str, Sequence[Any]]:
667 # Avoid the extract operation if the rhs is a direct value to allow
668 # indexes to be used.
669 if self.rhs_is_direct_value():
670 # Skip the extract part by directly using the originating field,
671 # that is self.lhs.lhs.
672 lhs_sql, params = self.process_lhs(compiler, connection, self.lhs.lhs)
673 rhs_sql, _ = self.process_rhs(compiler, connection)
674 # rhs_sql should be a string for year lookups
675 assert isinstance(rhs_sql, str), f"Expected str, got {type(rhs_sql)}"
676 rhs_sql = self.get_direct_rhs_sql(connection, rhs_sql)
677 start, finish = self.year_lookup_bounds(connection, self.rhs)
678 params.extend(self.get_bound_params(start, finish))
679 return f"{lhs_sql} {rhs_sql}", params
680 return super().as_sql(compiler, connection)
681
682 def get_direct_rhs_sql(self, connection: DatabaseWrapper, rhs: str) -> str:
683 assert self.lookup_name is not None, (
684 "lookup_name must be set on Lookup subclass"
685 )
686 return OPERATORS[self.lookup_name] % rhs
687
688 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any, ...]:
689 """Return bound parameters for the year lookup."""
690 raise NotImplementedError("Subclasses must implement get_bound_params()")
691
692
693class YearExact(YearLookup, Exact):
694 def get_direct_rhs_sql(self, connection: DatabaseWrapper, rhs: str) -> str:
695 return "BETWEEN %s AND %s"
696
697 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any, Any]:
698 return (start, finish)
699
700
701class YearGt(YearLookup, GreaterThan):
702 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
703 return (finish,)
704
705
706class YearGte(YearLookup, GreaterThanOrEqual):
707 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
708 return (start,)
709
710
711class YearLt(YearLookup, LessThan):
712 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
713 return (start,)
714
715
716class YearLte(YearLookup, LessThanOrEqual):
717 def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
718 return (finish,)
719
720
721# UUID lookups - PostgreSQL has native UUID support so these inherit directly
722# from their base classes without any special processing.
723
724
725@UUIDField.register_lookup
726class UUIDIExact(IExact):
727 pass
728
729
730@UUIDField.register_lookup
731class UUIDContains(Contains):
732 pass
733
734
735@UUIDField.register_lookup
736class UUIDIContains(IContains):
737 pass
738
739
740@UUIDField.register_lookup
741class UUIDStartsWith(StartsWith):
742 pass
743
744
745@UUIDField.register_lookup
746class UUIDIStartsWith(IStartsWith):
747 pass
748
749
750@UUIDField.register_lookup
751class UUIDEndsWith(EndsWith):
752 pass
753
754
755@UUIDField.register_lookup
756class UUIDIEndsWith(IEndsWith):
757 pass