Plain is headed towards 1.0! Subscribe for development updates →

plain.dev

A single command that runs everything you need for local development.

Plain dev command example

The plain.dev package can be installed from PyPI, and does not need to be added to INSTALLED_PACKAGES.

plain dev

The plain dev command does several things:

  • Sets PLAIN_CSRF_TRUSTED_ORIGINS to localhost by default
  • Runs plain preflight to check for any issues
  • Executes any pending model migrations
  • Starts gunicorn with --reload
  • Runs plain tailwind compile --watch, if plain.tailwind is installed
  • Any custom process defined in pyproject.toml at tool.plain.dev.run
  • Necessary services (ex. Postgres) defined in pyproject.toml at tool.plain.dev.services

Services

Use services to define databases or other processes that your app needs to be functional. The services will be started automatically in plain dev, but also in plain pre-commit (so preflight and tests have a database).

Ultimately, how you run your development database is up to you. But a recommended starting point is to use Docker:

# pyproject.toml
[tool.plain.dev.services]
postgres = {cmd = "docker run --name app-postgres --rm -p 54321:5432 -v $(pwd)/.plain/dev/pgdata:/var/lib/postgresql/data -e POSTGRES_PASSWORD=postgres postgres:15 postgres"}

Custom processes

Unlike services, custom processes are only run during plain dev. This is a good place to run something like ngrok or a Plain worker, which you might need to use your local site, but don't need running for executing tests, for example.

# pyproject.toml
[tool.plain.dev.run]
ngrok = {command = "ngrok http $PORT"}

plain dev services

Starts your services by themselves.

plain pre-commit

A built-in pre-commit hook that can be installed with plain pre-commit --install.

Runs:

  • Custom commands defined in pyproject.toml at tool.plain.pre-commit.run
  • plain code check, if plain.code is installed
  • poetry check --lock, if using Poetry
  • plain preflight --database default
  • plain migrate --check
  • plain makemigrations --dry-run --check
  • plain compile
  • plain test

VS Code debugging

Debug Plain with VS Code

Since plain dev runs multiple processes at once, the regular pdb debuggers don't quite work.

Instead, we include microsoft/debugpy and an attach function to make it even easier to use VS Code's debugger.

First, import and run the debug.attach() function:

class HomeView(TemplateView):
    template_name = "home.html"

    def get_template_context(self):
        context = super().get_template_context()

        # Make sure the debugger is attached (will need to be if runserver reloads)
        from plain.dev import debug; debug.attach()

        # Add a breakpoint (or use the gutter in VS Code to add one)
        breakpoint()

        return context

When you load the page, you'll see "Waiting for debugger to attach...".

You can then run the VS Code debugger and attach to an existing Python process, at localhost:5678.

  1import datetime
  2import json
  3import os
  4import sys
  5import traceback
  6
  7import requests
  8
  9from plain.runtime import settings
 10from plain.signals import got_request_exception
 11
 12
 13class RequestLog:
 14    def __init__(self, *, request, response, exception=None):
 15        self.request = request
 16        self.response = response
 17        self.exception = exception
 18
 19    @staticmethod
 20    def storage_path():
 21        return str(settings.PLAIN_TEMP_PATH / "requestlog")
 22
 23    @classmethod
 24    def replay_request(cls, name):
 25        path = os.path.join(cls.storage_path(), f"{name}.json")
 26        with open(path) as f:
 27            data = json.load(f)
 28
 29        method = data["request"]["method"]
 30
 31        if method == "GET":
 32            # Params are in absolute uri
 33            request_data = data["request"]["body"].encode("utf-8")
 34        elif method in ("POST", "PUT", "PATCH"):
 35            if data["request"]["querydict"]:
 36                request_data = data["request"]["querydict"]
 37            else:
 38                request_data = data["request"]["body"].encode("utf-8")
 39
 40        # Cookies need to be passed as a dict, so that
 41        # they are passed through redirects
 42        data["request"]["headers"].pop("Cookie", None)
 43
 44        # TODO???
 45        if data["request"]["headers"].get("X-Forwarded-Proto", "") == "https,https":
 46            data["request"]["headers"]["X-Forwarded-Proto"] = "https"
 47
 48        response = requests.request(
 49            method,
 50            data["request"]["absolute_uri"],
 51            headers=data["request"]["headers"],
 52            cookies=data["request"]["cookies"],
 53            data=request_data,
 54            timeout=5,
 55        )
 56        print("Replayed request", response)
 57
 58    @staticmethod
 59    def load_json_logs():
 60        storage_path = RequestLog.storage_path()
 61        if not os.path.exists(storage_path):
 62            return []
 63
 64        logs = []
 65        filenames = os.listdir(storage_path)
 66        sorted_filenames = sorted(filenames, reverse=True)
 67        for filename in sorted_filenames:
 68            path = os.path.join(storage_path, filename)
 69            with open(path) as f:
 70                log = json.load(f)
 71                log["name"] = os.path.splitext(filename)[0]
 72                # Convert timestamp back to datetime
 73                log["timestamp"] = datetime.datetime.fromtimestamp(log["timestamp"])
 74                try:
 75                    log["request"]["body_json"] = json.dumps(
 76                        json.loads(log["request"]["body"]), indent=2
 77                    )
 78                except json.JSONDecodeError:
 79                    pass
 80                logs.append(log)
 81
 82        return logs
 83
 84    @staticmethod
 85    def delete_old_logs():
 86        storage_path = RequestLog.storage_path()
 87        if not os.path.exists(storage_path):
 88            return
 89
 90        filenames = os.listdir(storage_path)
 91        sorted_filenames = sorted(filenames, reverse=True)
 92        for filename in sorted_filenames[settings.DEV_REQUESTS_MAX :]:
 93            path = os.path.join(storage_path, filename)
 94            try:
 95                os.remove(path)
 96            except FileNotFoundError:
 97                pass
 98
 99    @staticmethod
100    def clear():
101        storage_path = RequestLog.storage_path()
102        if not os.path.exists(storage_path):
103            return
104
105        filenames = os.listdir(storage_path)
106        for filename in filenames:
107            path = os.path.join(storage_path, filename)
108            try:
109                os.remove(path)
110            except FileNotFoundError:
111                pass
112
113    def save(self):
114        storage_path = self.storage_path()
115        if not os.path.exists(storage_path):
116            os.makedirs(storage_path)
117
118        timestamp = datetime.datetime.now().timestamp()
119        filename = f"{timestamp}.json"
120        path = os.path.join(storage_path, filename)
121        with open(path, "w+") as f:
122            json.dump(self.as_dict(), f, indent=2)
123
124        self.delete_old_logs()
125
126    def as_dict(self):
127        return {
128            "timestamp": datetime.datetime.now().timestamp(),
129            "request": self.request_as_dict(self.request),
130            "response": self.response_as_dict(self.response),
131            "exception": self.exception_as_dict(self.exception),
132        }
133
134    @staticmethod
135    def request_as_dict(request):
136        return {
137            "method": request.method,
138            "path": request.path,
139            "full_path": request.get_full_path(),
140            "querydict": request.POST.dict()
141            if request.method == "POST"
142            else request.GET.dict(),
143            "cookies": request.COOKIES,
144            # files?
145            "absolute_uri": request.build_absolute_uri(),
146            "body": request.body.decode("utf-8"),
147            "headers": dict(request.headers),
148        }
149
150    @staticmethod
151    def response_as_dict(response):
152        try:
153            content = response.content.decode("utf-8")
154        except AttributeError:
155            content = "<streaming_content>"
156
157        return {
158            "status_code": response.status_code,
159            "headers": dict(response.headers),
160            "content": content,
161        }
162
163    @staticmethod
164    def exception_as_dict(exception):
165        if not exception:
166            return None
167
168        tb_string = "".join(traceback.format_tb(exception.__traceback__))
169
170        try:
171            args = json.dumps(exception.args)
172        except TypeError:
173            args = str(exception.args)
174
175        return {
176            "type": type(exception).__name__,
177            "str": str(exception),
178            "args": args,
179            "traceback": tb_string,
180        }
181
182
183def should_capture_request(request):
184    if not settings.DEBUG:
185        return False
186
187    if request.resolver_match and request.resolver_match.default_namespace == "dev":
188        return False
189
190    if request.path in settings.DEV_REQUESTS_IGNORE_PATHS:
191        return False
192
193    # This could be an attribute set on request or response
194    # or something more dynamic
195    if "querystats" in request.GET:
196        return False
197
198    return True
199
200
201class RequestsMiddleware:
202    def __init__(self, get_response):
203        self.get_response = get_response
204        self.exception = None  # If an exception occurs, we want to remember it
205
206        got_request_exception.connect(self.store_exception)
207
208    def __call__(self, request):
209        # Process it first, so we know the resolver_match
210        response = self.get_response(request)
211
212        if should_capture_request(request):
213            RequestLog(
214                request=request, response=response, exception=self.exception
215            ).save()
216
217        return response
218
219    def store_exception(self, **kwargs):
220        """
221        The signal calls this at the right time,
222        so we can use sys.exxception to capture.
223        """
224        self.exception = sys.exception()