1"""
2Create SQL statements for QuerySets.
3
4The code in here encapsulates all of the SQL construction so that QuerySets
5themselves do not have to (and could be backed by things other than SQL
6databases). The abstraction barrier only works one way: this module has to know
7all about the internals of models in order to get the information it needs.
8"""
9
10from __future__ import annotations
11
12import copy
13import difflib
14import functools
15import sys
16from collections import Counter
17from collections.abc import Callable, Iterable, Iterator, Mapping
18from collections.abc import Iterator as TypingIterator
19from functools import cached_property
20from itertools import chain, count, product
21from string import ascii_uppercase
22from typing import (
23 TYPE_CHECKING,
24 Any,
25 Literal,
26 NamedTuple,
27 Self,
28 TypeVar,
29 cast,
30 overload,
31)
32
33from plain.models.aggregates import Count
34from plain.models.constants import LOOKUP_SEP
35from plain.models.db import NotSupportedError, db_connection
36from plain.models.exceptions import FieldDoesNotExist, FieldError
37from plain.models.expressions import (
38 BaseExpression,
39 Col,
40 Exists,
41 F,
42 OuterRef,
43 Ref,
44 ResolvableExpression,
45 ResolvedOuterRef,
46 Value,
47)
48from plain.models.fields import Field
49from plain.models.fields.related_lookups import MultiColSource
50from plain.models.lookups import Lookup
51from plain.models.query_utils import (
52 PathInfo,
53 Q,
54 check_rel_lookup_compatibility,
55 refs_expression,
56)
57from plain.models.sql.constants import INNER, LOUTER, ORDER_DIR, SINGLE
58from plain.models.sql.datastructures import BaseTable, Empty, Join, MultiJoin
59from plain.models.sql.where import AND, OR, ExtraWhere, NothingNode, WhereNode
60from plain.utils.regex_helper import _lazy_re_compile
61from plain.utils.tree import Node
62
63if TYPE_CHECKING:
64 from plain.models import Model
65 from plain.models.backends.base.base import BaseDatabaseWrapper
66 from plain.models.fields.related import RelatedField
67 from plain.models.fields.reverse_related import ForeignObjectRel
68 from plain.models.meta import Meta
69 from plain.models.sql.compiler import SQLCompiler
70
71
72__all__ = ["Query", "RawQuery"]
73
74# Quotation marks ('"`[]), whitespace characters, semicolons, or inline
75# SQL comments are forbidden in column aliases.
76FORBIDDEN_ALIAS_PATTERN = _lazy_re_compile(r"['`\"\]\[;\s]|--|/\*|\*/")
77
78# Inspired from
79# https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
80EXPLAIN_OPTIONS_PATTERN = _lazy_re_compile(r"[\w\-]+")
81
82
83def get_field_names_from_opts(meta: Meta | None) -> set[str]:
84 if meta is None:
85 return set()
86 return set(
87 chain.from_iterable(
88 (f.name, f.attname) if f.concrete else (f.name,) for f in meta.get_fields()
89 )
90 )
91
92
93def get_children_from_q(q: Q) -> TypingIterator[tuple[str, Any]]:
94 for child in q.children:
95 if isinstance(child, Node):
96 yield from get_children_from_q(child)
97 else:
98 yield child
99
100
101class JoinInfo(NamedTuple):
102 """Information about a join operation in a query."""
103
104 final_field: Field[Any]
105 targets: tuple[Field[Any], ...]
106 meta: Meta
107 joins: list[str]
108 path: list[PathInfo]
109 transform_function: Callable[[Field[Any], str | None], BaseExpression]
110
111
112class RawQuery:
113 """A single raw SQL query."""
114
115 def __init__(self, sql: str, params: tuple[Any, ...] | dict[str, Any] = ()):
116 self.params = params
117 self.sql = sql
118 self.cursor: Any = None
119
120 # Mirror some properties of a normal query so that
121 # the compiler can be used to process results.
122 self.low_mark, self.high_mark = 0, None # Used for offset/limit
123 self.extra_select = {}
124 self.annotation_select = {}
125
126 def chain(self) -> RawQuery:
127 return self.clone()
128
129 def clone(self) -> RawQuery:
130 return RawQuery(self.sql, params=self.params)
131
132 def get_columns(self) -> list[str]:
133 if self.cursor is None:
134 self._execute_query()
135 converter = db_connection.introspection.identifier_converter
136 return [
137 converter(column_model_meta[0])
138 for column_model_meta in self.cursor.description
139 ]
140
141 def __iter__(self) -> TypingIterator[Any]:
142 # Always execute a new query for a new iterator.
143 # This could be optimized with a cache at the expense of RAM.
144 self._execute_query()
145 if not db_connection.features.can_use_chunked_reads:
146 # If the database can't use chunked reads we need to make sure we
147 # evaluate the entire query up front.
148 result = list(self.cursor)
149 else:
150 result = self.cursor
151 return iter(result)
152
153 def __repr__(self) -> str:
154 return f"<{self.__class__.__name__}: {self}>"
155
156 @property
157 def params_type(self) -> type[dict] | type[tuple] | None:
158 if self.params is None:
159 return None
160 return dict if isinstance(self.params, Mapping) else tuple
161
162 def __str__(self) -> str:
163 if self.params_type is None:
164 return self.sql
165 return self.sql % self.params_type(self.params)
166
167 def _execute_query(self) -> None:
168 # Adapt parameters to the database, as much as possible considering
169 # that the target type isn't known. See #17755.
170 params_type = self.params_type
171 adapter = db_connection.ops.adapt_unknown_value
172 if params_type is tuple:
173 assert isinstance(self.params, tuple)
174 params = tuple(adapter(val) for val in self.params)
175 elif params_type is dict:
176 assert isinstance(self.params, dict)
177 params = {key: adapter(val) for key, val in self.params.items()}
178 elif params_type is None:
179 params = None
180 else:
181 raise RuntimeError(f"Unexpected params type: {params_type}")
182
183 self.cursor = db_connection.cursor()
184 self.cursor.execute(self.sql, params)
185
186
187class ExplainInfo(NamedTuple):
188 """Information about an EXPLAIN query."""
189
190 format: str | None
191 options: dict[str, Any]
192
193
194class TransformWrapper:
195 """Wrapper for transform functions that supports the has_transforms attribute.
196
197 This replaces functools.partial for transform functions, allowing proper
198 type checking while supporting dynamic attribute assignment.
199 """
200
201 def __init__(
202 self,
203 func: Callable[..., BaseExpression],
204 **kwargs: Any,
205 ):
206 self._partial = functools.partial(func, **kwargs)
207 self.has_transforms: bool = False
208
209 def __call__(self, field: Field[Any], alias: str | None) -> BaseExpression:
210 return self._partial(field, alias)
211
212
213QueryType = TypeVar("QueryType", bound="Query")
214
215
216class Query(BaseExpression):
217 """A single SQL query."""
218
219 alias_prefix = "T"
220 empty_result_set_value = None
221 subq_aliases = frozenset([alias_prefix])
222
223 compiler = "SQLCompiler"
224
225 base_table_class = BaseTable
226 join_class = Join
227
228 default_cols = True
229 default_ordering = True
230 standard_ordering = True
231
232 filter_is_sticky = False
233 subquery = False
234
235 # SQL-related attributes.
236 # Select and related select clauses are expressions to use in the SELECT
237 # clause of the query. The select is used for cases where we want to set up
238 # the select clause to contain other than default fields (values(),
239 # subqueries...). Note that annotations go to annotations dictionary.
240 select = ()
241 # The group_by attribute can have one of the following forms:
242 # - None: no group by at all in the query
243 # - A tuple of expressions: group by (at least) those expressions.
244 # String refs are also allowed for now.
245 # - True: group by all select fields of the model
246 # See compiler.get_group_by() for details.
247 group_by = None
248 order_by = ()
249 low_mark = 0 # Used for offset/limit.
250 high_mark = None # Used for offset/limit.
251 distinct = False
252 distinct_fields = ()
253 select_for_update = False
254 select_for_update_nowait = False
255 select_for_update_skip_locked = False
256 select_for_update_of = ()
257 select_for_no_key_update = False
258 select_related: bool | dict[str, Any] = False
259 has_select_fields = False
260 # Arbitrary limit for select_related to prevents infinite recursion.
261 max_depth = 5
262 # Holds the selects defined by a call to values() or values_list()
263 # excluding annotation_select and extra_select.
264 values_select = ()
265
266 # SQL annotation-related attributes.
267 annotation_select_mask = None
268 _annotation_select_cache = None
269
270 # Set combination attributes.
271 combinator = None
272 combinator_all = False
273 combined_queries = ()
274
275 # These are for extensions. The contents are more or less appended verbatim
276 # to the appropriate clause.
277 extra_select_mask = None
278 _extra_select_cache = None
279
280 extra_tables = ()
281 extra_order_by = ()
282
283 # A tuple that is a set of model field names and either True, if these are
284 # the fields to defer, or False if these are the only fields to load.
285 deferred_loading = (frozenset(), True)
286
287 explain_info = None
288
289 def __init__(self, model: type[Model] | None, alias_cols: bool = True):
290 self.model = model
291 self.alias_refcount = {}
292 # alias_map is the most important data structure regarding joins.
293 # It's used for recording which joins exist in the query and what
294 # types they are. The key is the alias of the joined table (possibly
295 # the table name) and the value is a Join-like object (see
296 # sql.datastructures.Join for more information).
297 self.alias_map = {}
298 # Whether to provide alias to columns during reference resolving.
299 self.alias_cols = alias_cols
300 # Sometimes the query contains references to aliases in outer queries (as
301 # a result of split_exclude). Correct alias quoting needs to know these
302 # aliases too.
303 # Map external tables to whether they are aliased.
304 self.external_aliases = {}
305 self.table_map = {} # Maps table names to list of aliases.
306 self.used_aliases = set()
307
308 self.where = WhereNode()
309 # Maps alias -> Annotation Expression.
310 self.annotations = {}
311 # These are for extensions. The contents are more or less appended
312 # verbatim to the appropriate clause.
313 self.extra = {} # Maps col_alias -> (col_sql, params).
314
315 self._filtered_relations = {}
316
317 @property
318 def output_field(self) -> Field | None:
319 if len(self.select) == 1:
320 select = self.select[0]
321 return getattr(select, "target", None) or select.field
322 elif len(self.annotation_select) == 1:
323 return next(iter(self.annotation_select.values())).output_field
324
325 @cached_property
326 def base_table(self) -> str | None:
327 for alias in self.alias_map:
328 return alias
329
330 def __str__(self) -> str:
331 """
332 Return the query as a string of SQL with the parameter values
333 substituted in (use sql_with_params() to see the unsubstituted string).
334
335 Parameter values won't necessarily be quoted correctly, since that is
336 done by the database interface at execution time.
337 """
338 sql, params = self.sql_with_params()
339 return sql % params
340
341 def sql_with_params(self) -> tuple[str, tuple[Any, ...]]:
342 """
343 Return the query as an SQL string and the parameters that will be
344 substituted into the query.
345 """
346 return self.get_compiler().as_sql()
347
348 def __deepcopy__(self, memo: dict[int, Any]) -> Self:
349 """Limit the amount of work when a Query is deepcopied."""
350 result = self.clone()
351 memo[id(self)] = result
352 return result
353
354 def get_compiler(self, *, elide_empty: bool = True) -> Any:
355 return db_connection.ops.compiler(self.compiler)(
356 self, db_connection, elide_empty
357 )
358
359 def clone(self) -> Self:
360 """
361 Return a copy of the current Query. A lightweight alternative to
362 deepcopy().
363 """
364 obj = Empty()
365 obj.__class__ = self.__class__
366 obj = cast(Self, obj) # Type checker doesn't understand __class__ reassignment
367 # Copy references to everything.
368 obj.__dict__ = self.__dict__.copy()
369 # Clone attributes that can't use shallow copy.
370 obj.alias_refcount = self.alias_refcount.copy()
371 obj.alias_map = self.alias_map.copy()
372 obj.external_aliases = self.external_aliases.copy()
373 obj.table_map = self.table_map.copy()
374 obj.where = self.where.clone()
375 obj.annotations = self.annotations.copy()
376 if self.annotation_select_mask is not None:
377 obj.annotation_select_mask = self.annotation_select_mask.copy()
378 if self.combined_queries:
379 obj.combined_queries = tuple(
380 [query.clone() for query in self.combined_queries]
381 )
382 # _annotation_select_cache cannot be copied, as doing so breaks the
383 # (necessary) state in which both annotations and
384 # _annotation_select_cache point to the same underlying objects.
385 # It will get re-populated in the cloned queryset the next time it's
386 # used.
387 obj._annotation_select_cache = None
388 obj.extra = self.extra.copy()
389 if self.extra_select_mask is not None:
390 obj.extra_select_mask = self.extra_select_mask.copy()
391 if self._extra_select_cache is not None:
392 obj._extra_select_cache = self._extra_select_cache.copy()
393 if self.select_related is not False:
394 # Use deepcopy because select_related stores fields in nested
395 # dicts.
396 obj.select_related = copy.deepcopy(obj.select_related)
397 if "subq_aliases" in self.__dict__:
398 obj.subq_aliases = self.subq_aliases.copy()
399 obj.used_aliases = self.used_aliases.copy()
400 obj._filtered_relations = self._filtered_relations.copy()
401 # Clear the cached_property, if it exists.
402 obj.__dict__.pop("base_table", None)
403 return obj
404
405 @overload
406 def chain(self, klass: None = None) -> Self: ...
407
408 @overload
409 def chain(self, klass: type[QueryType]) -> QueryType: ...
410
411 def chain(self, klass: type[Query] | None = None) -> Query:
412 """
413 Return a copy of the current Query that's ready for another operation.
414 The klass argument changes the type of the Query, e.g. UpdateQuery.
415 """
416 obj = self.clone()
417 if klass and obj.__class__ != klass:
418 obj.__class__ = klass
419 if not obj.filter_is_sticky:
420 obj.used_aliases = set()
421 obj.filter_is_sticky = False
422 if hasattr(obj, "_setup_query"):
423 obj._setup_query() # type: ignore[call-non-callable]
424 return obj
425
426 def relabeled_clone(self, change_map: dict[str, str]) -> Self:
427 clone = self.clone()
428 clone.change_aliases(change_map)
429 return clone
430
431 def _get_col(self, target: Any, field: Field, alias: str | None) -> Col:
432 if not self.alias_cols:
433 alias = None
434 return target.get_col(alias, field)
435
436 def get_aggregation(self, aggregate_exprs: dict[str, Any]) -> dict[str, Any]:
437 """
438 Return the dictionary with the values of the existing aggregations.
439 """
440 if not aggregate_exprs:
441 return {}
442 aggregates = {}
443 for alias, aggregate_expr in aggregate_exprs.items():
444 self.check_alias(alias)
445 aggregate = aggregate_expr.resolve_expression(
446 self, allow_joins=True, reuse=None, summarize=True
447 )
448 if not aggregate.contains_aggregate:
449 raise TypeError(f"{alias} is not an aggregate expression")
450 aggregates[alias] = aggregate
451 # Existing usage of aggregation can be determined by the presence of
452 # selected aggregates but also by filters against aliased aggregates.
453 _, having, qualify = self.where.split_having_qualify()
454 has_existing_aggregation = (
455 any(
456 getattr(annotation, "contains_aggregate", True)
457 for annotation in self.annotations.values()
458 )
459 or having
460 )
461 # Decide if we need to use a subquery.
462 #
463 # Existing aggregations would cause incorrect results as
464 # get_aggregation() must produce just one result and thus must not use
465 # GROUP BY.
466 #
467 # If the query has limit or distinct, or uses set operations, then
468 # those operations must be done in a subquery so that the query
469 # aggregates on the limit and/or distinct results instead of applying
470 # the distinct and limit after the aggregation.
471 if (
472 isinstance(self.group_by, tuple)
473 or self.is_sliced
474 or has_existing_aggregation
475 or qualify
476 or self.distinct
477 or self.combinator
478 ):
479 from plain.models.sql.subqueries import AggregateQuery
480
481 inner_query = self.clone()
482 inner_query.subquery = True
483 outer_query = AggregateQuery(self.model, inner_query)
484 inner_query.select_for_update = False
485 inner_query.select_related = False
486 inner_query.set_annotation_mask(self.annotation_select)
487 # Queries with distinct_fields need ordering and when a limit is
488 # applied we must take the slice from the ordered query. Otherwise
489 # no need for ordering.
490 inner_query.clear_ordering(force=False)
491 if not inner_query.distinct:
492 # If the inner query uses default select and it has some
493 # aggregate annotations, then we must make sure the inner
494 # query is grouped by the main model's primary key. However,
495 # clearing the select clause can alter results if distinct is
496 # used.
497 if inner_query.default_cols and has_existing_aggregation:
498 assert self.model is not None, "Aggregation requires a model"
499 inner_query.group_by = (
500 self.model._model_meta.get_forward_field("id").get_col(
501 inner_query.get_initial_alias()
502 ),
503 )
504 inner_query.default_cols = False
505 if not qualify:
506 # Mask existing annotations that are not referenced by
507 # aggregates to be pushed to the outer query unless
508 # filtering against window functions is involved as it
509 # requires complex realising.
510 annotation_mask = set()
511 for aggregate in aggregates.values():
512 annotation_mask |= aggregate.get_refs()
513 inner_query.set_annotation_mask(annotation_mask)
514
515 # Add aggregates to the outer AggregateQuery. This requires making
516 # sure all columns referenced by the aggregates are selected in the
517 # inner query. It is achieved by retrieving all column references
518 # by the aggregates, explicitly selecting them in the inner query,
519 # and making sure the aggregates are repointed to them.
520 col_refs = {}
521 for alias, aggregate in aggregates.items():
522 replacements = {}
523 for col in self._gen_cols([aggregate], resolve_refs=False):
524 if not (col_ref := col_refs.get(col)):
525 index = len(col_refs) + 1
526 col_alias = f"__col{index}"
527 col_ref = Ref(col_alias, col)
528 col_refs[col] = col_ref
529 inner_query.annotations[col_alias] = col
530 inner_query.append_annotation_mask([col_alias])
531 replacements[col] = col_ref
532 outer_query.annotations[alias] = aggregate.replace_expressions(
533 replacements
534 )
535 if (
536 inner_query.select == ()
537 and not inner_query.default_cols
538 and not inner_query.annotation_select_mask
539 ):
540 # In case of Model.objects[0:3].count(), there would be no
541 # field selected in the inner query, yet we must use a subquery.
542 # So, make sure at least one field is selected.
543 assert self.model is not None, "Count with slicing requires a model"
544 inner_query.select = (
545 self.model._model_meta.get_forward_field("id").get_col(
546 inner_query.get_initial_alias()
547 ),
548 )
549 else:
550 outer_query = self
551 self.select = ()
552 self.default_cols = False
553 self.extra = {}
554 if self.annotations:
555 # Inline reference to existing annotations and mask them as
556 # they are unnecessary given only the summarized aggregations
557 # are requested.
558 replacements = {
559 Ref(alias, annotation): annotation
560 for alias, annotation in self.annotations.items()
561 }
562 self.annotations = {
563 alias: aggregate.replace_expressions(replacements)
564 for alias, aggregate in aggregates.items()
565 }
566 else:
567 self.annotations = aggregates
568 self.set_annotation_mask(aggregates)
569
570 empty_set_result = [
571 expression.empty_result_set_value
572 for expression in outer_query.annotation_select.values()
573 ]
574 elide_empty = not any(result is NotImplemented for result in empty_set_result)
575 outer_query.clear_ordering(force=True)
576 outer_query.clear_limits()
577 outer_query.select_for_update = False
578 outer_query.select_related = False
579 compiler = outer_query.get_compiler(elide_empty=elide_empty)
580 result = compiler.execute_sql(SINGLE)
581 if result is None:
582 result = empty_set_result
583 else:
584 converters = compiler.get_converters(outer_query.annotation_select.values())
585 result = next(compiler.apply_converters((result,), converters))
586
587 return dict(zip(outer_query.annotation_select, result))
588
589 def get_count(self) -> int:
590 """
591 Perform a COUNT() query using the current filter constraints.
592 """
593 obj = self.clone()
594 return obj.get_aggregation({"__count": Count("*")})["__count"]
595
596 def has_filters(self) -> bool:
597 return bool(self.where)
598
599 def exists(self, limit: bool = True) -> Self:
600 q = self.clone()
601 if not (q.distinct and q.is_sliced):
602 if q.group_by is True:
603 assert self.model is not None, "GROUP BY requires a model"
604 q.add_fields(
605 (f.attname for f in self.model._model_meta.concrete_fields), False
606 )
607 # Disable GROUP BY aliases to avoid orphaning references to the
608 # SELECT clause which is about to be cleared.
609 q.set_group_by(allow_aliases=False)
610 q.clear_select_clause()
611 if q.combined_queries and q.combinator == "union":
612 q.combined_queries = tuple(
613 combined_query.exists(limit=False)
614 for combined_query in q.combined_queries
615 )
616 q.clear_ordering(force=True)
617 if limit:
618 q.set_limits(high=1)
619 q.add_annotation(Value(1), "a")
620 return q
621
622 def has_results(self) -> bool:
623 q = self.exists()
624 compiler = q.get_compiler()
625 return compiler.has_results()
626
627 def explain(self, format: str | None = None, **options: Any) -> str:
628 q = self.clone()
629 for option_name in options:
630 if (
631 not EXPLAIN_OPTIONS_PATTERN.fullmatch(option_name)
632 or "--" in option_name
633 ):
634 raise ValueError(f"Invalid option name: {option_name!r}.")
635 q.explain_info = ExplainInfo(format, options)
636 compiler = q.get_compiler()
637 return "\n".join(compiler.explain_query())
638
639 def combine(self, rhs: Query, connector: str) -> None:
640 """
641 Merge the 'rhs' query into the current one (with any 'rhs' effects
642 being applied *after* (that is, "to the right of") anything in the
643 current query. 'rhs' is not modified during a call to this function.
644
645 The 'connector' parameter describes how to connect filters from the
646 'rhs' query.
647 """
648 if self.model != rhs.model:
649 raise TypeError("Cannot combine queries on two different base models.")
650 if self.is_sliced:
651 raise TypeError("Cannot combine queries once a slice has been taken.")
652 if self.distinct != rhs.distinct:
653 raise TypeError("Cannot combine a unique query with a non-unique query.")
654 if self.distinct_fields != rhs.distinct_fields:
655 raise TypeError("Cannot combine queries with different distinct fields.")
656
657 # If lhs and rhs shares the same alias prefix, it is possible to have
658 # conflicting alias changes like T4 -> T5, T5 -> T6, which might end up
659 # as T4 -> T6 while combining two querysets. To prevent this, change an
660 # alias prefix of the rhs and update current aliases accordingly,
661 # except if the alias is the base table since it must be present in the
662 # query on both sides.
663 initial_alias = self.get_initial_alias()
664 assert initial_alias is not None
665 rhs.bump_prefix(self, exclude={initial_alias})
666
667 # Work out how to relabel the rhs aliases, if necessary.
668 change_map = {}
669 conjunction = connector == AND
670
671 # Determine which existing joins can be reused. When combining the
672 # query with AND we must recreate all joins for m2m filters. When
673 # combining with OR we can reuse joins. The reason is that in AND
674 # case a single row can't fulfill a condition like:
675 # revrel__col=1 & revrel__col=2
676 # But, there might be two different related rows matching this
677 # condition. In OR case a single True is enough, so single row is
678 # enough, too.
679 #
680 # Note that we will be creating duplicate joins for non-m2m joins in
681 # the AND case. The results will be correct but this creates too many
682 # joins. This is something that could be fixed later on.
683 reuse = set() if conjunction else set(self.alias_map)
684 joinpromoter = JoinPromoter(connector, 2, False)
685 joinpromoter.add_votes(
686 j for j in self.alias_map if self.alias_map[j].join_type == INNER
687 )
688 rhs_votes = set()
689 # Now, add the joins from rhs query into the new query (skipping base
690 # table).
691 rhs_tables = list(rhs.alias_map)[1:]
692 for alias in rhs_tables:
693 join = rhs.alias_map[alias]
694 # If the left side of the join was already relabeled, use the
695 # updated alias.
696 join = join.relabeled_clone(change_map)
697 new_alias = self.join(join, reuse=reuse)
698 if join.join_type == INNER:
699 rhs_votes.add(new_alias)
700 # We can't reuse the same join again in the query. If we have two
701 # distinct joins for the same connection in rhs query, then the
702 # combined query must have two joins, too.
703 reuse.discard(new_alias)
704 if alias != new_alias:
705 change_map[alias] = new_alias
706 if not rhs.alias_refcount[alias]:
707 # The alias was unused in the rhs query. Unref it so that it
708 # will be unused in the new query, too. We have to add and
709 # unref the alias so that join promotion has information of
710 # the join type for the unused alias.
711 self.unref_alias(new_alias)
712 joinpromoter.add_votes(rhs_votes)
713 joinpromoter.update_join_types(self)
714
715 # Combine subqueries aliases to ensure aliases relabelling properly
716 # handle subqueries when combining where and select clauses.
717 self.subq_aliases |= rhs.subq_aliases
718
719 # Now relabel a copy of the rhs where-clause and add it to the current
720 # one.
721 w = rhs.where.clone()
722 w.relabel_aliases(change_map)
723 self.where.add(w, connector)
724
725 # Selection columns and extra extensions are those provided by 'rhs'.
726 if rhs.select:
727 self.set_select([col.relabeled_clone(change_map) for col in rhs.select])
728 else:
729 self.select = ()
730
731 if connector == OR:
732 # It would be nice to be able to handle this, but the queries don't
733 # really make sense (or return consistent value sets). Not worth
734 # the extra complexity when you can write a real query instead.
735 if self.extra and rhs.extra:
736 raise ValueError(
737 "When merging querysets using 'or', you cannot have "
738 "extra(select=...) on both sides."
739 )
740 self.extra.update(rhs.extra)
741 extra_select_mask = set()
742 if self.extra_select_mask is not None:
743 extra_select_mask.update(self.extra_select_mask)
744 if rhs.extra_select_mask is not None:
745 extra_select_mask.update(rhs.extra_select_mask)
746 if extra_select_mask:
747 self.set_extra_mask(extra_select_mask)
748 self.extra_tables += rhs.extra_tables
749
750 # Ordering uses the 'rhs' ordering, unless it has none, in which case
751 # the current ordering is used.
752 self.order_by = rhs.order_by or self.order_by
753 self.extra_order_by = rhs.extra_order_by or self.extra_order_by
754
755 def _get_defer_select_mask(
756 self,
757 meta: Meta,
758 mask: dict[str, Any],
759 select_mask: dict[Any, Any] | None = None,
760 ) -> dict[Any, Any]:
761 from plain.models.fields.related import RelatedField
762
763 if select_mask is None:
764 select_mask = {}
765 select_mask[meta.get_forward_field("id")] = {}
766 # All concrete fields that are not part of the defer mask must be
767 # loaded. If a relational field is encountered it gets added to the
768 # mask for it be considered if `select_related` and the cycle continues
769 # by recursively calling this function.
770 for field in meta.concrete_fields:
771 field_mask = mask.pop(field.name, None)
772 if field_mask is None:
773 select_mask.setdefault(field, {})
774 elif field_mask:
775 if not isinstance(field, RelatedField):
776 raise FieldError(next(iter(field_mask)))
777 field_select_mask = select_mask.setdefault(field, {})
778 related_model = field.remote_field.model
779 self._get_defer_select_mask(
780 related_model._model_meta, field_mask, field_select_mask
781 )
782 # Remaining defer entries must be references to reverse relationships.
783 # The following code is expected to raise FieldError if it encounters
784 # a malformed defer entry.
785 for field_name, field_mask in mask.items():
786 if filtered_relation := self._filtered_relations.get(field_name):
787 relation = meta.get_reverse_relation(filtered_relation.relation_name)
788 field_select_mask = select_mask.setdefault((field_name, relation), {})
789 field = relation.field
790 else:
791 field = meta.get_reverse_relation(field_name).field
792 field_select_mask = select_mask.setdefault(field, {})
793 related_model = field.model
794 self._get_defer_select_mask(
795 related_model._model_meta, field_mask, field_select_mask
796 )
797 return select_mask
798
799 def _get_only_select_mask(
800 self,
801 meta: Meta,
802 mask: dict[str, Any],
803 select_mask: dict[Any, Any] | None = None,
804 ) -> dict[Any, Any]:
805 from plain.models.fields.related import RelatedField
806
807 if select_mask is None:
808 select_mask = {}
809 select_mask[meta.get_forward_field("id")] = {}
810 # Only include fields mentioned in the mask.
811 for field_name, field_mask in mask.items():
812 field = meta.get_field(field_name)
813 field_select_mask = select_mask.setdefault(field, {})
814 if field_mask:
815 if not isinstance(field, RelatedField):
816 raise FieldError(next(iter(field_mask)))
817 related_model = field.remote_field.model
818 self._get_only_select_mask(
819 related_model._model_meta, field_mask, field_select_mask
820 )
821 return select_mask
822
823 def get_select_mask(self) -> dict[Any, Any]:
824 """
825 Convert the self.deferred_loading data structure to an alternate data
826 structure, describing the field that *will* be loaded. This is used to
827 compute the columns to select from the database and also by the
828 QuerySet class to work out which fields are being initialized on each
829 model. Models that have all their fields included aren't mentioned in
830 the result, only those that have field restrictions in place.
831 """
832 field_names, defer = self.deferred_loading
833 if not field_names:
834 return {}
835 mask = {}
836 for field_name in field_names:
837 part_mask = mask
838 for part in field_name.split(LOOKUP_SEP):
839 part_mask = part_mask.setdefault(part, {})
840 assert self.model is not None, "Deferred/only field loading requires a model"
841 meta = self.model._model_meta
842 if defer:
843 return self._get_defer_select_mask(meta, mask)
844 return self._get_only_select_mask(meta, mask)
845
846 def table_alias(
847 self, table_name: str, create: bool = False, filtered_relation: Any = None
848 ) -> tuple[str, bool]:
849 """
850 Return a table alias for the given table_name and whether this is a
851 new alias or not.
852
853 If 'create' is true, a new alias is always created. Otherwise, the
854 most recently created alias for the table (if one exists) is reused.
855 """
856 alias_list = self.table_map.get(table_name)
857 if not create and alias_list:
858 alias = alias_list[0]
859 self.alias_refcount[alias] += 1
860 return alias, False
861
862 # Create a new alias for this table.
863 if alias_list:
864 alias = "%s%d" % (self.alias_prefix, len(self.alias_map) + 1) # noqa: UP031
865 alias_list.append(alias)
866 else:
867 # The first occurrence of a table uses the table name directly.
868 alias = (
869 filtered_relation.alias if filtered_relation is not None else table_name
870 )
871 self.table_map[table_name] = [alias]
872 self.alias_refcount[alias] = 1
873 return alias, True
874
875 def ref_alias(self, alias: str) -> None:
876 """Increases the reference count for this alias."""
877 self.alias_refcount[alias] += 1
878
879 def unref_alias(self, alias: str, amount: int = 1) -> None:
880 """Decreases the reference count for this alias."""
881 self.alias_refcount[alias] -= amount
882
883 def promote_joins(self, aliases: set[str] | list[str]) -> None:
884 """
885 Promote recursively the join type of given aliases and its children to
886 an outer join. If 'unconditional' is False, only promote the join if
887 it is nullable or the parent join is an outer join.
888
889 The children promotion is done to avoid join chains that contain a LOUTER
890 b INNER c. So, if we have currently a INNER b INNER c and a->b is promoted,
891 then we must also promote b->c automatically, or otherwise the promotion
892 of a->b doesn't actually change anything in the query results.
893 """
894 aliases = list(aliases)
895 while aliases:
896 alias = aliases.pop(0)
897 if self.alias_map[alias].join_type is None:
898 # This is the base table (first FROM entry) - this table
899 # isn't really joined at all in the query, so we should not
900 # alter its join type.
901 continue
902 # Only the first alias (skipped above) should have None join_type
903 assert self.alias_map[alias].join_type is not None
904 parent_alias = self.alias_map[alias].parent_alias
905 parent_louter = (
906 parent_alias and self.alias_map[parent_alias].join_type == LOUTER
907 )
908 already_louter = self.alias_map[alias].join_type == LOUTER
909 if (self.alias_map[alias].nullable or parent_louter) and not already_louter:
910 self.alias_map[alias] = self.alias_map[alias].promote()
911 # Join type of 'alias' changed, so re-examine all aliases that
912 # refer to this one.
913 aliases.extend(
914 join
915 for join in self.alias_map
916 if self.alias_map[join].parent_alias == alias
917 and join not in aliases
918 )
919
920 def demote_joins(self, aliases: set[str] | list[str]) -> None:
921 """
922 Change join type from LOUTER to INNER for all joins in aliases.
923
924 Similarly to promote_joins(), this method must ensure no join chains
925 containing first an outer, then an inner join are generated. If we
926 are demoting b->c join in chain a LOUTER b LOUTER c then we must
927 demote a->b automatically, or otherwise the demotion of b->c doesn't
928 actually change anything in the query results. .
929 """
930 aliases = list(aliases)
931 while aliases:
932 alias = aliases.pop(0)
933 if self.alias_map[alias].join_type == LOUTER:
934 self.alias_map[alias] = self.alias_map[alias].demote()
935 parent_alias = self.alias_map[alias].parent_alias
936 if self.alias_map[parent_alias].join_type == INNER:
937 aliases.append(parent_alias)
938
939 def reset_refcounts(self, to_counts: dict[str, int]) -> None:
940 """
941 Reset reference counts for aliases so that they match the value passed
942 in `to_counts`.
943 """
944 for alias, cur_refcount in self.alias_refcount.copy().items():
945 unref_amount = cur_refcount - to_counts.get(alias, 0)
946 self.unref_alias(alias, unref_amount)
947
948 def change_aliases(self, change_map: dict[str, str]) -> None:
949 """
950 Change the aliases in change_map (which maps old-alias -> new-alias),
951 relabelling any references to them in select columns and the where
952 clause.
953 """
954 # If keys and values of change_map were to intersect, an alias might be
955 # updated twice (e.g. T4 -> T5, T5 -> T6, so also T4 -> T6) depending
956 # on their order in change_map.
957 assert set(change_map).isdisjoint(change_map.values())
958
959 # 1. Update references in "select" (normal columns plus aliases),
960 # "group by" and "where".
961 self.where.relabel_aliases(change_map)
962 if isinstance(self.group_by, tuple):
963 self.group_by = tuple(
964 [col.relabeled_clone(change_map) for col in self.group_by]
965 )
966 self.select = tuple([col.relabeled_clone(change_map) for col in self.select])
967 self.annotations = self.annotations and {
968 key: col.relabeled_clone(change_map)
969 for key, col in self.annotations.items()
970 }
971
972 # 2. Rename the alias in the internal table/alias datastructures.
973 for old_alias, new_alias in change_map.items():
974 if old_alias not in self.alias_map:
975 continue
976 alias_data = self.alias_map[old_alias].relabeled_clone(change_map)
977 self.alias_map[new_alias] = alias_data
978 self.alias_refcount[new_alias] = self.alias_refcount[old_alias]
979 del self.alias_refcount[old_alias]
980 del self.alias_map[old_alias]
981
982 table_aliases = self.table_map[alias_data.table_name]
983 for pos, alias in enumerate(table_aliases):
984 if alias == old_alias:
985 table_aliases[pos] = new_alias
986 break
987 self.external_aliases = {
988 # Table is aliased or it's being changed and thus is aliased.
989 change_map.get(alias, alias): (aliased or alias in change_map)
990 for alias, aliased in self.external_aliases.items()
991 }
992
993 def bump_prefix(
994 self, other_query: Query, exclude: set[str] | dict[str, str] | None = None
995 ) -> None:
996 """
997 Change the alias prefix to the next letter in the alphabet in a way
998 that the other query's aliases and this query's aliases will not
999 conflict. Even tables that previously had no alias will get an alias
1000 after this call. To prevent changing aliases use the exclude parameter.
1001 """
1002
1003 def prefix_gen() -> TypingIterator[str]:
1004 """
1005 Generate a sequence of characters in alphabetical order:
1006 -> 'A', 'B', 'C', ...
1007
1008 When the alphabet is finished, the sequence will continue with the
1009 Cartesian product:
1010 -> 'AA', 'AB', 'AC', ...
1011 """
1012 alphabet = ascii_uppercase
1013 prefix = chr(ord(self.alias_prefix) + 1)
1014 yield prefix
1015 for n in count(1):
1016 seq = alphabet[alphabet.index(prefix) :] if prefix else alphabet
1017 for s in product(seq, repeat=n):
1018 yield "".join(s)
1019 prefix = None
1020
1021 if self.alias_prefix != other_query.alias_prefix:
1022 # No clashes between self and outer query should be possible.
1023 return
1024
1025 # Explicitly avoid infinite loop. The constant divider is based on how
1026 # much depth recursive subquery references add to the stack. This value
1027 # might need to be adjusted when adding or removing function calls from
1028 # the code path in charge of performing these operations.
1029 local_recursion_limit = sys.getrecursionlimit() // 16
1030 for pos, prefix in enumerate(prefix_gen()):
1031 if prefix not in self.subq_aliases:
1032 self.alias_prefix = prefix
1033 break
1034 if pos > local_recursion_limit:
1035 raise RecursionError(
1036 "Maximum recursion depth exceeded: too many subqueries."
1037 )
1038 self.subq_aliases = self.subq_aliases.union([self.alias_prefix])
1039 other_query.subq_aliases = other_query.subq_aliases.union(self.subq_aliases)
1040 if exclude is None:
1041 exclude = {}
1042 self.change_aliases(
1043 {
1044 alias: "%s%d" % (self.alias_prefix, pos) # noqa: UP031
1045 for pos, alias in enumerate(self.alias_map)
1046 if alias not in exclude
1047 }
1048 )
1049
1050 def get_initial_alias(self) -> str | None:
1051 """
1052 Return the first alias for this query, after increasing its reference
1053 count.
1054 """
1055 if self.alias_map:
1056 alias = self.base_table
1057 self.ref_alias(alias)
1058 elif self.model:
1059 alias = self.join(
1060 self.base_table_class(self.model.model_options.db_table, None)
1061 )
1062 else:
1063 alias = None
1064 return alias
1065
1066 def count_active_tables(self) -> int:
1067 """
1068 Return the number of tables in this query with a non-zero reference
1069 count. After execution, the reference counts are zeroed, so tables
1070 added in compiler will not be seen by this method.
1071 """
1072 return len([1 for count in self.alias_refcount.values() if count])
1073
1074 def join(
1075 self,
1076 join: BaseTable | Join,
1077 reuse: set[str] | None = None,
1078 reuse_with_filtered_relation: bool = False,
1079 ) -> str:
1080 """
1081 Return an alias for the 'join', either reusing an existing alias for
1082 that join or creating a new one. 'join' is either a base_table_class or
1083 join_class.
1084
1085 The 'reuse' parameter can be either None which means all joins are
1086 reusable, or it can be a set containing the aliases that can be reused.
1087
1088 The 'reuse_with_filtered_relation' parameter is used when computing
1089 FilteredRelation instances.
1090
1091 A join is always created as LOUTER if the lhs alias is LOUTER to make
1092 sure chains like t1 LOUTER t2 INNER t3 aren't generated. All new
1093 joins are created as LOUTER if the join is nullable.
1094 """
1095 if reuse_with_filtered_relation and reuse:
1096 reuse_aliases = [
1097 a for a, j in self.alias_map.items() if a in reuse and j.equals(join)
1098 ]
1099 else:
1100 reuse_aliases = [
1101 a
1102 for a, j in self.alias_map.items()
1103 if (reuse is None or a in reuse) and j == join
1104 ]
1105 if reuse_aliases:
1106 if join.table_alias in reuse_aliases:
1107 reuse_alias = join.table_alias
1108 else:
1109 # Reuse the most recent alias of the joined table
1110 # (a many-to-many relation may be joined multiple times).
1111 reuse_alias = reuse_aliases[-1]
1112 self.ref_alias(reuse_alias)
1113 return reuse_alias
1114
1115 # No reuse is possible, so we need a new alias.
1116 alias, _ = self.table_alias(
1117 join.table_name, create=True, filtered_relation=join.filtered_relation
1118 )
1119 if isinstance(join, Join):
1120 if self.alias_map[join.parent_alias].join_type == LOUTER or join.nullable:
1121 join_type = LOUTER
1122 else:
1123 join_type = INNER
1124 join.join_type = join_type
1125 join.table_alias = alias
1126 self.alias_map[alias] = join
1127 return alias
1128
1129 def check_alias(self, alias: str) -> None:
1130 if FORBIDDEN_ALIAS_PATTERN.search(alias):
1131 raise ValueError(
1132 "Column aliases cannot contain whitespace characters, quotation marks, "
1133 "semicolons, or SQL comments."
1134 )
1135
1136 def add_annotation(
1137 self, annotation: BaseExpression, alias: str, select: bool = True
1138 ) -> None:
1139 """Add a single annotation expression to the Query."""
1140 self.check_alias(alias)
1141 annotation = annotation.resolve_expression(self, allow_joins=True, reuse=None)
1142 if select:
1143 self.append_annotation_mask([alias])
1144 else:
1145 self.set_annotation_mask(set(self.annotation_select).difference({alias}))
1146 self.annotations[alias] = annotation
1147
1148 def resolve_expression(self, query: Query, *args: Any, **kwargs: Any) -> Self:
1149 clone = self.clone()
1150 # Subqueries need to use a different set of aliases than the outer query.
1151 clone.bump_prefix(query)
1152 clone.subquery = True
1153 clone.where.resolve_expression(query, *args, **kwargs)
1154 # Resolve combined queries.
1155 if clone.combinator:
1156 clone.combined_queries = tuple(
1157 [
1158 combined_query.resolve_expression(query, *args, **kwargs)
1159 for combined_query in clone.combined_queries
1160 ]
1161 )
1162 for key, value in clone.annotations.items():
1163 resolved = value.resolve_expression(query, *args, **kwargs)
1164 if hasattr(resolved, "external_aliases"):
1165 resolved.external_aliases.update(clone.external_aliases)
1166 clone.annotations[key] = resolved
1167 # Outer query's aliases are considered external.
1168 for alias, table in query.alias_map.items():
1169 clone.external_aliases[alias] = (
1170 isinstance(table, Join)
1171 and table.join_field.related_model.model_options.db_table != alias
1172 ) or (
1173 isinstance(table, BaseTable) and table.table_name != table.table_alias
1174 )
1175 return clone
1176
1177 def get_external_cols(self) -> list[Col]:
1178 exprs = chain(self.annotations.values(), self.where.children)
1179 return [
1180 col
1181 for col in self._gen_cols(exprs, include_external=True)
1182 if col.alias in self.external_aliases
1183 ]
1184
1185 def get_group_by_cols(
1186 self, wrapper: BaseExpression | None = None
1187 ) -> list[Col] | list[BaseExpression]:
1188 # If wrapper is referenced by an alias for an explicit GROUP BY through
1189 # values() a reference to this expression and not the self must be
1190 # returned to ensure external column references are not grouped against
1191 # as well.
1192 external_cols = self.get_external_cols()
1193 if any(col.possibly_multivalued for col in external_cols):
1194 return [wrapper or self]
1195 return external_cols
1196
1197 def as_sql(
1198 self, compiler: SQLCompiler, connection: BaseDatabaseWrapper
1199 ) -> tuple[str, tuple[Any, ...]]:
1200 # Some backends (e.g. Oracle) raise an error when a subquery contains
1201 # unnecessary ORDER BY clause.
1202 if (
1203 self.subquery
1204 and not db_connection.features.ignores_unnecessary_order_by_in_subqueries
1205 ):
1206 self.clear_ordering(force=False)
1207 for query in self.combined_queries:
1208 query.clear_ordering(force=False)
1209 sql, params = self.get_compiler().as_sql()
1210 if self.subquery:
1211 sql = f"({sql})"
1212 return sql, params
1213
1214 def resolve_lookup_value(
1215 self, value: Any, can_reuse: set[str] | None, allow_joins: bool
1216 ) -> Any:
1217 if isinstance(value, ResolvableExpression):
1218 value = value.resolve_expression(
1219 self,
1220 reuse=can_reuse,
1221 allow_joins=allow_joins,
1222 )
1223 elif isinstance(value, list | tuple):
1224 # The items of the iterable may be expressions and therefore need
1225 # to be resolved independently.
1226 values = (
1227 self.resolve_lookup_value(sub_value, can_reuse, allow_joins)
1228 for sub_value in value
1229 )
1230 type_ = type(value)
1231 if hasattr(type_, "_make"): # namedtuple
1232 return type_(*values)
1233 return type_(values)
1234 return value
1235
1236 def solve_lookup_type(
1237 self, lookup: str, summarize: bool = False
1238 ) -> tuple[
1239 list[str] | tuple[str, ...], tuple[str, ...], BaseExpression | Literal[False]
1240 ]:
1241 """
1242 Solve the lookup type from the lookup (e.g.: 'foobar__id__icontains').
1243 """
1244 lookup_splitted = lookup.split(LOOKUP_SEP)
1245 if self.annotations:
1246 annotation, expression_lookups = refs_expression(
1247 lookup_splitted, self.annotations
1248 )
1249 if annotation:
1250 expression = self.annotations[annotation]
1251 if summarize:
1252 expression = Ref(annotation, expression)
1253 return expression_lookups, (), expression
1254 assert self.model is not None, "Field lookups require a model"
1255 meta = self.model._model_meta
1256 _, field, _, lookup_parts = self.names_to_path(lookup_splitted, meta)
1257 field_parts = lookup_splitted[0 : len(lookup_splitted) - len(lookup_parts)]
1258 if len(lookup_parts) > 1 and not field_parts:
1259 raise FieldError(
1260 f'Invalid lookup "{lookup}" for model {meta.model.__name__}".'
1261 )
1262 return lookup_parts, tuple(field_parts), False
1263
1264 def check_query_object_type(
1265 self, value: Any, meta: Meta, field: Field | ForeignObjectRel
1266 ) -> None:
1267 """
1268 Check whether the object passed while querying is of the correct type.
1269 If not, raise a ValueError specifying the wrong object.
1270 """
1271 from plain.models import Model
1272
1273 if isinstance(value, Model):
1274 if not check_rel_lookup_compatibility(value._model_meta.model, meta, field):
1275 raise ValueError(
1276 f'Cannot query "{value}": Must be "{meta.model.model_options.object_name}" instance.'
1277 )
1278
1279 def check_related_objects(
1280 self, field: RelatedField | ForeignObjectRel, value: Any, meta: Meta
1281 ) -> None:
1282 """Check the type of object passed to query relations."""
1283 from plain.models import Model
1284
1285 # Check that the field and the queryset use the same model in a
1286 # query like .filter(author=Author.query.all()). For example, the
1287 # meta would be Author's (from the author field) and value.model
1288 # would be Author.query.all() queryset's .model (Author also).
1289 # The field is the related field on the lhs side.
1290 if (
1291 isinstance(value, Query)
1292 and not value.has_select_fields
1293 and not check_rel_lookup_compatibility(value.model, meta, field)
1294 ):
1295 raise ValueError(
1296 f'Cannot use QuerySet for "{value.model.model_options.object_name}": Use a QuerySet for "{meta.model.model_options.object_name}".'
1297 )
1298 elif isinstance(value, Model):
1299 self.check_query_object_type(value, meta, field)
1300 elif isinstance(value, Iterable):
1301 for v in value:
1302 self.check_query_object_type(v, meta, field)
1303
1304 def check_filterable(self, expression: Any) -> None:
1305 """Raise an error if expression cannot be used in a WHERE clause."""
1306 if isinstance(expression, ResolvableExpression) and not getattr(
1307 expression, "filterable", True
1308 ):
1309 raise NotSupportedError(
1310 expression.__class__.__name__ + " is disallowed in the filter clause."
1311 )
1312 if hasattr(expression, "get_source_expressions"):
1313 for expr in expression.get_source_expressions():
1314 self.check_filterable(expr)
1315
1316 def build_lookup(
1317 self, lookups: list[str], lhs: BaseExpression | MultiColSource, rhs: Any
1318 ) -> Lookup | None:
1319 """
1320 Try to extract transforms and lookup from given lhs.
1321
1322 The lhs value is something that works like SQLExpression.
1323 The rhs value is what the lookup is going to compare against.
1324 The lookups is a list of names to extract using get_lookup()
1325 and get_transform().
1326 """
1327 # __exact is the default lookup if one isn't given.
1328 *transforms, lookup_name = lookups or ["exact"]
1329 if transforms:
1330 if isinstance(lhs, MultiColSource):
1331 raise FieldError(
1332 "Transforms are not supported on multi-column relations."
1333 )
1334 # At this point, lhs must be BaseExpression
1335 for name in transforms:
1336 lhs = self.try_transform(lhs, name)
1337 # First try get_lookup() so that the lookup takes precedence if the lhs
1338 # supports both transform and lookup for the name.
1339 lookup_class = lhs.get_lookup(lookup_name)
1340 if not lookup_class:
1341 # A lookup wasn't found. Try to interpret the name as a transform
1342 # and do an Exact lookup against it.
1343 if isinstance(lhs, MultiColSource):
1344 raise FieldError(
1345 "Transforms are not supported on multi-column relations."
1346 )
1347 lhs = self.try_transform(lhs, lookup_name)
1348 lookup_name = "exact"
1349 lookup_class = lhs.get_lookup(lookup_name)
1350 if not lookup_class:
1351 return
1352
1353 lookup = lookup_class(lhs, rhs)
1354 # Interpret '__exact=None' as the sql 'is NULL'; otherwise, reject all
1355 # uses of None as a query value unless the lookup supports it.
1356 if lookup.rhs is None and not lookup.can_use_none_as_rhs:
1357 if lookup_name not in ("exact", "iexact"):
1358 raise ValueError("Cannot use None as a query value")
1359 isnull_lookup = lhs.get_lookup("isnull")
1360 assert isnull_lookup is not None
1361 return isnull_lookup(lhs, True)
1362
1363 return lookup
1364
1365 def try_transform(self, lhs: BaseExpression, name: str) -> BaseExpression:
1366 """
1367 Helper method for build_lookup(). Try to fetch and initialize
1368 a transform for name parameter from lhs.
1369 """
1370 transform_class = lhs.get_transform(name)
1371 if transform_class:
1372 return transform_class(lhs)
1373 else:
1374 output_field = lhs.output_field.__class__
1375 suggested_lookups = difflib.get_close_matches(
1376 name, output_field.get_lookups()
1377 )
1378 if suggested_lookups:
1379 suggestion = ", perhaps you meant {}?".format(
1380 " or ".join(suggested_lookups)
1381 )
1382 else:
1383 suggestion = "."
1384 raise FieldError(
1385 f"Unsupported lookup '{name}' for {output_field.__name__} or join on the field not "
1386 f"permitted{suggestion}"
1387 )
1388
1389 def build_filter(
1390 self,
1391 filter_expr: tuple[str, Any] | Q | BaseExpression,
1392 branch_negated: bool = False,
1393 current_negated: bool = False,
1394 can_reuse: set[str] | None = None,
1395 allow_joins: bool = True,
1396 split_subq: bool = True,
1397 reuse_with_filtered_relation: bool = False,
1398 check_filterable: bool = True,
1399 summarize: bool = False,
1400 ) -> tuple[WhereNode, set[str] | tuple[()]]:
1401 from plain.models.fields.related import RelatedField
1402
1403 """
1404 Build a WhereNode for a single filter clause but don't add it
1405 to this Query. Query.add_q() will then add this filter to the where
1406 Node.
1407
1408 The 'branch_negated' tells us if the current branch contains any
1409 negations. This will be used to determine if subqueries are needed.
1410
1411 The 'current_negated' is used to determine if the current filter is
1412 negated or not and this will be used to determine if IS NULL filtering
1413 is needed.
1414
1415 The difference between current_negated and branch_negated is that
1416 branch_negated is set on first negation, but current_negated is
1417 flipped for each negation.
1418
1419 Note that add_filter will not do any negating itself, that is done
1420 upper in the code by add_q().
1421
1422 The 'can_reuse' is a set of reusable joins for multijoins.
1423
1424 If 'reuse_with_filtered_relation' is True, then only joins in can_reuse
1425 will be reused.
1426
1427 The method will create a filter clause that can be added to the current
1428 query. However, if the filter isn't added to the query then the caller
1429 is responsible for unreffing the joins used.
1430 """
1431 if isinstance(filter_expr, dict):
1432 raise FieldError("Cannot parse keyword query as dict")
1433 if isinstance(filter_expr, Q):
1434 return self._add_q(
1435 filter_expr,
1436 branch_negated=branch_negated,
1437 current_negated=current_negated,
1438 used_aliases=can_reuse,
1439 allow_joins=allow_joins,
1440 split_subq=split_subq,
1441 check_filterable=check_filterable,
1442 summarize=summarize,
1443 )
1444 if isinstance(filter_expr, ResolvableExpression):
1445 if not getattr(filter_expr, "conditional", False):
1446 raise TypeError("Cannot filter against a non-conditional expression.")
1447 condition = filter_expr.resolve_expression( # type: ignore[call-non-callable]
1448 self, allow_joins=allow_joins, summarize=summarize
1449 )
1450 if not isinstance(condition, Lookup):
1451 condition = self.build_lookup(["exact"], condition, True)
1452 return WhereNode([condition], connector=AND), set()
1453 if isinstance(filter_expr, BaseExpression):
1454 raise TypeError(f"Unexpected BaseExpression type: {type(filter_expr)}")
1455 arg, value = filter_expr
1456 if not arg:
1457 raise FieldError(f"Cannot parse keyword query {arg!r}")
1458 lookups, parts, reffed_expression = self.solve_lookup_type(arg, summarize)
1459
1460 if check_filterable:
1461 self.check_filterable(reffed_expression)
1462
1463 if not allow_joins and len(parts) > 1:
1464 raise FieldError("Joined field references are not permitted in this query")
1465
1466 pre_joins = self.alias_refcount.copy()
1467 value = self.resolve_lookup_value(value, can_reuse, allow_joins)
1468 used_joins = {
1469 k for k, v in self.alias_refcount.items() if v > pre_joins.get(k, 0)
1470 }
1471
1472 if check_filterable:
1473 self.check_filterable(value)
1474
1475 if reffed_expression:
1476 condition = self.build_lookup(list(lookups), reffed_expression, value)
1477 return WhereNode([condition], connector=AND), set()
1478
1479 assert self.model is not None, "Building filters requires a model"
1480 meta = self.model._model_meta
1481 alias = self.get_initial_alias()
1482 assert alias is not None
1483 allow_many = not branch_negated or not split_subq
1484
1485 try:
1486 join_info = self.setup_joins(
1487 list(parts),
1488 meta,
1489 alias,
1490 can_reuse=can_reuse,
1491 allow_many=allow_many,
1492 reuse_with_filtered_relation=reuse_with_filtered_relation,
1493 )
1494
1495 # Prevent iterator from being consumed by check_related_objects()
1496 if isinstance(value, Iterator):
1497 value = list(value)
1498 from plain.models.fields.related import RelatedField
1499 from plain.models.fields.reverse_related import ForeignObjectRel
1500
1501 if isinstance(join_info.final_field, RelatedField | ForeignObjectRel):
1502 self.check_related_objects(join_info.final_field, value, join_info.meta)
1503
1504 # split_exclude() needs to know which joins were generated for the
1505 # lookup parts
1506 self._lookup_joins = join_info.joins
1507 except MultiJoin as e:
1508 return self.split_exclude(
1509 filter_expr,
1510 can_reuse or set(),
1511 e.names_with_path,
1512 )
1513
1514 # Update used_joins before trimming since they are reused to determine
1515 # which joins could be later promoted to INNER.
1516 used_joins.update(join_info.joins)
1517 targets, alias, join_list = self.trim_joins(
1518 join_info.targets, join_info.joins, join_info.path
1519 )
1520 if can_reuse is not None:
1521 can_reuse.update(join_list)
1522
1523 if isinstance(join_info.final_field, RelatedField | ForeignObjectRel):
1524 if len(targets) == 1:
1525 col = self._get_col(targets[0], join_info.final_field, alias)
1526 else:
1527 col = MultiColSource(
1528 alias, targets, join_info.targets, join_info.final_field
1529 )
1530 else:
1531 col = self._get_col(targets[0], join_info.final_field, alias)
1532
1533 condition = self.build_lookup(list(lookups), col, value)
1534 assert condition is not None
1535 lookup_type = condition.lookup_name
1536 clause = WhereNode([condition], connector=AND)
1537
1538 require_outer = (
1539 lookup_type == "isnull" and condition.rhs is True and not current_negated
1540 )
1541 if (
1542 current_negated
1543 and (lookup_type != "isnull" or condition.rhs is False)
1544 and condition.rhs is not None
1545 ):
1546 require_outer = True
1547 if lookup_type != "isnull":
1548 # The condition added here will be SQL like this:
1549 # NOT (col IS NOT NULL), where the first NOT is added in
1550 # upper layers of code. The reason for addition is that if col
1551 # is null, then col != someval will result in SQL "unknown"
1552 # which isn't the same as in Python. The Python None handling
1553 # is wanted, and it can be gotten by
1554 # (col IS NULL OR col != someval)
1555 # <=>
1556 # NOT (col IS NOT NULL AND col = someval).
1557 if (
1558 self.is_nullable(targets[0])
1559 or self.alias_map[join_list[-1]].join_type == LOUTER
1560 ):
1561 lookup_class = targets[0].get_lookup("isnull")
1562 assert lookup_class is not None
1563 col = self._get_col(targets[0], join_info.targets[0], alias)
1564 clause.add(lookup_class(col, False), AND)
1565 # If someval is a nullable column, someval IS NOT NULL is
1566 # added.
1567 if isinstance(value, Col) and self.is_nullable(value.target):
1568 lookup_class = value.target.get_lookup("isnull")
1569 assert lookup_class is not None
1570 clause.add(lookup_class(value, False), AND)
1571 return clause, used_joins if not require_outer else ()
1572
1573 def add_filter(self, filter_lhs: str, filter_rhs: Any) -> None:
1574 self.add_q(Q((filter_lhs, filter_rhs)))
1575
1576 def add_q(self, q_object: Q) -> None:
1577 """
1578 A preprocessor for the internal _add_q(). Responsible for doing final
1579 join promotion.
1580 """
1581 # For join promotion this case is doing an AND for the added q_object
1582 # and existing conditions. So, any existing inner join forces the join
1583 # type to remain inner. Existing outer joins can however be demoted.
1584 # (Consider case where rel_a is LOUTER and rel_a__col=1 is added - if
1585 # rel_a doesn't produce any rows, then the whole condition must fail.
1586 # So, demotion is OK.
1587 existing_inner = {
1588 a for a in self.alias_map if self.alias_map[a].join_type == INNER
1589 }
1590 clause, _ = self._add_q(q_object, self.used_aliases)
1591 if clause:
1592 self.where.add(clause, AND)
1593 self.demote_joins(existing_inner)
1594
1595 def build_where(
1596 self, filter_expr: tuple[str, Any] | Q | BaseExpression
1597 ) -> WhereNode:
1598 return self.build_filter(filter_expr, allow_joins=False)[0]
1599
1600 def clear_where(self) -> None:
1601 self.where = WhereNode()
1602
1603 def _add_q(
1604 self,
1605 q_object: Q,
1606 used_aliases: set[str] | None,
1607 branch_negated: bool = False,
1608 current_negated: bool = False,
1609 allow_joins: bool = True,
1610 split_subq: bool = True,
1611 check_filterable: bool = True,
1612 summarize: bool = False,
1613 ) -> tuple[WhereNode, set[str] | tuple[()]]:
1614 """Add a Q-object to the current filter."""
1615 connector = q_object.connector
1616 current_negated ^= q_object.negated
1617 branch_negated = branch_negated or q_object.negated
1618 target_clause = WhereNode(connector=connector, negated=q_object.negated)
1619 joinpromoter = JoinPromoter(
1620 q_object.connector, len(q_object.children), current_negated
1621 )
1622 for child in q_object.children:
1623 child_clause, needed_inner = self.build_filter(
1624 child,
1625 can_reuse=used_aliases,
1626 branch_negated=branch_negated,
1627 current_negated=current_negated,
1628 allow_joins=allow_joins,
1629 split_subq=split_subq,
1630 check_filterable=check_filterable,
1631 summarize=summarize,
1632 )
1633 joinpromoter.add_votes(needed_inner)
1634 if child_clause:
1635 target_clause.add(child_clause, connector)
1636 needed_inner = joinpromoter.update_join_types(self)
1637 return target_clause, needed_inner
1638
1639 def build_filtered_relation_q(
1640 self,
1641 q_object: Q,
1642 reuse: set[str],
1643 branch_negated: bool = False,
1644 current_negated: bool = False,
1645 ) -> WhereNode:
1646 """Add a FilteredRelation object to the current filter."""
1647 connector = q_object.connector
1648 current_negated ^= q_object.negated
1649 branch_negated = branch_negated or q_object.negated
1650 target_clause = WhereNode(connector=connector, negated=q_object.negated)
1651 for child in q_object.children:
1652 if isinstance(child, Node):
1653 child_clause = self.build_filtered_relation_q(
1654 child,
1655 reuse=reuse,
1656 branch_negated=branch_negated,
1657 current_negated=current_negated,
1658 )
1659 else:
1660 child_clause, _ = self.build_filter(
1661 child,
1662 can_reuse=reuse,
1663 branch_negated=branch_negated,
1664 current_negated=current_negated,
1665 allow_joins=True,
1666 split_subq=False,
1667 reuse_with_filtered_relation=True,
1668 )
1669 target_clause.add(child_clause, connector)
1670 return target_clause
1671
1672 def add_filtered_relation(self, filtered_relation: Any, alias: str) -> None:
1673 filtered_relation.alias = alias
1674 lookups = dict(get_children_from_q(filtered_relation.condition))
1675 relation_lookup_parts, relation_field_parts, _ = self.solve_lookup_type(
1676 filtered_relation.relation_name
1677 )
1678 if relation_lookup_parts:
1679 raise ValueError(
1680 "FilteredRelation's relation_name cannot contain lookups "
1681 f"(got {filtered_relation.relation_name!r})."
1682 )
1683 for lookup in chain(lookups):
1684 lookup_parts, lookup_field_parts, _ = self.solve_lookup_type(lookup)
1685 shift = 2 if not lookup_parts else 1
1686 lookup_field_path = lookup_field_parts[:-shift]
1687 for idx, lookup_field_part in enumerate(lookup_field_path):
1688 if len(relation_field_parts) > idx:
1689 if relation_field_parts[idx] != lookup_field_part:
1690 raise ValueError(
1691 "FilteredRelation's condition doesn't support "
1692 f"relations outside the {filtered_relation.relation_name!r} (got {lookup!r})."
1693 )
1694 else:
1695 raise ValueError(
1696 "FilteredRelation's condition doesn't support nested "
1697 f"relations deeper than the relation_name (got {lookup!r} for "
1698 f"{filtered_relation.relation_name!r})."
1699 )
1700 self._filtered_relations[filtered_relation.alias] = filtered_relation
1701
1702 def names_to_path(
1703 self,
1704 names: list[str],
1705 meta: Meta,
1706 allow_many: bool = True,
1707 fail_on_missing: bool = False,
1708 ) -> tuple[list[Any], Field | ForeignObjectRel, tuple[Field, ...], list[str]]:
1709 from plain.models.fields.related import RelatedField
1710
1711 """
1712 Walk the list of names and turns them into PathInfo tuples. A single
1713 name in 'names' can generate multiple PathInfos (m2m, for example).
1714
1715 'names' is the path of names to travel, 'meta' is the Meta we
1716 start the name resolving from, 'allow_many' is as for setup_joins().
1717 If fail_on_missing is set to True, then a name that can't be resolved
1718 will generate a FieldError.
1719
1720 Return a list of PathInfo tuples. In addition return the final field
1721 (the last used join field) and target (which is a field guaranteed to
1722 contain the same value as the final field). Finally, return those names
1723 that weren't found (which are likely transforms and the final lookup).
1724 """
1725 path, names_with_path = [], []
1726 for pos, name in enumerate(names):
1727 cur_names_with_path = (name, [])
1728
1729 field = None
1730 filtered_relation = None
1731 try:
1732 if meta is None:
1733 raise FieldDoesNotExist
1734 field = meta.get_field(name)
1735 except FieldDoesNotExist:
1736 if name in self.annotation_select:
1737 field = self.annotation_select[name].output_field
1738 elif name in self._filtered_relations and pos == 0:
1739 filtered_relation = self._filtered_relations[name]
1740 if LOOKUP_SEP in filtered_relation.relation_name:
1741 parts = filtered_relation.relation_name.split(LOOKUP_SEP)
1742 filtered_relation_path, field, _, _ = self.names_to_path(
1743 parts,
1744 meta,
1745 allow_many,
1746 fail_on_missing,
1747 )
1748 path.extend(filtered_relation_path[:-1])
1749 else:
1750 field = meta.get_field(filtered_relation.relation_name)
1751 if field is not None:
1752 # Fields that contain one-to-many relations with a generic
1753 # model (like a GenericForeignKey) cannot generate reverse
1754 # relations and therefore cannot be used for reverse querying.
1755 if isinstance(field, RelatedField) and not field.related_model:
1756 raise FieldError(
1757 f"Field {name!r} does not generate an automatic reverse "
1758 "relation and therefore cannot be used for reverse "
1759 "querying. If it is a GenericForeignKey, consider "
1760 "adding a GenericRelation."
1761 )
1762 else:
1763 # We didn't find the current field, so move position back
1764 # one step.
1765 pos -= 1
1766 if pos == -1 or fail_on_missing:
1767 available = sorted(
1768 [
1769 *get_field_names_from_opts(meta),
1770 *self.annotation_select,
1771 *self._filtered_relations,
1772 ]
1773 )
1774 raise FieldError(
1775 "Cannot resolve keyword '{}' into field. "
1776 "Choices are: {}".format(name, ", ".join(available))
1777 )
1778 break
1779
1780 # Lazy import to avoid circular dependency
1781 from plain.models.fields.related import ForeignKeyField as FK
1782 from plain.models.fields.related import ManyToManyField as M2M
1783 from plain.models.fields.reverse_related import ForeignObjectRel as FORel
1784
1785 if isinstance(field, FK | M2M | FORel):
1786 pathinfos: list[PathInfo]
1787 if filtered_relation:
1788 pathinfos = field.get_path_info(filtered_relation)
1789 else:
1790 pathinfos = field.path_infos
1791 if not allow_many:
1792 for inner_pos, p in enumerate(pathinfos):
1793 if p.m2m:
1794 cur_names_with_path[1].extend(pathinfos[0 : inner_pos + 1])
1795 names_with_path.append(cur_names_with_path)
1796 raise MultiJoin(pos + 1, names_with_path)
1797 last = pathinfos[-1]
1798 path.extend(pathinfos)
1799 final_field = last.join_field
1800 meta = last.to_meta
1801 targets = last.target_fields
1802 cur_names_with_path[1].extend(pathinfos)
1803 names_with_path.append(cur_names_with_path)
1804 else:
1805 # Local non-relational field.
1806 final_field = field
1807 targets = (field,)
1808 if fail_on_missing and pos + 1 != len(names):
1809 raise FieldError(
1810 f"Cannot resolve keyword {names[pos + 1]!r} into field. Join on '{name}'"
1811 " not permitted."
1812 )
1813 break
1814 return path, final_field, targets, names[pos + 1 :]
1815
1816 def setup_joins(
1817 self,
1818 names: list[str],
1819 meta: Meta,
1820 alias: str,
1821 can_reuse: set[str] | None = None,
1822 allow_many: bool = True,
1823 reuse_with_filtered_relation: bool = False,
1824 ) -> JoinInfo:
1825 """
1826 Compute the necessary table joins for the passage through the fields
1827 given in 'names'. 'meta' is the Meta for the current model
1828 (which gives the table we are starting from), 'alias' is the alias for
1829 the table to start the joining from.
1830
1831 The 'can_reuse' defines the reverse foreign key joins we can reuse. It
1832 can be None in which case all joins are reusable or a set of aliases
1833 that can be reused. Note that non-reverse foreign keys are always
1834 reusable when using setup_joins().
1835
1836 The 'reuse_with_filtered_relation' can be used to force 'can_reuse'
1837 parameter and force the relation on the given connections.
1838
1839 If 'allow_many' is False, then any reverse foreign key seen will
1840 generate a MultiJoin exception.
1841
1842 Return the final field involved in the joins, the target field (used
1843 for any 'where' constraint), the final 'opts' value, the joins, the
1844 field path traveled to generate the joins, and a transform function
1845 that takes a field and alias and is equivalent to `field.get_col(alias)`
1846 in the simple case but wraps field transforms if they were included in
1847 names.
1848
1849 The target field is the field containing the concrete value. Final
1850 field can be something different, for example foreign key pointing to
1851 that value. Final field is needed for example in some value
1852 conversions (convert 'obj' in fk__id=obj to pk val using the foreign
1853 key field for example).
1854 """
1855 joins = [alias]
1856 # The transform can't be applied yet, as joins must be trimmed later.
1857 # To avoid making every caller of this method look up transforms
1858 # directly, compute transforms here and create a partial that converts
1859 # fields to the appropriate wrapped version.
1860
1861 def _base_transformer(field: Field, alias: str | None) -> Col:
1862 if not self.alias_cols:
1863 alias = None
1864 return field.get_col(alias)
1865
1866 final_transformer: TransformWrapper | Callable[[Field, str | None], Col] = (
1867 _base_transformer
1868 )
1869
1870 # Try resolving all the names as fields first. If there's an error,
1871 # treat trailing names as lookups until a field can be resolved.
1872 last_field_exception = None
1873 for pivot in range(len(names), 0, -1):
1874 try:
1875 path, final_field, targets, rest = self.names_to_path(
1876 names[:pivot],
1877 meta,
1878 allow_many,
1879 fail_on_missing=True,
1880 )
1881 except FieldError as exc:
1882 if pivot == 1:
1883 # The first item cannot be a lookup, so it's safe
1884 # to raise the field error here.
1885 raise
1886 else:
1887 last_field_exception = exc
1888 else:
1889 # The transforms are the remaining items that couldn't be
1890 # resolved into fields.
1891 transforms = names[pivot:]
1892 break
1893 for name in transforms:
1894
1895 def transform(
1896 field: Field, alias: str | None, *, name: str, previous: Any
1897 ) -> BaseExpression:
1898 try:
1899 wrapped = previous(field, alias)
1900 return self.try_transform(wrapped, name)
1901 except FieldError:
1902 # FieldError is raised if the transform doesn't exist.
1903 if isinstance(final_field, Field) and last_field_exception:
1904 raise last_field_exception
1905 else:
1906 raise
1907
1908 final_transformer = TransformWrapper(
1909 transform, name=name, previous=final_transformer
1910 )
1911 final_transformer.has_transforms = True
1912 # Then, add the path to the query's joins. Note that we can't trim
1913 # joins at this stage - we will need the information about join type
1914 # of the trimmed joins.
1915 for join in path:
1916 if join.filtered_relation:
1917 filtered_relation = join.filtered_relation.clone()
1918 table_alias = filtered_relation.alias
1919 else:
1920 filtered_relation = None
1921 table_alias = None
1922 meta = join.to_meta
1923 if join.direct:
1924 nullable = self.is_nullable(join.join_field)
1925 else:
1926 nullable = True
1927 connection = self.join_class(
1928 meta.model.model_options.db_table,
1929 alias,
1930 table_alias,
1931 INNER,
1932 join.join_field,
1933 nullable,
1934 filtered_relation=filtered_relation,
1935 )
1936 reuse = can_reuse if join.m2m or reuse_with_filtered_relation else None
1937 alias = self.join(
1938 connection,
1939 reuse=reuse,
1940 reuse_with_filtered_relation=reuse_with_filtered_relation,
1941 )
1942 joins.append(alias)
1943 if filtered_relation:
1944 filtered_relation.path = joins[:]
1945 return JoinInfo(final_field, targets, meta, joins, path, final_transformer)
1946
1947 def trim_joins(
1948 self, targets: tuple[Field, ...], joins: list[str], path: list[Any]
1949 ) -> tuple[tuple[Field, ...], str, list[str]]:
1950 """
1951 The 'target' parameter is the final field being joined to, 'joins'
1952 is the full list of join aliases. The 'path' contain the PathInfos
1953 used to create the joins.
1954
1955 Return the final target field and table alias and the new active
1956 joins.
1957
1958 Always trim any direct join if the target column is already in the
1959 previous table. Can't trim reverse joins as it's unknown if there's
1960 anything on the other side of the join.
1961 """
1962 joins = joins[:]
1963 for pos, info in enumerate(reversed(path)):
1964 if len(joins) == 1 or not info.direct:
1965 break
1966 if info.filtered_relation:
1967 break
1968 join_targets = {t.column for t in info.join_field.foreign_related_fields}
1969 cur_targets = {t.column for t in targets}
1970 if not cur_targets.issubset(join_targets):
1971 break
1972 targets_dict = {
1973 r[1].column: r[0]
1974 for r in info.join_field.related_fields
1975 if r[1].column in cur_targets
1976 }
1977 targets = tuple(targets_dict[t.column] for t in targets)
1978 self.unref_alias(joins.pop())
1979 return targets, joins[-1], joins
1980
1981 @classmethod
1982 def _gen_cols(
1983 cls,
1984 exprs: Iterable[Any],
1985 include_external: bool = False,
1986 resolve_refs: bool = True,
1987 ) -> TypingIterator[Col]:
1988 for expr in exprs:
1989 if isinstance(expr, Col):
1990 yield expr
1991 elif include_external and callable(
1992 getattr(expr, "get_external_cols", None)
1993 ):
1994 yield from expr.get_external_cols()
1995 elif hasattr(expr, "get_source_expressions"):
1996 if not resolve_refs and isinstance(expr, Ref):
1997 continue
1998 yield from cls._gen_cols(
1999 expr.get_source_expressions(),
2000 include_external=include_external,
2001 resolve_refs=resolve_refs,
2002 )
2003
2004 @classmethod
2005 def _gen_col_aliases(cls, exprs: Iterable[Any]) -> TypingIterator[str]:
2006 yield from (expr.alias for expr in cls._gen_cols(exprs))
2007
2008 def resolve_ref(
2009 self,
2010 name: str,
2011 allow_joins: bool = True,
2012 reuse: set[str] | None = None,
2013 summarize: bool = False,
2014 ) -> BaseExpression:
2015 annotation = self.annotations.get(name)
2016 if annotation is not None:
2017 if not allow_joins:
2018 for alias in self._gen_col_aliases([annotation]):
2019 if isinstance(self.alias_map[alias], Join):
2020 raise FieldError(
2021 "Joined field references are not permitted in this query"
2022 )
2023 if summarize:
2024 # Summarize currently means we are doing an aggregate() query
2025 # which is executed as a wrapped subquery if any of the
2026 # aggregate() elements reference an existing annotation. In
2027 # that case we need to return a Ref to the subquery's annotation.
2028 if name not in self.annotation_select:
2029 raise FieldError(
2030 f"Cannot aggregate over the '{name}' alias. Use annotate() "
2031 "to promote it."
2032 )
2033 return Ref(name, self.annotation_select[name])
2034 else:
2035 return annotation
2036 else:
2037 field_list = name.split(LOOKUP_SEP)
2038 annotation = self.annotations.get(field_list[0])
2039 if annotation is not None:
2040 for transform in field_list[1:]:
2041 annotation = self.try_transform(annotation, transform)
2042 return annotation
2043 initial_alias = self.get_initial_alias()
2044 assert initial_alias is not None
2045 assert self.model is not None, "Resolving field references requires a model"
2046 meta = self.model._model_meta
2047 join_info = self.setup_joins(
2048 field_list,
2049 meta,
2050 initial_alias,
2051 can_reuse=reuse,
2052 )
2053 targets, final_alias, join_list = self.trim_joins(
2054 join_info.targets, join_info.joins, join_info.path
2055 )
2056 if not allow_joins and len(join_list) > 1:
2057 raise FieldError(
2058 "Joined field references are not permitted in this query"
2059 )
2060 if len(targets) > 1:
2061 raise FieldError(
2062 "Referencing multicolumn fields with F() objects isn't supported"
2063 )
2064 # Verify that the last lookup in name is a field or a transform:
2065 # transform_function() raises FieldError if not.
2066 transform = join_info.transform_function(targets[0], final_alias)
2067 if reuse is not None:
2068 reuse.update(join_list)
2069 return transform
2070
2071 def split_exclude(
2072 self,
2073 filter_expr: tuple[str, Any],
2074 can_reuse: set[str],
2075 names_with_path: list[tuple[str, list[Any]]],
2076 ) -> tuple[WhereNode, set[str] | tuple[()]]:
2077 """
2078 When doing an exclude against any kind of N-to-many relation, we need
2079 to use a subquery. This method constructs the nested query, given the
2080 original exclude filter (filter_expr) and the portion up to the first
2081 N-to-many relation field.
2082
2083 For example, if the origin filter is ~Q(child__name='foo'), filter_expr
2084 is ('child__name', 'foo') and can_reuse is a set of joins usable for
2085 filters in the original query.
2086
2087 We will turn this into equivalent of:
2088 WHERE NOT EXISTS(
2089 SELECT 1
2090 FROM child
2091 WHERE name = 'foo' AND child.parent_id = parent.id
2092 LIMIT 1
2093 )
2094 """
2095 # Generate the inner query.
2096 query = self.__class__(self.model)
2097 query._filtered_relations = self._filtered_relations
2098 filter_lhs, filter_rhs = filter_expr
2099 if isinstance(filter_rhs, OuterRef):
2100 filter_rhs = OuterRef(filter_rhs)
2101 elif isinstance(filter_rhs, F):
2102 filter_rhs = OuterRef(filter_rhs.name)
2103 query.add_filter(filter_lhs, filter_rhs)
2104 query.clear_ordering(force=True)
2105 # Try to have as simple as possible subquery -> trim leading joins from
2106 # the subquery.
2107 trimmed_prefix, contains_louter = query.trim_start(names_with_path)
2108
2109 col = query.select[0]
2110 select_field = col.target
2111 alias = col.alias
2112 if alias in can_reuse:
2113 id_field = select_field.model._model_meta.get_forward_field("id")
2114 # Need to add a restriction so that outer query's filters are in effect for
2115 # the subquery, too.
2116 query.bump_prefix(self)
2117 lookup_class = select_field.get_lookup("exact")
2118 # Note that the query.select[0].alias is different from alias
2119 # due to bump_prefix above.
2120 lookup = lookup_class(
2121 id_field.get_col(query.select[0].alias), id_field.get_col(alias)
2122 )
2123 query.where.add(lookup, AND)
2124 query.external_aliases[alias] = True
2125
2126 lookup_class = select_field.get_lookup("exact")
2127 lookup = lookup_class(col, ResolvedOuterRef(trimmed_prefix))
2128 query.where.add(lookup, AND)
2129 condition, needed_inner = self.build_filter(Exists(query))
2130
2131 if contains_louter:
2132 or_null_condition, _ = self.build_filter(
2133 (f"{trimmed_prefix}__isnull", True),
2134 current_negated=True,
2135 branch_negated=True,
2136 can_reuse=can_reuse,
2137 )
2138 condition.add(or_null_condition, OR)
2139 # Note that the end result will be:
2140 # (outercol NOT IN innerq AND outercol IS NOT NULL) OR outercol IS NULL.
2141 # This might look crazy but due to how IN works, this seems to be
2142 # correct. If the IS NOT NULL check is removed then outercol NOT
2143 # IN will return UNKNOWN. If the IS NULL check is removed, then if
2144 # outercol IS NULL we will not match the row.
2145 return condition, needed_inner
2146
2147 def set_empty(self) -> None:
2148 self.where.add(NothingNode(), AND)
2149 for query in self.combined_queries:
2150 query.set_empty()
2151
2152 def is_empty(self) -> bool:
2153 return any(isinstance(c, NothingNode) for c in self.where.children)
2154
2155 def set_limits(self, low: int | None = None, high: int | None = None) -> None:
2156 """
2157 Adjust the limits on the rows retrieved. Use low/high to set these,
2158 as it makes it more Pythonic to read and write. When the SQL query is
2159 created, convert them to the appropriate offset and limit values.
2160
2161 Apply any limits passed in here to the existing constraints. Add low
2162 to the current low value and clamp both to any existing high value.
2163 """
2164 if high is not None:
2165 if self.high_mark is not None:
2166 self.high_mark = min(self.high_mark, self.low_mark + high)
2167 else:
2168 self.high_mark = self.low_mark + high
2169 if low is not None:
2170 if self.high_mark is not None:
2171 self.low_mark = min(self.high_mark, self.low_mark + low)
2172 else:
2173 self.low_mark = self.low_mark + low
2174
2175 if self.low_mark == self.high_mark:
2176 self.set_empty()
2177
2178 def clear_limits(self) -> None:
2179 """Clear any existing limits."""
2180 self.low_mark, self.high_mark = 0, None
2181
2182 @property
2183 def is_sliced(self) -> bool:
2184 return self.low_mark != 0 or self.high_mark is not None
2185
2186 def has_limit_one(self) -> bool:
2187 return self.high_mark is not None and (self.high_mark - self.low_mark) == 1
2188
2189 def can_filter(self) -> bool:
2190 """
2191 Return True if adding filters to this instance is still possible.
2192
2193 Typically, this means no limits or offsets have been put on the results.
2194 """
2195 return not self.is_sliced
2196
2197 def clear_select_clause(self) -> None:
2198 """Remove all fields from SELECT clause."""
2199 self.select = ()
2200 self.default_cols = False
2201 self.select_related = False
2202 self.set_extra_mask(())
2203 self.set_annotation_mask(())
2204
2205 def clear_select_fields(self) -> None:
2206 """
2207 Clear the list of fields to select (but not extra_select columns).
2208 Some queryset types completely replace any existing list of select
2209 columns.
2210 """
2211 self.select = ()
2212 self.values_select = ()
2213
2214 def add_select_col(self, col: BaseExpression, name: str) -> None:
2215 self.select += (col,)
2216 self.values_select += (name,)
2217
2218 def set_select(self, cols: list[Col] | tuple[Col, ...]) -> None:
2219 self.default_cols = False
2220 self.select = tuple(cols)
2221
2222 def add_distinct_fields(self, *field_names: str) -> None:
2223 """
2224 Add and resolve the given fields to the query's "distinct on" clause.
2225 """
2226 self.distinct_fields = field_names
2227 self.distinct = True
2228
2229 def add_fields(
2230 self, field_names: list[str] | TypingIterator[str], allow_m2m: bool = True
2231 ) -> None:
2232 """
2233 Add the given (model) fields to the select set. Add the field names in
2234 the order specified.
2235 """
2236 alias = self.get_initial_alias()
2237 assert alias is not None
2238 assert self.model is not None, "add_fields() requires a model"
2239 meta = self.model._model_meta
2240
2241 try:
2242 cols = []
2243 for name in field_names:
2244 # Join promotion note - we must not remove any rows here, so
2245 # if there is no existing joins, use outer join.
2246 join_info = self.setup_joins(
2247 name.split(LOOKUP_SEP), meta, alias, allow_many=allow_m2m
2248 )
2249 targets, final_alias, joins = self.trim_joins(
2250 join_info.targets,
2251 join_info.joins,
2252 join_info.path,
2253 )
2254 for target in targets:
2255 cols.append(join_info.transform_function(target, final_alias))
2256 if cols:
2257 self.set_select(cols)
2258 except MultiJoin:
2259 raise FieldError(f"Invalid field name: '{name}'")
2260 except FieldError:
2261 if LOOKUP_SEP in name:
2262 # For lookups spanning over relationships, show the error
2263 # from the model on which the lookup failed.
2264 raise
2265 elif name in self.annotations:
2266 raise FieldError(
2267 f"Cannot select the '{name}' alias. Use annotate() to promote it."
2268 )
2269 else:
2270 names = sorted(
2271 [
2272 *get_field_names_from_opts(meta),
2273 *self.extra,
2274 *self.annotation_select,
2275 *self._filtered_relations,
2276 ]
2277 )
2278 raise FieldError(
2279 "Cannot resolve keyword {!r} into field. Choices are: {}".format(
2280 name, ", ".join(names)
2281 )
2282 )
2283
2284 def add_ordering(self, *ordering: str | BaseExpression) -> None:
2285 """
2286 Add items from the 'ordering' sequence to the query's "order by"
2287 clause. These items are either field names (not column names) --
2288 possibly with a direction prefix ('-' or '?') -- or OrderBy
2289 expressions.
2290
2291 If 'ordering' is empty, clear all ordering from the query.
2292 """
2293 errors = []
2294 for item in ordering:
2295 if isinstance(item, str):
2296 if item == "?":
2297 continue
2298 item = item.removeprefix("-")
2299 if item in self.annotations:
2300 continue
2301 if self.extra and item in self.extra:
2302 continue
2303 # names_to_path() validates the lookup. A descriptive
2304 # FieldError will be raise if it's not.
2305 assert self.model is not None, "ORDER BY field names require a model"
2306 self.names_to_path(item.split(LOOKUP_SEP), self.model._model_meta)
2307 elif not isinstance(item, ResolvableExpression):
2308 errors.append(item)
2309 if getattr(item, "contains_aggregate", False):
2310 raise FieldError(
2311 "Using an aggregate in order_by() without also including "
2312 f"it in annotate() is not allowed: {item}"
2313 )
2314 if errors:
2315 raise FieldError(f"Invalid order_by arguments: {errors}")
2316 if ordering:
2317 self.order_by += ordering
2318 else:
2319 self.default_ordering = False
2320
2321 def clear_ordering(self, force: bool = False, clear_default: bool = True) -> None:
2322 """
2323 Remove any ordering settings if the current query allows it without
2324 side effects, set 'force' to True to clear the ordering regardless.
2325 If 'clear_default' is True, there will be no ordering in the resulting
2326 query (not even the model's default).
2327 """
2328 if not force and (
2329 self.is_sliced or self.distinct_fields or self.select_for_update
2330 ):
2331 return
2332 self.order_by = ()
2333 self.extra_order_by = ()
2334 if clear_default:
2335 self.default_ordering = False
2336
2337 def set_group_by(self, allow_aliases: bool = True) -> None:
2338 """
2339 Expand the GROUP BY clause required by the query.
2340
2341 This will usually be the set of all non-aggregate fields in the
2342 return data. If the database backend supports grouping by the
2343 primary key, and the query would be equivalent, the optimization
2344 will be made automatically.
2345 """
2346 if allow_aliases and self.values_select:
2347 # If grouping by aliases is allowed assign selected value aliases
2348 # by moving them to annotations.
2349 group_by_annotations = {}
2350 values_select = {}
2351 for alias, expr in zip(self.values_select, self.select):
2352 if isinstance(expr, Col):
2353 values_select[alias] = expr
2354 else:
2355 group_by_annotations[alias] = expr
2356 self.annotations = {**group_by_annotations, **self.annotations}
2357 self.append_annotation_mask(group_by_annotations)
2358 self.select = tuple(values_select.values())
2359 self.values_select = tuple(values_select)
2360 group_by = list(self.select)
2361 for alias, annotation in self.annotation_select.items():
2362 if not (group_by_cols := annotation.get_group_by_cols()):
2363 continue
2364 if allow_aliases and not annotation.contains_aggregate:
2365 group_by.append(Ref(alias, annotation))
2366 else:
2367 group_by.extend(group_by_cols)
2368 self.group_by = tuple(group_by)
2369
2370 def add_select_related(self, fields: list[str]) -> None:
2371 """
2372 Set up the select_related data structure so that we only select
2373 certain related models (as opposed to all models, when
2374 self.select_related=True).
2375 """
2376 if isinstance(self.select_related, bool):
2377 field_dict: dict[str, Any] = {}
2378 else:
2379 field_dict = self.select_related
2380 for field in fields:
2381 d = field_dict
2382 for part in field.split(LOOKUP_SEP):
2383 d = d.setdefault(part, {})
2384 self.select_related = field_dict
2385
2386 def add_extra(
2387 self,
2388 select: dict[str, str],
2389 select_params: list[Any] | None,
2390 where: list[str],
2391 params: list[Any],
2392 tables: list[str],
2393 order_by: tuple[str, ...],
2394 ) -> None:
2395 """
2396 Add data to the various extra_* attributes for user-created additions
2397 to the query.
2398 """
2399 if select:
2400 # We need to pair any placeholder markers in the 'select'
2401 # dictionary with their parameters in 'select_params' so that
2402 # subsequent updates to the select dictionary also adjust the
2403 # parameters appropriately.
2404 select_pairs = {}
2405 if select_params:
2406 param_iter = iter(select_params)
2407 else:
2408 param_iter = iter([])
2409 for name, entry in select.items():
2410 self.check_alias(name)
2411 entry = str(entry)
2412 entry_params = []
2413 pos = entry.find("%s")
2414 while pos != -1:
2415 if pos == 0 or entry[pos - 1] != "%":
2416 entry_params.append(next(param_iter))
2417 pos = entry.find("%s", pos + 2)
2418 select_pairs[name] = (entry, entry_params)
2419 self.extra.update(select_pairs)
2420 if where or params:
2421 self.where.add(ExtraWhere(where, params), AND)
2422 if tables:
2423 self.extra_tables += tuple(tables)
2424 if order_by:
2425 self.extra_order_by = order_by
2426
2427 def clear_deferred_loading(self) -> None:
2428 """Remove any fields from the deferred loading set."""
2429 self.deferred_loading = (frozenset(), True)
2430
2431 def add_deferred_loading(self, field_names: frozenset[str]) -> None:
2432 """
2433 Add the given list of model field names to the set of fields to
2434 exclude from loading from the database when automatic column selection
2435 is done. Add the new field names to any existing field names that
2436 are deferred (or removed from any existing field names that are marked
2437 as the only ones for immediate loading).
2438 """
2439 # Fields on related models are stored in the literal double-underscore
2440 # format, so that we can use a set datastructure. We do the foo__bar
2441 # splitting and handling when computing the SQL column names (as part of
2442 # get_columns()).
2443 existing, defer = self.deferred_loading
2444 existing_set = set(existing)
2445 if defer:
2446 # Add to existing deferred names.
2447 self.deferred_loading = frozenset(existing_set.union(field_names)), True
2448 else:
2449 # Remove names from the set of any existing "immediate load" names.
2450 if new_existing := existing_set.difference(field_names):
2451 self.deferred_loading = frozenset(new_existing), False
2452 else:
2453 self.clear_deferred_loading()
2454 if new_only := set(field_names).difference(existing_set):
2455 self.deferred_loading = frozenset(new_only), True
2456
2457 def add_immediate_loading(self, field_names: list[str] | set[str]) -> None:
2458 """
2459 Add the given list of model field names to the set of fields to
2460 retrieve when the SQL is executed ("immediate loading" fields). The
2461 field names replace any existing immediate loading field names. If
2462 there are field names already specified for deferred loading, remove
2463 those names from the new field_names before storing the new names
2464 for immediate loading. (That is, immediate loading overrides any
2465 existing immediate values, but respects existing deferrals.)
2466 """
2467 existing, defer = self.deferred_loading
2468 field_names_set = set(field_names)
2469
2470 if defer:
2471 # Remove any existing deferred names from the current set before
2472 # setting the new names.
2473 self.deferred_loading = (
2474 frozenset(field_names_set.difference(existing)),
2475 False,
2476 )
2477 else:
2478 # Replace any existing "immediate load" field names.
2479 self.deferred_loading = frozenset(field_names_set), False
2480
2481 def set_annotation_mask(
2482 self,
2483 names: set[str]
2484 | frozenset[str]
2485 | list[str]
2486 | tuple[str, ...]
2487 | dict[str, Any]
2488 | None,
2489 ) -> None:
2490 """Set the mask of annotations that will be returned by the SELECT."""
2491 if names is None:
2492 self.annotation_select_mask = None
2493 else:
2494 self.annotation_select_mask = set(names)
2495 self._annotation_select_cache = None
2496
2497 def append_annotation_mask(self, names: list[str] | dict[str, Any]) -> None:
2498 if self.annotation_select_mask is not None:
2499 self.set_annotation_mask(self.annotation_select_mask.union(names))
2500
2501 def set_extra_mask(
2502 self, names: set[str] | list[str] | tuple[str, ...] | None
2503 ) -> None:
2504 """
2505 Set the mask of extra select items that will be returned by SELECT.
2506 Don't remove them from the Query since they might be used later.
2507 """
2508 if names is None:
2509 self.extra_select_mask = None
2510 else:
2511 self.extra_select_mask = set(names)
2512 self._extra_select_cache = None
2513
2514 def set_values(self, fields: list[str]) -> None:
2515 self.select_related = False
2516 self.clear_deferred_loading()
2517 self.clear_select_fields()
2518 self.has_select_fields = True
2519
2520 if fields:
2521 field_names = []
2522 extra_names = []
2523 annotation_names = []
2524 if not self.extra and not self.annotations:
2525 # Shortcut - if there are no extra or annotations, then
2526 # the values() clause must be just field names.
2527 field_names = list(fields)
2528 else:
2529 self.default_cols = False
2530 for f in fields:
2531 if f in self.extra_select:
2532 extra_names.append(f)
2533 elif f in self.annotation_select:
2534 annotation_names.append(f)
2535 else:
2536 field_names.append(f)
2537 self.set_extra_mask(extra_names)
2538 self.set_annotation_mask(annotation_names)
2539 selected = frozenset(field_names + extra_names + annotation_names)
2540 else:
2541 assert self.model is not None, "Default values query requires a model"
2542 field_names = [f.attname for f in self.model._model_meta.concrete_fields]
2543 selected = frozenset(field_names)
2544 # Selected annotations must be known before setting the GROUP BY
2545 # clause.
2546 if self.group_by is True:
2547 assert self.model is not None, "GROUP BY True requires a model"
2548 self.add_fields(
2549 (f.attname for f in self.model._model_meta.concrete_fields), False
2550 )
2551 # Disable GROUP BY aliases to avoid orphaning references to the
2552 # SELECT clause which is about to be cleared.
2553 self.set_group_by(allow_aliases=False)
2554 self.clear_select_fields()
2555 elif self.group_by:
2556 # Resolve GROUP BY annotation references if they are not part of
2557 # the selected fields anymore.
2558 group_by = []
2559 for expr in self.group_by:
2560 if isinstance(expr, Ref) and expr.refs not in selected:
2561 expr = self.annotations[expr.refs]
2562 group_by.append(expr)
2563 self.group_by = tuple(group_by)
2564
2565 self.values_select = tuple(field_names)
2566 self.add_fields(field_names, True)
2567
2568 @property
2569 def annotation_select(self) -> dict[str, BaseExpression]:
2570 """
2571 Return the dictionary of aggregate columns that are not masked and
2572 should be used in the SELECT clause. Cache this result for performance.
2573 """
2574 if self._annotation_select_cache is not None:
2575 return self._annotation_select_cache
2576 elif not self.annotations:
2577 return {}
2578 elif self.annotation_select_mask is not None:
2579 self._annotation_select_cache = {
2580 k: v
2581 for k, v in self.annotations.items()
2582 if k in self.annotation_select_mask
2583 }
2584 return self._annotation_select_cache
2585 else:
2586 return self.annotations
2587
2588 @property
2589 def extra_select(self) -> dict[str, tuple[str, list[Any]]]:
2590 if self._extra_select_cache is not None:
2591 return self._extra_select_cache
2592 if not self.extra:
2593 return {}
2594 elif self.extra_select_mask is not None:
2595 self._extra_select_cache = {
2596 k: v for k, v in self.extra.items() if k in self.extra_select_mask
2597 }
2598 return self._extra_select_cache
2599 else:
2600 return self.extra
2601
2602 def trim_start(
2603 self, names_with_path: list[tuple[str, list[Any]]]
2604 ) -> tuple[str, bool]:
2605 """
2606 Trim joins from the start of the join path. The candidates for trim
2607 are the PathInfos in names_with_path structure that are m2m joins.
2608
2609 Also set the select column so the start matches the join.
2610
2611 This method is meant to be used for generating the subquery joins &
2612 cols in split_exclude().
2613
2614 Return a lookup usable for doing outerq.filter(lookup=self) and a
2615 boolean indicating if the joins in the prefix contain a LEFT OUTER join.
2616 _"""
2617 all_paths = []
2618 for _, paths in names_with_path:
2619 all_paths.extend(paths)
2620 contains_louter = False
2621 # Trim and operate only on tables that were generated for
2622 # the lookup part of the query. That is, avoid trimming
2623 # joins generated for F() expressions.
2624 lookup_tables = [
2625 t for t in self.alias_map if t in self._lookup_joins or t == self.base_table
2626 ]
2627 for trimmed_paths, path in enumerate(all_paths):
2628 if path.m2m:
2629 break
2630 if self.alias_map[lookup_tables[trimmed_paths + 1]].join_type == LOUTER:
2631 contains_louter = True
2632 alias = lookup_tables[trimmed_paths]
2633 self.unref_alias(alias)
2634 # The path.join_field is a Rel, lets get the other side's field
2635 join_field = path.join_field.field
2636 # Build the filter prefix.
2637 paths_in_prefix = trimmed_paths
2638 trimmed_prefix = []
2639 for name, path in names_with_path:
2640 if paths_in_prefix - len(path) < 0:
2641 break
2642 trimmed_prefix.append(name)
2643 paths_in_prefix -= len(path)
2644 trimmed_prefix.append(join_field.foreign_related_fields[0].name)
2645 trimmed_prefix = LOOKUP_SEP.join(trimmed_prefix)
2646 # Lets still see if we can trim the first join from the inner query
2647 # (that is, self). We can't do this for:
2648 # - LEFT JOINs because we would miss those rows that have nothing on
2649 # the outer side,
2650 # - INNER JOINs from filtered relations because we would miss their
2651 # filters.
2652 first_join = self.alias_map[lookup_tables[trimmed_paths + 1]]
2653 if first_join.join_type != LOUTER and not first_join.filtered_relation:
2654 select_fields = [r[0] for r in join_field.related_fields]
2655 select_alias = lookup_tables[trimmed_paths + 1]
2656 self.unref_alias(lookup_tables[trimmed_paths])
2657 else:
2658 # TODO: It might be possible to trim more joins from the start of the
2659 # inner query if it happens to have a longer join chain containing the
2660 # values in select_fields. Lets punt this one for now.
2661 select_fields = [r[1] for r in join_field.related_fields]
2662 select_alias = lookup_tables[trimmed_paths]
2663 # The found starting point is likely a join_class instead of a
2664 # base_table_class reference. But the first entry in the query's FROM
2665 # clause must not be a JOIN.
2666 for table in self.alias_map:
2667 if self.alias_refcount[table] > 0:
2668 self.alias_map[table] = self.base_table_class(
2669 self.alias_map[table].table_name,
2670 table,
2671 )
2672 break
2673 self.set_select([f.get_col(select_alias) for f in select_fields])
2674 return trimmed_prefix, contains_louter
2675
2676 def is_nullable(self, field: Field) -> bool:
2677 """Check if the given field should be treated as nullable."""
2678 # QuerySet does not have knowledge of which connection is going to be
2679 # used. For the single-database setup we always reference the default
2680 # connection here.
2681 return field.allow_null
2682
2683
2684def get_order_dir(field: str, default: str = "ASC") -> tuple[str, str]:
2685 """
2686 Return the field name and direction for an order specification. For
2687 example, '-foo' is returned as ('foo', 'DESC').
2688
2689 The 'default' param is used to indicate which way no prefix (or a '+'
2690 prefix) should sort. The '-' prefix always sorts the opposite way.
2691 """
2692 dirn = ORDER_DIR[default]
2693 if field[0] == "-":
2694 return field[1:], dirn[1]
2695 return field, dirn[0]
2696
2697
2698class JoinPromoter:
2699 """
2700 A class to abstract away join promotion problems for complex filter
2701 conditions.
2702 """
2703
2704 def __init__(self, connector: str, num_children: int, negated: bool):
2705 self.connector = connector
2706 self.negated = negated
2707 if self.negated:
2708 if connector == AND:
2709 self.effective_connector = OR
2710 else:
2711 self.effective_connector = AND
2712 else:
2713 self.effective_connector = self.connector
2714 self.num_children = num_children
2715 # Maps of table alias to how many times it is seen as required for
2716 # inner and/or outer joins.
2717 self.votes = Counter()
2718
2719 def __repr__(self) -> str:
2720 return (
2721 f"{self.__class__.__qualname__}(connector={self.connector!r}, "
2722 f"num_children={self.num_children!r}, negated={self.negated!r})"
2723 )
2724
2725 def add_votes(self, votes: Any) -> None:
2726 """
2727 Add single vote per item to self.votes. Parameter can be any
2728 iterable.
2729 """
2730 self.votes.update(votes)
2731
2732 def update_join_types(self, query: Query) -> set[str]:
2733 """
2734 Change join types so that the generated query is as efficient as
2735 possible, but still correct. So, change as many joins as possible
2736 to INNER, but don't make OUTER joins INNER if that could remove
2737 results from the query.
2738 """
2739 to_promote = set()
2740 to_demote = set()
2741 # The effective_connector is used so that NOT (a AND b) is treated
2742 # similarly to (a OR b) for join promotion.
2743 for table, votes in self.votes.items():
2744 # We must use outer joins in OR case when the join isn't contained
2745 # in all of the joins. Otherwise the INNER JOIN itself could remove
2746 # valid results. Consider the case where a model with rel_a and
2747 # rel_b relations is queried with rel_a__col=1 | rel_b__col=2. Now,
2748 # if rel_a join doesn't produce any results is null (for example
2749 # reverse foreign key or null value in direct foreign key), and
2750 # there is a matching row in rel_b with col=2, then an INNER join
2751 # to rel_a would remove a valid match from the query. So, we need
2752 # to promote any existing INNER to LOUTER (it is possible this
2753 # promotion in turn will be demoted later on).
2754 if self.effective_connector == OR and votes < self.num_children:
2755 to_promote.add(table)
2756 # If connector is AND and there is a filter that can match only
2757 # when there is a joinable row, then use INNER. For example, in
2758 # rel_a__col=1 & rel_b__col=2, if either of the rels produce NULL
2759 # as join output, then the col=1 or col=2 can't match (as
2760 # NULL=anything is always false).
2761 # For the OR case, if all children voted for a join to be inner,
2762 # then we can use INNER for the join. For example:
2763 # (rel_a__col__icontains=Alex | rel_a__col__icontains=Russell)
2764 # then if rel_a doesn't produce any rows, the whole condition
2765 # can't match. Hence we can safely use INNER join.
2766 if self.effective_connector == AND or (
2767 self.effective_connector == OR and votes == self.num_children
2768 ):
2769 to_demote.add(table)
2770 # Finally, what happens in cases where we have:
2771 # (rel_a__col=1|rel_b__col=2) & rel_a__col__gte=0
2772 # Now, we first generate the OR clause, and promote joins for it
2773 # in the first if branch above. Both rel_a and rel_b are promoted
2774 # to LOUTER joins. After that we do the AND case. The OR case
2775 # voted no inner joins but the rel_a__col__gte=0 votes inner join
2776 # for rel_a. We demote it back to INNER join (in AND case a single
2777 # vote is enough). The demotion is OK, if rel_a doesn't produce
2778 # rows, then the rel_a__col__gte=0 clause can't be true, and thus
2779 # the whole clause must be false. So, it is safe to use INNER
2780 # join.
2781 # Note that in this example we could just as well have the __gte
2782 # clause and the OR clause swapped. Or we could replace the __gte
2783 # clause with an OR clause containing rel_a__col=1|rel_a__col=2,
2784 # and again we could safely demote to INNER.
2785 query.promote_joins(to_promote)
2786 query.demote_joins(to_demote)
2787 return to_demote