1import datetime
2import json
3import os
4import sys
5import traceback
6
7import requests
8
9from plain.runtime import settings
10from plain.signals import got_request_exception
11
12
13class RequestLog:
14 def __init__(self, *, request, response, exception=None):
15 self.request = request
16 self.response = response
17 self.exception = exception
18
19 @staticmethod
20 def storage_path():
21 return str(settings.PLAIN_TEMP_PATH / "dev" / "requestlog")
22
23 @classmethod
24 def replay_request(cls, name):
25 path = os.path.join(cls.storage_path(), f"{name}.json")
26 with open(path) as f:
27 data = json.load(f)
28
29 method = data["request"]["method"]
30
31 if method == "GET":
32 # Params are in absolute uri
33 request_data = data["request"]["body"].encode("utf-8")
34 elif method in ("POST", "PUT", "PATCH"):
35 if data["request"]["querydict"]:
36 request_data = data["request"]["querydict"]
37 else:
38 request_data = data["request"]["body"].encode("utf-8")
39
40 # Cookies need to be passed as a dict, so that
41 # they are passed through redirects
42 data["request"]["headers"].pop("Cookie", None)
43
44 # TODO???
45 if data["request"]["headers"].get("X-Forwarded-Proto", "") == "https,https":
46 data["request"]["headers"]["X-Forwarded-Proto"] = "https"
47
48 response = requests.request(
49 method,
50 data["request"]["absolute_uri"],
51 headers=data["request"]["headers"],
52 cookies=data["request"]["cookies"],
53 data=request_data,
54 timeout=5,
55 )
56 print("Replayed request", response)
57
58 @staticmethod
59 def load_json_logs():
60 storage_path = RequestLog.storage_path()
61 if not os.path.exists(storage_path):
62 return []
63
64 logs = []
65 filenames = os.listdir(storage_path)
66 sorted_filenames = sorted(filenames, reverse=True)
67 for filename in sorted_filenames:
68 path = os.path.join(storage_path, filename)
69 with open(path) as f:
70 log = json.load(f)
71 log["name"] = os.path.splitext(filename)[0]
72 # Convert timestamp back to datetime
73 log["timestamp"] = datetime.datetime.fromtimestamp(log["timestamp"])
74 try:
75 log["request"]["body_json"] = json.dumps(
76 json.loads(log["request"]["body"]), indent=2
77 )
78 except json.JSONDecodeError:
79 pass
80 logs.append(log)
81
82 return logs
83
84 @staticmethod
85 def delete_old_logs():
86 storage_path = RequestLog.storage_path()
87 if not os.path.exists(storage_path):
88 return
89
90 filenames = os.listdir(storage_path)
91 sorted_filenames = sorted(filenames, reverse=True)
92 for filename in sorted_filenames[settings.DEV_REQUESTS_MAX :]:
93 path = os.path.join(storage_path, filename)
94 try:
95 os.remove(path)
96 except FileNotFoundError:
97 pass
98
99 @staticmethod
100 def clear():
101 storage_path = RequestLog.storage_path()
102 if not os.path.exists(storage_path):
103 return
104
105 filenames = os.listdir(storage_path)
106 for filename in filenames:
107 path = os.path.join(storage_path, filename)
108 try:
109 os.remove(path)
110 except FileNotFoundError:
111 pass
112
113 def save(self):
114 storage_path = self.storage_path()
115 if not os.path.exists(storage_path):
116 os.makedirs(storage_path)
117
118 timestamp = datetime.datetime.now().timestamp()
119 filename = f"{timestamp}.json"
120 path = os.path.join(storage_path, filename)
121 with open(path, "w+") as f:
122 json.dump(self.as_dict(), f, indent=2)
123
124 self.delete_old_logs()
125
126 def as_dict(self):
127 return {
128 "timestamp": datetime.datetime.now().timestamp(),
129 "request": self.request_as_dict(self.request),
130 "response": self.response_as_dict(self.response),
131 "exception": self.exception_as_dict(self.exception),
132 }
133
134 @staticmethod
135 def request_as_dict(request):
136 return {
137 "method": request.method,
138 "path": request.path,
139 "full_path": request.get_full_path(),
140 "querydict": request.data.dict()
141 if request.method == "POST"
142 else request.query_params.dict(),
143 "cookies": request.cookies,
144 # files?
145 "absolute_uri": request.build_absolute_uri(),
146 "body": request.body.decode("utf-8"),
147 "headers": dict(request.headers),
148 }
149
150 @staticmethod
151 def response_as_dict(response):
152 try:
153 content = response.content.decode("utf-8")
154 except AttributeError:
155 content = "<streaming_content>"
156
157 return {
158 "status_code": response.status_code,
159 "headers": dict(response.headers),
160 "content": content,
161 }
162
163 @staticmethod
164 def exception_as_dict(exception):
165 if not exception:
166 return None
167
168 tb_string = "".join(traceback.format_tb(exception.__traceback__))
169
170 try:
171 args = json.dumps(exception.args)
172 except TypeError:
173 args = str(exception.args)
174
175 return {
176 "type": type(exception).__name__,
177 "str": str(exception),
178 "args": args,
179 "traceback": tb_string,
180 }
181
182
183def should_capture_request(request):
184 if not settings.DEBUG:
185 return False
186
187 if request.resolver_match and request.resolver_match.namespace == "dev":
188 return False
189
190 if request.path in settings.DEV_REQUESTS_IGNORE_PATHS:
191 return False
192
193 # This could be an attribute set on request or response
194 # or something more dynamic
195 if "querystats" in request.query_params:
196 return False
197
198 return True
199
200
201class RequestsMiddleware:
202 def __init__(self, get_response):
203 self.get_response = get_response
204 self.exception = None # If an exception occurs, we want to remember it
205
206 got_request_exception.connect(self.store_exception)
207
208 def __call__(self, request):
209 # Process it first, so we know the resolver_match
210 response = self.get_response(request)
211
212 if should_capture_request(request):
213 RequestLog(
214 request=request, response=response, exception=self.exception
215 ).save()
216
217 return response
218
219 def store_exception(self, **kwargs):
220 """
221 The signal calls this at the right time,
222 so we can use sys.exxception to capture.
223 """
224 self.exception = sys.exception()