plain.dev
A single command that runs everything you need for local development.
The plain.dev
package can be installed from PyPI, and does not need to be added to INSTALLED_PACKAGES
.
plain dev
The plain dev
command does several things:
- Sets
PLAIN_CSRF_TRUSTED_ORIGINS
to localhost by default - Runs
plain preflight
to check for any issues - Executes any pending model migrations
- Starts
gunicorn
with--reload
- Runs
plain tailwind compile --watch
, ifplain.tailwind
is installed - Any custom process defined in
pyproject.toml
attool.plain.dev.run
- Necessary services (ex. Postgres) defined in
pyproject.toml
attool.plain.dev.services
Services
Use services to define databases or other processes that your app needs to be functional. The services will be started automatically in plain dev
, but also in plain pre-commit
(so preflight and tests have a database).
Ultimately, how you run your development database is up to you. But a recommended starting point is to use Docker:
# pyproject.toml
[tool.plain.dev.services]
postgres = {cmd = "docker run --name app-postgres --rm -p 54321:5432 -v $(pwd)/.plain/dev/pgdata:/var/lib/postgresql/data -e POSTGRES_PASSWORD=postgres postgres:15 postgres"}
Custom processes
Unlike services, custom processes are only run during plain dev
. This is a good place to run something like ngrok or a Plain worker, which you might need to use your local site, but don't need running for executing tests, for example.
# pyproject.toml
[tool.plain.dev.run]
ngrok = {command = "ngrok http $PORT"}
plain dev services
Starts your services by themselves.
plain pre-commit
A built-in pre-commit hook that can be installed with plain pre-commit --install
.
Runs:
- Custom commands defined in
pyproject.toml
attool.plain.pre-commit.run
plain code check
, ifplain.code
is installedpoetry check --lock
, if using Poetryplain preflight --database default
plain migrate --check
plain makemigrations --dry-run --check
plain compile
plain test
VS Code debugging
Since plain dev
runs multiple processes at once, the regular pdb debuggers don't quite work.
Instead, we include microsoft/debugpy and an attach
function to make it even easier to use VS Code's debugger.
First, import and run the debug.attach()
function:
class HomeView(TemplateView):
template_name = "home.html"
def get_template_context(self):
context = super().get_template_context()
# Make sure the debugger is attached (will need to be if runserver reloads)
from plain.dev import debug; debug.attach()
# Add a breakpoint (or use the gutter in VS Code to add one)
breakpoint()
return context
When you load the page, you'll see "Waiting for debugger to attach...".
You can then run the VS Code debugger and attach to an existing Python process, at localhost:5678.
1import datetime
2import json
3import os
4import sys
5import traceback
6
7import requests
8
9from plain.runtime import settings
10from plain.signals import got_request_exception
11
12
13class RequestLog:
14 def __init__(self, *, request, response, exception=None):
15 self.request = request
16 self.response = response
17 self.exception = exception
18
19 @staticmethod
20 def storage_path():
21 return str(settings.PLAIN_TEMP_PATH / "requestlog")
22
23 @classmethod
24 def replay_request(cls, name):
25 path = os.path.join(cls.storage_path(), f"{name}.json")
26 with open(path) as f:
27 data = json.load(f)
28
29 method = data["request"]["method"]
30
31 if method == "GET":
32 # Params are in absolute uri
33 request_data = data["request"]["body"].encode("utf-8")
34 elif method in ("POST", "PUT", "PATCH"):
35 if data["request"]["querydict"]:
36 request_data = data["request"]["querydict"]
37 else:
38 request_data = data["request"]["body"].encode("utf-8")
39
40 # Cookies need to be passed as a dict, so that
41 # they are passed through redirects
42 data["request"]["headers"].pop("Cookie", None)
43
44 # TODO???
45 if data["request"]["headers"].get("X-Forwarded-Proto", "") == "https,https":
46 data["request"]["headers"]["X-Forwarded-Proto"] = "https"
47
48 response = requests.request(
49 method,
50 data["request"]["absolute_uri"],
51 headers=data["request"]["headers"],
52 cookies=data["request"]["cookies"],
53 data=request_data,
54 timeout=5,
55 )
56 print("Replayed request", response)
57
58 @staticmethod
59 def load_json_logs():
60 storage_path = RequestLog.storage_path()
61 if not os.path.exists(storage_path):
62 return []
63
64 logs = []
65 filenames = os.listdir(storage_path)
66 sorted_filenames = sorted(filenames, reverse=True)
67 for filename in sorted_filenames:
68 path = os.path.join(storage_path, filename)
69 with open(path) as f:
70 log = json.load(f)
71 log["name"] = os.path.splitext(filename)[0]
72 # Convert timestamp back to datetime
73 log["timestamp"] = datetime.datetime.fromtimestamp(log["timestamp"])
74 try:
75 log["request"]["body_json"] = json.dumps(
76 json.loads(log["request"]["body"]), indent=2
77 )
78 except json.JSONDecodeError:
79 pass
80 logs.append(log)
81
82 return logs
83
84 @staticmethod
85 def delete_old_logs():
86 storage_path = RequestLog.storage_path()
87 if not os.path.exists(storage_path):
88 return
89
90 filenames = os.listdir(storage_path)
91 sorted_filenames = sorted(filenames, reverse=True)
92 for filename in sorted_filenames[settings.DEV_REQUESTS_MAX :]:
93 path = os.path.join(storage_path, filename)
94 try:
95 os.remove(path)
96 except FileNotFoundError:
97 pass
98
99 @staticmethod
100 def clear():
101 storage_path = RequestLog.storage_path()
102 if not os.path.exists(storage_path):
103 return
104
105 filenames = os.listdir(storage_path)
106 for filename in filenames:
107 path = os.path.join(storage_path, filename)
108 try:
109 os.remove(path)
110 except FileNotFoundError:
111 pass
112
113 def save(self):
114 storage_path = self.storage_path()
115 if not os.path.exists(storage_path):
116 os.makedirs(storage_path)
117
118 timestamp = datetime.datetime.now().timestamp()
119 filename = f"{timestamp}.json"
120 path = os.path.join(storage_path, filename)
121 with open(path, "w+") as f:
122 json.dump(self.as_dict(), f, indent=2)
123
124 self.delete_old_logs()
125
126 def as_dict(self):
127 return {
128 "timestamp": datetime.datetime.now().timestamp(),
129 "request": self.request_as_dict(self.request),
130 "response": self.response_as_dict(self.response),
131 "exception": self.exception_as_dict(self.exception),
132 }
133
134 @staticmethod
135 def request_as_dict(request):
136 return {
137 "method": request.method,
138 "path": request.path,
139 "full_path": request.get_full_path(),
140 "querydict": request.POST.dict()
141 if request.method == "POST"
142 else request.GET.dict(),
143 "cookies": request.COOKIES,
144 # files?
145 "absolute_uri": request.build_absolute_uri(),
146 "body": request.body.decode("utf-8"),
147 "headers": dict(request.headers),
148 }
149
150 @staticmethod
151 def response_as_dict(response):
152 try:
153 content = response.content.decode("utf-8")
154 except AttributeError:
155 content = "<streaming_content>"
156
157 return {
158 "status_code": response.status_code,
159 "headers": dict(response.headers),
160 "content": content,
161 }
162
163 @staticmethod
164 def exception_as_dict(exception):
165 if not exception:
166 return None
167
168 tb_string = "".join(traceback.format_tb(exception.__traceback__))
169
170 try:
171 args = json.dumps(exception.args)
172 except TypeError:
173 args = str(exception.args)
174
175 return {
176 "type": type(exception).__name__,
177 "str": str(exception),
178 "args": args,
179 "traceback": tb_string,
180 }
181
182
183def should_capture_request(request):
184 if not settings.DEBUG:
185 return False
186
187 if request.resolver_match and request.resolver_match.default_namespace == "dev":
188 return False
189
190 if request.path in settings.DEV_REQUESTS_IGNORE_PATHS:
191 return False
192
193 # This could be an attribute set on request or response
194 # or something more dynamic
195 if "querystats" in request.GET:
196 return False
197
198 return True
199
200
201class RequestsMiddleware:
202 def __init__(self, get_response):
203 self.get_response = get_response
204 self.exception = None # If an exception occurs, we want to remember it
205
206 got_request_exception.connect(self.store_exception)
207
208 def __call__(self, request):
209 # Process it first, so we know the resolver_match
210 response = self.get_response(request)
211
212 if should_capture_request(request):
213 RequestLog(
214 request=request, response=response, exception=self.exception
215 ).save()
216
217 return response
218
219 def store_exception(self, **kwargs):
220 """
221 The signal calls this at the right time,
222 so we can use sys.exxception to capture.
223 """
224 self.exception = sys.exception()