Plain is headed towards 1.0! Subscribe for development updates →

  1import datetime
  2import json
  3import os
  4import sys
  5import traceback
  6
  7import requests
  8
  9from plain.runtime import settings
 10from plain.signals import got_request_exception
 11
 12
 13class RequestLog:
 14    def __init__(self, *, request, response, exception=None):
 15        self.request = request
 16        self.response = response
 17        self.exception = exception
 18
 19    @staticmethod
 20    def storage_path():
 21        return str(settings.PLAIN_TEMP_PATH / "dev" / "requestlog")
 22
 23    @classmethod
 24    def replay_request(cls, name):
 25        path = os.path.join(cls.storage_path(), f"{name}.json")
 26        with open(path) as f:
 27            data = json.load(f)
 28
 29        method = data["request"]["method"]
 30
 31        if method == "GET":
 32            # Params are in absolute uri
 33            request_data = data["request"]["body"].encode("utf-8")
 34        elif method in ("POST", "PUT", "PATCH"):
 35            if data["request"]["querydict"]:
 36                request_data = data["request"]["querydict"]
 37            else:
 38                request_data = data["request"]["body"].encode("utf-8")
 39
 40        # Cookies need to be passed as a dict, so that
 41        # they are passed through redirects
 42        data["request"]["headers"].pop("Cookie", None)
 43
 44        # TODO???
 45        if data["request"]["headers"].get("X-Forwarded-Proto", "") == "https,https":
 46            data["request"]["headers"]["X-Forwarded-Proto"] = "https"
 47
 48        response = requests.request(
 49            method,
 50            data["request"]["absolute_uri"],
 51            headers=data["request"]["headers"],
 52            cookies=data["request"]["cookies"],
 53            data=request_data,
 54            timeout=5,
 55        )
 56        print("Replayed request", response)
 57
 58    @staticmethod
 59    def load_json_logs():
 60        storage_path = RequestLog.storage_path()
 61        if not os.path.exists(storage_path):
 62            return []
 63
 64        logs = []
 65        filenames = os.listdir(storage_path)
 66        sorted_filenames = sorted(filenames, reverse=True)
 67        for filename in sorted_filenames:
 68            path = os.path.join(storage_path, filename)
 69            with open(path) as f:
 70                log = json.load(f)
 71                log["name"] = os.path.splitext(filename)[0]
 72                # Convert timestamp back to datetime
 73                log["timestamp"] = datetime.datetime.fromtimestamp(log["timestamp"])
 74                try:
 75                    log["request"]["body_json"] = json.dumps(
 76                        json.loads(log["request"]["body"]), indent=2
 77                    )
 78                except json.JSONDecodeError:
 79                    pass
 80                logs.append(log)
 81
 82        return logs
 83
 84    @staticmethod
 85    def delete_old_logs():
 86        storage_path = RequestLog.storage_path()
 87        if not os.path.exists(storage_path):
 88            return
 89
 90        filenames = os.listdir(storage_path)
 91        sorted_filenames = sorted(filenames, reverse=True)
 92        for filename in sorted_filenames[settings.DEV_REQUESTS_MAX :]:
 93            path = os.path.join(storage_path, filename)
 94            try:
 95                os.remove(path)
 96            except FileNotFoundError:
 97                pass
 98
 99    @staticmethod
100    def clear():
101        storage_path = RequestLog.storage_path()
102        if not os.path.exists(storage_path):
103            return
104
105        filenames = os.listdir(storage_path)
106        for filename in filenames:
107            path = os.path.join(storage_path, filename)
108            try:
109                os.remove(path)
110            except FileNotFoundError:
111                pass
112
113    def save(self):
114        storage_path = self.storage_path()
115        if not os.path.exists(storage_path):
116            os.makedirs(storage_path)
117
118        timestamp = datetime.datetime.now().timestamp()
119        filename = f"{timestamp}.json"
120        path = os.path.join(storage_path, filename)
121        with open(path, "w+") as f:
122            json.dump(self.as_dict(), f, indent=2)
123
124        self.delete_old_logs()
125
126    def as_dict(self):
127        return {
128            "timestamp": datetime.datetime.now().timestamp(),
129            "request": self.request_as_dict(self.request),
130            "response": self.response_as_dict(self.response),
131            "exception": self.exception_as_dict(self.exception),
132        }
133
134    @staticmethod
135    def request_as_dict(request):
136        return {
137            "method": request.method,
138            "path": request.path,
139            "full_path": request.get_full_path(),
140            "querydict": request.data.dict()
141            if request.method == "POST"
142            else request.query_params.dict(),
143            "cookies": request.cookies,
144            # files?
145            "absolute_uri": request.build_absolute_uri(),
146            "body": request.body.decode("utf-8"),
147            "headers": dict(request.headers),
148        }
149
150    @staticmethod
151    def response_as_dict(response):
152        try:
153            content = response.content.decode("utf-8")
154        except AttributeError:
155            content = "<streaming_content>"
156
157        return {
158            "status_code": response.status_code,
159            "headers": dict(response.headers),
160            "content": content,
161        }
162
163    @staticmethod
164    def exception_as_dict(exception):
165        if not exception:
166            return None
167
168        tb_string = "".join(traceback.format_tb(exception.__traceback__))
169
170        try:
171            args = json.dumps(exception.args)
172        except TypeError:
173            args = str(exception.args)
174
175        return {
176            "type": type(exception).__name__,
177            "str": str(exception),
178            "args": args,
179            "traceback": tb_string,
180        }
181
182
183def should_capture_request(request):
184    if not settings.DEBUG:
185        return False
186
187    if request.resolver_match and request.resolver_match.namespace == "dev":
188        return False
189
190    if request.path in settings.DEV_REQUESTS_IGNORE_PATHS:
191        return False
192
193    # This could be an attribute set on request or response
194    # or something more dynamic
195    if "querystats" in request.query_params:
196        return False
197
198    return True
199
200
201class RequestsMiddleware:
202    def __init__(self, get_response):
203        self.get_response = get_response
204        self.exception = None  # If an exception occurs, we want to remember it
205
206        got_request_exception.connect(self.store_exception)
207
208    def __call__(self, request):
209        # Process it first, so we know the resolver_match
210        response = self.get_response(request)
211
212        if should_capture_request(request):
213            RequestLog(
214                request=request, response=response, exception=self.exception
215            ).save()
216
217        return response
218
219    def store_exception(self, **kwargs):
220        """
221        The signal calls this at the right time,
222        so we can use sys.exxception to capture.
223        """
224        self.exception = sys.exception()