1import json
2import logging
3import re
4
5from plain.json import PlainJSONEncoder
6from plain.models import db_connection
7from plain.runtime import settings
8
9from .core import QueryStats
10
11try:
12 import psycopg
13except ImportError:
14 psycopg = None
15
16logger = logging.getLogger(__name__)
17
18
19class QueryStatsJSONEncoder(PlainJSONEncoder):
20 def default(self, obj):
21 try:
22 return super().default(obj)
23 except TypeError:
24 if psycopg and isinstance(obj, psycopg.types.json.Json):
25 return obj.obj
26 elif psycopg and isinstance(obj, psycopg.types.json.Jsonb):
27 return obj.obj
28 else:
29 raise
30
31
32class QueryStatsMiddleware:
33 def __init__(self, get_response):
34 self.get_response = get_response
35 self.ignore_url_patterns = [
36 re.compile(url) for url in settings.ADMIN_QUERYSTATS_IGNORE_URLS
37 ]
38
39 def should_ignore_request(self, request):
40 for url in self.ignore_url_patterns:
41 if url.match(request.path):
42 return True
43
44 return False
45
46 def __call__(self, request):
47 """
48 Enables querystats for the current request.
49
50 If DEBUG or an admin, then Server-Timing headers are always added to the response.
51 Full querystats are only stored in the session if they are manually enabled.
52 """
53
54 if self.should_ignore_request(request):
55 return self.get_response(request)
56
57 def is_tracking():
58 return "querystats" in request.session
59
60 querystats = QueryStats(include_tracebacks=is_tracking())
61
62 with db_connection.execute_wrapper(querystats):
63 is_admin = self.is_admin_request(request)
64
65 if settings.DEBUG or is_admin:
66 with db_connection.execute_wrapper(querystats):
67 response = self.get_response(request)
68
69 if settings.DEBUG:
70 # TODO logging settings
71 logger.debug("Querystats: %s", querystats)
72
73 # Make current querystats available on the current page
74 # by using the server timing API which can be parsed client-side
75 response.headers["Server-Timing"] = querystats.as_server_timing()
76
77 if is_tracking() and querystats.num_queries > 0:
78 request.session["querystats"][request.unique_id] = json.dumps(
79 querystats.as_context_dict(request), cls=QueryStatsJSONEncoder
80 )
81
82 # Keep 30 requests max, in case it is left on by accident
83 if len(request.session["querystats"]) > 30:
84 del request.session["querystats"][
85 list(request.session["querystats"])[0]
86 ]
87
88 # Did a deeper modification to the session dict...
89 request.session.modified = True
90
91 return response
92
93 else:
94 return self.get_response(request)
95
96 @staticmethod
97 def is_admin_request(request):
98 if getattr(request, "impersonator", None):
99 # Support for impersonation (still want the real admin user to see the querystats)
100 return request.impersonator and request.impersonator.is_admin
101
102 return hasattr(request, "user") and request.user and request.user.is_admin