v0.146.0
  1from __future__ import annotations
  2
  3import datetime
  4import itertools
  5import math
  6from collections.abc import Sequence
  7from functools import cached_property
  8from typing import TYPE_CHECKING, Any
  9
 10from plain.postgres.dialect import (
 11    OPERATORS,
 12    PATTERN_ESC,
 13    PATTERN_OPS,
 14    lookup_cast,
 15    prep_for_like_query,
 16    regex_lookup,
 17    year_lookup_bounds_for_date_field,
 18    year_lookup_bounds_for_datetime_field,
 19)
 20from plain.postgres.exceptions import EmptyResultSet, FullResultSet
 21from plain.postgres.expressions import Expression, Func, ResolvableExpression, Value
 22from plain.postgres.fields import (
 23    BooleanField,
 24    DateTimeField,
 25    Field,
 26    IntegerField,
 27    UUIDField,
 28)
 29from plain.postgres.query_utils import RegisterLookupMixin
 30from plain.utils.datastructures import OrderedSet
 31from plain.utils.hashable import make_hashable
 32
 33if TYPE_CHECKING:
 34    from plain.postgres.connection import DatabaseConnection
 35    from plain.postgres.sql.compiler import SQLCompiler
 36
 37
 38class Lookup(Expression):
 39    lookup_name: str | None = None
 40    prepare_rhs: bool = True
 41    can_use_none_as_rhs: bool = False
 42    lhs: Any
 43    rhs: Any
 44
 45    def __init__(self, lhs: Any, rhs: Any):
 46        self.lhs, self.rhs = lhs, rhs
 47        self.rhs = self.get_prep_lookup()
 48        self.lhs = self.get_prep_lhs()
 49        if hasattr(self.lhs, "get_bilateral_transforms"):
 50            bilateral_transforms = self.lhs.get_bilateral_transforms()
 51        else:
 52            bilateral_transforms = []
 53        if bilateral_transforms:
 54            # Warn the user as soon as possible if they are trying to apply
 55            # a bilateral transformation on a nested QuerySet: that won't work.
 56            from plain.postgres.sql.query import Query  # avoid circular import
 57
 58            if isinstance(rhs, Query):
 59                raise NotImplementedError(
 60                    "Bilateral transformations on nested querysets are not implemented."
 61                )
 62        self.bilateral_transforms = bilateral_transforms
 63
 64    def apply_bilateral_transforms(self, value: Any) -> Any:
 65        for transform in self.bilateral_transforms:
 66            value = transform(value)
 67        return value
 68
 69    def __repr__(self) -> str:
 70        return f"{self.__class__.__name__}({self.lhs!r}, {self.rhs!r})"
 71
 72    def batch_process_rhs(
 73        self, compiler: SQLCompiler, connection: DatabaseConnection, rhs: Any = None
 74    ) -> tuple[list[str], list[Any]]:
 75        if rhs is None:
 76            rhs = self.rhs
 77        if self.bilateral_transforms:
 78            sqls: list[str] = []
 79            sqls_params: list[Any] = []
 80            for p in rhs:
 81                value = Value(p, output_field=self.lhs.output_field)
 82                value = self.apply_bilateral_transforms(value)
 83                value = value.resolve_expression(compiler.query)
 84                sql, sql_params = compiler.compile(value)
 85                sqls.append(sql)
 86                sqls_params.extend(sql_params)
 87        else:
 88            _, params = self.get_db_prep_lookup(rhs, connection)
 89            sqls = ["%s"] * len(params)
 90            sqls_params = list(params)
 91        return sqls, sqls_params
 92
 93    def get_source_expressions(self) -> list[Any]:
 94        if self.rhs_is_direct_value():
 95            return [self.lhs]
 96        return [self.lhs, self.rhs]
 97
 98    def set_source_expressions(self, exprs: Sequence[Any]) -> None:
 99        exprs_list = list(exprs)
100        if len(exprs_list) == 1:
101            self.lhs = exprs_list[0]
102        else:
103            self.lhs, self.rhs = exprs_list
104
105    def get_prep_lookup(self) -> Any:
106        if not self.prepare_rhs or isinstance(self.rhs, ResolvableExpression):
107            return self.rhs
108        if output_field := getattr(self.lhs, "output_field", None):
109            if get_prep_value := getattr(output_field, "get_prep_value", None):
110                return get_prep_value(self.rhs)
111        elif self.rhs_is_direct_value():
112            return Value(self.rhs)
113        return self.rhs
114
115    def get_prep_lhs(self) -> Any:
116        if isinstance(self.lhs, ResolvableExpression):
117            return self.lhs
118        return Value(self.lhs)
119
120    def get_db_prep_lookup(
121        self, value: Any, connection: DatabaseConnection
122    ) -> tuple[str, list[Any]]:
123        return ("%s", [value])
124
125    def process_lhs(
126        self, compiler: SQLCompiler, connection: DatabaseConnection, lhs: Any = None
127    ) -> tuple[str, list[Any]]:
128        lhs = lhs or self.lhs
129        if isinstance(lhs, ResolvableExpression):
130            lhs = lhs.resolve_expression(compiler.query)
131        sql, params = compiler.compile(lhs)
132        if isinstance(lhs, Lookup):
133            # Wrapped in parentheses to respect operator precedence.
134            sql = f"({sql})"
135        return sql, list(params)
136
137    def process_rhs(
138        self, compiler: SQLCompiler, connection: DatabaseConnection
139    ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
140        value = self.rhs
141        if self.bilateral_transforms:
142            if self.rhs_is_direct_value():
143                # Do not call get_db_prep_lookup here as the value will be
144                # transformed before being used for lookup
145                value = Value(value, output_field=self.lhs.output_field)
146            value = self.apply_bilateral_transforms(value)
147            value = value.resolve_expression(compiler.query)
148        if hasattr(value, "as_sql"):
149            sql, params = compiler.compile(value)
150            # Ensure expression is wrapped in parentheses to respect operator
151            # precedence but avoid double wrapping.
152            if sql and sql[0] != "(":
153                sql = f"({sql})"
154            return sql, list(params)
155        else:
156            return self.get_db_prep_lookup(value, connection)
157
158    def rhs_is_direct_value(self) -> bool:
159        return not hasattr(self.rhs, "as_sql")
160
161    def get_group_by_cols(self) -> list[Any]:
162        cols = []
163        for source in self.get_source_expressions():
164            cols.extend(source.get_group_by_cols())
165        return cols
166
167    @cached_property
168    def output_field(self) -> BooleanField:
169        return BooleanField()
170
171    @property
172    def identity(self) -> tuple[type[Lookup], Any, Any]:
173        return self.__class__, self.lhs, self.rhs
174
175    def __eq__(self, other: object) -> bool:
176        if not isinstance(other, Lookup):
177            return NotImplemented
178        return self.identity == other.identity
179
180    def __hash__(self) -> int:
181        return hash(make_hashable(self.identity))
182
183    def resolve_expression(
184        self,
185        query: Any = None,
186        allow_joins: bool = True,
187        reuse: Any = None,
188        summarize: bool = False,
189        for_save: bool = False,
190    ) -> Lookup:
191        c = self.copy()
192        c.is_summary = summarize
193        c.lhs = self.lhs.resolve_expression(
194            query, allow_joins, reuse, summarize, for_save
195        )
196        if isinstance(self.rhs, ResolvableExpression):
197            c.rhs = self.rhs.resolve_expression(
198                query, allow_joins, reuse, summarize, for_save
199            )
200        return c
201
202    def select_format(
203        self, compiler: SQLCompiler, sql: str, params: Sequence[Any]
204    ) -> tuple[str, Sequence[Any]]:
205        # Boolean expressions work directly in SELECT
206        return sql, params
207
208
209class Transform(RegisterLookupMixin, Func):
210    """
211    RegisterLookupMixin() is first so that get_lookup() and get_transform()
212    first examine self and then check output_field.
213    """
214
215    lookup_name: str | None = None
216    bilateral: bool = False
217    arity: int = 1
218
219    @property
220    def lhs(self) -> Any:
221        return self.get_source_expressions()[0]
222
223    def get_bilateral_transforms(self) -> list[type[Transform]]:
224        if hasattr(self.lhs, "get_bilateral_transforms"):
225            bilateral_transforms = self.lhs.get_bilateral_transforms()
226        else:
227            bilateral_transforms = []
228        if self.bilateral:
229            bilateral_transforms.append(self.__class__)
230        return bilateral_transforms
231
232
233class BuiltinLookup(Lookup):
234    def process_lhs(
235        self, compiler: SQLCompiler, connection: DatabaseConnection, lhs: Any = None
236    ) -> tuple[str, list[Any]]:
237        assert self.lookup_name is not None, (
238            "lookup_name must be set on Lookup subclass"
239        )
240        lhs_sql, params = super().process_lhs(compiler, connection, lhs)
241        lhs_sql = lookup_cast(self.lookup_name, self.lhs.output_field) % lhs_sql
242        return lhs_sql, list(params)
243
244    def as_sql(
245        self, compiler: SQLCompiler, connection: DatabaseConnection
246    ) -> tuple[str, list[Any]]:
247        lhs_sql, params = self.process_lhs(compiler, connection)
248        rhs_sql, rhs_params = self.process_rhs(compiler, connection)
249        params.extend(rhs_params)
250        rhs_sql = self.get_rhs_op(connection, rhs_sql)
251        return f"{lhs_sql} {rhs_sql}", params
252
253    def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
254        assert self.lookup_name is not None, (
255            "lookup_name must be set on Lookup subclass"
256        )
257        return OPERATORS[self.lookup_name] % rhs
258
259
260class FieldGetDbPrepValueMixin(Lookup):
261    """
262    Some lookups require Field.get_db_prep_value() to be called on their
263    inputs.
264    """
265
266    get_db_prep_lookup_value_is_iterable: bool = False
267    lhs: Any
268    rhs: Any
269
270    def get_db_prep_lookup(
271        self, value: Any, connection: DatabaseConnection
272    ) -> tuple[str, list[Any]]:
273        # For relational fields, use the 'target_field' attribute of the
274        # output_field.
275        field = getattr(self.lhs.output_field, "target_field", None)
276        get_db_prep_value = (
277            getattr(field, "get_db_prep_value", None)
278            or self.lhs.output_field.get_db_prep_value
279        )
280        return (
281            "%s",
282            [get_db_prep_value(v, connection, prepared=True) for v in value]
283            if self.get_db_prep_lookup_value_is_iterable
284            else [get_db_prep_value(value, connection, prepared=True)],
285        )
286
287
288class FieldGetDbPrepValueIterableMixin(FieldGetDbPrepValueMixin):
289    """
290    Some lookups require Field.get_db_prep_value() to be called on each value
291    in an iterable.
292    """
293
294    get_db_prep_lookup_value_is_iterable: bool = True
295    prepare_rhs: bool
296
297    def get_prep_lookup(self) -> Any:
298        if isinstance(self.rhs, ResolvableExpression):
299            return self.rhs
300        prepared_values = []
301        for rhs_value in self.rhs:
302            if isinstance(rhs_value, ResolvableExpression):
303                # An expression will be handled by the database but can coexist
304                # alongside real values.
305                pass
306            elif self.prepare_rhs:
307                if output_field := getattr(self.lhs, "output_field", None):
308                    if get_prep_value := getattr(output_field, "get_prep_value", None):
309                        rhs_value = get_prep_value(rhs_value)
310            prepared_values.append(rhs_value)
311        return prepared_values
312
313    def process_rhs(
314        self, compiler: SQLCompiler, connection: DatabaseConnection
315    ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
316        if self.rhs_is_direct_value():
317            # rhs should be an iterable of values. Use batch_process_rhs()
318            # to prepare/transform those values.
319            return self.batch_process_rhs(compiler, connection)
320        else:
321            return super().process_rhs(compiler, connection)
322
323    def resolve_expression_parameter(
324        self,
325        compiler: SQLCompiler,
326        connection: DatabaseConnection,
327        sql: str,
328        param: Any,
329    ) -> tuple[str, list[Any]]:
330        params: list[Any] = [param]
331        if isinstance(param, ResolvableExpression):
332            param = param.resolve_expression(compiler.query)
333        if hasattr(param, "as_sql"):
334            sql, compiled_params = compiler.compile(param)
335            params = list(compiled_params)
336        return sql, params
337
338    def batch_process_rhs(
339        self, compiler: SQLCompiler, connection: DatabaseConnection, rhs: Any = None
340    ) -> tuple[list[str], list[Any]]:
341        pre_processed = super().batch_process_rhs(compiler, connection, rhs)
342        # The params list may contain expressions which compile to a
343        # sql/param pair. Zip them to get sql and param pairs that refer to the
344        # same argument and attempt to replace them with the result of
345        # compiling the param step.
346        sql, params = zip(
347            *(
348                self.resolve_expression_parameter(compiler, connection, sql, param)
349                for sql, param in zip(*pre_processed)
350            )
351        )
352        params_list = list(itertools.chain.from_iterable(params))
353        return list(sql), params_list
354
355
356class OperatorLookup(Lookup):
357    """Lookup defined by a SQL operator."""
358
359    operator: str | None = None
360
361    def as_sql(
362        self, compiler: SQLCompiler, connection: DatabaseConnection
363    ) -> tuple[str, tuple[Any, ...]]:
364        lhs, lhs_params = self.process_lhs(compiler, connection)
365        rhs, rhs_params = self.process_rhs(compiler, connection)
366        params = tuple(lhs_params) + tuple(rhs_params)
367        return f"{lhs} {self.operator} {rhs}", params
368
369
370@Field.register_lookup
371class Exact(FieldGetDbPrepValueMixin, BuiltinLookup):
372    lookup_name: str = "exact"
373
374    def get_prep_lookup(self) -> Any:
375        from plain.postgres.sql.query import Query  # avoid circular import
376
377        if isinstance(self.rhs, Query):
378            if self.rhs.has_limit_one():
379                if not self.rhs.has_select_fields:
380                    self.rhs.clear_select_clause()
381                    self.rhs.add_fields(["id"])
382            else:
383                raise ValueError(
384                    "The QuerySet value for an exact lookup must be limited to "
385                    "one result using slicing."
386                )
387        return super().get_prep_lookup()
388
389    def as_sql(
390        self, compiler: SQLCompiler, connection: DatabaseConnection
391    ) -> tuple[str, list[Any]]:
392        # Avoid comparison against direct rhs if lhs is a boolean value. That
393        # turns "boolfield__exact=True" into "WHERE boolean_field" instead of
394        # "WHERE boolean_field = True" when allowed.
395        if isinstance(self.rhs, bool) and getattr(self.lhs, "conditional", False):
396            lhs_sql, params = self.process_lhs(compiler, connection)
397            template = "%s" if self.rhs else "NOT %s"
398            return template % lhs_sql, params
399        return super().as_sql(compiler, connection)
400
401
402@Field.register_lookup
403class IExact(BuiltinLookup):
404    lookup_name: str = "iexact"
405    prepare_rhs: bool = False
406
407
408@Field.register_lookup
409class GreaterThan(FieldGetDbPrepValueMixin, BuiltinLookup):
410    lookup_name: str = "gt"
411
412
413@Field.register_lookup
414class GreaterThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup):
415    lookup_name: str = "gte"
416
417
418@Field.register_lookup
419class LessThan(FieldGetDbPrepValueMixin, BuiltinLookup):
420    lookup_name: str = "lt"
421
422
423@Field.register_lookup
424class LessThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup):
425    lookup_name: str = "lte"
426
427
428class IntegerFieldOverflow:
429    underflow_exception: type[Exception] = EmptyResultSet
430    overflow_exception: type[Exception] = EmptyResultSet
431    lhs: Any
432    rhs: Any
433
434    def process_rhs(
435        self, compiler: SQLCompiler, connection: DatabaseConnection
436    ) -> tuple[str, list[Any]]:
437        rhs = self.rhs
438        if isinstance(rhs, int):
439            min_value, max_value = self.lhs.output_field.integer_range
440            if min_value is not None and rhs < min_value:
441                raise self.underflow_exception
442            if max_value is not None and rhs > max_value:
443                raise self.overflow_exception
444        return super().process_rhs(compiler, connection)  # ty: ignore[unresolved-attribute]
445
446
447class IntegerFieldFloatRounding:
448    """
449    Allow floats to work as query values for IntegerField. Without this, the
450    decimal portion of the float would always be discarded.
451    """
452
453    rhs: Any
454
455    def get_prep_lookup(self) -> Any:
456        if isinstance(self.rhs, float):
457            self.rhs = math.ceil(self.rhs)
458        return super().get_prep_lookup()  # ty: ignore[unresolved-attribute]
459
460
461@IntegerField.register_lookup
462class IntegerFieldExact(IntegerFieldOverflow, Exact):
463    pass
464
465
466@IntegerField.register_lookup
467class IntegerGreaterThan(IntegerFieldOverflow, GreaterThan):
468    underflow_exception = FullResultSet
469
470
471@IntegerField.register_lookup
472class IntegerGreaterThanOrEqual(
473    IntegerFieldOverflow, IntegerFieldFloatRounding, GreaterThanOrEqual
474):
475    underflow_exception = FullResultSet
476
477
478@IntegerField.register_lookup
479class IntegerLessThan(IntegerFieldOverflow, IntegerFieldFloatRounding, LessThan):
480    overflow_exception = FullResultSet
481
482
483@IntegerField.register_lookup
484class IntegerLessThanOrEqual(IntegerFieldOverflow, LessThanOrEqual):
485    overflow_exception = FullResultSet
486
487
488@Field.register_lookup
489class In(FieldGetDbPrepValueIterableMixin, BuiltinLookup):
490    lookup_name: str = "in"
491
492    def get_prep_lookup(self) -> Any:
493        from plain.postgres.sql.query import Query  # avoid circular import
494
495        if isinstance(self.rhs, Query):
496            self.rhs.clear_ordering(clear_default=True)
497            if not self.rhs.has_select_fields:
498                self.rhs.clear_select_clause()
499                self.rhs.add_fields(["id"])
500        return super().get_prep_lookup()
501
502    def process_rhs(
503        self, compiler: SQLCompiler, connection: DatabaseConnection
504    ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
505        if self.rhs_is_direct_value():
506            # Remove None from the list as NULL is never equal to anything.
507            try:
508                rhs = OrderedSet(self.rhs)
509                rhs.discard(None)
510            except TypeError:  # Unhashable items in self.rhs
511                rhs = [r for r in self.rhs if r is not None]
512
513            if not rhs:
514                raise EmptyResultSet
515
516            # rhs should be an iterable; use batch_process_rhs() to
517            # prepare/transform those values.
518            sqls, sqls_params = self.batch_process_rhs(compiler, connection, rhs)
519            placeholder = "(" + ", ".join(sqls) + ")"
520            return (placeholder, sqls_params)
521        return super().process_rhs(compiler, connection)
522
523    def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
524        return f"IN {rhs}"
525
526    # PostgreSQL has no limit on IN clause size, so no need to override as_sql()
527
528
529class PatternLookup(BuiltinLookup):
530    param_pattern: str = "%%%s%%"
531    prepare_rhs: bool = False
532    bilateral_transforms: list[Any]
533
534    def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
535        # Assume we are in startswith. We need to produce SQL like:
536        #     col LIKE %s, ['thevalue%']
537        # For python values we can (and should) do that directly in Python,
538        # but if the value is for example reference to other column, then
539        # we need to add the % pattern match to the lookup by something like
540        #     col LIKE othercol || '%%'
541        # So, for Python values we don't need any special pattern, but for
542        # SQL reference values or SQL transformations we need the correct
543        # pattern added.
544        if hasattr(self.rhs, "as_sql") or self.bilateral_transforms:
545            assert self.lookup_name is not None, (
546                "lookup_name must be set on Lookup subclass"
547            )
548            pattern = PATTERN_OPS[self.lookup_name].format(PATTERN_ESC)
549            return pattern.format(rhs)
550        else:
551            return super().get_rhs_op(connection, rhs)
552
553    def process_rhs(
554        self, compiler: SQLCompiler, connection: DatabaseConnection
555    ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
556        rhs, params = super().process_rhs(compiler, connection)
557        if isinstance(rhs, str):
558            if self.rhs_is_direct_value() and params and not self.bilateral_transforms:
559                params[0] = self.param_pattern % prep_for_like_query(params[0])
560            return rhs, params
561        else:
562            return rhs, params
563
564
565@Field.register_lookup
566class Contains(PatternLookup):
567    lookup_name: str = "contains"
568
569
570@Field.register_lookup
571class IContains(Contains):
572    lookup_name: str = "icontains"
573
574
575@Field.register_lookup
576class StartsWith(PatternLookup):
577    lookup_name: str = "startswith"
578    param_pattern: str = "%s%%"
579
580
581@Field.register_lookup
582class IStartsWith(StartsWith):
583    lookup_name: str = "istartswith"
584
585
586@Field.register_lookup
587class EndsWith(PatternLookup):
588    lookup_name: str = "endswith"
589    param_pattern: str = "%%%s"
590
591
592@Field.register_lookup
593class IEndsWith(EndsWith):
594    lookup_name: str = "iendswith"
595
596
597@Field.register_lookup
598class Range(FieldGetDbPrepValueIterableMixin, BuiltinLookup):
599    lookup_name: str = "range"
600
601    def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
602        # Range lookup always receives a list of two elements from process_rhs
603        assert isinstance(rhs, list), f"Range lookup expects list, got {type(rhs)}"
604        return f"BETWEEN {rhs[0]} AND {rhs[1]}"
605
606
607@Field.register_lookup
608class IsNull(BuiltinLookup):
609    lookup_name: str = "isnull"
610    prepare_rhs: bool = False
611
612    def as_sql(
613        self, compiler: SQLCompiler, connection: DatabaseConnection
614    ) -> tuple[str, list[Any]]:
615        if not isinstance(self.rhs, bool):
616            raise ValueError(
617                "The QuerySet value for an isnull lookup must be True or False."
618            )
619        sql, params = self.process_lhs(compiler, connection)
620        if self.rhs:
621            return f"{sql} IS NULL", params
622        else:
623            return f"{sql} IS NOT NULL", params
624
625
626@Field.register_lookup
627class Regex(BuiltinLookup):
628    lookup_name: str = "regex"
629    prepare_rhs: bool = False
630
631    def as_sql(
632        self, compiler: SQLCompiler, connection: DatabaseConnection
633    ) -> tuple[str, list[Any]]:
634        if self.lookup_name in OPERATORS:
635            return super().as_sql(compiler, connection)
636        else:
637            lhs, lhs_params = self.process_lhs(compiler, connection)
638            rhs, rhs_params = self.process_rhs(compiler, connection)
639            sql_template = regex_lookup(self.lookup_name)
640            return sql_template % (lhs, rhs), lhs_params + rhs_params
641
642
643@Field.register_lookup
644class IRegex(Regex):
645    lookup_name: str = "iregex"
646
647
648class YearLookup(Lookup):
649    def year_lookup_bounds(
650        self, connection: DatabaseConnection, year: int
651    ) -> list[datetime.date] | list[datetime.datetime]:
652        from plain.postgres.functions import ExtractIsoYear
653
654        iso_year = isinstance(self.lhs, ExtractIsoYear)
655        output_field = self.lhs.lhs.output_field
656        if isinstance(output_field, DateTimeField):
657            bounds = year_lookup_bounds_for_datetime_field(year, iso_year=iso_year)
658        else:
659            bounds = year_lookup_bounds_for_date_field(year, iso_year=iso_year)
660        return bounds
661
662    def as_sql(
663        self, compiler: SQLCompiler, connection: DatabaseConnection
664    ) -> tuple[str, Sequence[Any]]:
665        # Avoid the extract operation if the rhs is a direct value to allow
666        # indexes to be used.
667        if self.rhs_is_direct_value():
668            # Skip the extract part by directly using the originating field,
669            # that is self.lhs.lhs.
670            lhs_sql, params = self.process_lhs(compiler, connection, self.lhs.lhs)
671            rhs_sql, _ = self.process_rhs(compiler, connection)
672            # rhs_sql should be a string for year lookups
673            assert isinstance(rhs_sql, str), f"Expected str, got {type(rhs_sql)}"
674            rhs_sql = self.get_direct_rhs_sql(connection, rhs_sql)
675            start, finish = self.year_lookup_bounds(connection, self.rhs)
676            params.extend(self.get_bound_params(start, finish))
677            return f"{lhs_sql} {rhs_sql}", params
678        return super().as_sql(compiler, connection)
679
680    def get_direct_rhs_sql(self, connection: DatabaseConnection, rhs: str) -> str:
681        assert self.lookup_name is not None, (
682            "lookup_name must be set on Lookup subclass"
683        )
684        return OPERATORS[self.lookup_name] % rhs
685
686    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any, ...]:
687        """Return bound parameters for the year lookup."""
688        raise NotImplementedError("Subclasses must implement get_bound_params()")
689
690
691class YearExact(YearLookup, Exact):
692    def get_direct_rhs_sql(self, connection: DatabaseConnection, rhs: str) -> str:
693        return "BETWEEN %s AND %s"
694
695    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any, Any]:
696        return (start, finish)
697
698
699class YearGt(YearLookup, GreaterThan):
700    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
701        return (finish,)
702
703
704class YearGte(YearLookup, GreaterThanOrEqual):
705    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
706        return (start,)
707
708
709class YearLt(YearLookup, LessThan):
710    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
711        return (start,)
712
713
714class YearLte(YearLookup, LessThanOrEqual):
715    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
716        return (finish,)
717
718
719# UUID lookups - PostgreSQL has native UUID support so these inherit directly
720# from their base classes without any special processing.
721
722
723@UUIDField.register_lookup
724class UUIDIExact(IExact):
725    pass
726
727
728@UUIDField.register_lookup
729class UUIDContains(Contains):
730    pass
731
732
733@UUIDField.register_lookup
734class UUIDIContains(IContains):
735    pass
736
737
738@UUIDField.register_lookup
739class UUIDStartsWith(StartsWith):
740    pass
741
742
743@UUIDField.register_lookup
744class UUIDIStartsWith(IStartsWith):
745    pass
746
747
748@UUIDField.register_lookup
749class UUIDEndsWith(EndsWith):
750    pass
751
752
753@UUIDField.register_lookup
754class UUIDIEndsWith(IEndsWith):
755    pass