Plain is headed towards 1.0! Subscribe for development updates →

  1import json
  2import logging
  3import re
  4
  5from plain.json import PlainJSONEncoder
  6from plain.models import db_connection
  7from plain.runtime import settings
  8
  9from .core import QueryStats
 10
 11try:
 12    import psycopg
 13except ImportError:
 14    psycopg = None
 15
 16logger = logging.getLogger(__name__)
 17
 18
 19class QueryStatsJSONEncoder(PlainJSONEncoder):
 20    def default(self, obj):
 21        try:
 22            return super().default(obj)
 23        except TypeError:
 24            if psycopg and isinstance(obj, psycopg.types.json.Json):
 25                return obj.obj
 26            elif psycopg and isinstance(obj, psycopg.types.json.Jsonb):
 27                return obj.obj
 28            else:
 29                raise
 30
 31
 32class QueryStatsMiddleware:
 33    def __init__(self, get_response):
 34        self.get_response = get_response
 35        self.ignore_url_patterns = [
 36            re.compile(url) for url in settings.ADMIN_QUERYSTATS_IGNORE_URLS
 37        ]
 38
 39    def should_ignore_request(self, request):
 40        for url in self.ignore_url_patterns:
 41            if url.match(request.path):
 42                return True
 43
 44        return False
 45
 46    def __call__(self, request):
 47        """
 48        Enables querystats for the current request.
 49
 50        If DEBUG or an admin, then Server-Timing headers are always added to the response.
 51        Full querystats are only stored in the session if they are manually enabled.
 52        """
 53
 54        if self.should_ignore_request(request):
 55            return self.get_response(request)
 56
 57        def is_tracking():
 58            return "querystats" in request.session
 59
 60        querystats = QueryStats(include_tracebacks=is_tracking())
 61
 62        with db_connection.execute_wrapper(querystats):
 63            is_admin = self.is_admin_request(request)
 64
 65        if settings.DEBUG or is_admin:
 66            with db_connection.execute_wrapper(querystats):
 67                response = self.get_response(request)
 68
 69            if settings.DEBUG:
 70                # TODO logging settings
 71                logger.debug("Querystats: %s", querystats)
 72
 73            # Make current querystats available on the current page
 74            # by using the server timing API which can be parsed client-side
 75            response.headers["Server-Timing"] = querystats.as_server_timing()
 76
 77            if is_tracking() and querystats.num_queries > 0:
 78                request.session["querystats"][request.unique_id] = json.dumps(
 79                    querystats.as_context_dict(request), cls=QueryStatsJSONEncoder
 80                )
 81
 82                # Keep 30 requests max, in case it is left on by accident
 83                if len(request.session["querystats"]) > 30:
 84                    del request.session["querystats"][
 85                        list(request.session["querystats"])[0]
 86                    ]
 87
 88                # Did a deeper modification to the session dict...
 89                request.session.modified = True
 90
 91            return response
 92
 93        else:
 94            return self.get_response(request)
 95
 96    @staticmethod
 97    def is_admin_request(request):
 98        if getattr(request, "impersonator", None):
 99            # Support for impersonation (still want the real admin user to see the querystats)
100            return request.impersonator and request.impersonator.is_admin
101
102        return hasattr(request, "user") and request.user and request.user.is_admin