1from __future__ import annotations
  2
  3import datetime
  4import itertools
  5import math
  6from collections.abc import Sequence
  7from functools import cached_property
  8from typing import TYPE_CHECKING, Any
  9
 10from plain.postgres.dialect import (
 11    INTEGER_FIELD_RANGES,
 12    OPERATORS,
 13    PATTERN_ESC,
 14    PATTERN_OPS,
 15    lookup_cast,
 16    prep_for_like_query,
 17    regex_lookup,
 18    year_lookup_bounds_for_date_field,
 19    year_lookup_bounds_for_datetime_field,
 20)
 21from plain.postgres.exceptions import EmptyResultSet, FullResultSet
 22from plain.postgres.expressions import Expression, Func, ResolvableExpression, Value
 23from plain.postgres.fields import (
 24    BooleanField,
 25    DateTimeField,
 26    Field,
 27    IntegerField,
 28    UUIDField,
 29)
 30from plain.postgres.query_utils import RegisterLookupMixin
 31from plain.utils.datastructures import OrderedSet
 32from plain.utils.hashable import make_hashable
 33
 34if TYPE_CHECKING:
 35    from plain.postgres.connection import DatabaseConnection
 36    from plain.postgres.sql.compiler import SQLCompiler
 37
 38
 39class Lookup(Expression):
 40    lookup_name: str | None = None
 41    prepare_rhs: bool = True
 42    can_use_none_as_rhs: bool = False
 43    lhs: Any
 44    rhs: Any
 45
 46    def __init__(self, lhs: Any, rhs: Any):
 47        self.lhs, self.rhs = lhs, rhs
 48        self.rhs = self.get_prep_lookup()
 49        self.lhs = self.get_prep_lhs()
 50        if hasattr(self.lhs, "get_bilateral_transforms"):
 51            bilateral_transforms = self.lhs.get_bilateral_transforms()
 52        else:
 53            bilateral_transforms = []
 54        if bilateral_transforms:
 55            # Warn the user as soon as possible if they are trying to apply
 56            # a bilateral transformation on a nested QuerySet: that won't work.
 57            from plain.postgres.sql.query import Query  # avoid circular import
 58
 59            if isinstance(rhs, Query):
 60                raise NotImplementedError(
 61                    "Bilateral transformations on nested querysets are not implemented."
 62                )
 63        self.bilateral_transforms = bilateral_transforms
 64
 65    def apply_bilateral_transforms(self, value: Any) -> Any:
 66        for transform in self.bilateral_transforms:
 67            value = transform(value)
 68        return value
 69
 70    def __repr__(self) -> str:
 71        return f"{self.__class__.__name__}({self.lhs!r}, {self.rhs!r})"
 72
 73    def batch_process_rhs(
 74        self, compiler: SQLCompiler, connection: DatabaseConnection, rhs: Any = None
 75    ) -> tuple[list[str], list[Any]]:
 76        if rhs is None:
 77            rhs = self.rhs
 78        if self.bilateral_transforms:
 79            sqls: list[str] = []
 80            sqls_params: list[Any] = []
 81            for p in rhs:
 82                value = Value(p, output_field=self.lhs.output_field)
 83                value = self.apply_bilateral_transforms(value)
 84                value = value.resolve_expression(compiler.query)
 85                sql, sql_params = compiler.compile(value)
 86                sqls.append(sql)
 87                sqls_params.extend(sql_params)
 88        else:
 89            _, params = self.get_db_prep_lookup(rhs, connection)
 90            sqls = ["%s"] * len(params)
 91            sqls_params = list(params)
 92        return sqls, sqls_params
 93
 94    def get_source_expressions(self) -> list[Any]:
 95        if self.rhs_is_direct_value():
 96            return [self.lhs]
 97        return [self.lhs, self.rhs]
 98
 99    def set_source_expressions(self, exprs: Sequence[Any]) -> None:
100        exprs_list = list(exprs)
101        if len(exprs_list) == 1:
102            self.lhs = exprs_list[0]
103        else:
104            self.lhs, self.rhs = exprs_list
105
106    def get_prep_lookup(self) -> Any:
107        if not self.prepare_rhs or isinstance(self.rhs, ResolvableExpression):
108            return self.rhs
109        if output_field := getattr(self.lhs, "output_field", None):
110            if get_prep_value := getattr(output_field, "get_prep_value", None):
111                return get_prep_value(self.rhs)
112        elif self.rhs_is_direct_value():
113            return Value(self.rhs)
114        return self.rhs
115
116    def get_prep_lhs(self) -> Any:
117        if isinstance(self.lhs, ResolvableExpression):
118            return self.lhs
119        return Value(self.lhs)
120
121    def get_db_prep_lookup(
122        self, value: Any, connection: DatabaseConnection
123    ) -> tuple[str, list[Any]]:
124        return ("%s", [value])
125
126    def process_lhs(
127        self, compiler: SQLCompiler, connection: DatabaseConnection, lhs: Any = None
128    ) -> tuple[str, list[Any]]:
129        lhs = lhs or self.lhs
130        if isinstance(lhs, ResolvableExpression):
131            lhs = lhs.resolve_expression(compiler.query)
132        sql, params = compiler.compile(lhs)
133        if isinstance(lhs, Lookup):
134            # Wrapped in parentheses to respect operator precedence.
135            sql = f"({sql})"
136        return sql, list(params)
137
138    def process_rhs(
139        self, compiler: SQLCompiler, connection: DatabaseConnection
140    ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
141        value = self.rhs
142        if self.bilateral_transforms:
143            if self.rhs_is_direct_value():
144                # Do not call get_db_prep_lookup here as the value will be
145                # transformed before being used for lookup
146                value = Value(value, output_field=self.lhs.output_field)
147            value = self.apply_bilateral_transforms(value)
148            value = value.resolve_expression(compiler.query)
149        if hasattr(value, "as_sql"):
150            sql, params = compiler.compile(value)
151            # Ensure expression is wrapped in parentheses to respect operator
152            # precedence but avoid double wrapping.
153            if sql and sql[0] != "(":
154                sql = f"({sql})"
155            return sql, list(params)
156        else:
157            return self.get_db_prep_lookup(value, connection)
158
159    def rhs_is_direct_value(self) -> bool:
160        return not hasattr(self.rhs, "as_sql")
161
162    def get_group_by_cols(self) -> list[Any]:
163        cols = []
164        for source in self.get_source_expressions():
165            cols.extend(source.get_group_by_cols())
166        return cols
167
168    @cached_property
169    def output_field(self) -> BooleanField:
170        return BooleanField()
171
172    @property
173    def identity(self) -> tuple[type[Lookup], Any, Any]:
174        return self.__class__, self.lhs, self.rhs
175
176    def __eq__(self, other: object) -> bool:
177        if not isinstance(other, Lookup):
178            return NotImplemented
179        return self.identity == other.identity
180
181    def __hash__(self) -> int:
182        return hash(make_hashable(self.identity))
183
184    def resolve_expression(
185        self,
186        query: Any = None,
187        allow_joins: bool = True,
188        reuse: Any = None,
189        summarize: bool = False,
190        for_save: bool = False,
191    ) -> Lookup:
192        c = self.copy()
193        c.is_summary = summarize
194        c.lhs = self.lhs.resolve_expression(
195            query, allow_joins, reuse, summarize, for_save
196        )
197        if isinstance(self.rhs, ResolvableExpression):
198            c.rhs = self.rhs.resolve_expression(
199                query, allow_joins, reuse, summarize, for_save
200            )
201        return c
202
203    def select_format(
204        self, compiler: SQLCompiler, sql: str, params: Sequence[Any]
205    ) -> tuple[str, Sequence[Any]]:
206        # Boolean expressions work directly in SELECT
207        return sql, params
208
209
210class Transform(RegisterLookupMixin, Func):
211    """
212    RegisterLookupMixin() is first so that get_lookup() and get_transform()
213    first examine self and then check output_field.
214    """
215
216    lookup_name: str | None = None
217    bilateral: bool = False
218    arity: int = 1
219
220    @property
221    def lhs(self) -> Any:
222        return self.get_source_expressions()[0]
223
224    def get_bilateral_transforms(self) -> list[type[Transform]]:
225        if hasattr(self.lhs, "get_bilateral_transforms"):
226            bilateral_transforms = self.lhs.get_bilateral_transforms()
227        else:
228            bilateral_transforms = []
229        if self.bilateral:
230            bilateral_transforms.append(self.__class__)
231        return bilateral_transforms
232
233
234class BuiltinLookup(Lookup):
235    def process_lhs(
236        self, compiler: SQLCompiler, connection: DatabaseConnection, lhs: Any = None
237    ) -> tuple[str, list[Any]]:
238        assert self.lookup_name is not None, (
239            "lookup_name must be set on Lookup subclass"
240        )
241        lhs_sql, params = super().process_lhs(compiler, connection, lhs)
242        field_internal_type = self.lhs.output_field.get_internal_type()
243        lhs_sql = lookup_cast(self.lookup_name, field_internal_type) % lhs_sql
244        return lhs_sql, list(params)
245
246    def as_sql(
247        self, compiler: SQLCompiler, connection: DatabaseConnection
248    ) -> tuple[str, list[Any]]:
249        lhs_sql, params = self.process_lhs(compiler, connection)
250        rhs_sql, rhs_params = self.process_rhs(compiler, connection)
251        params.extend(rhs_params)
252        rhs_sql = self.get_rhs_op(connection, rhs_sql)
253        return f"{lhs_sql} {rhs_sql}", params
254
255    def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
256        assert self.lookup_name is not None, (
257            "lookup_name must be set on Lookup subclass"
258        )
259        return OPERATORS[self.lookup_name] % rhs
260
261
262class FieldGetDbPrepValueMixin(Lookup):
263    """
264    Some lookups require Field.get_db_prep_value() to be called on their
265    inputs.
266    """
267
268    get_db_prep_lookup_value_is_iterable: bool = False
269    lhs: Any
270    rhs: Any
271
272    def get_db_prep_lookup(
273        self, value: Any, connection: DatabaseConnection
274    ) -> tuple[str, list[Any]]:
275        # For relational fields, use the 'target_field' attribute of the
276        # output_field.
277        field = getattr(self.lhs.output_field, "target_field", None)
278        get_db_prep_value = (
279            getattr(field, "get_db_prep_value", None)
280            or self.lhs.output_field.get_db_prep_value
281        )
282        return (
283            "%s",
284            [get_db_prep_value(v, connection, prepared=True) for v in value]
285            if self.get_db_prep_lookup_value_is_iterable
286            else [get_db_prep_value(value, connection, prepared=True)],
287        )
288
289
290class FieldGetDbPrepValueIterableMixin(FieldGetDbPrepValueMixin):
291    """
292    Some lookups require Field.get_db_prep_value() to be called on each value
293    in an iterable.
294    """
295
296    get_db_prep_lookup_value_is_iterable: bool = True
297    prepare_rhs: bool
298
299    def get_prep_lookup(self) -> Any:
300        if isinstance(self.rhs, ResolvableExpression):
301            return self.rhs
302        prepared_values = []
303        for rhs_value in self.rhs:
304            if isinstance(rhs_value, ResolvableExpression):
305                # An expression will be handled by the database but can coexist
306                # alongside real values.
307                pass
308            elif self.prepare_rhs:
309                if output_field := getattr(self.lhs, "output_field", None):
310                    if get_prep_value := getattr(output_field, "get_prep_value", None):
311                        rhs_value = get_prep_value(rhs_value)
312            prepared_values.append(rhs_value)
313        return prepared_values
314
315    def process_rhs(
316        self, compiler: SQLCompiler, connection: DatabaseConnection
317    ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
318        if self.rhs_is_direct_value():
319            # rhs should be an iterable of values. Use batch_process_rhs()
320            # to prepare/transform those values.
321            return self.batch_process_rhs(compiler, connection)
322        else:
323            return super().process_rhs(compiler, connection)
324
325    def resolve_expression_parameter(
326        self,
327        compiler: SQLCompiler,
328        connection: DatabaseConnection,
329        sql: str,
330        param: Any,
331    ) -> tuple[str, list[Any]]:
332        params: list[Any] = [param]
333        if isinstance(param, ResolvableExpression):
334            param = param.resolve_expression(compiler.query)
335        if hasattr(param, "as_sql"):
336            sql, compiled_params = compiler.compile(param)
337            params = list(compiled_params)
338        return sql, params
339
340    def batch_process_rhs(
341        self, compiler: SQLCompiler, connection: DatabaseConnection, rhs: Any = None
342    ) -> tuple[list[str], list[Any]]:
343        pre_processed = super().batch_process_rhs(compiler, connection, rhs)
344        # The params list may contain expressions which compile to a
345        # sql/param pair. Zip them to get sql and param pairs that refer to the
346        # same argument and attempt to replace them with the result of
347        # compiling the param step.
348        sql, params = zip(
349            *(
350                self.resolve_expression_parameter(compiler, connection, sql, param)
351                for sql, param in zip(*pre_processed)
352            )
353        )
354        params_list = list(itertools.chain.from_iterable(params))
355        return list(sql), params_list
356
357
358class OperatorLookup(Lookup):
359    """Lookup defined by a SQL operator."""
360
361    operator: str | None = None
362
363    def as_sql(
364        self, compiler: SQLCompiler, connection: DatabaseConnection
365    ) -> tuple[str, tuple[Any, ...]]:
366        lhs, lhs_params = self.process_lhs(compiler, connection)
367        rhs, rhs_params = self.process_rhs(compiler, connection)
368        params = tuple(lhs_params) + tuple(rhs_params)
369        return f"{lhs} {self.operator} {rhs}", params
370
371
372@Field.register_lookup
373class Exact(FieldGetDbPrepValueMixin, BuiltinLookup):
374    lookup_name: str = "exact"
375
376    def get_prep_lookup(self) -> Any:
377        from plain.postgres.sql.query import Query  # avoid circular import
378
379        if isinstance(self.rhs, Query):
380            if self.rhs.has_limit_one():
381                if not self.rhs.has_select_fields:
382                    self.rhs.clear_select_clause()
383                    self.rhs.add_fields(["id"])
384            else:
385                raise ValueError(
386                    "The QuerySet value for an exact lookup must be limited to "
387                    "one result using slicing."
388                )
389        return super().get_prep_lookup()
390
391    def as_sql(
392        self, compiler: SQLCompiler, connection: DatabaseConnection
393    ) -> tuple[str, list[Any]]:
394        # Avoid comparison against direct rhs if lhs is a boolean value. That
395        # turns "boolfield__exact=True" into "WHERE boolean_field" instead of
396        # "WHERE boolean_field = True" when allowed.
397        if isinstance(self.rhs, bool) and getattr(self.lhs, "conditional", False):
398            lhs_sql, params = self.process_lhs(compiler, connection)
399            template = "%s" if self.rhs else "NOT %s"
400            return template % lhs_sql, params
401        return super().as_sql(compiler, connection)
402
403
404@Field.register_lookup
405class IExact(BuiltinLookup):
406    lookup_name: str = "iexact"
407    prepare_rhs: bool = False
408
409
410@Field.register_lookup
411class GreaterThan(FieldGetDbPrepValueMixin, BuiltinLookup):
412    lookup_name: str = "gt"
413
414
415@Field.register_lookup
416class GreaterThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup):
417    lookup_name: str = "gte"
418
419
420@Field.register_lookup
421class LessThan(FieldGetDbPrepValueMixin, BuiltinLookup):
422    lookup_name: str = "lt"
423
424
425@Field.register_lookup
426class LessThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup):
427    lookup_name: str = "lte"
428
429
430class IntegerFieldOverflow:
431    underflow_exception: type[Exception] = EmptyResultSet
432    overflow_exception: type[Exception] = EmptyResultSet
433    lhs: Any
434    rhs: Any
435
436    def process_rhs(
437        self, compiler: SQLCompiler, connection: DatabaseConnection
438    ) -> tuple[str, list[Any]]:
439        rhs = self.rhs
440        if isinstance(rhs, int):
441            field_internal_type = self.lhs.output_field.get_internal_type()
442            min_value, max_value = INTEGER_FIELD_RANGES[field_internal_type]
443            if min_value is not None and rhs < min_value:
444                raise self.underflow_exception
445            if max_value is not None and rhs > max_value:
446                raise self.overflow_exception
447        return super().process_rhs(compiler, connection)  # type: ignore[misc]
448
449
450class IntegerFieldFloatRounding:
451    """
452    Allow floats to work as query values for IntegerField. Without this, the
453    decimal portion of the float would always be discarded.
454    """
455
456    rhs: Any
457
458    def get_prep_lookup(self) -> Any:
459        if isinstance(self.rhs, float):
460            self.rhs = math.ceil(self.rhs)
461        return super().get_prep_lookup()  # type: ignore[misc]
462
463
464@IntegerField.register_lookup
465class IntegerFieldExact(IntegerFieldOverflow, Exact):
466    pass
467
468
469@IntegerField.register_lookup
470class IntegerGreaterThan(IntegerFieldOverflow, GreaterThan):
471    underflow_exception = FullResultSet
472
473
474@IntegerField.register_lookup
475class IntegerGreaterThanOrEqual(
476    IntegerFieldOverflow, IntegerFieldFloatRounding, GreaterThanOrEqual
477):
478    underflow_exception = FullResultSet
479
480
481@IntegerField.register_lookup
482class IntegerLessThan(IntegerFieldOverflow, IntegerFieldFloatRounding, LessThan):
483    overflow_exception = FullResultSet
484
485
486@IntegerField.register_lookup
487class IntegerLessThanOrEqual(IntegerFieldOverflow, LessThanOrEqual):
488    overflow_exception = FullResultSet
489
490
491@Field.register_lookup
492class In(FieldGetDbPrepValueIterableMixin, BuiltinLookup):
493    lookup_name: str = "in"
494
495    def get_prep_lookup(self) -> Any:
496        from plain.postgres.sql.query import Query  # avoid circular import
497
498        if isinstance(self.rhs, Query):
499            self.rhs.clear_ordering(clear_default=True)
500            if not self.rhs.has_select_fields:
501                self.rhs.clear_select_clause()
502                self.rhs.add_fields(["id"])
503        return super().get_prep_lookup()
504
505    def process_rhs(
506        self, compiler: SQLCompiler, connection: DatabaseConnection
507    ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
508        if self.rhs_is_direct_value():
509            # Remove None from the list as NULL is never equal to anything.
510            try:
511                rhs = OrderedSet(self.rhs)
512                rhs.discard(None)
513            except TypeError:  # Unhashable items in self.rhs
514                rhs = [r for r in self.rhs if r is not None]
515
516            if not rhs:
517                raise EmptyResultSet
518
519            # rhs should be an iterable; use batch_process_rhs() to
520            # prepare/transform those values.
521            sqls, sqls_params = self.batch_process_rhs(compiler, connection, rhs)
522            placeholder = "(" + ", ".join(sqls) + ")"
523            return (placeholder, sqls_params)
524        return super().process_rhs(compiler, connection)
525
526    def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
527        return f"IN {rhs}"
528
529    # PostgreSQL has no limit on IN clause size, so no need to override as_sql()
530
531
532class PatternLookup(BuiltinLookup):
533    param_pattern: str = "%%%s%%"
534    prepare_rhs: bool = False
535    bilateral_transforms: list[Any]
536
537    def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
538        # Assume we are in startswith. We need to produce SQL like:
539        #     col LIKE %s, ['thevalue%']
540        # For python values we can (and should) do that directly in Python,
541        # but if the value is for example reference to other column, then
542        # we need to add the % pattern match to the lookup by something like
543        #     col LIKE othercol || '%%'
544        # So, for Python values we don't need any special pattern, but for
545        # SQL reference values or SQL transformations we need the correct
546        # pattern added.
547        if hasattr(self.rhs, "as_sql") or self.bilateral_transforms:
548            assert self.lookup_name is not None, (
549                "lookup_name must be set on Lookup subclass"
550            )
551            pattern = PATTERN_OPS[self.lookup_name].format(PATTERN_ESC)
552            return pattern.format(rhs)
553        else:
554            return super().get_rhs_op(connection, rhs)
555
556    def process_rhs(
557        self, compiler: SQLCompiler, connection: DatabaseConnection
558    ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
559        rhs, params = super().process_rhs(compiler, connection)
560        if isinstance(rhs, str):
561            if self.rhs_is_direct_value() and params and not self.bilateral_transforms:
562                params[0] = self.param_pattern % prep_for_like_query(params[0])
563            return rhs, params
564        else:
565            return rhs, params
566
567
568@Field.register_lookup
569class Contains(PatternLookup):
570    lookup_name: str = "contains"
571
572
573@Field.register_lookup
574class IContains(Contains):
575    lookup_name: str = "icontains"
576
577
578@Field.register_lookup
579class StartsWith(PatternLookup):
580    lookup_name: str = "startswith"
581    param_pattern: str = "%s%%"
582
583
584@Field.register_lookup
585class IStartsWith(StartsWith):
586    lookup_name: str = "istartswith"
587
588
589@Field.register_lookup
590class EndsWith(PatternLookup):
591    lookup_name: str = "endswith"
592    param_pattern: str = "%%%s"
593
594
595@Field.register_lookup
596class IEndsWith(EndsWith):
597    lookup_name: str = "iendswith"
598
599
600@Field.register_lookup
601class Range(FieldGetDbPrepValueIterableMixin, BuiltinLookup):
602    lookup_name: str = "range"
603
604    def get_rhs_op(self, connection: DatabaseConnection, rhs: str | list[str]) -> str:
605        # Range lookup always receives a list of two elements from process_rhs
606        assert isinstance(rhs, list), f"Range lookup expects list, got {type(rhs)}"
607        return f"BETWEEN {rhs[0]} AND {rhs[1]}"
608
609
610@Field.register_lookup
611class IsNull(BuiltinLookup):
612    lookup_name: str = "isnull"
613    prepare_rhs: bool = False
614
615    def as_sql(
616        self, compiler: SQLCompiler, connection: DatabaseConnection
617    ) -> tuple[str, list[Any]]:
618        if not isinstance(self.rhs, bool):
619            raise ValueError(
620                "The QuerySet value for an isnull lookup must be True or False."
621            )
622        sql, params = self.process_lhs(compiler, connection)
623        if self.rhs:
624            return f"{sql} IS NULL", params
625        else:
626            return f"{sql} IS NOT NULL", params
627
628
629@Field.register_lookup
630class Regex(BuiltinLookup):
631    lookup_name: str = "regex"
632    prepare_rhs: bool = False
633
634    def as_sql(
635        self, compiler: SQLCompiler, connection: DatabaseConnection
636    ) -> tuple[str, list[Any]]:
637        if self.lookup_name in OPERATORS:
638            return super().as_sql(compiler, connection)
639        else:
640            lhs, lhs_params = self.process_lhs(compiler, connection)
641            rhs, rhs_params = self.process_rhs(compiler, connection)
642            sql_template = regex_lookup(self.lookup_name)
643            return sql_template % (lhs, rhs), lhs_params + rhs_params
644
645
646@Field.register_lookup
647class IRegex(Regex):
648    lookup_name: str = "iregex"
649
650
651class YearLookup(Lookup):
652    def year_lookup_bounds(
653        self, connection: DatabaseConnection, year: int
654    ) -> list[datetime.date] | list[datetime.datetime]:
655        from plain.postgres.functions import ExtractIsoYear
656
657        iso_year = isinstance(self.lhs, ExtractIsoYear)
658        output_field = self.lhs.lhs.output_field
659        if isinstance(output_field, DateTimeField):
660            bounds = year_lookup_bounds_for_datetime_field(year, iso_year=iso_year)
661        else:
662            bounds = year_lookup_bounds_for_date_field(year, iso_year=iso_year)
663        return bounds
664
665    def as_sql(
666        self, compiler: SQLCompiler, connection: DatabaseConnection
667    ) -> tuple[str, Sequence[Any]]:
668        # Avoid the extract operation if the rhs is a direct value to allow
669        # indexes to be used.
670        if self.rhs_is_direct_value():
671            # Skip the extract part by directly using the originating field,
672            # that is self.lhs.lhs.
673            lhs_sql, params = self.process_lhs(compiler, connection, self.lhs.lhs)
674            rhs_sql, _ = self.process_rhs(compiler, connection)
675            # rhs_sql should be a string for year lookups
676            assert isinstance(rhs_sql, str), f"Expected str, got {type(rhs_sql)}"
677            rhs_sql = self.get_direct_rhs_sql(connection, rhs_sql)
678            start, finish = self.year_lookup_bounds(connection, self.rhs)
679            params.extend(self.get_bound_params(start, finish))
680            return f"{lhs_sql} {rhs_sql}", params
681        return super().as_sql(compiler, connection)
682
683    def get_direct_rhs_sql(self, connection: DatabaseConnection, rhs: str) -> str:
684        assert self.lookup_name is not None, (
685            "lookup_name must be set on Lookup subclass"
686        )
687        return OPERATORS[self.lookup_name] % rhs
688
689    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any, ...]:
690        """Return bound parameters for the year lookup."""
691        raise NotImplementedError("Subclasses must implement get_bound_params()")
692
693
694class YearExact(YearLookup, Exact):
695    def get_direct_rhs_sql(self, connection: DatabaseConnection, rhs: str) -> str:
696        return "BETWEEN %s AND %s"
697
698    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any, Any]:
699        return (start, finish)
700
701
702class YearGt(YearLookup, GreaterThan):
703    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
704        return (finish,)
705
706
707class YearGte(YearLookup, GreaterThanOrEqual):
708    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
709        return (start,)
710
711
712class YearLt(YearLookup, LessThan):
713    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
714        return (start,)
715
716
717class YearLte(YearLookup, LessThanOrEqual):
718    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
719        return (finish,)
720
721
722# UUID lookups - PostgreSQL has native UUID support so these inherit directly
723# from their base classes without any special processing.
724
725
726@UUIDField.register_lookup
727class UUIDIExact(IExact):
728    pass
729
730
731@UUIDField.register_lookup
732class UUIDContains(Contains):
733    pass
734
735
736@UUIDField.register_lookup
737class UUIDIContains(IContains):
738    pass
739
740
741@UUIDField.register_lookup
742class UUIDStartsWith(StartsWith):
743    pass
744
745
746@UUIDField.register_lookup
747class UUIDIStartsWith(IStartsWith):
748    pass
749
750
751@UUIDField.register_lookup
752class UUIDEndsWith(EndsWith):
753    pass
754
755
756@UUIDField.register_lookup
757class UUIDIEndsWith(IEndsWith):
758    pass