1from __future__ import annotations
  2
  3import datetime
  4import itertools
  5import math
  6from collections.abc import Sequence
  7from functools import cached_property
  8from typing import TYPE_CHECKING, Any
  9
 10from plain.models.exceptions import EmptyResultSet, FullResultSet
 11from plain.models.expressions import Expression, Func, ResolvableExpression, Value
 12from plain.models.fields import (
 13    BooleanField,
 14    DateTimeField,
 15    Field,
 16    IntegerField,
 17    UUIDField,
 18)
 19from plain.models.postgres.sql import (
 20    INTEGER_FIELD_RANGES,
 21    OPERATORS,
 22    PATTERN_ESC,
 23    PATTERN_OPS,
 24    lookup_cast,
 25    prep_for_like_query,
 26    regex_lookup,
 27    year_lookup_bounds_for_date_field,
 28    year_lookup_bounds_for_datetime_field,
 29)
 30from plain.models.query_utils import RegisterLookupMixin
 31from plain.utils.datastructures import OrderedSet
 32from plain.utils.hashable import make_hashable
 33
 34if TYPE_CHECKING:
 35    from plain.models.postgres.wrapper import DatabaseWrapper
 36    from plain.models.sql.compiler import SQLCompiler
 37
 38
 39class Lookup(Expression):
 40    lookup_name: str | None = None
 41    prepare_rhs: bool = True
 42    can_use_none_as_rhs: bool = False
 43    lhs: Any
 44    rhs: Any
 45
 46    def __init__(self, lhs: Any, rhs: Any):
 47        self.lhs, self.rhs = lhs, rhs
 48        self.rhs = self.get_prep_lookup()
 49        self.lhs = self.get_prep_lhs()
 50        if hasattr(self.lhs, "get_bilateral_transforms"):
 51            bilateral_transforms = self.lhs.get_bilateral_transforms()
 52        else:
 53            bilateral_transforms = []
 54        if bilateral_transforms:
 55            # Warn the user as soon as possible if they are trying to apply
 56            # a bilateral transformation on a nested QuerySet: that won't work.
 57            from plain.models.sql.query import Query  # avoid circular import
 58
 59            if isinstance(rhs, Query):
 60                raise NotImplementedError(
 61                    "Bilateral transformations on nested querysets are not implemented."
 62                )
 63        self.bilateral_transforms = bilateral_transforms
 64
 65    def apply_bilateral_transforms(self, value: Any) -> Any:
 66        for transform in self.bilateral_transforms:
 67            value = transform(value)
 68        return value
 69
 70    def __repr__(self) -> str:
 71        return f"{self.__class__.__name__}({self.lhs!r}, {self.rhs!r})"
 72
 73    def batch_process_rhs(
 74        self, compiler: SQLCompiler, connection: DatabaseWrapper, rhs: Any = None
 75    ) -> tuple[list[str], list[Any]]:
 76        if rhs is None:
 77            rhs = self.rhs
 78        if self.bilateral_transforms:
 79            sqls: list[str] = []
 80            sqls_params: list[Any] = []
 81            for p in rhs:
 82                value = Value(p, output_field=self.lhs.output_field)
 83                value = self.apply_bilateral_transforms(value)
 84                value = value.resolve_expression(compiler.query)
 85                sql, sql_params = compiler.compile(value)
 86                sqls.append(sql)
 87                sqls_params.extend(sql_params)
 88        else:
 89            _, params = self.get_db_prep_lookup(rhs, connection)
 90            sqls = ["%s"] * len(params)
 91            sqls_params = list(params)
 92        return sqls, sqls_params
 93
 94    def get_source_expressions(self) -> list[Any]:
 95        if self.rhs_is_direct_value():
 96            return [self.lhs]
 97        return [self.lhs, self.rhs]
 98
 99    def set_source_expressions(self, exprs: Sequence[Any]) -> None:
100        exprs_list = list(exprs)
101        if len(exprs_list) == 1:
102            self.lhs = exprs_list[0]
103        else:
104            self.lhs, self.rhs = exprs_list
105
106    def get_prep_lookup(self) -> Any:
107        if not self.prepare_rhs or isinstance(self.rhs, ResolvableExpression):
108            return self.rhs
109        if output_field := getattr(self.lhs, "output_field", None):
110            if get_prep_value := getattr(output_field, "get_prep_value", None):
111                return get_prep_value(self.rhs)
112        elif self.rhs_is_direct_value():
113            return Value(self.rhs)
114        return self.rhs
115
116    def get_prep_lhs(self) -> Any:
117        if isinstance(self.lhs, ResolvableExpression):
118            return self.lhs
119        return Value(self.lhs)
120
121    def get_db_prep_lookup(
122        self, value: Any, connection: DatabaseWrapper
123    ) -> tuple[str, list[Any]]:
124        return ("%s", [value])
125
126    def process_lhs(
127        self, compiler: SQLCompiler, connection: DatabaseWrapper, lhs: Any = None
128    ) -> tuple[str, list[Any]]:
129        lhs = lhs or self.lhs
130        if isinstance(lhs, ResolvableExpression):
131            lhs = lhs.resolve_expression(compiler.query)
132        sql, params = compiler.compile(lhs)
133        if isinstance(lhs, Lookup):
134            # Wrapped in parentheses to respect operator precedence.
135            sql = f"({sql})"
136        return sql, list(params)
137
138    def process_rhs(
139        self, compiler: SQLCompiler, connection: DatabaseWrapper
140    ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
141        value = self.rhs
142        if self.bilateral_transforms:
143            if self.rhs_is_direct_value():
144                # Do not call get_db_prep_lookup here as the value will be
145                # transformed before being used for lookup
146                value = Value(value, output_field=self.lhs.output_field)
147            value = self.apply_bilateral_transforms(value)
148            value = value.resolve_expression(compiler.query)
149        if hasattr(value, "as_sql"):
150            sql, params = compiler.compile(value)
151            # Ensure expression is wrapped in parentheses to respect operator
152            # precedence but avoid double wrapping.
153            if sql and sql[0] != "(":
154                sql = f"({sql})"
155            return sql, list(params)
156        else:
157            return self.get_db_prep_lookup(value, connection)
158
159    def rhs_is_direct_value(self) -> bool:
160        return not hasattr(self.rhs, "as_sql")
161
162    def get_group_by_cols(self) -> list[Any]:
163        cols = []
164        for source in self.get_source_expressions():
165            cols.extend(source.get_group_by_cols())
166        return cols
167
168    @cached_property
169    def output_field(self) -> BooleanField:
170        return BooleanField()
171
172    @property
173    def identity(self) -> tuple[type[Lookup], Any, Any]:
174        return self.__class__, self.lhs, self.rhs
175
176    def __eq__(self, other: object) -> bool:
177        if not isinstance(other, Lookup):
178            return NotImplemented
179        return self.identity == other.identity
180
181    def __hash__(self) -> int:
182        return hash(make_hashable(self.identity))
183
184    def resolve_expression(
185        self,
186        query: Any = None,
187        allow_joins: bool = True,
188        reuse: Any = None,
189        summarize: bool = False,
190        for_save: bool = False,
191    ) -> Lookup:
192        c = self.copy()
193        c.is_summary = summarize
194        c.lhs = self.lhs.resolve_expression(
195            query, allow_joins, reuse, summarize, for_save
196        )
197        if isinstance(self.rhs, ResolvableExpression):
198            c.rhs = self.rhs.resolve_expression(
199                query, allow_joins, reuse, summarize, for_save
200            )
201        return c
202
203    def select_format(
204        self, compiler: SQLCompiler, sql: str, params: Sequence[Any]
205    ) -> tuple[str, Sequence[Any]]:
206        # Boolean expressions work directly in SELECT
207        return sql, params
208
209
210class Transform(RegisterLookupMixin, Func):
211    """
212    RegisterLookupMixin() is first so that get_lookup() and get_transform()
213    first examine self and then check output_field.
214    """
215
216    bilateral: bool = False
217    arity: int = 1
218
219    @property
220    def lhs(self) -> Any:
221        return self.get_source_expressions()[0]
222
223    def get_bilateral_transforms(self) -> list[type[Transform]]:
224        if hasattr(self.lhs, "get_bilateral_transforms"):
225            bilateral_transforms = self.lhs.get_bilateral_transforms()
226        else:
227            bilateral_transforms = []
228        if self.bilateral:
229            bilateral_transforms.append(self.__class__)
230        return bilateral_transforms
231
232
233class BuiltinLookup(Lookup):
234    def process_lhs(
235        self, compiler: SQLCompiler, connection: DatabaseWrapper, lhs: Any = None
236    ) -> tuple[str, list[Any]]:
237        assert self.lookup_name is not None, (
238            "lookup_name must be set on Lookup subclass"
239        )
240        lhs_sql, params = super().process_lhs(compiler, connection, lhs)
241        field_internal_type = self.lhs.output_field.get_internal_type()
242        lhs_sql = lookup_cast(self.lookup_name, field_internal_type) % lhs_sql
243        return lhs_sql, list(params)
244
245    def as_sql(
246        self, compiler: SQLCompiler, connection: DatabaseWrapper
247    ) -> tuple[str, list[Any]]:
248        lhs_sql, params = self.process_lhs(compiler, connection)
249        rhs_sql, rhs_params = self.process_rhs(compiler, connection)
250        params.extend(rhs_params)
251        rhs_sql = self.get_rhs_op(connection, rhs_sql)
252        return f"{lhs_sql} {rhs_sql}", params
253
254    def get_rhs_op(self, connection: DatabaseWrapper, rhs: str | list[str]) -> str:
255        assert self.lookup_name is not None, (
256            "lookup_name must be set on Lookup subclass"
257        )
258        return OPERATORS[self.lookup_name] % rhs
259
260
261class FieldGetDbPrepValueMixin(Lookup):
262    """
263    Some lookups require Field.get_db_prep_value() to be called on their
264    inputs.
265    """
266
267    get_db_prep_lookup_value_is_iterable: bool = False
268    lhs: Any
269    rhs: Any
270
271    def get_db_prep_lookup(
272        self, value: Any, connection: DatabaseWrapper
273    ) -> tuple[str, list[Any]]:
274        # For relational fields, use the 'target_field' attribute of the
275        # output_field.
276        field = getattr(self.lhs.output_field, "target_field", None)
277        get_db_prep_value = (
278            getattr(field, "get_db_prep_value", None)
279            or self.lhs.output_field.get_db_prep_value
280        )
281        return (
282            "%s",
283            [get_db_prep_value(v, connection, prepared=True) for v in value]
284            if self.get_db_prep_lookup_value_is_iterable
285            else [get_db_prep_value(value, connection, prepared=True)],
286        )
287
288
289class FieldGetDbPrepValueIterableMixin(FieldGetDbPrepValueMixin):
290    """
291    Some lookups require Field.get_db_prep_value() to be called on each value
292    in an iterable.
293    """
294
295    get_db_prep_lookup_value_is_iterable: bool = True
296    prepare_rhs: bool
297
298    def get_prep_lookup(self) -> Any:
299        if isinstance(self.rhs, ResolvableExpression):
300            return self.rhs
301        prepared_values = []
302        for rhs_value in self.rhs:
303            if isinstance(rhs_value, ResolvableExpression):
304                # An expression will be handled by the database but can coexist
305                # alongside real values.
306                pass
307            elif self.prepare_rhs:
308                if output_field := getattr(self.lhs, "output_field", None):
309                    if get_prep_value := getattr(output_field, "get_prep_value", None):
310                        rhs_value = get_prep_value(rhs_value)
311            prepared_values.append(rhs_value)
312        return prepared_values
313
314    def process_rhs(
315        self, compiler: SQLCompiler, connection: DatabaseWrapper
316    ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
317        if self.rhs_is_direct_value():
318            # rhs should be an iterable of values. Use batch_process_rhs()
319            # to prepare/transform those values.
320            return self.batch_process_rhs(compiler, connection)
321        else:
322            return super().process_rhs(compiler, connection)
323
324    def resolve_expression_parameter(
325        self,
326        compiler: SQLCompiler,
327        connection: DatabaseWrapper,
328        sql: str,
329        param: Any,
330    ) -> tuple[str, list[Any]]:
331        params: list[Any] = [param]
332        if isinstance(param, ResolvableExpression):
333            param = param.resolve_expression(compiler.query)
334        if hasattr(param, "as_sql"):
335            sql, compiled_params = compiler.compile(param)
336            params = list(compiled_params)
337        return sql, params
338
339    def batch_process_rhs(
340        self, compiler: SQLCompiler, connection: DatabaseWrapper, rhs: Any = None
341    ) -> tuple[list[str], list[Any]]:
342        pre_processed = super().batch_process_rhs(compiler, connection, rhs)
343        # The params list may contain expressions which compile to a
344        # sql/param pair. Zip them to get sql and param pairs that refer to the
345        # same argument and attempt to replace them with the result of
346        # compiling the param step.
347        sql, params = zip(
348            *(
349                self.resolve_expression_parameter(compiler, connection, sql, param)
350                for sql, param in zip(*pre_processed)
351            )
352        )
353        params_list = list(itertools.chain.from_iterable(params))
354        return list(sql), params_list
355
356
357class OperatorLookup(Lookup):
358    """Lookup defined by a SQL operator."""
359
360    operator: str | None = None
361
362    def as_sql(
363        self, compiler: SQLCompiler, connection: DatabaseWrapper
364    ) -> tuple[str, tuple[Any, ...]]:
365        lhs, lhs_params = self.process_lhs(compiler, connection)
366        rhs, rhs_params = self.process_rhs(compiler, connection)
367        params = tuple(lhs_params) + tuple(rhs_params)
368        return f"{lhs} {self.operator} {rhs}", params
369
370
371@Field.register_lookup
372class Exact(FieldGetDbPrepValueMixin, BuiltinLookup):
373    lookup_name: str = "exact"
374
375    def get_prep_lookup(self) -> Any:
376        from plain.models.sql.query import Query  # avoid circular import
377
378        if isinstance(self.rhs, Query):
379            if self.rhs.has_limit_one():
380                if not self.rhs.has_select_fields:
381                    self.rhs.clear_select_clause()
382                    self.rhs.add_fields(["id"])
383            else:
384                raise ValueError(
385                    "The QuerySet value for an exact lookup must be limited to "
386                    "one result using slicing."
387                )
388        return super().get_prep_lookup()
389
390    def as_sql(
391        self, compiler: SQLCompiler, connection: DatabaseWrapper
392    ) -> tuple[str, list[Any]]:
393        # Avoid comparison against direct rhs if lhs is a boolean value. That
394        # turns "boolfield__exact=True" into "WHERE boolean_field" instead of
395        # "WHERE boolean_field = True" when allowed.
396        if isinstance(self.rhs, bool) and getattr(self.lhs, "conditional", False):
397            lhs_sql, params = self.process_lhs(compiler, connection)
398            template = "%s" if self.rhs else "NOT %s"
399            return template % lhs_sql, params
400        return super().as_sql(compiler, connection)
401
402
403@Field.register_lookup
404class IExact(BuiltinLookup):
405    lookup_name: str = "iexact"
406    prepare_rhs: bool = False
407
408
409@Field.register_lookup
410class GreaterThan(FieldGetDbPrepValueMixin, BuiltinLookup):
411    lookup_name: str = "gt"
412
413
414@Field.register_lookup
415class GreaterThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup):
416    lookup_name: str = "gte"
417
418
419@Field.register_lookup
420class LessThan(FieldGetDbPrepValueMixin, BuiltinLookup):
421    lookup_name: str = "lt"
422
423
424@Field.register_lookup
425class LessThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup):
426    lookup_name: str = "lte"
427
428
429class IntegerFieldOverflow:
430    underflow_exception: type[Exception] = EmptyResultSet
431    overflow_exception: type[Exception] = EmptyResultSet
432    lhs: Any
433    rhs: Any
434
435    def process_rhs(
436        self, compiler: SQLCompiler, connection: DatabaseWrapper
437    ) -> tuple[str, list[Any]]:
438        rhs = self.rhs
439        if isinstance(rhs, int):
440            field_internal_type = self.lhs.output_field.get_internal_type()
441            min_value, max_value = INTEGER_FIELD_RANGES[field_internal_type]
442            if min_value is not None and rhs < min_value:
443                raise self.underflow_exception
444            if max_value is not None and rhs > max_value:
445                raise self.overflow_exception
446        return super().process_rhs(compiler, connection)  # type: ignore[misc]
447
448
449class IntegerFieldFloatRounding:
450    """
451    Allow floats to work as query values for IntegerField. Without this, the
452    decimal portion of the float would always be discarded.
453    """
454
455    rhs: Any
456
457    def get_prep_lookup(self) -> Any:
458        if isinstance(self.rhs, float):
459            self.rhs = math.ceil(self.rhs)
460        return super().get_prep_lookup()  # type: ignore[misc]
461
462
463@IntegerField.register_lookup
464class IntegerFieldExact(IntegerFieldOverflow, Exact):
465    pass
466
467
468@IntegerField.register_lookup
469class IntegerGreaterThan(IntegerFieldOverflow, GreaterThan):
470    underflow_exception = FullResultSet
471
472
473@IntegerField.register_lookup
474class IntegerGreaterThanOrEqual(
475    IntegerFieldOverflow, IntegerFieldFloatRounding, GreaterThanOrEqual
476):
477    underflow_exception = FullResultSet
478
479
480@IntegerField.register_lookup
481class IntegerLessThan(IntegerFieldOverflow, IntegerFieldFloatRounding, LessThan):
482    overflow_exception = FullResultSet
483
484
485@IntegerField.register_lookup
486class IntegerLessThanOrEqual(IntegerFieldOverflow, LessThanOrEqual):
487    overflow_exception = FullResultSet
488
489
490@Field.register_lookup
491class In(FieldGetDbPrepValueIterableMixin, BuiltinLookup):
492    lookup_name: str = "in"
493
494    def get_prep_lookup(self) -> Any:
495        from plain.models.sql.query import Query  # avoid circular import
496
497        if isinstance(self.rhs, Query):
498            self.rhs.clear_ordering(clear_default=True)
499            if not self.rhs.has_select_fields:
500                self.rhs.clear_select_clause()
501                self.rhs.add_fields(["id"])
502        return super().get_prep_lookup()
503
504    def process_rhs(
505        self, compiler: SQLCompiler, connection: DatabaseWrapper
506    ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
507        if self.rhs_is_direct_value():
508            # Remove None from the list as NULL is never equal to anything.
509            try:
510                rhs = OrderedSet(self.rhs)
511                rhs.discard(None)
512            except TypeError:  # Unhashable items in self.rhs
513                rhs = [r for r in self.rhs if r is not None]
514
515            if not rhs:
516                raise EmptyResultSet
517
518            # rhs should be an iterable; use batch_process_rhs() to
519            # prepare/transform those values.
520            sqls, sqls_params = self.batch_process_rhs(compiler, connection, rhs)
521            placeholder = "(" + ", ".join(sqls) + ")"
522            return (placeholder, sqls_params)
523        return super().process_rhs(compiler, connection)
524
525    def get_rhs_op(self, connection: DatabaseWrapper, rhs: str | list[str]) -> str:
526        return f"IN {rhs}"
527
528    # PostgreSQL has no limit on IN clause size, so no need to override as_sql()
529
530
531class PatternLookup(BuiltinLookup):
532    param_pattern: str = "%%%s%%"
533    prepare_rhs: bool = False
534    bilateral_transforms: list[Any]
535
536    def get_rhs_op(self, connection: DatabaseWrapper, rhs: str | list[str]) -> str:
537        # Assume we are in startswith. We need to produce SQL like:
538        #     col LIKE %s, ['thevalue%']
539        # For python values we can (and should) do that directly in Python,
540        # but if the value is for example reference to other column, then
541        # we need to add the % pattern match to the lookup by something like
542        #     col LIKE othercol || '%%'
543        # So, for Python values we don't need any special pattern, but for
544        # SQL reference values or SQL transformations we need the correct
545        # pattern added.
546        if hasattr(self.rhs, "as_sql") or self.bilateral_transforms:
547            assert self.lookup_name is not None, (
548                "lookup_name must be set on Lookup subclass"
549            )
550            pattern = PATTERN_OPS[self.lookup_name].format(PATTERN_ESC)
551            return pattern.format(rhs)
552        else:
553            return super().get_rhs_op(connection, rhs)
554
555    def process_rhs(
556        self, compiler: SQLCompiler, connection: DatabaseWrapper
557    ) -> tuple[str, list[Any]] | tuple[list[str], list[Any]]:
558        rhs, params = super().process_rhs(compiler, connection)
559        if isinstance(rhs, str):
560            if self.rhs_is_direct_value() and params and not self.bilateral_transforms:
561                params[0] = self.param_pattern % prep_for_like_query(params[0])
562            return rhs, params
563        else:
564            return rhs, params
565
566
567@Field.register_lookup
568class Contains(PatternLookup):
569    lookup_name: str = "contains"
570
571
572@Field.register_lookup
573class IContains(Contains):
574    lookup_name: str = "icontains"
575
576
577@Field.register_lookup
578class StartsWith(PatternLookup):
579    lookup_name: str = "startswith"
580    param_pattern: str = "%s%%"
581
582
583@Field.register_lookup
584class IStartsWith(StartsWith):
585    lookup_name: str = "istartswith"
586
587
588@Field.register_lookup
589class EndsWith(PatternLookup):
590    lookup_name: str = "endswith"
591    param_pattern: str = "%%%s"
592
593
594@Field.register_lookup
595class IEndsWith(EndsWith):
596    lookup_name: str = "iendswith"
597
598
599@Field.register_lookup
600class Range(FieldGetDbPrepValueIterableMixin, BuiltinLookup):
601    lookup_name: str = "range"
602
603    def get_rhs_op(self, connection: DatabaseWrapper, rhs: str | list[str]) -> str:
604        # Range lookup always receives a list of two elements from process_rhs
605        assert isinstance(rhs, list), f"Range lookup expects list, got {type(rhs)}"
606        return f"BETWEEN {rhs[0]} AND {rhs[1]}"
607
608
609@Field.register_lookup
610class IsNull(BuiltinLookup):
611    lookup_name: str = "isnull"
612    prepare_rhs: bool = False
613
614    def as_sql(
615        self, compiler: SQLCompiler, connection: DatabaseWrapper
616    ) -> tuple[str, list[Any]]:
617        if not isinstance(self.rhs, bool):
618            raise ValueError(
619                "The QuerySet value for an isnull lookup must be True or False."
620            )
621        sql, params = self.process_lhs(compiler, connection)
622        if self.rhs:
623            return f"{sql} IS NULL", params
624        else:
625            return f"{sql} IS NOT NULL", params
626
627
628@Field.register_lookup
629class Regex(BuiltinLookup):
630    lookup_name: str = "regex"
631    prepare_rhs: bool = False
632
633    def as_sql(
634        self, compiler: SQLCompiler, connection: DatabaseWrapper
635    ) -> tuple[str, list[Any]]:
636        if self.lookup_name in OPERATORS:
637            return super().as_sql(compiler, connection)
638        else:
639            lhs, lhs_params = self.process_lhs(compiler, connection)
640            rhs, rhs_params = self.process_rhs(compiler, connection)
641            sql_template = regex_lookup(self.lookup_name)
642            return sql_template % (lhs, rhs), lhs_params + rhs_params
643
644
645@Field.register_lookup
646class IRegex(Regex):
647    lookup_name: str = "iregex"
648
649
650class YearLookup(Lookup):
651    def year_lookup_bounds(
652        self, connection: DatabaseWrapper, year: int
653    ) -> list[datetime.date] | list[datetime.datetime]:
654        from plain.models.functions import ExtractIsoYear
655
656        iso_year = isinstance(self.lhs, ExtractIsoYear)
657        output_field = self.lhs.lhs.output_field
658        if isinstance(output_field, DateTimeField):
659            bounds = year_lookup_bounds_for_datetime_field(year, iso_year=iso_year)
660        else:
661            bounds = year_lookup_bounds_for_date_field(year, iso_year=iso_year)
662        return bounds
663
664    def as_sql(
665        self, compiler: SQLCompiler, connection: DatabaseWrapper
666    ) -> tuple[str, Sequence[Any]]:
667        # Avoid the extract operation if the rhs is a direct value to allow
668        # indexes to be used.
669        if self.rhs_is_direct_value():
670            # Skip the extract part by directly using the originating field,
671            # that is self.lhs.lhs.
672            lhs_sql, params = self.process_lhs(compiler, connection, self.lhs.lhs)
673            rhs_sql, _ = self.process_rhs(compiler, connection)
674            # rhs_sql should be a string for year lookups
675            assert isinstance(rhs_sql, str), f"Expected str, got {type(rhs_sql)}"
676            rhs_sql = self.get_direct_rhs_sql(connection, rhs_sql)
677            start, finish = self.year_lookup_bounds(connection, self.rhs)
678            params.extend(self.get_bound_params(start, finish))
679            return f"{lhs_sql} {rhs_sql}", params
680        return super().as_sql(compiler, connection)
681
682    def get_direct_rhs_sql(self, connection: DatabaseWrapper, rhs: str) -> str:
683        assert self.lookup_name is not None, (
684            "lookup_name must be set on Lookup subclass"
685        )
686        return OPERATORS[self.lookup_name] % rhs
687
688    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any, ...]:
689        """Return bound parameters for the year lookup."""
690        raise NotImplementedError("Subclasses must implement get_bound_params()")
691
692
693class YearExact(YearLookup, Exact):
694    def get_direct_rhs_sql(self, connection: DatabaseWrapper, rhs: str) -> str:
695        return "BETWEEN %s AND %s"
696
697    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any, Any]:
698        return (start, finish)
699
700
701class YearGt(YearLookup, GreaterThan):
702    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
703        return (finish,)
704
705
706class YearGte(YearLookup, GreaterThanOrEqual):
707    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
708        return (start,)
709
710
711class YearLt(YearLookup, LessThan):
712    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
713        return (start,)
714
715
716class YearLte(YearLookup, LessThanOrEqual):
717    def get_bound_params(self, start: Any, finish: Any) -> tuple[Any]:
718        return (finish,)
719
720
721# UUID lookups - PostgreSQL has native UUID support so these inherit directly
722# from their base classes without any special processing.
723
724
725@UUIDField.register_lookup
726class UUIDIExact(IExact):
727    pass
728
729
730@UUIDField.register_lookup
731class UUIDContains(Contains):
732    pass
733
734
735@UUIDField.register_lookup
736class UUIDIContains(IContains):
737    pass
738
739
740@UUIDField.register_lookup
741class UUIDStartsWith(StartsWith):
742    pass
743
744
745@UUIDField.register_lookup
746class UUIDIStartsWith(IStartsWith):
747    pass
748
749
750@UUIDField.register_lookup
751class UUIDEndsWith(EndsWith):
752    pass
753
754
755@UUIDField.register_lookup
756class UUIDIEndsWith(IEndsWith):
757    pass