Below is all of the documentation and abbreviated source code for the Plain web framework. Your job is to read and understand it, and then act as the Plain Framework Assistant and help the developer accomplish whatever they want to do next. --- # Plain **Plain is a web framework for building products with Python.** The core `plain` package provides the backbone of a Python web application (similar to [Flask](https://flask.palletsprojects.com/en/stable/)), while the additional first-party packages can power a more fully-featured database-backed app (similar to [Django](https://www.djangoproject.com/)). All Plain packages are designed to work together and use [PEP 420](https://peps.python.org/pep-0420/) to share the `plain` namespace. To quickly get started with Plain, visit [plainframework.com/start/](https://plainframework.com/start/). ## Core Modules The `plain` package includes everything you need to start handling web requests with Python: - [assets](./assets/README.md) - Serve static files and assets. - [cli](./cli/README.md) - The `plain` CLI, powered by Click. - [csrf](./csrf/README.md) - Cross-Site Request Forgery protection. - [forms](./forms/README.md) - HTML forms and form validation. - [http](./http/README.md) - HTTP request and response handling. - [logs](./logs/README.md) - Logging configuration and utilities. - [preflight](./preflight/README.md) - Preflight checks for your app. - [runtime](./runtime/README.md) - Runtime settings and configuration. - [templates](./templates/README.md) - Jinja2 templates and rendering. - [test](./test/README.md) - Test utilities and fixtures. - [urls](./urls/README.md) - URL routing and request dispatching. - [views](./views/README.md) - Class-based views and request handlers. ## Foundational Packages - [plain.models](/plain-models/plain/models/README.md) - Define and interact with your database models. - [plain.cache](/plain-cache/plain/cache/README.md) - A database-driven general purpose cache. - [plain.email](/plain-email/plain/email/README.md) - Send emails with SMTP or custom backends. - [plain.sessions](/plain-sessions/plain/sessions/README.md) - User sessions and cookies. - [plain.worker](/plain-worker/plain/worker/README.md) - Backgrounb jobs stored in the database. - [plain.api](/plain-api/plain/api/README.md) - Build APIs with Plain views. ## Auth Packages - [plain.auth](/plain-auth/plain/auth/README.md) - User authentication and authorization. - [plain.oauth](/plain-oauth/plain/oauth/README.md) - OAuth authentication and API access. - [plain.passwords](/plain-passwords/plain/passwords/README.md) - Password-based login and registration. - [plain.loginlink](/plain-loginlink/plain/loginlink/README.md) - Login links for passwordless authentication. ## Admin Packages - [plain.admin](/plain-admin/plain/admin/README.md) - An admin interface for back-office tasks. - [plain.flags](/plain-flags/plain/flags/README.md) - Feature flags. - [plain.support](/plain-support/plain/support/README.md) - Customer support forms. - [plain.redirection](/plain-redirection/plain/redirection/README.md) - Redirects managed in the database. - [plain.pageviews](/plain-pageviews/plain/pageviews/README.md) - Basic self-hosted page view tracking and reporting. ## Dev Packages - [plain.dev](/plain-dev/plain/dev/README.md) - A single command for local development. - [plain.pytest](/plain-pytest/plain/pytest/README.md) - Pytest fixtures and helpers. - [plain.code](/plain-code/plain/code/README.md) - Code formatting and linting. - [plain.tunnel](/plain-tunnel/plain/tunnel/README.md) - Expose your local server to the internet. ## Frontend Packages - [plain.tailwind](/plain-tailwind/plain/tailwind/README.md) - Tailwind CSS integration without Node.js. - [plain.htmx](/plain-htmx/plain/htmx/README.md) - HTMX integrated into views and templates. - [plain.elements](/plain-elements/plain/elements/README.md) - Server-side HTML components. - [plain.pages](/plain-pages/plain/pages/README.md) - Static pages with Markdown and Jinja2. - [plain.esbuild](/plain-esbuild/plain/esbuild/README.md) - Simple JavaScript bundling and minification. - [plain.vendor](/plain-vendor/plain/vendor/README.md) - Vendor JavaScript and CSS libraries. # Assets **Serve static assets (CSS, JS, images, etc.) directly or from a CDN.** ## Usage To serve assets, put them in `app/assets` or `app/{package}/assets`. Then include the `AssetsRouter` in your own router, typically under the `assets/` path. ```python # app/urls.py from plain.assets.urls import AssetsRouter from plain.urls import include, Router class AppRouter(Router): namespace = "" urls = [ include("assets/", AssetsRouter), # your other routes here... ] ``` Now in your template you can use the `asset()` function to get the URL, which will output the fully compiled and fingerprinted URL. ```html ``` ## Local development When you're working with `settings.DEBUG = True`, the assets will be served directly from their original location. You don't need to run `plain build` or configure anything else. ## Production deployment In production, one of your deployment steps should be to compile the assets. ```bash plain build ``` By default, this [generates "fingerprinted" and compressed versions of the assets](fingerprints.py#get_file_fingerprint), which are then served by your app. This means that a file like `main.css` will result in two new files, like `main.d0db67b.css` and `main.d0db67b.css.gz`. The purpose of fingerprinting the assets is to allow the browser to cache them indefinitely. When the content of the file changes, the fingerprint will change, and the browser will use the newer file. This cuts down on the number of requests that your app has to handle related to assets. ## Using `AssetView` directly In some situations you may want to use the `AssetView` at a custom URL, for example to serve a `favicon.ico`. You can do this quickly by using the `AssetView.as_view()` class method. ```python from plain.assets.views import AssetView from plain.urls import path, Router class AppRouter(Router): namespace = "" urls = [ path("favicon.ico", AssetView.as_view(asset_path="favicon.ico")), ] ``` ## FAQs ### How do you reference assets in Python code? ```python from plain.assets.urls import get_asset_url url = get_asset_url("css/style.css") ``` ### What if I need the files in a different location? The generated/copied files are stored in `{repo}/.plain/assets/compiled`. If you need them to be somewhere else, try simply moving them after compilation. ```bash plain build mv .plain/assets/compiled /path/to/your/static ``` ### How do I upload the assets to a CDN? The steps for this will vary, but the general idea is to compile them, and then upload the compiled assets from their [compiled location](compile.py#get_compiled_path). ```bash # Compile the assets plain build # List the newly compiled files ls .plain/assets/compiled # Upload the files to your CDN ./example-upload-to-cdn-script ``` Use the `ASSETS_BASE_URL` setting to tell the `{{ asset() }}` template function where to point. ```python # app/settings.py ASSETS_BASE_URL = "https://cdn.example.com/" ``` ### Why aren't the originals copied to the compiled directory? The default behavior is to fingerprint assets, which is an exact copy of the original file but with a different filename. The originals aren't copied over because you should generally always use this fingerprinted path (that automatically uses longer-lived caching). If you need the originals for any reason, you can use `plain build --keep-original`, though this will typically be combined with `--no-fingerprint` otherwise the fingerprinted files will still get priority in `{{ asset() }}` template calls. Note that by default, the `ASSETS_REDIRECT_ORIGINAL` setting is `True`, which will redirect requests for the original file to the fingerprinted file. # Chores **Routine maintenance tasks.** Chores are registered functions that can be run at any time to keep an app in a desirable state. ![](https://assets.plainframework.com/docs/plain-chores-run.png) A good example is the clearing of expired sessions in [`plain.sessions`](/plain-sessions/plain/sessions/chores.py) — since the sessions are stored in the database, occasionally you will want to delete any sessions that are expired and no longer in use. ```python # plain/sessions/chores.py from plain.chores import register_chore from plain.utils import timezone from .models import Session @register_chore("sessions") def clear_expired(): """ Delete sessions that have expired. """ result = Session.objects.filter(expires_at__lt=timezone.now()).delete() return f"{result[0]} expired sessions deleted" ``` ## Running chores The `plain chores run` command will execute all registered chores. When and how to run this is up to the user, but running them hourly is a safe assumption in most cases (assuming you have any chores — `plain chores list`). There are several ways you can run chores depending on your needs: - on deploy - as a [`plain.worker` scheduled job](/plain-worker/plain/worker/README.md#scheduled-jobs) - as a cron job (using any cron-like system where your app is hosted) - manually as needed ## Writing chores A chore is a function decorated with `@register_chore(chore_group_name)`. It can write a description as a docstring, and it can return a value that will be printed when the chore is run. ```python # app/chores.py from plain.chores import register_chore @register_chore("app") def chore_name(): """ A chore description can go here """ # Do a thing! return "We did it!" ``` A good chore is: - Fast - Idempotent - Recurring - Stateless If chores are written in `app/chores.py` or `{pkg}/chores.py`, then they will be imported automatically and registered. # CLI **The `plain` CLI and how to add your own commands to it.** Commands are written using [Click](https://click.palletsprojects.com/en/8.1.x/) (one of Plain's few dependencies), which has been one of those most popular CLI frameworks in Python for a long time. ## Built-in commands ### `plain build` Compile static assets (used in the deploy/production process). Automatically runs `plain tailwind build` if [plain.tailwind](/plain-tailwind/) is installed. ### `plain create` Create a new local package. ### `plain preflight` Run preflight checks to ensure your app is ready to run. ### `plain run` Run a Python script in the context of your app. ### `plain setting` View the runtime value of a named setting. ### `plain shell` Open a Python shell with the Plain loaded. To auto-load models or run other code at shell launch, create an `app/shell.py` and it will be imported automatically. ```python # app/shell.py from app.organizations.models import Organization __all__ = [ "Organization", ] ``` ### `plain urls` List all the URL patterns in your app. ### `plain utils generate-secret-key` Generate a new secret key for your app, to be used in `settings.SECRET_KEY`. ## Adding commands The `register_cli` decorator can be used to add your own commands to the `plain` CLI. ```python import click from plain.cli import register_cli @register_cli("example-subgroup-name") @click.group() def cli(): """Custom example commands""" pass @cli.command() def example_command(): click.echo("An example command!") ``` Then you can run the command with `plain`. ```bash $ plain example-subgroup-name example-command An example command! ``` Technically you can register a CLI from anywhere, but typically you will do it in either `app/cli.py` or a package's `/cli.py`, as those modules will be autoloaded by Plain. # CSRF **Cross-Site Request Forgery (CSRF) protection.** Plain protects against [CSRF attacks](https://en.wikipedia.org/wiki/Cross-site_request_forgery) through a [middleware](middleware.py) that compares the generated `csrftoken` cookie with the CSRF token from the request (either `_csrftoken` in form data or the `CSRF-Token` header). ## Usage The `CsrfViewMiddleware` is [automatically installed](../internal/handlers/base.py#BUILTIN_BEFORE_MIDDLEWARE), so you don't need to add it to your `settings.MIDDLEWARE`. When you use HTML forms, you should include the CSRF token in the form data via a hidden input: ```html
{{ csrf_input }}
```
# Forms **HTML form handling and validation.** The `Form` and `Field` classes help output, parse, and validate form data from an HTTP request. Unlike other frameworks, the HTML inputs are not rendered automatically, though there are some helpers for you to do your own rendering. With forms, you will typically use one of the built-in view classes to tie everything together. ```python from plain import forms from plain.views import FormView class ContactForm(forms.Form): email = forms.EmailField() message = forms.CharField() class ContactView(FormView): form_class = ContactForm template_name = "contact.html" ``` Then in your template, you can render the form fields. ```html {% extends "base.html" %} {% block content %}
{{ csrf_input }} {% for error in form.non_field_errors %}
{{ error }}
{% endfor %}
{% if form.email.errors %}
{{ form.email.errors|join(', ') }}
{% endif %}
{% if form.message.errors %}
{{ form.message.errors|join(', ') }}
{% endif %}
{% endblock %} ``` With manual form rendering, you have full control over the HTML classes, attributes, and JS behavior. But in large applications the form rendering can become repetitive. You will often end up re-using certain patterns in your HTML which can be abstracted away using Jinja [includes](https://jinja.palletsprojects.com/en/stable/templates/#include), [macros](https://jinja.palletsprojects.com/en/stable/templates/#macros), or [plain.elements](/plain-elements/README.md).
# HTTP **HTTP request and response handling.** Typically you will interact with [request](request.py#HttpRequest) and [response](response.py#ResponseBase) objects in your views and middleware. ```python from plain.views import View from plain.http import Response class ExampleView(View): def get(self): # Accessing a request header print(self.request.headers.get("Example-Header")) # Accessing a query parameter print(self.request.query_params.get("example")) # Creating a response response = Response("Hello, world!", status_code=200) # Setting a response header response.headers["Example-Header"] = "Example Value" return response ``` # Logs **Logging configuration and utilities.** In Python, configuring logging can be surprisingly complex. For most use cases, Plain provides a [default configuration](./configure.py) that "just works". By default, both the `plain` and `app` loggers are set to the `INFO` level. You can quickly change this by using the `PLAIN_LOG_LEVEL` and `APP_LOG_LEVEL` environment variables. ## `app_logger` The `app_logger` is a pre-configured logger you can use inside your app code. ```python from plain.logs import app_logger def example_function(): app_logger.info("Hey!") ``` ## `app_logger.kv` The key-value logging format is popular for outputting more structured logs that are still human-readable. ```python from plain.logs import app_logger def example_function(): app_logger.kv("Example log line with", example_key="example_value") ``` ## Logging settings You can further configure your logging with `settings.LOGGING`. ```python # app/settings.py LOGGING = { "version": 1, "disable_existing_loggers": False, "handlers": { "console": { "class": "logging.StreamHandler", }, }, "loggers": { "mylogger": { "handlers": ["console"], "level": "DEBUG", }, }, } ``` # Packages **Install Python modules as Plain packages.** Most Python modules that you use with Plain will need to be installed via `settings.INSTALLED_PACKAGES`. This is what enables template detection, per-package settings, database models, and other features. A package can either be a local module inside of your `app`, or a third-party package from PyPI. ```python # app/settings.py INSTALLED_PACKAGES = [ "plain.models", "plain.tailwind", "plain.auth", "plain.passwords", "plain.sessions", "plain.htmx", "plain.admin", "plain.elements", # Local packages "app.users", ] ``` ## Creating app packages It often makes sense to conceptually split your app across multiple local packages. For example, we find it typically works better to have separate `users`, `teams`, and `projects` packages, than a single `core` package that contains all the code for users, teams, and projects together (just as an example). If you find yourself creating a package with a generic name like `core` or `base`, you might want to consider splitting it up into smaller packages. You can quickly create a new package by running `plain create `. Make sure to add it to `settings.INSTALLED_PACKAGES` if it uses templates, models, or other Plain-specific features. ## Package settings An installed package can optionally define it's own settings. These could be default settings for how the package behaves, or required settings that must be configured by the developer. ```python # /default_settings.py # A default setting EXAMPLE_SETTING: str = "example" # A required setting (type annotation with no default value) REQUIRED_SETTING: str ``` Settings can then be accessed at runtime through the `settings` object. ```python # /models.py from plain.runtime import settings def example_function(): print(settings.EXAMPLE_SETTING) ``` It is strongly recommended to "namespace" your settings to your package. So if your package is named "teams", you might want to prefix all your settings with `TEAMS_`. ```python # teams/default_settings.py TEAMS_EXAMPLE_SETTING: str = "example" ``` ## Package `ready()` method To run setup code when your package is loaded, you can define a package configuration and the `ready()` method. ```python # /config.py from plain.packages import PackageConfig, register_config @register_config class TeamsConfig(PackageConfig): def ready(self): print("Teams package is ready!") ``` # Preflight **System checks for Plain applications.** Preflight checks help identify issues with your settings or environment before running your application. ```bash plain preflight ``` ## Development If you use [`plain.dev`](/plain-dev/README.md) for local development, the Plain preflight command is run automatically when you run `plain dev`. ## Deployment The `plain preflight` command should often be part of your deployment process. Make sure to add the `--deploy` flag to the command to run checks that are only relevant in a production environment. ```bash plain preflight --deploy ``` ## Custom preflight checks Use the `@register_check` decorator to add your own preflight check to the system. Just make sure that particular Python module is somehow imported so the check registration runs. ```python from plain.preflight import register_check, Error @register_check def custom_check(package_configs, **kwargs): return Error("This is a custom error message.", id="custom.C001") ``` For deployment-specific checks, add the `deploy` argument to the decorator. ```python @register_check(deploy=True) def custom_deploy_check(package_configs, **kwargs): return Error("This is a custom error message for deployment.", id="custom.D001") ``` ## Silencing preflight checks The `settings.PREFLIGHT_SILENCED_CHECKS` setting can be used to silence individual checks by their ID (ex. `security.W020`). ```python # app/settings.py PREFLIGHT_SILENCED_CHECKS = [ "security.W020", ] ``` # Runtime **Access app and package settings at runtime.** Plain is configured by "settings", which are ultimately just Python variables. Most settings have default values which can be overidden either by your `app/settings.py` file or by environment variables. ```python # app/settings.py URLS_ROUTER = "app.urls.AppRouter" TIME_ZONE = "America/Chicago" INSTALLED_PACKAGES = [ "plain.models", "plain.tailwind", "plain.auth", "plain.passwords", "plain.sessions", "plain.htmx", "plain.admin", "plain.elements", # Local packages "app.users", ] AUTH_USER_MODEL = "users.User" AUTH_LOGIN_URL = "login" MIDDLEWARE = [ "plain.sessions.middleware.SessionMiddleware", "plain.auth.middleware.AuthenticationMiddleware", "plain.admin.AdminMiddleware", ] ``` While working inside a Plain application or package, you can access settings at runtime via `plain.runtime.settings`. ```python from plain.runtime import settings print(settings.AN_EXAMPLE_SETTING) ``` The Plain core settings are defined in [`plain/runtime/global_settings.py`](global_settings.py) and you should look at that for reference. Each installed package can also define its own settings in a `default_settings.py` file. ## Environment variables It's common in both development and production to use environment variables to manage settings. To handle this, any type-annotated setting can be loaded from the env with a `PLAIN_` prefix. For example, to set the `SECRET_KEY` setting is defined with a type annotation. ```python SECRET_KEY: str ``` And can be set by an environment variable. ```bash PLAIN_SECRET_KEY=supersecret ``` For more complex types like lists or dictionaries, just use the `list` or `dict` type annotation and JSON-compatible types. ```python LIST_EXAMPLE: list[str] ``` And set the environment variable with a JSON-encoded string. ```bash PLAIN_LIST_EXAMPLE='["one", "two", "three"]' ``` Custom behavior can always be supported by checking the environment directly. ```python # plain/models/default_settings.py from os import environ from . import database_url # Make DATABASES a required setting DATABASES: dict # Automatically configure DATABASES if a DATABASE_URL was given in the environment if "DATABASE_URL" in environ: DATABASES = { "default": database_url.parse( environ["DATABASE_URL"], # Enable persistent connections by default conn_max_age=int(environ.get("DATABASE_CONN_MAX_AGE", 600)), conn_health_checks=environ.get( "DATABASE_CONN_HEALTH_CHECKS", "true" ).lower() in [ "true", "1", ], ) } # Classes used to implement DB routing behavior. DATABASE_ROUTERS = [] ``` ### .env files Plain itself does not load `.env` files automatically, except in development if you use [`plain.dev`](/plain-dev/README.md). If you use `.env` files in production then you will need to load them yourself. ## Package settings An installed package can provide a `default_settings.py` file. It is strongly recommended to prefix any defined settings with the package name to avoid conflicts. ```python # app/users/default_settings.py USERS_DEFAULT_ROLE = "user" ``` The way you define these settings can impact the runtime behavior. For example, a required setting should be defined with a type annotation but no default value. ```python # app/users/default_settings.py USERS_DEFAULT_ROLE: str ``` Type annotations are only required for settings that don't provide a default value (to enable the environment variable loading). But generally type annotations are recommended as they also provide basic validation at runtime — if a setting is defined as a `str` but the user sets it to an `int`, an error will be raised. ```python # app/users/default_settings.py USERS_DEFAULT_ROLE: str = "user" ``` ## Custom app-wide settings At times it can be useful to create your own settings that are used across your application. When you define these in `app/settings.py`, you simply prefix them with `APP_` which marks them as a custom setting. ```python # app/settings.py # A required env setting APP_STRIPE_SECRET_KEY = os.environ["STRIPE_SECRET_KEY"] # An optional env setting APP_GIT_SHA = os.environ.get("HEROKU_SLUG_COMMIT", "dev")[:7] # A setting populated by Python code with open("app/secret_key.txt") as f: APP_EXAMPLE_KEY = f.read().strip() ``` ## Using Plain in other environments There may be some situations where you want to manually invoke Plain, like in a Python script. To get everything set up, you can call the `plain.runtime.setup()` function. ```python import plain.runtime plain.runtime.setup() ``` # Signals **Run code when certain events happen.** ```python from plain.signals import request_finished def on_request_finished(sender, **kwargs): print("Request finished!") request_finished.connect(on_request_finished) ``` # Templates **Render HTML templates using Jinja.** Plain uses Jinja2 for template rendering. You can refer to the [Jinja documentation](https://jinja.palletsprojects.com/en/stable/api/) for all of the features available. In general, templates are used in combination with `TemplateView` or a more specific subclass of it. ```python from plain.views import TemplateView class ExampleView(TemplateView): template_name = "example.html" def get_template_context(self): context = super().get_template_context() context["message"] = "Hello, world!" return context ``` ```html {% extends "base.html" %} {% block content %}

{{ message }}

{% endblock %} ``` ## Template files Template files can be located in either a root `app/templates`, or the `/templates` directory of any installed package. All template directories are "merged" together, allowing you to override templates from other packages. The `app/templates` will take priority, followed by `INSTALLED_PACKAGES` in the order they are defined. ## Extending Jinja Plain includes a set of default [global variables](jinja/globals.py) and [filters](jinja/filters.py). You can register additional extensions, globals, or filters either in a package or in your app. Typically this will be in `app/templates.py` or `/templates.py`, which are automatically imported. ```python # app/templates.py from plain.templates import register_template_filter, register_template_global, register_template_extension from plain.templates.jinja.extensions import InclusionTagExtension from plain.runtime import settings @register_template_filter def camel_case(value): return value.replace("_", " ").title().replace(" ", "") @register_template_global def app_version(): return "1.0.0" @register_template_extension class HTMXJSExtension(InclusionTagExtension): tags = {"htmx_js"} template_name = "htmx/js.html" def get_context(self, context, *args, **kwargs): return { "csrf_token": context["csrf_token"], "DEBUG": settings.DEBUG, "extensions": kwargs.get("extensions", []), } ``` ## Rendering templates manually Templates can also be rendered manually using the [`Template` class](core.py#Template). ```python from plain.templates import Template comment_body = Template("comment.md").render({"message": "Hello, world!",}) ```
# Test **Testing utilities for Plain.** This module provides a the [`Client`](client.py#Client) and [`RequestFactory`](client.py#RequestFactory) classes to facilitate testing requests and responses. ```python from plain.test import Client def test_client_example(): client = Client() # Getting responses response = client.get("/") assert response.status_code == 200 # Modifying sessions client.session["example"] = "value" assert client.session["example"] == "value" # Logging in user = User.objects.first() client.force_login(user) response = client.get("/protected/") assert response.status_code == 200 # Logging out client.logout() response = client.get("/protected/") assert response.status_code == 302 def test_request_factory_example(): request = RequestFactory().get("/") assert request.method == "GET" ``` More complete testing utilities are provided by the [`plain.pytest`](/plain-pytest/README.md) package. The [`plain.models`](/plain-models/README.md) package also provides pytest fixtures for database testing. # URLs **Route requests to views.** URLs are typically the "entrypoint" to your app. Virtually all request handling up to this point happens behind the scenes, and then you decide how to route specific URL patterns to your views. The `URLS_ROUTER` is the primary router that handles all incoming requests. It is defined in your `app/settings.py` file. This will typically point to a `Router` class in your `app.urls` module. ```python # app/settings.py URLS_ROUTER = "app.urls.AppRouter" ``` The root router often has an empty namespace (`""`) and some combination of individual paths and sub-routers. ```python # app/urls.py from plain.urls import Router, path, include from plain.admin.urls import AdminRouter from . import views class AppRouter(Router): namespace = "" urls = [ include("admin/", AdminRouter), path("about/", views.AboutView, name="about"), # A named URL path("", views.HomeView), # An unnamed URL ] ``` ## Reversing URLs In templates, you will use the `{{ url("") }}` function to look up full URLs by name. ```html About ``` And the same can be done in Python code with the `reverse` (or `reverse_lazy`) function. ```python from plain.urls import reverse url = reverse("about") ``` A URL path has to include a `name` attribute if you want to reverse it. The router's `namespace` will be used as a prefix to the URL name. ```python from plain.urls import reverse url = reverse("admin:dashboard") ``` ## URL args and kwargs URL patterns can include arguments and keyword arguments. ```python # app/urls.py from plain.urls import Router, path from . import views class AppRouter(Router): namespace = "" urls = [ path("user//", views.UserView, name="user"), path("search//", views.SearchView, name="search"), ] ``` These will be accessible inside the view as `self.url_args` and `self.url_kwargs`. ```python # app/views.py from plain.views import View class SearchView(View): def get(self): query = self.url_kwargs["query"] print(f"Searching for {query}") # ... ``` To reverse a URL with args or kwargs, simply pass them in the `reverse` function. ```python from plain.urls import reverse url = reverse("search", query="example") ``` There are a handful of built-in [converters](converters.py#DEFAULT_CONVERTERS) that can be used in URL patterns. ```python from plain.urls import Router, path from . import views class AppRouter(Router): namespace = "" urls = [ path("user//", views.UserView, name="user"), path("search//", views.SearchView, name="search"), path("post//", views.PostView, name="post"), path("document//", views.DocumentView, name="document"), path("path//", views.PathView, name="path"), ] ``` ## Package routers Installed packages will often provide a URL router to include in your root URL router. ```python # plain/assets/urls.py from plain.urls import Router, path from .views import AssetView class AssetsRouter(Router): """ The router for serving static assets. Include this router in your app router if you are serving assets yourself. """ namespace = "assets" urls = [ path("", AssetView, name="asset"), ] ``` Import the package's router and `include` it at any path you choose. ```python from plain.urls import include, Router from plain.assets.urls import AssetsRouter class AppRouter(Router): namespace = "" urls = [ include("assets/", AssetsRouter), # Your other URLs here... ] ``` # Utilities **Various utilities for text manipulation, parsing, dates, and more.** The utilities aren't going to be documented in detail here. Take a look at the source code for more information. # Views **Take a request, return a response.** Plain views are written as classes, with a straightforward API that keeps simple views simple, but gives you the power of a full class to handle more complex cases. ```python from plain.views import View class ExampleView(View): def get(self): return "Hello, world!" ``` ## HTTP methods -> class methods The HTTP methd of the request will map to a class method of the same name on the view. If a request comes in and there isn't a matching method on the view, Plain will return a `405 Method Not Allowed` response. ```python from plain.views import View class ExampleView(View): def get(self): pass def post(self): pass def put(self): pass def patch(self): pass def delete(self): pass def trace(self): pass ``` The [base `View` class](./base.py) defines default `options` and `head` behavior, but you can override these too. ## Return types For simple JSON responses, HTML, or status code responses, you don't need to instantiate a `Response` object. ```python class JsonView(View): def get(self): return {"message": "Hello, world!"} class HtmlView(View): def get(self): return "Hello, world!" class StatusCodeView(View): def get(self): return 204 # No content ``` ## Template views The most common behavior for a view is to render a template. ```python from plain.views import TemplateView class ExampleView(TemplateView): template_name = "example.html" def get_template_context(self): context = super().get_template_context() context["message"] = "Hello, world!" return context ``` The `TemplateView` is also the base class for _most_ of the other built-in view classes. Template views that don't need any custom context can use `TemplateView.as_view()` direcly in the URL route. ```python from plain.views import TemplateView from plain.urls import path, Router class AppRouter(Router): routes = [ path("/example/", TemplateView.as_view(template_name="example.html")), ] ``` ## Form views Standard [forms](../forms) can be rendered and processed by a `FormView`. ```python from plain.views import FormView from .forms import ExampleForm class ExampleView(FormView): template_name = "example.html" form_class = ExampleForm success_url = "." # Redirect to the same page def form_valid(self, form): # Do other successfull form processing here return super().form_valid(form) ``` Rendering forms is done directly in the HTML. ```html {% extends "base.html" %} {% block content %}
{{ csrf_input }} {% for error in form.non_field_errors %}
{{ error }}
{% endfor %} {% if form.email.errors %}
{{ form.email.errors|join(', ') }}
{% endif %}
{% endblock %} ``` ## Object views The object views support the standard CRUD (create, read/detail, update, delete) operations, plus a list view. ```python from plain.views import DetailView, CreateView, UpdateView, DeleteView, ListView class ExampleDetailView(DetailView): template_name = "detail.html" def get_object(self): return MyObjectClass.objects.get( pk=self.url_kwargs["pk"], user=self.request.user, # Limit access ) class ExampleCreateView(CreateView): template_name = "create.html" form_class = CustomCreateForm success_url = "." class ExampleUpdateView(UpdateView): template_name = "update.html" form_class = CustomUpdateForm success_url = "." def get_object(self): return MyObjectClass.objects.get( pk=self.url_kwargs["pk"], user=self.request.user, # Limit access ) class ExampleDeleteView(DeleteView): template_name = "delete.html" success_url = "." # No form class necessary. # Just POST to this view to delete the object. def get_object(self): return MyObjectClass.objects.get( pk=self.url_kwargs["pk"], user=self.request.user, # Limit access ) class ExampleListView(ListView): template_name = "list.html" def get_objects(self): return MyObjectClass.objects.filter( user=self.request.user, # Limit access ) ``` ## Response exceptions At any point in the request handling, a view can raise a `ResponseException` to immediately exit and return the wrapped response. This isn't always necessary, but can be useful for raising rate limits or authorization errors when you're a couple layers deep in the view handling or helper functions. ```python from plain.views import DetailView from plain.views.exceptions import ResponseException from plain.http import Response class ExampleView(DetailView): def get_object(self): if self.request.user.exceeds_rate_limit: raise ResponseException( Response("Rate limit exceeded", status_code=429) ) return AnExpensiveObject() ``` ## Error views By default, HTTP errors will be rendered by `templates/.html` or `templates/error.html`. You can define your own error views by pointing the `HTTP_ERROR_VIEWS` setting to a dictionary of status codes and view classes. ```python # app/settings.py HTTP_ERROR_VIEWS = { 404: "errors.NotFoundView", } ``` ```python # app/errors.py from plain.views import View class NotFoundView(View): def get(self): # A custom implementation or error view handling pass ``` ## Redirect views ```python from plain.views import RedirectView class ExampleRedirectView(RedirectView): url = "/new-location/" permanent = True ``` Redirect views can also be used in the URL router. ```python from plain.views import RedirectView from plain.urls import path, Router class AppRouter(Router): routes = [ path("/old-location/", RedirectView.as_view(url="/new-location/", permanent=True)), ] ``` ## CSRF exempt views ```python from plain.views import View from plain.views.csrf import CsrfExemptViewMixin class ExemptView(CsrfExemptViewMixin, View): def post(self): return "Hello, world!" ```
# plain.admin **Manage your app with a backend interface.** The Plain Admin provides a combination of built-in views and the flexibility to create your own. You can use it to quickly get visibility into your app's data and to manage it. ![Plain Admin user example](https://assets.plainframework.com/docs/plain-pageviews-user.png) ## Installation Install the `plain.admin` package and its dependencies. ```console uv add plain.admin ``` The admin uses a combination of other Plain packages, most of which you will already have installed. Ultimately, your settings will look something like this: ```python # app/settings.py INSTALLED_PACKAGES = [ "plain.models", "plain.tailwind", "plain.auth", "plain.sessions", "plain.htmx", "plain.admin", "plain.elements", # other packages... ] AUTH_USER_MODEL = "users.User" AUTH_LOGIN_URL = "login" MIDDLEWARE = [ "plain.sessions.middleware.SessionMiddleware", "plain.auth.middleware.AuthenticationMiddleware", "plain.admin.AdminMiddleware", ] ``` Your User model is expected to have an `is_admin` field (or attribute) for checking who has permission to access the admin. ```python # app/users/models.py from plain import models @models.register_model class User(models.Model): is_admin = models.BooleanField(default=False) # other fields... ``` To make the admin accessible, add the `AdminRouter` to your root URLs. ```python # app/urls.py from plain.admin.urls import AdminRouter from plain.urls import Router, include, path from . import views class AppRouter(Router): namespace = "" urls = [ include("admin/", AdminRouter), path("login/", views.LoginView, name="login"), path("logout/", LogoutView, name="logout"), # other urls... ] ``` Optionally, you can add the admin toolbar to your base template. The toolbar will appear when `settings.DEBUG` or when `request.user.is_admin` (including in production!). ```html {{ html_title|default("My App") }} {% tailwind_css %} {% block content required %}{% endblock %} {% toolbar %} ``` ## Admin viewsets The most common use of the admin is to display and manage your `plain.models`. To do this, create a viewset with a set of inner views. ```python # app/users/admin.py from plain.admin.views import ( AdminModelDetailView, AdminModelListView, AdminModelUpdateView, AdminViewset, register_viewset, ) from plain.models.forms import ModelForm from .models import User class UserForm(ModelForm): class Meta: model = User fields = ["email"] @register_viewset class UserAdmin(AdminViewset): class ListView(AdminModelListView): model = User fields = [ "id", "email", "created_at__date", ] queryset_order = ["-created_at"] search_fields = [ "email", ] class DetailView(AdminModelDetailView): model = User class UpdateView(AdminModelUpdateView): template_name = "admin/users/user_form.html" model = User form_class = UserForm ``` The [`AdminViewset`](./views/viewsets.py) will automatically recognize inner views named `ListView`, `CreateView`, `DetailView`, `UpdateView`, and `DeleteView`. It will interlink these views automatically in the UI and form success URLs. You can define additional views too, but you will need to implement a couple methods to hook them up. ## Admin cards TODO ## Admin forms TODO ## Toolbar TODO ## Impersonate TODO ## Querystats TODO # plain.impersonate See what your users see. A key feature for providing customer support is to be able to view the site through their account. With `impersonate` installed, you can impersonate a user by finding them in the Django admin and clicking the "Impersonate" button. ![](/docs/img/impersonate-admin.png) Then with the [admin toolbar](/docs/plain-toolbar/) enabled, you'll get a notice of the impersonation and a button to exit: ![](/docs/img/impersonate-bar.png) ## Installation To impersonate users, you need the app, middleware, and URLs: ```python # settings.py INSTALLED_PACKAGES = INSTALLED_PACKAGES + [ "plain.admin.impersonate", ] MIDDLEWARE = MIDDLEWARE + [ "plain.admin.impersonate.ImpersonateMiddleware", ] ``` ```python # urls.py urlpatterns = [ # ... path("impersonate/", include("plain.admin.impersonate.urls")), ] ``` ## Settings By default, all admin users can impersonate other users. ```python # settings.py IMPERSONATE_ALLOWED = lambda user: user.is_admin ``` # plain.querystats On-page database query stats in development and production. On each page, the query stats will display how many database queries were performed and how long they took. [Watch on YouTube](https://www.youtube.com/watch?v=NX8VXxVJm08) Clicking the stats in the toolbar will show the full SQL query log with tracebacks and timings. This is even designed to work in production, making it much easier to discover and debug performance issues on production data! ![Django query stats](https://user-images.githubusercontent.com/649496/213781593-54197bb6-36a8-4c9d-8294-5b43bd86a4c9.png) It will also point out duplicate queries, which can typically be removed by using `select_related`, `prefetch_related`, or otherwise refactoring your code. ## Installation ```python # settings.py INSTALLED_PACKAGES = [ # ... "plain.admin.querystats", ] MIDDLEWARE = [ "plain.sessions.middleware.SessionMiddleware", "plain.auth.middleware.AuthenticationMiddleware", "plain.admin.querystats.QueryStatsMiddleware", # Put additional middleware below querystats # ... ] ``` We strongly recommend using the plain-toolbar along with this, but if you aren't, you can add the querystats to your frontend templates with this include: ```html {% include "querystats/button.html" %} ``` _Note that you will likely want to surround this with an if `DEBUG` or `is_admin` check._ To view querystats you need to send a POST request to `?querystats=store` (i.e. via a `
`), and the template include is the easiest way to do that. ## Tailwind CSS This package is styled with [Tailwind CSS](https://tailwindcss.com/), and pairs well with [`plain-tailwind`](https://github.com/plainpackages/plain-tailwind). If you are using your own Tailwind implementation, you can modify the "content" in your Tailwind config to include any Plain packages: ```js // tailwind.config.js module.exports = { content: [ // ... ".venv/lib/python*/site-packages/plain*/**/*.{html,js}", ], // ... } ``` If you aren't using Tailwind, and don't intend to, open an issue to discuss other options. # plain.toolbar The admin toolbar is enabled for every user who `is_admin`. ![Plain admin toolbar](https://user-images.githubusercontent.com/649496/213781915-a2094f54-99b8-4a05-a36e-dee107405229.png) ## Installation Add `plaintoolbar` to your `INSTALLED_PACKAGES`, and the `{% toolbar %}` to your base template: ```python # settings.py INSTALLED_PACKAGES += [ "plaintoolbar", ] ``` ```html {% load toolbar %} ... {% toolbar %} ... ``` More specific settings can be found below. ## Tailwind CSS This package is styled with [Tailwind CSS](https://tailwindcss.com/), and pairs well with [`plain-tailwind`](https://github.com/plainpackages/plain-tailwind). If you are using your own Tailwind implementation, you can modify the "content" in your Tailwind config to include any Plain packages: ```js // tailwind.config.js module.exports = { content: [ // ... ".venv/lib/python*/site-packages/plain*/**/*.{html,js}", ], // ... } ``` If you aren't using Tailwind, and don't intend to, open an issue to discuss other options. # plain.requestlog The request log stores a local history of HTTP requests and responses during `plain work` (Django runserver). The request history will make it easy to see redirects, 400 and 500 level errors, form submissions, API calls, webhooks, and more. [Watch on YouTube](https://www.youtube.com/watch?v=AwI7Pt5oZnM) Requests can be re-submitted by clicking the "replay" button. [![Django request log](https://user-images.githubusercontent.com/649496/213781414-417ad043-de67-4836-9ef1-2b91404336c3.png)](https://user-images.githubusercontent.com/649496/213781414-417ad043-de67-4836-9ef1-2b91404336c3.png) ## Installation ```python # settings.py INSTALLED_PACKAGES += [ "plainrequestlog", ] MIDDLEWARE = MIDDLEWARE + [ # ... "plainrequestlog.RequestLogMiddleware", ] ``` The default settings can be customized if needed: ```python # settings.py DEV_REQUESTS_IGNORE_PATHS = [ "/sw.js", "/favicon.ico", "/admin/jsi18n/", ] DEV_REQUESTS_MAX = 50 ``` ## Tailwind CSS This package is styled with [Tailwind CSS](https://tailwindcss.com/), and pairs well with [`plain-tailwind`](https://github.com/plainpackages/plain-tailwind). If you are using your own Tailwind implementation, you can modify the "content" in your Tailwind config to include any Plain packages: ```js // tailwind.config.js module.exports = { content: [ // ... ".venv/lib/python*/site-packages/plain*/**/*.{html,js}", ], // ... } ``` If you aren't using Tailwind, and don't intend to, open an issue to discuss other options. # plain.api **Build APIs using class-based views.** This package includes lightweight view classes for building APIs using the same patterns as regular HTML views. It also provides an [`APIKey` model](#api-keys) and support for generating [OpenAPI](#openapi) documents. Because [Views](/plain/plain/views/README.md) can convert built-in types to responses, an API view can simply return a dict or list to send a JSON response back to the client. More complex responses can use the [`JsonResponse`](/plain/plain/http/response.py#JsonResponse) class. ```python # app/api/views.py from plain.api.views import APIKeyView, APIView from plain.http import JsonResponse from plain.views.exeptions import ResponseException from app.users.models import User from app.pullrequests.models import PullRequest # An example base class that will be used across your custom API class BaseAPIView(APIView, APIKeyView): def use_api_key(self): super().use_api_key() if user := self.api_key.users.first(): self.request.user = user else: raise ResponseException( JsonResponse( {"error": "API key not associated with a user."}, status_code=403, ) ) # An endpoint that returns the current user class UserView(BaseAPIView): def get(self): return { "uuid": self.request.user.uuid, "username": self.request.user.username, "time_zone": str(self.request.user.time_zone), } # An endpoint that filters querysets based on the user class PullRequestView(BaseAPIView): def get(self): try: pull = ( PullRequest.objects.all() .visible_to_user(self.request.user) .get(uuid=self.url_kwargs["uuid"]) ) except PullRequest.DoesNotExist: return None return { "uuid": pull.uuid, "state": pull.state, "number": pull.number, "host_url": pull.host_url, "host_created_at": pull.host_created_at, "host_updated_at": pull.host_updated_at, "host_merged_at": pull.host_merged_at, "author": { "uuid": pull.author.uuid, "display_name": pull.author.display_name, }, } ``` URLs work like they do everywhere else, though it's generally recommended to put everything together into an `app.api` package and `api` namespace. ```python # app/api/urls.py from plain.urls import Router, path from . import views class APIRouter(Router): namespace = "api" urls = [ path("user/", views.UserView), path("pullrequests//", views.PullRequestView), ] ``` ## Authentication and authorization Handling authentication in the API is pretty straightforward. If you use [API keys](#api-keys), then the `APIKeyView` will parse the `Authorization: Bearer ` header and set `self.api_key`. You will then customize the `use_api_key` method to associate the request with a user (or team, for example), depending on how your app works. ```python class BaseAPIView(APIView, APIKeyView): def use_api_key(self): super().use_api_key() if user := self.api_key.users.first(): self.request.user = user else: raise ResponseException( JsonResponse( {"error": "API key not associated with a user."}, status_code=403, ) ) ``` When it comes to authorizing actions, typically you will factor this in to the queryset to only return objects that the user is allowed to see. If a response method (`get`, `post`, etc.) returns `None`, then the view will return a 404 response. Other status codes can be returned with an int (ex. `403`) or a `JsonResponse` object. ```python class PullRequestView(BaseAPIView): def get(self): try: pull = ( PullRequest.objects.all() .visible_to_user(self.request.user) .get(uuid=self.url_kwargs["uuid"]) ) except PullRequest.DoesNotExist: return None # ...return the authorized data here ``` ## `PUT`, `POST`, and `PATCH` One way to handle PUT, POST, and PATCH endpoints is to use standard [forms](/plain/plain/forms/README.md). This will use the same validation and error handling as an HTML form, but will parse the input from the JSON request instead of HTML form data. ```python class UserForm(ModelForm): class Meta: model = User fields = [ "username", "time_zone", ] class UserView(BaseAPIView): def patch(self): form = UserForm( request=self.request, instance=self.request.user, ) if form.is_valid(): user = form.save() return { "uuid": user.uuid, "username": user.username, "time_zone": str(user.time_zone), } else: return {"errors": form.errors} ``` If you don't want to use Plain's forms, you could also use a third-party schema/validation library like [Pydantic](https://docs.pydantic.dev/latest/) or [Marshmallow](https://marshmallow.readthedocs.io/en/3.x-line/). But depending on your use case, you may not need to use forms or fancy validation at all! ## `DELETE` Deletes can be handled in the `delete` method of the view. Most of the time this just means getting the object, deleting it, and returning a 204. ```python class PullRequestView(BaseAPIView): def delete(self): try: pull = ( PullRequest.objects.all() .visible_to_user(self.request.user) .get(uuid=self.url_kwargs["uuid"]) ) except PullRequest.DoesNotExist: return None pull.delete() return 204 ``` ## API keys The provided [`APIKey` model](./models.py) includes randomly generated, unique API tokens that are automatically parsed by `APIKeyView`. The tokens can optionally be named and include an `expires_at` date. Associating an `APIKey` with a user (or team, for example) is up to you. Most likely you will want to use a `ForeignKey` or a `ManyToManyField`. ```python # app/users/models.py from plain import models from plain.api.models import APIKey @models.register_model class User(models.Model): # other fields... api_key = models.ForeignKey( APIKey, on_delete=models.CASCADE, related_name="users", allow_null=True, required=False, ) class Meta: constraints = [ models.UniqueConstraint( fields=["api_key"], condition=models.Q(api_key__isnull=False), name="unique_user_api_key", ), ] ``` Generating API keys is something you will need to do in your own code, wherever it makes sense to do so. ```python user = User.objects.first() user.api_key = APIKey.objects.create() user.save() ``` To use API keys in your views, you can inherit from `APIKeyView` and customize the [`use_api_key` method](./views.py#use_api_key) to set the `request.user` attribute (or any other attribute) to the object associated with the API key. ```python # app/api/views.py from plain.api.views import APIKeyView, APIView class BaseAPIView(APIView, APIKeyView): def use_api_key(self): super().use_api_key() if user := self.api_key.users.first(): self.request.user = user else: raise ResponseException( JsonResponse( {"error": "API key not associated with a user."}, status_code=403, ) ) ``` ## OpenAPI You can use a combination of decorators to help generate an [OpenAPI](https://www.openapis.org/) document for your API. To define root level schema, use the `@openapi.schema` decorator on your `Router` class. ```python from plain.urls import Router, path from plain.api import openapi from plain.assets.views import AssetView from . import views @openapi.schema({ "openapi": "3.0.0", "info": { "title": "PullApprove API", "version": "4.0.0", }, "servers": [ { "url": "https://4.pullapprove.com/api/", "description": "PullApprove API", } ], }) class APIRouter(Router): namespace = "api" urls = [ # ...your API routes ] ``` You can then define additional schema on a view class, or a specific view method. ```python class CurrentUserAPIView(BaseAPIView): @openapi.schema({ "summary": "Get current user", }) def get(self): if self.request.user: user = self.request.user else: raise Http404 return schemas.UserSchema.from_user(user, self.request) ``` While you can attach any raw schema you like, there are a couple helpers to generate schema for API input (`@openapi.request_form`) and output (`@openapi.response_typed_dict`). These are intentionally specific, leaving room for custom decorators to be written for the input/output types of your choice. ```python class TeamAccountAPIView(BaseAPIView): @openapi.request_form(TeamAccountForm) @openapi.response_typed_dict(200, TeamAccountSchema) def patch(self): form = TeamAccountForm(request=self.request, instance=self.team_account) if form.is_valid(): team_account = form.save() return TeamAccountSchema.from_team_account( team_account, self.request ) else: return {"errors": form.errors} @cached_property def team_account(self): try: if self.organization: return TeamAccount.objects.get( team__organization=self.organization, uuid=self.url_kwargs["uuid"] ) if self.request.user: return TeamAccount.objects.get( team__organization__in=self.request.user.organizations.all(), uuid=self.url_kwargs["uuid"], ) except TeamAccount.DoesNotExist: raise Http404 class TeamAccountForm(ModelForm): class Meta: model = TeamAccount fields = ["is_reviewer", "is_admin"] class TeamAccountSchema(TypedDict): uuid: UUID account: AccountSchema is_admin: bool is_reviewer: bool api_url: str @classmethod def from_team_account(cls, team_account, request) -> "TeamAccountSchema": return cls( uuid=team_account.uuid, is_admin=team_account.is_admin, is_reviewer=team_account.is_reviewer, api_url=request.build_absolute_uri( reverse("api:team_account", uuid=team_account.uuid) ), account=AccountSchema.from_account(team_account.account, request), ) ``` To generate the OpenAPI JSON, run the following command (including swagger.io validation): ```bash plain api generate-openapi --validate ``` ### Deploying To build the JSON when you deploy, add a `build.run` command to your `pyproject.toml` file: ```toml [tool.plain.build.run] openapi = {cmd = "plain api generate-openapi --validate > app/assets/openapi.json"} ``` You will typically want `app/assets/openapi.json` to be included in your `.gitignore` file. Then you can use an [`AssetView`](/plain/plain/assets/views.py#AssetView) to serve the `openapi.json` file. ```python from plain.urls import Router, path from plain.assets.views import AssetView from . import views class APIRouter(Router): namespace = "api" urls = [ # ...your API routes path("openapi.json", AssetView.as_view(asset_path="openapi.json")), ] ``` # plain.auth Add users to your app and define which views they can access. To log a user in, you'll want to pair this package with: - `plain-passwords` - `plain-oauth` - `plain-passkeys` (TBD) - `plain-passlinks` (TBD) ## Installation ```python # app/settings.py INSTALLED_PACKAGES = [ # ... "plain.auth", "plain.sessions", "plain.passwords", ] MIDDLEWARE = [ "plain.sessions.middleware.SessionMiddleware", "plain.auth.middleware.AuthenticationMiddleware", ] AUTH_USER_MODEL = "users.User" AUTH_LOGIN_URL = "login" ``` Create your own user model (`plain create users`). ```python # app/users/models.py from plain import models from plain.passwords.models import PasswordField class User(models.Model): email = models.EmailField() password = PasswordField() is_admin = models.BooleanField(default=False) created_at = models.DateTimeField(auto_now_add=True) def __str__(self): return self.email ``` Define your URL/view where users can log in. ```python # app/urls.py from plain.auth.views import LoginView, LogoutView from plain.urls import include, path from plain.passwords.views import PasswordLoginView class LoginView(PasswordLoginView): template_name = "login.html" urlpatterns = [ path("logout/", LogoutView, name="logout"), path("login/", LoginView, name="login"), ] ``` ## Checking if a user is logged in A `request.user` will either be `None` or point to an instance of a your `AUTH_USER_MODEL`. So in templates you can do: ```html {% if request.user %}

Hello, {{ request.user.email }}!

{% else %}

You are not logged in.

{% endif %} ``` Or in Python: ```python if request.user: print(f"Hello, {request.user.email}!") else: print("You are not logged in.") ``` ## Restricting views Use the `AuthViewMixin` to restrict views to logged in users, admin users, or custom logic. ```python from plain.auth.views import AuthViewMixin from plain.exceptions import PermissionDenied from plain.views import View class LoggedInView(AuthViewMixin, View): login_required = True class AdminOnlyView(AuthViewMixin, View): login_required = True admin_required = True class CustomPermissionView(AuthViewMixin, View): def check_auth(self): super().check_auth() if not self.request.user.is_special: raise PermissionDenied("You're not special!") ```
# Cache A simple cache using the database. The Plain Cache stores JSON-serializable values in a `CachedItem` model. Cached data can be set to expire after a certain amount of time. Access to the cache is provided through the `Cached` class. ```python from plain.cache import Cached cached = Cached("my-cache-key") if cached.exists(): print("Cache hit and not expired!") print(cached.value) else: print("Cache miss!") cached.set("a JSON-serializable value", expiration=60) # Delete the item if you need to cached.delete() ``` Expired cache items can be cleared by [running chores](/plain/plain/chores/README.md). ## Installation Add `plain.cache` to your `INSTALLED_PACKAGES`: ```python # app/settings.py INSTALLED_PACKAGES = [ # ... "plain.cache", ] ``` ## CLI - `plain cache clear-expired` - Clear all expired cache items - `plain cache clear-all` - Clear all cache items - `plain cache stats` - Show cache statistics # plain.dev A single command that runs everything you need for local development. ![Plain dev command example](https://github.com/dropseed/plain/assets/649496/3643bb64-a99b-4a8e-adab-8c6b81791ea9) The `plain.dev` package can be [installed from PyPI](https://pypi.org/project/plain.dev/), and does _not_ need to be added to `INSTALLED_PACKAGES`. - [`plain dev`](#plain-dev) - [`plain dev services`](#plain-dev-services) - [`plain pre-commit`](#plain-pre-commit) - [`plain contrib`](#plain-contrib) - [VS Code debugging](#vscode-debugging) ## `plain dev` The `plain dev` command does several things: - Sets `PLAIN_CSRF_TRUSTED_ORIGINS` to localhost by default - Runs `plain preflight` to check for any issues - Executes any pending model migrations - Starts `gunicorn` with `--reload` - Runs `plain tailwind build --watch`, if `plain.tailwind` is installed - Any custom process defined in `pyproject.toml` at `tool.plain.dev.run` - Necessary services (ex. Postgres) defined in `pyproject.toml` at `tool.plain.dev.services` ### Services Use services to define databases or other processes that your app _needs_ to be functional. The services will be started automatically in `plain dev`, but also in `plain pre-commit` (so preflight and tests have a database). Ultimately, how you run your development database is up to you. But a recommended starting point is to use Docker: ```toml # pyproject.toml [tool.plain.dev.services] postgres = {cmd = "docker run --name app-postgres --rm -p 54321:5432 -v $(pwd)/.plain/dev/pgdata:/var/lib/postgresql/data -e POSTGRES_PASSWORD=postgres postgres:15 postgres"} ``` ### Custom processes Unlike [services](#services), custom processes are _only_ run during `plain dev`. This is a good place to run something like [ngrok](https://ngrok.com/) or a [Plain worker](../../../plain-worker), which you might need to use your local site, but don't need running for executing tests, for example. ```toml # pyproject.toml [tool.plain.dev.run] ngrok = {command = "ngrok http $PORT"} ``` ## `plain dev services` Starts your [services](#services) by themselves. ## `plain pre-commit` A built-in pre-commit hook that can be installed with `plain pre-commit --install`. Runs: - Custom commands defined in `pyproject.toml` at `tool.plain.pre-commit.run` - `plain code check`, if [`plain.code`](https://plainframework.com/docs/plain-code/plain/code/) is installed - `uv lock --locked`, if using uv - `plain preflight --database default` - `plain migrate --check` - `plain makemigrations --dry-run --check` - `plain build` - `plain test` ## VS Code debugging ![Debug Plain with VS Code](https://github.com/dropseed/plain-public/assets/649496/250138b6-7702-4ab6-bf38-e0c8e3c56d06) Since `plain dev` runs multiple processes at once, the regular [pdb](https://docs.python.org/3/library/pdb.html) debuggers don't quite work. Instead, we include [microsoft/debugpy](https://github.com/microsoft/debugpy) and an `attach` function to make it even easier to use VS Code's debugger. First, import and run the `debug.attach()` function: ```python class HomeView(TemplateView): template_name = "home.html" def get_template_context(self): context = super().get_template_context() # Make sure the debugger is attached (will need to be if runserver reloads) from plain.dev import debug; debug.attach() # Add a breakpoint (or use the gutter in VS Code to add one) breakpoint() return context ``` When you load the page, you'll see "Waiting for debugger to attach...". You can then run the VS Code debugger and attach to an existing Python process, at localhost:5678. ## FAQs ### What if the plain cli isn't working? When working on packages locally you can sometimes end up in a weird state where Plain can't load. The `plain contrib` command is also available as `plain-contrib`, which won't go through any of the setup processes for Plain, so you can always run that directly if you need to. # plain.elements ## Installation ```python # settings.py INSTALLED_PACKAGES = [ # ... "plain.elements", ] ``` # plain.email Everything you need to send email. ## Installation Add `plain.email` to your `INSTALLED_APPS`: ```python # settings.py INSTALLED_APPS = [ # ... 'plain.email', ] ``` # plain.esbuild Build JavaScript files with esbuild. ## gitignore ``` **/assets/**/*.esbuilt.* ``` # plain.flags Local feature flags via database models. Custom flags are written as subclasses of [`Flag`](./flags.py). You define the flag's "key" and initial value, and the results will be stored in the database for future reference. ```python # app/flags.py from plain.flags import Flag class FooEnabled(Flag): def __init__(self, user): self.user = user def get_key(self): return self.user def get_value(self): # Initially all users will have this feature disabled # and we'll enable them manually in the admin return False ``` Use flags in HTML templates: ```html {% if flags.FooEnabled(request.user) %}

Foo is enabled for you!

{% else %}

Foo is disabled for you.

{% endif %} ``` Or in Python: ```python import flags print(flags.FooEnabled(user).value) ``` ## Installation ```python INSTALLED_PACKAGES = [ ... "plain.flags", ] ``` Create a `flags.py` at the top of your `app` (or point `settings.FLAGS_MODULE` to a different location). ## Advanced usage Ultimately you can do whatever you want inside of `get_key` and `get_value`. ```python class OrganizationFeature(Flag): url_param_name = "" def __init__(self, request=None, organization=None): # Both of these are optional, but will usually both be given self.request = request self.organization = organization def get_key(self): if ( self.url_param_name and self.request and self.url_param_name in self.request.query_params ): return None if not self.organization: # Don't save the flag result for PRs without an organization return None return self.organization def get_value(self): if self.url_param_name and self.request: if self.request.query_params.get(self.url_param_name) == "1": return True if self.request.query_params.get(self.url_param_name) == "0": return False if not self.organization: return False # All organizations will start with False, # and I'll override in the DB for the ones that should be True return False class AIEnabled(OrganizationFeature): pass ```
# HTMX Integrate HTMX with templates and views. The `plain-htmx` package adds a couple of unique features for working with HTMX. One is [template fragments](#template-fragments) and the other is [view actions](#view-actions). The combination of these features lets you build HTMX-powered views that focus on server-side rendering and avoid overly complicated URL structures or REST APIs that you may not otherwise need. The `HTMXViewMixin` is the starting point for the server-side HTMX behavior. To use these feaures on a view, simply inherit from the class (yes, this is designed to work with class-based views). ```python from plain.views import TemplateView from plain.htmx.views import HTMXViewMixin class HomeView(HTMXViewMixin, TemplateView): template_name = "home.html" ``` In your `base.html` template (or wherever need the HTMX scripts), you can use the `{% htmx_js %}` template tag: ```html {% load htmx %} My Site {% htmx_js %} {% block content %}{% endblock %} ``` ## Installation ```python INSTALLED_PACKAGES = [ # ... "plain.htmx", ] ``` ## Template Fragments An `{% htmxfragment %}` can be used to render a specific part of your template in HTMX responses. When you use a fragment, all `hx-get`, `hx-post`, etc. elements inside that fragment will automatically send a request to the current URL, render _only_ the updated content for the fragment, and swap out the fragment. Here's an example: ```html {% extends "base.html" %} {% load htmx %} {% block content %}

Page title

{% htmxfragment main %}

The time is {% now "jS F Y H:i" %}

{% endhtmxfragment %}
{% endblock %} ``` Everything inside `{% htmxfragment %}` will automatically update when "Refresh" is clicked. ### Lazy template fragments If you want to render a fragment lazily, you can add the `lazy` attribute to the `{% htmxfragment %}` tag. ```html {% htmxfragment main lazy=True %} {% endhtmxfragment %} ``` This pairs nicely with passing a callable function or method as a context variable, which will only get invoked when the fragment actually gets rendered on the lazy load. ```python def fetch_items(): import time time.sleep(2) return ["foo", "bar", "baz"] class HomeView(HTMXViewMixin, TemplateView): def get_context(self, **kwargs): context = super().get_context(**kwargs) context["items"] = fetch_items # Missing () are on purpose! return context ``` ```html {% htmxfragment main lazy=True %}
    {% for item in items %}
  • {{ item }}
  • {% endfor %}
{% endhtmxfragment %} ``` ### How does it work? When you use the `{% htmxfragment %}` tag, a standard `div` is output that looks like this: ```html
{{ fragment_content }}
``` The `plain-hx-fragment` is a custom attribute that we've added ("F" is for "Forge"), but the rest are standard HTMX attributes. When Plain renders the response to an HTMX request, it will get the `Plain-HX-Fragment` header, find the fragment with that name in the template, and render that for the response. Then the response content is automatically swapped in to replace the content of your `{% htmxfragment %}` tag. Note that there is no URL specified on the `hx-get` attribute. By default, HTMX will send the request to the current URL for the page. When you're working with fragments, this is typically the behavior you want! (You're on a page and want to selectively re-render a part of that page.) The `{% htmxfragment %}` tag is somewhat similar to a `{% block %}` tag -- the fragments on a page should be named and unique, and you can't use it inside of loops. For fragment-like behavior inside of a for-loop, you'll most likely want to set up a dedicated URL that can handle a single instance of the looped items, and maybe leverage [dedicated templates](#dedicated-templates). ## View Actions View actions let you define multiple "actions" on a class-based view. This is an alternative to defining specific API endpoints or form views to handle basic button interactions. With view actions you can design a single view that renders a single template, and associate buttons in that template with class methods in the view. As an example, let's say we have a `PullRequest` model and we want users to be able to open, close, or merge it with a button. In our template, we would use the `plain-hx-action` attribute to name the action: ```html {% extends "base.html" %} {% load htmx %} {% block content %}

{{ pullrequest }}

{% htmxfragment pullrequest %}

State: {{ pullrequest.state }}

{% if pullrequest.state == "open" %} {% else if pullrequest.state == "closed" %} {% endif %} {% endhtmxfragment %}
{% endblock %} ``` Then in the view class, we can define methods for each HTTP method + `plain-hx-action`: ```python class PullRequestDetailView(HTMXViewMixin, DetailView): def get_queryset(self): # The queryset will apply to all actions on the view, so "permission" logic can be shared return super().get_queryset().filter(users=self.request.user) # Action handling methods follow this format: # htmx_{method}_{action} def htmx_post_open(self): self.object = self.get_object() if self.object.state != "closed": raise ValueError("Only a closed pull request can be opened") self.object.state = "closed" self.object.save() # Render the updated content the standard calls # (which will selectively render our fragment if applicable) context = self.get_context(object=self.object) return self.render_to_response(context) def htmx_post_close(self): self.object = self.get_object() if self.object.state != "open": raise ValueError("Only a open pull request can be closed") self.object.state = "open" self.object.save() context = self.get_context(object=self.object) return self.render_to_response(context) def htmx_post_merge(self): self.object = self.get_object() if self.object.state != "open": raise ValueError("Only a open pull request can be merged") self.object.state = "merged" self.object.save() context = self.get_context(object=self.object) return self.render_to_response(context) ``` This can be a matter of preference, but typically you may end up building out an entire form, API, or set of URLs to handle these behaviors. If you application is only going to handle these actions via HTMX, then a single View may be a simpler way to do it. Note that currently we don't have many helper-functions for parsing or returning HTMX responses -- this can basically all be done through standard request and response headers: ```python class PullRequestDetailView(HTMXViewMixin, DetailView): def get_queryset(self): # The queryset will apply to all actions on the view, so "permission" logic can be shared return super().get_queryset().filter(users=self.request.user) # You can also leave off the "plain-hx-action" attribute and just handle the HTTP method def htmx_delete(self): self.object = self.get_object() self.object.delete() # Tell HTMX to do a client-side redirect when it receives the response response = HttpResponse(status_code=204) response.headers["HX-Redirect"] = "/" return response ``` ## Dedicated Templates A small additional features of `plain-htmx` is that it will automatically find templates named `{template_name}_htmx.html` for HTMX requests. More than anything, this is just a nice way to formalize a naming scheme for template "partials" dedicated to HTMX. Because template fragments don't work inside of loops, for example, you'll often need to define dedicated URLs to handle the HTMX behaviors for individual items in a loop. You can sometimes think of these as "pages within a page". So if you have a template that renders a collection of items, you can do the initial render using a Django `{% include %}`: ```html {% extends "base.html" %} {% block content %} {% for pullrequest in pullrequests %}
{% include "pullrequests/pullrequest_detail_htmx.html" %}
{% endfor %} {% endblock %} ``` And then subsequent HTMX requests/actions on individual items can be handled by a separate URL/View: ```html

{{ pullrequest.title }}

``` _If_ you need a URL to render an individual item, you can simply include the same template fragment in most cases: ```html {% extends "base.html" %} {% block content %} {% include "pullrequests/pullrequest_detail_htmx.html" %} {% endblock %} ``` ```python # urls.py and views.py # urls.py default_namespace = "pullrequests" urlpatterns = [ path("/", views.PullRequestDetailView, name="detail"), ] # views.py class PullRequestDetailView(HTMXViewMixin, DetailView): def htmx_post_update(self): self.object = self.get_object() self.object.update() context = self.get_context(object=self.object) return self.render_to_response(context) ``` ## Tailwind CSS variant The standard behavior for `{% htmxfragment %}` is to set `hx-indicator="this"` on the rendered element. This tells HTMX to add the `htmx-request` class to the fragment element when it is loading. Since Plain emphasizes using Tailwind CSS, here's a simple variant you can add to your `tailwind.config.js` to easily style the loading state: ```js const plugin = require('tailwindcss/plugin') module.exports = { plugins: [ // Add variants for htmx-request class for loading states plugin(({addVariant}) => addVariant('htmx-request', ['&.htmx-request', '.htmx-request &'])) ], } ``` You can then prefix any class with `htmx-request:` to decide what it looks like while HTMX requests are being sent: ```html ``` ## CSRF tokens We configure CSRF tokens for you with the HTMX JS API. You don't have to put `hx-headers` on the `` tag, for example. ## Error classes This app also includes an HTMX extension for adding error classes for failed requests. - `htmx-error-response` for `htmx:responseError` - `htmx-error-response-{{ status_code }}` for `htmx:responseError` - `htmx-error-send` for `htmx:sendError` To enable them, use `hx-ext="error-classes"`. You can add the ones you want as Tailwind variants and use them to show error messages. ```js const plugin = require('tailwindcss/plugin') module.exports = { plugins: [ // Add variants for htmx-request class for loading states plugin(({addVariant}) => addVariant('htmx-error-response-429', ['&.htmx-error-response-429', '.htmx-error-response-429 &'])) ], } ``` ## CSP ``` ```
# Why Are These Files Here? These are legacy extensions for htmx 1.x and are **NOT** actively maintained or guaranteed to work with htmx 2.x. They are here because we unfortunately linked to unversioned unpkg URLs in the installation guides for them in 1.x, so we need to keep them here to preserve those URLs and not break existing users functionality. If you are looking for extensions for htmx 2.x, please see the [htmx 2.0 extensions site](https://htmx.org/extensions), which has links to the new extensions repos (They have all been moved to their own NPM projects and URLs, like they should have been from the start!) # plain.importmap Use import maps in templates. Heavily inspired by [rails/importmap-rails](https://github.com/rails/importmap-rails), this app adds a simple process for integrating [import maps](https://github.com/WICG/import-maps) into Django. This is a new project and it hasn't been used in production yet. But if you're looking to use import maps with Django, give it a try and tell us how it goes. The structure (and code) is pretty simple. Contributions are welcome! ## How to use it You'll need to do four things to use plain-importmap. The TL;DR is: - Add "importmap" to `INSTALLED_PACKAGES` - Create an `importmap.toml` - Run `python manage.py importmap_generate` - Use `{% importmap_js %}` in your template ### 1. Install it Do the equivalent of `pip install plain-importmap` and add it to your `INSTALLED_PACKAGES` list in your `settings.py` file. ```python # settings.py INSTALLED_PACKAGES = [ ... "importmap", ] ``` ### 2. Create an `importmap.toml` file This should live next to your `manage.py` file. Here you'll add a list of "packages" you want to use. The "name" can be anything, but should probably be the same as what it you would import from in typical bundling setups (i.e. `import React from "react"`). The "source" will get passed on to the [jspm.org generator](https://jspm.org/docs/api#install), but is basically the `@` you want to use. ```toml [[packages]] name = "react" source = "react@17.0.2" ``` ### 3. Run `importmap_generate` To resolve the import map, you'll need to run `python manage.py importmap_generate`. This will create `importmap.lock` (which you should save and commit to your repo) that contains the actual import map JSON (both for development and production). You don't need to look at this file yourself, but here is an example of what it will contain: ```json { "config_hash": "09d6237cdd891aad07de60f54689d130", "importmap": { "imports": { "react": "https://ga.jspm.io/npm:react@17.0.2/index.js" }, "scopes": { "https://ga.jspm.io/": { "object-assign": "https://ga.jspm.io/npm:object-assign@4.1.1/index.js" } } }, "importmap_dev": { "imports": { "react": "https://ga.jspm.io/npm:react@17.0.2/dev.index.js" }, "scopes": { "https://ga.jspm.io/": { "object-assign": "https://ga.jspm.io/npm:object-assign@4.1.1/index.js" } } } } ``` ### 4. Add the scripts to your template The import map itself gets added by using `{% load importmap %}` and then `{% importmap_js %}` in the head of your HTML. This will include the [es-module-shim](https://github.com/guybedford/es-module-shims). After that, you can include your own JavaScript! This could be inline or from `static`. Just be sure to use `type="module"` and the "name" you provided when doing your JS imports (i.e. "react"). ```html {% load importmap %} {% importmap_js %} ``` When it renders you should get something like this: ```html ``` ## Project status This is partly an experiment, but honestly it's so simple that I don't think there can be much wrong with how it works currently. Here's a list of things that would be nice to do (PRs welcome): - Command to add new importmap dependency (use `^` version automatically?) - Django check for comparing lock and config (at deploy time, etc.) - Use [deps](https://www.dependencies.io/) to update shim version - Preload option - Vendoring option (including shim) - More complete error handling (custom exceptions, etc.) # plain.loginlink Link-based authentication for Plain. # plain.models **Model your data and store it in a database.** ```python # app/users/models.py from plain import models from plain.passwords.models import PasswordField @models.register_model class User(models.Model): email = models.EmailField() password = PasswordField() is_admin = models.BooleanField(default=False) created_at = models.DateTimeField(auto_now_add=True) def __str__(self): return self.email ``` Create, update, and delete instances of your models: ```python from .models import User # Create a new user user = User.objects.create( email="test@example.com", password="password", ) # Update a user user.email = "new@example.com" user.save() # Delete a user user.delete() # Query for users admin_users = User.objects.filter(is_admin=True) ``` ## Installation Install `plain.models` from PyPI, then add it to your `INSTALLED_PACKAGES`. ```python # app/settings.py INSTALLED_PACKAGES = [ ... "plain.models", ] ``` To connect to a database, you can provide a `DATABASE_URL` environment variable. ```sh DATABASE_URL=postgresql://user:password@localhost:5432/dbname ``` Or you can manually define the `DATABASES` setting. ```python # app/settings.py DATABASES = { "default": { "ENGINE": "plain.models.backends.postgresql", "NAME": "dbname", "USER": "user", "PASSWORD": "password", "HOST": "localhost", "PORT": "5432", } } ``` Multiple backends are supported, including Postgres, MySQL, and SQLite. ## Querying TODO ## Migrations TODO ## Fields TODO ## Validation TODO ## Indexes and constraints TODO ## Managers TODO ## Forms TODO # plain.oauth Let users log in with OAuth providers. [Watch on YouTube (3 mins) →](https://www.youtube.com/watch?v=UxbxBa6AFsU) This library is intentionally minimal. It has no dependencies and a single database model. If you simply want users to log in with GitHub, Google, Twitter, etc. (and maybe use that access token for API calls), then this is the library for you. There are three OAuth flows that it makes possible: 1. Signup via OAuth (new user, new OAuth connection) 2. Login via OAuth (existing user, existing OAuth connection) 3. Connect/disconnect OAuth accounts to a user (existing user, new OAuth connection) ## Usage Install the package from PyPi: ```sh pip install plain-oauth ``` Add `plain.oauth` to your `INSTALLED_PACKAGES` in `settings.py`: ```python INSTALLED_PACKAGES = [ ... "plain.oauth", ] ``` In your `urls.py`, include `plain.oauth.urls`: ```python urlpatterns = [ path("oauth/", include("plain.oauth.urls")), ... ] ``` Then run migrations: ```sh python manage.py migrate plain.oauth ``` Create a new OAuth provider ([or copy one from our examples](https://github.com/forgepackages/plain-oauth/tree/master/provider_examples)): ```python # yourapp/oauth.py import requests from plain.oauth.providers import OAuthProvider, OAuthToken, OAuthUser class ExampleOAuthProvider(OAuthProvider): authorization_url = "https://example.com/login/oauth/authorize" def get_oauth_token(self, *, code, request): response = requests.post( "https://example.com/login/oauth/token", headers={ "Accept": "application/json", }, data={ "client_id": self.get_client_id(), "client_secret": self.get_client_secret(), "code": code, }, ) response.raise_for_status() data = response.json() return OAuthToken( access_token=data["access_token"], ) def get_oauth_user(self, *, oauth_token): response = requests.get( "https://example.com/api/user", headers={ "Accept": "application/json", "Authorization": f"token {oauth_token.access_token}", }, ) response.raise_for_status() data = response.json() return OAuthUser( # The provider ID is required id=data["id"], # And you can populate any of your User model fields with additional kwargs email=data["email"], username=data["username"], ) ``` Create your OAuth app/consumer on the provider's site (GitHub, Google, etc.). When setting it up, you'll likely need to give it a callback URL. In development this can be `http://localhost:8000/oauth/github/callback/` (if you name it `"github"` like in the example below). At the end you should get some sort of "client id" and "client secret" which you can then use in your `settings.py`: ```python OAUTH_LOGIN_PROVIDERS = { "github": { "class": "yourapp.oauth.GitHubOAuthProvider", "kwargs": { "client_id": environ["GITHUB_CLIENT_ID"], "client_secret": environ["GITHUB_CLIENT_SECRET"], # "scope" is optional, defaults to "" # You can add other fields if you have additional kwargs in your class __init__ # def __init__(self, *args, custom_arg="default", **kwargs): # self.custom_arg = custom_arg # super().__init__(*args, **kwargs) }, }, } ``` Then add a login button (which is a form using POST rather than a basic link, for security purposes): ```html

Login

{{ csrf_input }}
``` Depending on your URL and provider names, your OAuth callback will be something like `https://example.com/oauth/{provider}/callback/`. That's pretty much it! ## Advanced usage ### Handling OAuth errors The most common error you'll run into is if an existing user clicks a login button, but they haven't yet connected that provider to their account. For security reasons, the required flow here is that the user actually logs in with another method (however they signed up) and then _connects_ the OAuth provider from a settings page. For this error (and a couple others), there is an error template that is rendered. You can customize this by copying `oauth/error.html` to one of your own template directories: ```html {% extends "base.html" %} {% block content %}

OAuth Error

{{ oauth_error }}

{% endblock %} ``` ![Django OAuth duplicate email address error](https://user-images.githubusercontent.com/649496/159065848-b4ee6e63-9aa0-47b5-94e8-7bee9b509e60.png) ### Connecting and disconnecting OAuth accounts To connect and disconnect OAuth accounts, you can add a series of forms to a user/profile settings page. Here's an very basic example: ```html {% extends "base.html" %} {% block content %} Hello {{ request.user }}!

Existing connections

Add a connection

{% endblock %} ``` The `get_provider_keys` function can help populate the list of options: ```python from plain.oauth.providers import get_provider_keys class ExampleView(TemplateView): template_name = "index.html" def get_context(self, **kwargs): context = super().get_context(**kwargs) context["oauth_provider_keys"] = get_provider_keys() return context ``` ![Connecting and disconnecting Django OAuth accounts](https://user-images.githubusercontent.com/649496/159065096-30239a1f-62f6-4ee2-a944-45140f45af6f.png) ### Using a saved access token ```python import requests # Get the OAuth connection for a user connection = user.oauth_connections.get(provider_key="github") # If the token can expire, check and refresh it if connection.access_token_expired(): connection.refresh_access_token() # Use the token in an API call token = connection.access_token response = requests.get(...) ``` ### Using the Django system check This library comes with a Django system check to ensure you don't _remove_ a provider from `settings.py` that is still in use in your database. You do need to specify the `--database` for this to run when using the check command by itself: ```sh python manage.py check --database default ``` ## FAQs ### How is this different from [Django OAuth libraries](https://djangopackages.org/grids/g/oauth/)? The short answer is that _it does less_. In [django-allauth](https://github.com/pennersr/django-allauth) (maybe the most popular alternative) you get all kinds of other features like managing multiple email addresses, email verification, a long list of supported providers, and a whole suite of forms/urls/views/templates/signals/tags. And in my experience, it's too much. It often adds more complexity to your app than you actually need (or want) and honestly it can just be a lot to wrap your head around. Personally, I don't like the way that your OAuth settings are stored in the database vs when you use `settings.py`, and the implications for doing it one way or another. The other popular OAuth libraries have similar issues, and I think their _weight_ outweighs their usefulness for 80% of the use cases. ### Why aren't providers included in the library itself? One thing you'll notice is that we don't have a long list of pre-configured providers in this library. Instead, we have some examples (which you can usually just copy, paste, and use) and otherwise encourage you to wire up the provider yourself. Often times all this means is finding the two OAuth URLs ("oauth/authorize" and "oauth/token") in their docs, and writing two class methods that do the actual work of getting the user's data (which is often customized anyway). We've written examples for the following providers: - [GitHub](https://github.com/forgepackages/plain-oauth/tree/master/provider_examples/github.py) - [GitLab](https://github.com/forgepackages/plain-oauth/tree/master/provider_examples/gitlab.py) - [Bitbucket](https://github.com/forgepackages/plain-oauth/tree/master/provider_examples/bitbucket.py) Just copy that code and paste it in your project. Tweak as necessary! This might sound strange at first. But in the long run we think it's actually _much_ more maintainable for both us (as library authors) and you (as app author). If something breaks with a provider, you can fix it immediately! You don't need to try to run changes through us or wait for an upstream update. You're welcome to contribute an example to this repo, and there won't be an expectation that it "works perfectly for every use case until the end of time". ### Redirect/callback URL mismatch in local development? If you're doing local development through a proxy/tunnel like [ngrok](https://ngrok.com/), then the callback URL might be automatically built as `http` instead of `https`. This is the Django setting you're probably looking for: ```python HTTPS_PROXY_HEADER = ("HTTP_X_FORWARDED_PROTO", "https") ```
# Pages # plain-pageviews Track pageviews from the client-side. ## Installation Install `plain.pageviews` and add it to `INSTALLED_PACKAGES`. Add `PageviewsRouter` to your urls. Add `{% pageviews_js %}` to your `base.html` template to include the tracking code on the client side. ## Admin integration ```python from plain.pageviews.admin import UserPageviewsCard @register_viewset class UserAdmin(AdminViewset): class DetailView(AdminModelDetailView): model = User cards = [UserPageviewsCard] ``` ## FAQs ### Why not use server-side middleware? Originally this was the idea. It turns out that tracking from the backend, while powerful, also means you have to identify all kinds of requests _not_ to track (assets, files, API calls, etc.). In the end, a simple client-side tracking script naturally accomplishes what we're looking for in a more straightforward way. # plain.password Password authentication for Plain. ## Usage To enable password authentication in your Plain application, add the `PasswordLoginView` to your `urls.py`: ```python # app/urls.py from plain.urls import path from plain.passwords.views import PasswordLoginView urlpatterns = [ path('login/', PasswordLoginView.as_view(), name='login'), # ... ] ``` This sets up a basic login view where users can authenticate using their username and password. For password resets to work, you also need to install [plain.email](/plain-email/README.md). ## FAQs ### How do I customize the login form? To customize the login form, you can subclass `PasswordLoginForm` and override its fields or methods as needed. Then, set the `form_class` attribute in your `PasswordLoginView` to use your custom form. ```python # app/forms.py from plain.passwords.forms import PasswordLoginForm class MyCustomLoginForm(PasswordLoginForm): # Add custom fields or override methods here pass ``` ```python # app/views.py from plain.passwords.views import PasswordLoginView from .forms import MyCustomLoginForm class MyPasswordLoginView(PasswordLoginView): form_class = MyCustomLoginForm ``` Update your `urls.py` to use your custom view: ```python # app/urls.py from plain.urls import path from .views import MyPasswordLoginView urlpatterns = [ path('login/', MyPasswordLoginView.as_view(), name='login'), # ... ] ``` ## Testing - pytest Write and run tests with pytest. Django includes its own test runner and [unittest](https://docs.python.org/3/library/unittest.html#module-unittest) classes. But a lot of people (myself included) prefer [pytest](https://docs.pytest.org/en/latest/contents.html). In Plain I've removed the Django test runner and a lot of the implications that come with it. There are a few utilities that remain to make testing easier, and `plain test` is a wrapper around `pytest`. ## Usage To run your tests with pytest, use the `plain test` command: ```bash plain test ``` This will execute all your tests using pytest. # plain-redirection ## Sessions - db backed Manage sessions and save them in the database. - associate with users? - devices? ## Usage To use sessions in your views, access the `request.session` object: ```python # Example view using sessions class MyView(View): def get(self): # Store a value in the session self.request.session['key'] = 'value' # Retrieve a value from the session value = self.request.session.get('key') ``` # plain-support Provides support forms for your application. ## Usage Include the support URLs in your `urls.py`: ```python # app/urls.py from plain.urls import include, path import plain.support.urls urlpatterns = [ path("support/", include(plain.support.urls)), # ... ] ``` ## Security considerations Most support forms allow you to type in an email address. Be careful, because anybody can pretend to be anybody else at this point. Conversations either need to continue over email (which confirms they have access to the email account), or include a verification step (emailing a code to the email address, for example). # plain.tailwind Integrate Tailwind CSS without JavaScript or npm. Made possible by the [Tailwind standalone CLI](https://tailwindcss.com/blog/standalone-cli), which is installed for you. ```console $ plain tailwind Usage: plain tailwind [OPTIONS] COMMAND [ARGS]... Tailwind CSS Options: --help Show this message and exit. Commands: build Compile a Tailwind CSS file init Install Tailwind, create a tailwind.config.js... update Update the Tailwind CSS version ``` ## Installation Add `plain.tailwind` to your `INSTALLED_PACKAGES`: ```python # settings.py INSTALLED_PACKAGES = [ # ... "plain.tailwind", ] ``` Create a new `tailwind.config.js` file in your project root: ```sh plain tailwind init ``` This will also create a `tailwind.css` file at `static/src/tailwind.css` where additional CSS can be added. You can customize where these files are located if you need to, but this is the default (requires `STATICFILES_DIR = BASE_DIR / "static"`). The `src/tailwind.css` file is then compiled into `dist/tailwind.css` by running `tailwind build`: ```sh plain tailwind build ``` When you're working locally, add `--watch` to automatically compile as changes are made: ```sh plain tailwind build --watch ``` Then include the compiled CSS in your base template ``: ```html {% tailwind_css %} ``` In your repo you will notice a new `.plain` directory that contains `tailwind` (the standalone CLI binary) and `tailwind.version` (to track the version currently installed). You should add `.plain` to your `.gitignore` file. ## Updating Tailwind This package manages the Tailwind versioning by comparing the value in your `pyproject.toml` to `.plain/tailwind.version`. ```toml # pyproject.toml [tool.plain.tailwind] version = "3.4.1" ``` When you run `tailwind compile`, it will automatically check whether your local installation needs to be updated and will update it if necessary. You can use the `update` command to update your project to the latest version of Tailwind: ```sh plain tailwind update ``` ## Adding custom CSS If you need to actually write some CSS, it should be done in `app/static/src/tailwind.css`. ```css @tailwind base; @tailwind components; /* Add your own "components" here */ .btn { @apply bg-blue-500 hover:bg-blue-700 text-white; } @tailwind utilities; /* Add your own "utilities" here */ .bg-pattern-stars { background-image: url("/static/images/stars.png"); } ``` [Read the Tailwind docs for more about using custom styles →](https://tailwindcss.com/docs/adding-custom-styles) ## Deployment If possible, you should add `static/dist/tailwind.css` to your `.gitignore` and run the `plain tailwind build --minify` command as a part of your deployment pipeline. When you run `plain tailwind build`, it will automatically check whether the Tailwind standalone CLI has been installed, and install it if it isn't. When using Plain on Heroku, we do this for you automatically in our [Plain buildpack](https://github.com/plainpackages/heroku-buildpack-plain/blob/master/bin/files/post_compile). # plain.tunnel **Connect to your local development server remotely.** The Plain Tunnel is a hosted service, like [ngrok](https://ngrok.com/) or [Cloudflare Tunnel](https://developers.cloudflare.com/cloudflare-one/connections/connect-networks/), that is specifically designed to work with Plain and provide the minimum set of features you need to get your local development server connected to the internet. It will provision a subdomain of plaintunnel.com for you, and forward traffic to your local development server. This is especially useful for testing webhooks, doing mobile styling on a real device, or temporarily sharing your local development URL with someone. _Note: In the future this will likely require a small subscription to use custom subdomains (vs randomly generated ones)._ ## Usage The simplest way to use `plain.tunnel` is to install it from PyPI (`uv add plain.tunnel --dev`), then add it to your `plain.dev` configuration. ```toml [tool.plain.dev.run] tunnel = {cmd = "plain tunnel $PLAIN_DEV_URL --subdomain myappname --quiet"} ``` To show a tunnel URL (whether you are using `plain.tunnel` or not), you can add `PLAIN_DEV_TUNNEL_URL` to your local `.env` file. ```bash PLAIN_DEV_TUNNEL_URL=https://myappname.plaintunnel.com ``` ![](https://assets.plainframework.com/docs/plain-dev-tunnel.png) Depending on your setup, you may need to add your tunnel to the `settings.ALLOWED_HOSTS`, which can be done in `settings.py` or in your dev `.env`. ```bash PLAIN_ALLOWED_HOSTS='["*"]' ``` ## CLI To use `plain.tunnel` manually, you can use the `plain tunnel` command (or even use it as a one-off with something like `uvx plain-tunnel`). ```console plain tunnel https://app.localhost:8443 ``` # plain.vendor Download those CDN scripts and styles. ## What about source maps? It's fairly common right now to get an error during `plain build` that says it can't find the source map for one of your vendored files. Right now, the fix is add the source map itself to your vendored dependencies too. In the future `plain vendor` might discover those during the vendoring process and download them automatically with the compiled files. # Worker **Process background jobs with a database-driven worker.** Jobs are defined using the `Job` base class and the `run()` method at a minimum. ```python from plain.worker import Job, register_job from plain.email import send_mail @register_job class WelcomeUserJob(Job): def __init__(self, user): self.user = user def run(self): send_mail( subject="Welcome!", message=f"Hello from Plain, {self.user}", from_email="welcome@plainframework.com", recipient_list=[self.user.email], ) ``` You can then create an instance of the job and call `run_in_worker()` to enqueue it for a background worker to pick up. ```python user = User.objects.get(pk=1) WelcomeUserJob(user).run_in_worker() ``` Workers are run using the `plain worker run` command. ## Installation Install `plain.worker` and add it to your `INSTALLED_PACKAGES`. ```python # app/settings.py INSTALLED_PACKAGES = [ ... "plain.worker", ] ``` Jobs can be defined in any Python file, but it is suggested to use `app/jobs.py` or `app/{pkg}/jobs.py` as those will be imported automatically so the `@register_job` will fire. ## Local development In development, you will typically want to run the worker alongside your app. With [`plain.dev`](/plain-dev/plain/dev/README.md) you can do this by adding it to the `[tool.plain.dev.run]` section of your `pyproject.toml` file. Currently, you will need to use something like [watchfiles](https://pypi.org/project/watchfiles/) to add auto-reloading to the worker. ```toml # pyproject.toml [tool.plain.dev.run] worker = {cmd = "watchfiles --filter python \"plain worker run --stats-every 0 --max-processes 2\" ."} worker-slow = {cmd = "watchfiles --filter python \"plain worker run --queue slow --stats-every 0 --max-processes 2\" ."} ``` ## Job parameters TODO ## Admin TODO ## Job history TODO ## Scheduled jobs TODO ## Monitoring TODO SKIP_COMPRESS_EXTENSIONS = ('.jpg', '.jpeg', '.png', '.gif', '.webp', '.zip', '.gz', '.tgz', '.bz2', '.tbz', '.xz', '.br', '.woff', '.woff2', '.3gp', '.3gpp', '.asf', '.avi', '.m4v', '.mov', '.mp4', '.mpeg', '.mpg', '.webm', '.wmv') def get_compiled_path() def compile_assets(*, target_dir, keep_original, fingerprint, compress) def compile_asset(*, asset, target_dir, keep_original, fingerprint, compress) APP_ASSETS_DIR = APP_PATH / 'assets' SKIP_ASSETS = ('.DS_Store', '.gitignore') def iter_assets() def iter_asset_dirs() FINGERPRINT_LENGTH = 7 class AssetsFingerprintsManifest(dict) def load(self) def save(self) def get_fingerprinted_url_path(url_path) def get_file_fingerprint(file_path) class AssetsRouter(Router) namespace = 'assets' urls = [path('', AssetView, name='asset')] def get_asset_url(url_path) class AssetView(View) def get_url_path(self) def get(self) def get_asset_path(self, path) def get_debug_asset_path(self, path) def check_asset_path(self, path) @functools.cache def get_last_modified(self, path) @functools.cache def get_etag(self, path) @functools.cache def get_size(self, path) def update_headers(self, headers, path) def is_immutable(self, path) def get_encoded_path(self, path) def get_redirect_response(self, path) def get_conditional_response(self, path) def get_range_response(self, path) class Chore() def run(self) class ChoresRegistry() def register_chore(self, chore) def import_modules(self) def get_chores(self) chores_registry = ChoresRegistry() def register_chore(group) @click.command() @click.option('--keep-original/--no-keep-original', 'keep_original', is_flag=True, default=False, help='Keep the original assets') @click.option('--fingerprint/--no-fingerprint', 'fingerprint', is_flag=True, default=True, help='Fingerprint the assets') @click.option('--compress/--no-compress', 'compress', is_flag=True, default=True, help='Compress the assets') def build(keep_original, fingerprint, compress) logger = logging.getLogger('plain.chores') @click.group() def chores() @chores.command('list') @click.option('--group', default=None, type=str, help='Group to run', multiple=True) @click.option('--name', default=None, type=str, help='Name of the chore to run', multiple=True) def list_chores(group, name) @chores.command('run') @click.option('--group', default=None, type=str, help='Group to run', multiple=True) @click.option('--name', default=None, type=str, help='Name of the chore to run', multiple=True) @click.option('--dry-run', is_flag=True, help='Show what would be done without executing') def run_chores(group, name, dry_run) @click.group() def plain_cli() class CLIRegistryGroup(click.Group) def list_commands(self, ctx) def get_command(self, ctx, name) class PlainCommandCollection(click.CommandCollection) context_class = PlainContext def get_command(self, ctx: Context, cmd_name: str) cli = PlainCommandCollection() @click.command() @click.option('--llm', 'llm', is_flag=True) @click.option('--open') @click.argument('module', default='') def docs(module, llm, open) class LLMDocs() preamble = 'Below is all of the documentation and abbreviated source code for the Plain web framework. Your job is to read and understand it, and then act as the Plain Framework Assistant and help the developer accomplish whatever they want to do next.\n\n---\n\n' def load(self) def display_path(self, path) def print(self, relative_to=None) @staticmethod def symbolicate(file_path: Path) class PlainHelpFormatter(click.HelpFormatter) def write_heading(self, heading) def write_usage(self, prog, args, prefix='Usage: ') def write_dl(self, rows, col_max=30, col_spacing=2) class PlainContext(click.Context) formatter_class = PlainHelpFormatter @click.command('preflight') @click.argument('package_label', nargs=-1) @click.option('--deploy', is_flag=True, help='Check deployment settings.') @click.option('--fail-level', default='ERROR', type=click.Choice(['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG']), help='Message level that will cause the command to exit with a non-zero status. Default is ERROR.') @click.option('--database', 'databases', multiple=True, help='Run database related checks against these aliases.') def preflight_checks(package_label, deploy, fail_level, databases) def print_event(msg, newline=True) class CLIRegistry() def register_command(self, cmd, name) def import_modules(self) def get_commands(self) cli_registry = CLIRegistry() def register_cli(name) @click.command() @click.argument('package_name') def create(package_name) @click.command() @click.argument('setting_name') def setting(setting_name) @click.command() @click.option('-i', '--interface', type=click.Choice(['ipython', 'bpython', 'python']), help='Specify an interactive interpreter interface.') def shell(interface) @click.command() @click.argument('script', nargs=1, type=click.Path(exists=True)) def run(script) def print_bold(s) def print_italic(s) def print_dim(s) @click.command() @click.option('--flat', is_flag=True, help='List all URLs in a flat list') def urls(flat) @click.group() def utils() @utils.command() def generate_secret_key() logger = logging.getLogger('plain.security.csrf') invalid_token_chars_re = _lazy_re_compile('[^a-zA-Z0-9]') REASON_BAD_ORIGIN = 'Origin checking failed - %s does not match any trusted origins.' REASON_NO_REFERER = 'Referer checking failed - no Referer.' REASON_BAD_REFERER = 'Referer checking failed - %s does not match any trusted origins.' REASON_NO_CSRF_COOKIE = 'CSRF cookie not set.' REASON_CSRF_TOKEN_MISSING = 'CSRF token missing.' REASON_MALFORMED_REFERER = 'Referer checking failed - Referer is malformed.' REASON_INSECURE_REFERER = 'Referer checking failed - Referer is insecure while host is secure.' REASON_INCORRECT_LENGTH = 'has incorrect length' REASON_INVALID_CHARACTERS = 'has invalid characters' CSRF_SECRET_LENGTH = 32 CSRF_TOKEN_LENGTH = 2 * CSRF_SECRET_LENGTH CSRF_ALLOWED_CHARS = string.ascii_letters + string.digits def get_token(request) def rotate_token(request) class InvalidTokenFormat(Exception) class RejectRequest(Exception) class CsrfViewMiddleware() @cached_property def csrf_trusted_origins_hosts(self) @cached_property def allowed_origins_exact(self) @cached_property def allowed_origin_subdomains(self) class CsrfFailureView(TemplateView) template_name = '403.html' def get_response(self) def post(self) def put(self) def patch(self) def delete(self) def head(self) def options(self) def trace(self) def dd(*objs) class FieldDoesNotExist(Exception) class PackageRegistryNotReady(Exception) class ObjectDoesNotExist(Exception) class MultipleObjectsReturned(Exception) class SuspiciousOperation(Exception) class SuspiciousMultipartForm(SuspiciousOperation) class SuspiciousFileOperation(SuspiciousOperation) class DisallowedHost(SuspiciousOperation) class DisallowedRedirect(SuspiciousOperation) class TooManyFieldsSent(SuspiciousOperation) class TooManyFilesSent(SuspiciousOperation) class RequestDataTooBig(SuspiciousOperation) class BadRequest(Exception) class PermissionDenied(Exception) class ImproperlyConfigured(Exception) class FieldError(Exception) NON_FIELD_ERRORS = '__all__' class ValidationError(Exception) @property def messages(self) def update_error_dict(self, error_dict) class EmptyResultSet(Exception) class FullResultSet(Exception) class BoundField() @property def errors(self) def value(self) @cached_property def initial(self) class FormFieldMissingError(Exception) FILE_INPUT_CONTRADICTION = object() class Field() default_validators = [] default_error_messages = {'required': 'This field is required.'} empty_values = list(validators.EMPTY_VALUES) def prepare_value(self, value) def to_python(self, value) def validate(self, value) def run_validators(self, value) def clean(self, value) def bound_data(self, data, initial) def has_changed(self, initial, data) def get_bound_field(self, form, field_name) def value_from_form_data(self, data, files, html_name) def value_from_json_data(self, data, files, html_name) class CharField(Field) def to_python(self, value) class IntegerField(Field) default_error_messages = {'invalid': 'Enter a whole number.'} re_decimal = _lazy_re_compile('\\.0*\\s*$') def to_python(self, value) class FloatField(IntegerField) default_error_messages = {'invalid': 'Enter a number.'} def to_python(self, value) def validate(self, value) class DecimalField(IntegerField) default_error_messages = {'invalid': 'Enter a number.'} def to_python(self, value) def validate(self, value) class BaseTemporalField(Field) DATE_INPUT_FORMATS = ['%Y-%m-%d', '%m/%d/%Y', '%m/%d/%y', '%b %d %Y', '%b %d, %Y', '%d %b %Y', '%d %b, %Y', '%B %d %Y', '%B %d, %Y', '%d %B %Y', '%d %B, %Y'] TIME_INPUT_FORMATS = ['%H:%M:%S', '%H:%M:%S.%f', '%H:%M'] DATETIME_INPUT_FORMATS = ['%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d %H:%M', '%m/%d/%Y %H:%M:%S', '%m/%d/%Y %H:%M:%S.%f', '%m/%d/%Y %H:%M', '%m/%d/%y %H:%M:%S', '%m/%d/%y %H:%M:%S.%f', '%m/%d/%y %H:%M'] def to_python(self, value) def strptime(self, value, format) class DateField(BaseTemporalField) input_formats = BaseTemporalField.DATE_INPUT_FORMATS default_error_messages = {'invalid': 'Enter a valid date.'} def to_python(self, value) def strptime(self, value, format) class TimeField(BaseTemporalField) input_formats = BaseTemporalField.TIME_INPUT_FORMATS default_error_messages = {'invalid': 'Enter a valid time.'} def to_python(self, value) def strptime(self, value, format) class DateTimeFormatsIterator() class DateTimeField(BaseTemporalField) input_formats = DateTimeFormatsIterator() default_error_messages = {'invalid': 'Enter a valid date/time.'} def prepare_value(self, value) def to_python(self, value) def strptime(self, value, format) class DurationField(Field) default_error_messages = {'invalid': 'Enter a valid duration.', 'overflow': 'The number of days must be between {min_days} and {max_days}.'} def prepare_value(self, value) def to_python(self, value) class RegexField(CharField) regex = property(_get_regex, _set_regex) class EmailField(CharField) default_validators = [validators.validate_email] class FileField(Field) default_error_messages = {'invalid': 'No file was submitted. Check the encoding type on the form.', 'missing': 'No file was submitted.', 'empty': 'The submitted file is empty.', 'text': pluralize_lazy('Ensure this filename has at most %(max)d character (it has %(length)d).', 'Ensure this filename has at most %(max)d characters (it has %(length)d).', 'max'), 'contradiction': 'Please either submit a file or check the clear checkbox, not both.'} def to_python(self, data) def clean(self, data, initial=None) def bound_data(self, _, initial) def has_changed(self, initial, data) def value_from_form_data(self, data, files, html_name) def value_from_json_data(self, data, files, html_name) class ImageField(FileField) default_validators = [validators.validate_image_file_extension] default_error_messages = {'invalid_image': 'Upload a valid image. The file you uploaded was either not an image or a corrupted image.'} def to_python(self, data) class URLField(CharField) default_error_messages = {'invalid': 'Enter a valid URL.'} default_validators = [validators.URLValidator()] def to_python(self, value) class BooleanField(Field) def to_python(self, value) def validate(self, value) def has_changed(self, initial, data) def value_from_form_data(self, data, files, html_name) def value_from_json_data(self, data, files, html_name) class NullBooleanField(BooleanField) def to_python(self, value) def validate(self, value) class CallableChoiceIterator() class ChoiceField(Field) default_error_messages = {'invalid_choice': 'Select a valid choice. %(value)s is not one of the available choices.'} choices = property(_get_choices, _set_choices) def to_python(self, value) def validate(self, value) def valid_value(self, value) class TypedChoiceField(ChoiceField) def clean(self, value) class MultipleChoiceField(ChoiceField) default_error_messages = {'invalid_choice': 'Select a valid choice. %(value)s is not one of the available choices.', 'invalid_list': 'Enter a list of values.'} def to_python(self, value) def validate(self, value) def has_changed(self, initial, data) def value_from_form_data(self, data, files, html_name) class UUIDField(CharField) default_error_messages = {'invalid': 'Enter a valid UUID.'} def prepare_value(self, value) def to_python(self, value) class InvalidJSONInput(str) class JSONString(str) class JSONField(CharField) default_error_messages = {'invalid': 'Enter a valid JSON.'} def to_python(self, value) def bound_data(self, data, initial) def prepare_value(self, value) def has_changed(self, initial, data) def from_current_timezone(value) def to_current_timezone(value) class DeclarativeFieldsMetaclass(type) class BaseForm() prefix = None @property def errors(self) def is_valid(self) def add_prefix(self, field_name) @property def non_field_errors(self) def add_error(self, field, error) def full_clean(self) def clean(self) @cached_property def changed_data(self) def get_initial_for_field(self, field, field_name) class Form(BaseForm) def parse_cookie(cookie) class MultiPartParserError(Exception) class InputStreamExhausted(Exception) RAW = 'raw' FILE = 'file' FIELD = 'field' FIELD_TYPES = frozenset([FIELD, RAW]) class MultiPartParser() boundary_re = _lazy_re_compile('[ -~]{0,200}[!-~]') def parse(self) def handle_file_complete(self, old_field_name, counters) def sanitize_file_name(self, file_name) class LazyStream() def tell(self) def read(self, size=None) def close(self) def unget(self, bytes) class ChunkIter() class InterBoundaryIter() class BoundaryIter() def exhaust(stream_or_iterable) def parse_boundary_stream(stream, max_header_size) class Parser() host_validation_re = _lazy_re_compile('^([a-z0-9.-]+|\\[[a-f0-9]*:[a-f0-9\\.:]+\\])(:[0-9]+)?$') class UnreadablePostError(OSError) class RawPostDataException(Exception) class HttpRequest() non_picklable_attrs = frozenset(['resolver_match', '_stream']) @cached_property def headers(self) @cached_property def accepted_types(self) def accepts(self, media_type) def get_host(self) def get_port(self) def get_full_path(self, force_append_slash=False) def build_absolute_uri(self, location=None) @property def scheme(self) def is_https(self) @property def encoding(self) @encoding.setter def encoding(self, val) @property def upload_handlers(self) @upload_handlers.setter def upload_handlers(self, upload_handlers) def parse_file_upload(self, meta, post_data) @property def body(self) def close(self) def read(self, *args, **kwargs) def readline(self, *args, **kwargs) def readlines(self) class HttpHeaders(CaseInsensitiveMapping) HTTP_PREFIX = 'HTTP_' UNPREFIXED_HEADERS = {'CONTENT_TYPE', 'CONTENT_LENGTH'} @classmethod def parse_header_name(cls, header) @classmethod def to_wsgi_name(cls, header) @classmethod def to_wsgi_names(cls, headers) class QueryDict(MultiValueDict) @classmethod def fromkeys(cls, iterable, value='', mutable=False, encoding=None) @property def encoding(self) @encoding.setter def encoding(self, value) def setlist(self, key, list_) def setlistdefault(self, key, default_list=None) def appendlist(self, key, value) def pop(self, key, *args) def popitem(self) def clear(self) def setdefault(self, key, default=None) def copy(self) def urlencode(self, safe=None) class MediaType() @property def is_all_types(self) def match(self, other) def bytes_to_text(s, encoding) def split_domain_port(host) def validate_host(host, allowed_hosts) def parse_accept_header(header) class ResponseHeaders(CaseInsensitiveMapping) def pop(self, key, default=None) def setdefault(self, key, value) class BadHeaderError(ValueError) class ResponseBase() status_code = 200 @property def reason_phrase(self) @reason_phrase.setter def reason_phrase(self, value) @property def charset(self) @charset.setter def charset(self, value) def serialize_headers(self) def set_cookie(self, key, value='', max_age=None, expires=None, path='/', domain=None, secure=False, httponly=False, samesite=None) def set_signed_cookie(self, key, value, salt='', **kwargs) def delete_cookie(self, key, path='/', domain=None, samesite=None) def make_bytes(self, value) def close(self) def write(self, content) def flush(self) def tell(self) def readable(self) def seekable(self) def writable(self) def writelines(self, lines) class Response(ResponseBase) streaming = False non_picklable_attrs = frozenset(['resolver_match', 'client', 'context', 'json', 'templates']) def serialize(self) @property def content(self) @content.setter def content(self, value) @cached_property def text(self) def write(self, content) def tell(self) def getvalue(self) def writable(self) def writelines(self, lines) class StreamingResponse(ResponseBase) streaming = True @property def content(self) @property def streaming_content(self) @streaming_content.setter def streaming_content(self, value) def getvalue(self) class FileResponse(StreamingResponse) block_size = 4096 def set_headers(self, filelike) class ResponseRedirectBase(Response) allowed_schemes = ['http', 'https', 'ftp'] url = property(lambda self: self.headers['Location']) class ResponseRedirect(ResponseRedirectBase) status_code = 302 class ResponsePermanentRedirect(ResponseRedirectBase) status_code = 301 class ResponseNotModified(Response) status_code = 304 @Response.content.setter def content(self, value) class ResponseBadRequest(Response) status_code = 400 class ResponseNotFound(Response) status_code = 404 class ResponseForbidden(Response) status_code = 403 class ResponseNotAllowed(Response) status_code = 405 class ResponseGone(Response) status_code = 410 class ResponseServerError(Response) status_code = 500 class Http404(Exception) class JsonResponse(Response) class PlainJSONEncoder(json.JSONEncoder) def default(self, o) def configure_logging(logging_settings) app_logger = logging.getLogger('app') class KVLogger() def log(self, level, message, **kwargs) def info(self, message, **kwargs) def debug(self, message, **kwargs) def warning(self, message, **kwargs) def error(self, message, **kwargs) def critical(self, message, **kwargs) request_logger = logging.getLogger('plain.request') def log_response(message, *args, response=None, request=None, logger=request_logger, level=None, exception=None) CONFIG_MODULE_NAME = 'config' class PackageConfig() @cached_property def path(self) def ready(self) CONFIG_MODULE_NAME = 'config' class PackagesRegistry() def populate(self, installed_packages=None) def check_packages_ready(self) def get_package_configs(self) def get_package_config(self, package_label) def get_containing_package_config(self, object_name) def register_config(self, package_config) packages_registry = PackagesRegistry(installed_packages=None) def register_config(package_config_class) class UnorderedObjectListWarning(RuntimeWarning) class InvalidPage(Exception) class PageNotAnInteger(InvalidPage) class EmptyPage(InvalidPage) class Paginator() def validate_number(self, number) def get_page(self, number) def page(self, number) @cached_property def count(self) @cached_property def num_pages(self) @property def page_range(self) class Page(collections.abc.Sequence) def has_next(self) def has_previous(self) def has_other_pages(self) def next_page_number(self) def previous_page_number(self) def start_index(self) def end_index(self) @register_check def check_setting_file_upload_temp_dir(package_configs, **kwargs) DEBUG = 10 INFO = 20 WARNING = 30 ERROR = 40 CRITICAL = 50 class CheckMessage() def is_serious(self, level=ERROR) def is_silenced(self) class Debug(CheckMessage) class Info(CheckMessage) class Warning(CheckMessage) class Error(CheckMessage) class Critical(CheckMessage) class CheckRegistry() def register(self, check=None, deploy=False) def run_checks(self, package_configs=None, include_deployment_checks=False, databases=None) def get_checks(self, include_deployment_checks=False) checks_registry = CheckRegistry() register_check = checks_registry.register run_checks = checks_registry.run_checks SECRET_KEY_MIN_LENGTH = 50 SECRET_KEY_MIN_UNIQUE_CHARACTERS = 5 SECRET_KEY_WARNING_MSG = f"Your %s has less than {SECRET_KEY_MIN_LENGTH} characters or less than {SECRET_KEY_MIN_UNIQUE_CHARACTERS} unique characters. Please generate a long and random value, otherwise many of Plain's security-critical features will be vulnerable to attack." W025 = Warning(SECRET_KEY_WARNING_MSG, id='security.W025') @register_check(deploy=True) def check_secret_key(package_configs, **kwargs) @register_check(deploy=True) def check_secret_key_fallbacks(package_configs, **kwargs) @register_check(deploy=True) def check_debug(package_configs, **kwargs) @register_check(deploy=True) def check_allowed_hosts(package_configs, **kwargs) @register_check def check_url_config(package_configs, **kwargs) def check_resolver(resolver) def get_warning_for_invalid_pattern(pattern) APP_PATH = Path.cwd() / 'app' settings = Settings() class AppPathNotFound(RuntimeError) def setup() class SettingsReference(str) DEFAULT_CHARSET = 'utf-8' APPEND_SLASH = True DEFAULT_RESPONSE_HEADERS = {'Cross-Origin-Opener-Policy': 'same-origin', 'Referrer-Policy': 'same-origin', 'X-Content-Type-Options': 'nosniff', 'X-Frame-Options': 'DENY'} HTTPS_REDIRECT_ENABLED = True HTTPS_REDIRECT_EXEMPT = [] HTTPS_REDIRECT_HOST = None HTTPS_PROXY_HEADER = None USE_X_FORWARDED_HOST = False USE_X_FORWARDED_PORT = False FILE_UPLOAD_HANDLERS = ['plain.internal.files.uploadhandler.MemoryFileUploadHandler', 'plain.internal.files.uploadhandler.TemporaryFileUploadHandler'] FILE_UPLOAD_MAX_MEMORY_SIZE = 2621440 DATA_UPLOAD_MAX_MEMORY_SIZE = 2621440 DATA_UPLOAD_MAX_NUMBER_FIELDS = 1000 DATA_UPLOAD_MAX_NUMBER_FILES = 100 FILE_UPLOAD_TEMP_DIR = None COOKIE_SIGNING_BACKEND = 'plain.signing.TimestampSigner' CSRF_COOKIE_NAME = 'csrftoken' CSRF_COOKIE_AGE = 60 * 60 * 24 * 7 * 52 CSRF_COOKIE_DOMAIN = None CSRF_COOKIE_PATH = '/' CSRF_COOKIE_SECURE = True CSRF_COOKIE_HTTPONLY = False CSRF_COOKIE_SAMESITE = 'Lax' CSRF_HEADER_NAME = 'CSRF-Token' CSRF_FIELD_NAME = '_csrftoken' LOGGING = {} ASSETS_REDIRECT_ORIGINAL = True PREFLIGHT_SILENCED_CHECKS = [] TEMPLATES_JINJA_ENVIRONMENT = 'plain.templates.jinja.DefaultEnvironment' ENVIRONMENT_VARIABLE = 'PLAIN_SETTINGS_MODULE' ENV_SETTINGS_PREFIX = 'PLAIN_' CUSTOM_SETTINGS_PREFIX = 'APP_' class Settings() class SettingDefinition() def set_value(self, value, source) def check_type(self, obj) request_started = Signal() request_finished = Signal() got_request_exception = Signal() logger = logging.getLogger('plain.signals.dispatch') NONE_ID = _make_id(None) NO_RECEIVERS = object() class Signal() def connect(self, receiver, sender=None, weak=True, dispatch_uid=None) def disconnect(self, receiver=None, sender=None, dispatch_uid=None) def has_listeners(self, sender=None) def send(self, sender, **named) def send_robust(self, sender, **named) def receiver(signal, **kwargs) BASE62_ALPHABET = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' class BadSignature(Exception) class SignatureExpired(BadSignature) def b62_encode(s) def b62_decode(s) def b64_encode(s) def b64_decode(s) def base64_hmac(salt, value, key, algorithm='sha1') def get_cookie_signer(salt='plain.signing.get_cookie_signer') class JSONSerializer() def dumps(self, obj) def loads(self, data) def dumps(obj, key=None, salt='plain.signing', serializer=JSONSerializer, compress=False) def loads(s, key=None, salt='plain.signing', serializer=JSONSerializer, max_age=None, fallback_keys=None) class Signer() def signature(self, value, key=None) def sign(self, value) def unsign(self, signed_value) def sign_object(self, obj, serializer=JSONSerializer, compress=False) def unsign_object(self, signed_obj, serializer=JSONSerializer, **kwargs) class TimestampSigner(Signer) def timestamp(self) def sign(self, value) def unsign(self, value, max_age=None) class TemplateFileMissing(Exception) class Template() def render(self, context: dict) class JinjaEnvironment(LazyObject) environment = JinjaEnvironment() def register_template_extension(extension_class) def register_template_global(value, name=None) def register_template_filter(func, name=None) def finalize_callable_error(obj) def get_template_dirs() class DefaultEnvironment(Environment) class InclusionTagExtension(Extension) def parse(self, parser) def get_context(self, context, *args, **kwargs) def localtime_filter(value, timezone=None) default_filters = {'strftime': datetime.datetime.strftime, 'strptime': datetime.datetime.strptime, 'localtime': localtime_filter, 'timeuntil': timeuntil, 'timesince': timesince, 'json_script': json_script, 'islice': islice} def asset(url_path) default_globals = {'asset': asset, 'url': reverse, 'Paginator': Paginator, 'now': timezone.now, 'timedelta': timedelta, 'localtime': timezone.localtime} class ClientHandler(BaseHandler) class RequestFactory() def request(self, **request) def get(self, path, data=None, secure=True, *, headers=None, **extra) def post(self, path, data=None, content_type=_MULTIPART_CONTENT, secure=True, *, headers=None, **extra) def head(self, path, data=None, secure=True, *, headers=None, **extra) def trace(self, path, secure=True, *, headers=None, **extra) def options(self, path, data='', content_type='application/octet-stream', secure=True, *, headers=None, **extra) def put(self, path, data='', content_type='application/octet-stream', secure=True, *, headers=None, **extra) def patch(self, path, data='', content_type='application/octet-stream', secure=True, *, headers=None, **extra) def delete(self, path, data='', content_type='application/octet-stream', secure=True, *, headers=None, **extra) def generic(self, method, path, data='', content_type='application/octet-stream', secure=True, *, headers=None, **extra) class Client(RequestFactory) def request(self, **request) def get(self, path, data=None, follow=False, secure=True, *, headers=None, **extra) def post(self, path, data=None, content_type=_MULTIPART_CONTENT, follow=False, secure=True, *, headers=None, **extra) def head(self, path, data=None, follow=False, secure=True, *, headers=None, **extra) def options(self, path, data='', content_type='application/octet-stream', follow=False, secure=True, *, headers=None, **extra) def put(self, path, data='', content_type='application/octet-stream', follow=False, secure=True, *, headers=None, **extra) def patch(self, path, data='', content_type='application/octet-stream', follow=False, secure=True, *, headers=None, **extra) def delete(self, path, data='', content_type='application/octet-stream', follow=False, secure=True, *, headers=None, **extra) def trace(self, path, data='', follow=False, secure=True, *, headers=None, **extra) def store_exc_info(self, **kwargs) def check_exception(self, response) @property def session(self) def force_login(self, user) def logout(self) def encode_multipart(boundary, data) def encode_file(boundary, key, file) class RedirectCycleError(Exception) class IntConverter() regex = '[0-9]+' def to_python(self, value) def to_url(self, value) class StringConverter() regex = '[^/]+' def to_python(self, value) def to_url(self, value) class UUIDConverter() regex = '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}' def to_python(self, value) def to_url(self, value) class SlugConverter(StringConverter) regex = '[-a-zA-Z0-9_]+' class PathConverter(StringConverter) regex = '.+' DEFAULT_CONVERTERS = {'int': IntConverter(), 'path': PathConverter(), 'slug': SlugConverter(), 'str': StringConverter(), 'uuid': UUIDConverter()} REGISTERED_CONVERTERS = {} def register_converter(converter, type_name) @functools.cache def get_converters() def get_converter(raw_converter) class Resolver404(Http404) class NoReverseMatch(Exception) class RegexPattern(CheckURLMixin) def match(self, path) def check(self) class RoutePattern(CheckURLMixin) def match(self, path) def check(self) class URLPattern() def check(self) def resolve(self, path) @cached_property def lookup_str(self) class ResolverMatch() def get_resolver(router=None) @functools.cache def get_ns_resolver(ns_pattern, resolver, converters) class URLResolver() def check(self) @property def reverse_dict(self) @property def namespace_dict(self) @property def app_dict(self) def resolve(self, path) def reverse(self, lookup_view, *args, **kwargs) class Router() def include(route: str | re.Pattern, router_or_urls: list | tuple | str | Router) def path(route: str | re.Pattern, view: 'View', *, name: str='') def reverse(viewname, *args, **kwargs) reverse_lazy = lazy(reverse, str) cc_delim_re = _lazy_re_compile('\\s*,\\s*') def patch_response_headers(response, cache_timeout) def add_never_cache_headers(response) def patch_cache_control(response, **kwargs) def patch_vary_headers(response, newheaders) class ConnectionProxy() class ConnectionDoesNotExist(Exception) class BaseConnectionHandler() settings_name = None exception_class = ConnectionDoesNotExist @cached_property def settings(self) def configure_settings(self, settings) def create_connection(self, alias) def all(self, initialized_only=False) def close_all(self) class InvalidAlgorithm(ValueError) def salted_hmac(key_salt, value, secret=None, *, algorithm='sha1') RANDOM_STRING_CHARS = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' def get_random_string(length, allowed_chars=RANDOM_STRING_CHARS) def constant_time_compare(val1, val2) def pbkdf2(password, salt, iterations, dklen=0, digest=None) class OrderedSet() def add(self, item) def remove(self, item) def discard(self, item) class MultiValueDictKeyError(KeyError) class MultiValueDict(dict) def get(self, key, default=None) def getlist(self, key, default=None) def setlist(self, key, list_) def setdefault(self, key, default=None) def setlistdefault(self, key, default_list=None) def appendlist(self, key, value) def items(self) def lists(self) def values(self) def copy(self) def update(self, *args, **kwargs) def dict(self) class ImmutableList(tuple) def complain(self, *args, **kwargs) append = complain extend = complain insert = complain pop = complain remove = complain sort = complain reverse = complain class DictWrapper(dict) class CaseInsensitiveMapping(Mapping) def copy(self) date_re = _lazy_re_compile('(?P\\d{4})-(?P\\d{1,2})-(?P\\d{1,2})$') time_re = _lazy_re_compile('(?P\\d{1,2}):(?P\\d{1,2})(?::(?P\\d{1,2})(?:[\\.,](?P\\d{1,6})\\d{0,6})?)?$') datetime_re = _lazy_re_compile('(?P\\d{4})-(?P\\d{1,2})-(?P\\d{1,2})[T ](?P\\d{1,2}):(?P\\d{1,2})(?::(?P\\d{1,2})(?:[\\.,](?P\\d{1,6})\\d{0,6})?)?\\s*(?PZ|[+-]\\d{2}(?::?\\d{2})?)?$') standard_duration_re = _lazy_re_compile('^(?:(?P-?\\d+) (days?, )?)?(?P-?)((?:(?P\\d+):)(?=\\d+:\\d+))?(?:(?P\\d+):)?(?P\\d+)(?:[\\.,](?P\\d{1,6})\\d{0,6})?$') iso8601_duration_re = _lazy_re_compile('^(?P[-+]?)P(?:(?P\\d+([\\.,]\\d+)?)D)?(?:T(?:(?P\\d+([\\.,]\\d+)?)H)?(?:(?P\\d+([\\.,]\\d+)?)M)?(?:(?P\\d+([\\.,]\\d+)?)S)?)?$') postgres_interval_re = _lazy_re_compile('^(?:(?P-?\\d+) (days? ?))?(?:(?P[-+])?(?P\\d+):(?P\\d\\d):(?P\\d\\d)(?:\\.(?P\\d{1,6}))?)?$') def parse_date(value) def parse_time(value) def parse_datetime(value) def parse_duration(value) def deconstructible(*args, path=None) class classonlymethod(classmethod) def duration_string(duration) def duration_iso_string(duration) def duration_microseconds(delta) class PlainUnicodeDecodeError(UnicodeDecodeError) def is_protected_type(obj) def force_str(s, encoding='utf-8', strings_only=False, errors='strict') def force_bytes(s, encoding='utf-8', strings_only=False, errors='strict') def iri_to_uri(iri) def punycode(domain) class cached_property() name = None @staticmethod def func(instance) class classproperty() def getter(self, method) class Promise() def lazy(func, *resultclasses) def keep_lazy(*resultclasses) def keep_lazy_text(func) empty = object() def new_method_proxy(func) class LazyObject() def unpickle_lazyobject(wrapped) class SimpleLazyObject(LazyObject) def partition(predicate, values) def make_hashable(value) @keep_lazy(SafeString) def escape(text) def json_script(value, element_id=None, encoder=None) def conditional_escape(text) def format_html(format_string, *args, **kwargs) class MLStripper(HTMLParser) def handle_data(self, d) def handle_entityref(self, name) def handle_charref(self, name) def get_data(self) @keep_lazy_text def strip_tags(value) def avoid_wrapping(value) RFC3986_SUBDELIMS = "!$&'()*+,;=" def urlencode(query, doseq=False) def http_date(epoch_seconds=None) def base36_to_int(s) def int_to_base36(i) def is_same_domain(host, pattern) def escape_leading_slashes(url) def parse_header_parameters(line) def content_disposition_header(as_attachment, filename) def get_func_args(func) def func_accepts_kwargs(func) def method_has_no_args(meth) def clean_ipv6_address(ip_str, unpack_ipv4=False, error_message='This is not a valid IPv6 address.') def is_valid_ipv6_address(ip_str) def is_iterable(x) def cached_import(module_path, class_name) def import_string(dotted_path) def module_dir(module) ESCAPE_MAPPINGS = {'A': None, 'b': None, 'B': None, 'd': '0', 'D': 'x', 's': ' ', 'S': 'x', 'w': 'x', 'W': '!', 'Z': None} class Choice(list) class Group(list) class NonCapture(list) def normalize(pattern) def next_char(input_iter) def walk_to_end(ch, input_iter) def get_quantifier(ch, input_iter) def contains(source, inst) def flatten_result(source) class SafeData() class SafeString(str, SafeData) @keep_lazy(SafeString) def mark_safe(s) re_words = _lazy_re_compile('<[^>]+?>|([^<>\\s]+)', re.S) re_chars = _lazy_re_compile('<[^>]+?>|(.)', re.S) re_tag = _lazy_re_compile('<(/)?(\\S+?)(?:(\\s*/)|\\s.*?)?>', re.S) class Truncator(SimpleLazyObject) def add_truncation_text(self, text, truncate=None) def chars(self, num, truncate=None, html=False) def words(self, num, truncate=None, html=False) @keep_lazy_text def slugify(value, allow_unicode=False) def pluralize(singular, plural, number) def pluralize_lazy(singular, plural, number) def timesince(d: datetime.datetime, *, now: datetime.datetime | None=None, reversed: bool=False, format: str | dict[str, str]='verbose', depth: int=2) def timeuntil(d: datetime.datetime, now: datetime.datetime | None=None, format: str | dict[str, str]='verbose', depth: int=2) def get_fixed_timezone(offset) @functools.lru_cache def get_default_timezone() def get_default_timezone_name() def get_current_timezone() def get_current_timezone_name() def activate(timezone) def deactivate() class override(ContextDecorator) def localtime(value=None, timezone=None) def now() def is_aware(value) def is_naive(value) def make_aware(value, timezone=None) def make_naive(value, timezone=None) class Node() default = 'DEFAULT' @classmethod def create(cls, children=None, connector=None, negated=False) copy = __copy__ def add(self, data, conn_type) def negate(self) EMPTY_VALUES = (None, '', [], (), {}) @deconstructible class RegexValidator() regex = '' message = 'Enter a valid value.' code = 'invalid' inverse_match = False flags = 0 @deconstructible class URLValidator(RegexValidator) ul = '¡-\uffff' ipv4_re = '(?:0|25[0-5]|2[0-4][0-9]|1[0-9]?[0-9]?|[1-9][0-9]?)(?:\\.(?:0|25[0-5]|2[0-4][0-9]|1[0-9]?[0-9]?|[1-9][0-9]?)){3}' ipv6_re = '\\[[0-9a-f:.]+\\]' hostname_re = '[a-z' + ul + '0-9](?:[a-z' + ul + '0-9-]{0,61}[a-z' + ul + '0-9])?' domain_re = '(?:\\.(?!-)[a-z' + ul + '0-9-]{1,63}(? logger = logging.getLogger('plain.request') class View() def setup(self, request: HttpRequest, *url_args, **url_kwargs) @classonlymethod def as_view(cls, *init_args, **init_kwargs) def get_request_handler(self) def get_response(self) def convert_value_to_response(self, value) def options(self) class CsrfExemptViewMixin() def setup(self, *args, **kwargs) class ErrorView(TemplateView) def get_template_context(self) def get_template_names(self) def get_request_handler(self) def get_response(self) def get(self) class ResponseException(Exception) class FormView(TemplateView) def get_form(self) def get_form_kwargs(self) def get_success_url(self, form: 'BaseForm') def form_valid(self, form: 'BaseForm') def form_invalid(self, form: 'BaseForm') def get_template_context(self) def post(self) class ObjectTemplateViewMixin() context_object_name = '' def get(self) def load_object(self) def get_object(self) def get_template_context(self) class DetailView(ObjectTemplateViewMixin, TemplateView) class CreateView(ObjectTemplateViewMixin, FormView) def post(self) def load_object(self) def get_success_url(self, form) def form_valid(self, form) class UpdateView(ObjectTemplateViewMixin, FormView) def post(self) def get_success_url(self, form) def form_valid(self, form) def get_form_kwargs(self) class DeleteView(ObjectTemplateViewMixin, FormView) class EmptyDeleteForm(Form) def save(self) form_class = EmptyDeleteForm def post(self) def get_form_kwargs(self) def form_valid(self, form) class ListView(TemplateView) context_object_name = '' def get(self) def get_objects(self) def get_template_context(self) logger = logging.getLogger('plain.request') class RedirectView(View) permanent = False query_string = False def get_redirect_url(self) def get(self) def head(self) def post(self) def options(self) def delete(self) def put(self) def patch(self) def csrf_input(request) csrf_input_lazy = lazy(csrf_input, SafeString, str) csrf_token_lazy = lazy(get_token, str) class TemplateView(View) def get_template_context(self) def get_template_names(self) def get_template(self) def render_template(self) def get(self) app = _get_wsgi_application() class Card() class Sizes(Enum) SMALL = 1 MEDIUM = 2 LARGE = 3 FULL = 4 template_name = 'admin/cards/card.html' def render(self, view, request) @classmethod def view_name(cls) def get_template_context(self) def get_title(self) @classmethod def get_slug(cls) def get_description(self) def get_number(self) def get_text(self) def get_link(self) def get_current_display(self) def get_displays(self) class ChartCard(Card) template_name = 'admin/cards/chart.html' def get_template_context(self) def get_chart_data(self) class TrendCard(ChartCard) model = None datetime_field = None default_display = DatetimeRangeAliases.SINCE_30_DAYS_AGO displays = DatetimeRangeAliases def get_description(self) def get_current_display(self) def get_trend_data(self) def get_chart_data(self) class TableCard(Card) template_name = 'admin/cards/table.html' size = Card.Sizes.FULL headers = [] rows = [] footers = [] def get_template_context(self) def get_headers(self) def get_rows(self) def get_footers(self) @register_config class Config(PackageConfig) package_label = 'plainadmin' def ready(self) class DatetimeRangeAliases(Enum) TODAY = 'Today' THIS_WEEK = 'This Week' THIS_WEEK_TO_DATE = 'This Week-to-date' THIS_MONTH = 'This Month' THIS_MONTH_TO_DATE = 'This Month-to-date' THIS_QUARTER = 'This Quarter' THIS_QUARTER_TO_DATE = 'This Quarter-to-date' THIS_YEAR = 'This Year' THIS_YEAR_TO_DATE = 'This Year-to-date' LAST_WEEK = 'Last Week' LAST_WEEK_TO_DATE = 'Last Week-to-date' LAST_MONTH = 'Last Month' LAST_MONTH_TO_DATE = 'Last Month-to-date' LAST_QUARTER = 'Last Quarter' LAST_QUARTER_TO_DATE = 'Last Quarter-to-date' LAST_YEAR = 'Last Year' LAST_YEAR_TO_DATE = 'Last Year-to-date' SINCE_30_DAYS_AGO = 'Since 30 Days Ago' SINCE_60_DAYS_AGO = 'Since 60 Days Ago' SINCE_90_DAYS_AGO = 'Since 90 Days Ago' SINCE_365_DAYS_AGO = 'Since 365 Days Ago' NEXT_WEEK = 'Next Week' NEXT_4_WEEKS = 'Next 4 Weeks' NEXT_MONTH = 'Next Month' NEXT_QUARTER = 'Next Quarter' NEXT_YEAR = 'Next Year' @classmethod def from_value(cls, value) @classmethod def to_range(cls, value: str) class DatetimeRange() def as_tuple(self) def total_days(self) def iter_days(self) def iter_weeks(self) def iter_months(self) def iter_quarters(self) def iter_years(self) ADMIN_TOOLBAR_CLASS = 'plain.admin.toolbar.Toolbar' def get_user_by_pk(pk) class ImpersonateMiddleware() def can_be_impersonator(user) def can_impersonate_user(impersonator, target_user) def IMPERSONATE_ALLOWED(user) class ImpersonateRouter(Router) namespace = 'impersonate' urls = [path('stop/', ImpersonateStopView, name='stop'), path('start//', ImpersonateStartView, name='start')] IMPERSONATE_KEY = 'impersonate' class ImpersonateStartView(View) def get(self) class ImpersonateStopView(View) def get(self) class AdminMiddleware() IGNORE_STACK_FILES = ['threading', 'concurrent/futures', 'functools.py', 'socketserver', 'wsgiref', 'gunicorn', 'whitenoise', 'sentry_sdk', 'querystats/core', 'plain/template/base', 'plain/models', 'plain/internal'] def pretty_print_sql(sql) def get_stack() def tidy_stack(stack) class QueryStats() @cached_property def total_time(self) @staticmethod def get_time_display(seconds) @cached_property def total_time_display(self) @cached_property def num_queries(self) @cached_property def duplicate_queries(self) @cached_property def num_duplicate_queries(self) def as_summary_dict(self) def as_context_dict(self, request) def as_server_timing(self) logger = logging.getLogger(__name__) class QueryStatsJSONEncoder(PlainJSONEncoder) def default(self, obj) class QueryStatsMiddleware() def should_ignore_request(self, request) @staticmethod def is_admin_request(request) class QuerystatsRouter(Router) namespace = 'querystats' urls = [path('', views.QuerystatsView, name='querystats')] class QuerystatsView(AuthViewMixin, TemplateView) template_name = 'querystats/querystats.html' admin_required = True def check_auth(self) def get_response(self) def get(self) def get_template_context(self) def post(self) @register_template_extension class ToolbarExtension(InclusionTagExtension) tags = {'toolbar'} template_name = 'toolbar/toolbar.html' def get_context(self, context, *args, **kwargs) @register_template_filter def get_admin_model_detail_url(obj) class Toolbar() def should_render(self) def request_exception(self) class AdminIndexView(AdminView) template_name = 'admin/index.html' title = 'Dashboard' def get(self) class AdminSearchView(AdminView) template_name = 'admin/search.html' title = 'Search' def get_template_context(self) class AdminRouter(Router) namespace = 'admin' urls = [path('search/', AdminSearchView, name='search'), include('impersonate/', ImpersonateRouter), include('querystats/', QuerystatsRouter), include('', registry.get_urls()), path('', AdminIndexView, name='index')] URL_NAMESPACE = 'admin' class AdminView(AuthViewMixin, TemplateView) admin_required = True nav_section = 'App' nav_title = '' template_name = 'admin/page.html' def get_response(self) def get_template_context(self) @classmethod def view_name(cls) @classmethod def get_slug(cls) def get_title(self) def get_image(self) def get_description(self) @classmethod def get_path(cls) @classmethod def get_parent_view_classes(cls) @classmethod def get_nav_section(cls) @classmethod def get_nav_title(cls) @classmethod def get_view_url(cls, obj=None) def get_links(self) def get_cards(self) def get_model_field(instance, field) class AdminModelListView(AdminListView) show_search = True allow_global_search = True queryset_order = [] def get_title(self) @classmethod def get_nav_title(cls) @classmethod def get_path(cls) def get_template_context(self) def get_objects(self) def get_initial_queryset(self) def order_queryset(self, queryset) def search_queryset(self, queryset) def get_field_value(self, obj, field: str) def get_field_value_template(self, obj, field: str, value) class AdminModelDetailView(AdminDetailView) def get_title(self) @classmethod def get_path(cls) def get_fields(self) def get_field_value(self, obj, field: str) def get_object(self) class AdminModelCreateView(AdminCreateView) form_class = None def get_title(self) @classmethod def get_path(cls) class AdminModelUpdateView(AdminUpdateView) form_class = None success_url = '.' def get_title(self) @classmethod def get_path(cls) def get_object(self) class AdminModelDeleteView(AdminDeleteView) def get_title(self) @classmethod def get_path(cls) def get_object(self) class AdminListView(HTMXViewMixin, AdminView) template_name = 'admin/list.html' page_size = 100 show_search = False allow_global_search = False def get_template_context(self) def get(self) def post(self) def perform_action(self, action: str, target_pks: list) def get_objects(self) def get_fields(self) def get_actions(self) def get_displays(self) def get_field_value(self, obj, field: str) def get_object_pk(self, obj) def get_field_value_template(self, obj, field: str, value) def get_list_url(self) def get_create_url(self) def get_detail_url(self, obj) def get_update_url(self, obj) def get_delete_url(self, obj) def get_object_url(self, obj) def get_object_links(self, obj) def get_links(self) class AdminCreateView(AdminView, CreateView) template_name = None def get_list_url(self) def get_create_url(self) def get_detail_url(self, obj) def get_update_url(self, obj) def get_delete_url(self, obj) def get_success_url(self, form) class AdminDetailView(AdminView, DetailView) template_name = None nav_section = '' def get_template_context(self) def get_template_names(self) def get_description(self) def get_field_value(self, obj, field: str) def get_field_value_template(self, obj, field: str, value) def get_list_url(self) def get_create_url(self) def get_detail_url(self, obj) def get_update_url(self, obj) def get_delete_url(self, obj) def get_fields(self) def get_links(self) class AdminUpdateView(AdminView, UpdateView) template_name = None nav_section = '' def get_list_url(self) def get_create_url(self) def get_detail_url(self, obj) def get_update_url(self, obj) def get_delete_url(self, obj) def get_description(self) def get_links(self) def get_success_url(self, form) class AdminDeleteView(AdminView, DeleteView) template_name = 'admin/delete.html' nav_section = '' def get_description(self) def get_list_url(self) def get_create_url(self) def get_detail_url(self, obj) def get_update_url(self, obj) def get_delete_url(self, obj) def get_links(self) def get_success_url(self, form) class AdminViewRegistry() def register_view(self, view=None) def register_viewset(self, viewset=None) def get_nav_sections(self) def get_urls(self) def get_searchable_views(self) def get_model_detail_url(self, instance) registry = AdminViewRegistry() register_view = registry.register_view register_viewset = registry.register_viewset get_model_detail_url = registry.get_model_detail_url class Img() class AdminViewset() @classmethod def get_views(cls) @register_cli('api') @click.group() def cli() @cli.command() @click.option('--validate', is_flag=True, help='Validate the OpenAPI schema.') @click.option('--indent', default=2, help='Indentation level for JSON and YAML output.') @click.option('--format', default='json', help='Output format (json or yaml).', type=click.Choice(['json', 'yaml'])) def generate_openapi(validate, indent, format) @register_config class Config(PackageConfig) package_label = 'plainapi' def generate_token() @models.register_model class APIKey(models.Model) uuid = models.UUIDField(default=uuid.uuid4) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) expires_at = models.DateTimeField(required=False, allow_null=True) last_used_at = models.DateTimeField(required=False, allow_null=True) name = models.CharField(max_length=255, required=False) token = models.CharField(max_length=40, default=generate_token) api_version = models.CharField(max_length=255, required=False) class Meta() constraints = [models.UniqueConstraint(fields=['uuid'], name='plainapi_apikey_unique_uuid'), models.UniqueConstraint(fields=['token'], name='plainapi_apikey_unique_token')] def response_typed_dict(status_code: int | HTTPStatus | str, return_type, *, description='', component_name='') def request_form(form_class: BaseForm) def schema(data) class OpenAPISchemaGenerator() def as_json(self, indent) def as_yaml(self, indent) def get_paths(self, urls) def path_from_url_pattern(self, url_pattern, root_path) def extract_components(self, obj) def operations_for_url_pattern(self, url_pattern) def parameters_from_url_patterns(self, url_patterns) def merge_data(data1, data2) def schema_from_type(t) class ErrorSchema(TypedDict) class APIVersionChange() def transform_request_forward(self, request, data) def transform_response_backward(self, response, data) class VersionedAPIView(View) api_version_header = 'API-Version' @cached_property def api_version(self) def get_api_version(self) def get_default_api_version(self) def get_response(self) def transform_request(self, request) def transform_response(self, response) logger = logging.getLogger('plain.api') class APIKeyView(View) api_key_required = True @cached_property def api_key(self) def get_response(self) def use_api_key(self) def get_api_key(self) @openapi.response_typed_dict(400, ErrorSchema, component_name='BadRequest') @openapi.response_typed_dict(401, ErrorSchema, component_name='Unauthorized') @openapi.response_typed_dict(403, ErrorSchema, component_name='Forbidden') @openapi.response_typed_dict(404, ErrorSchema, component_name='NotFound') @openapi.response_typed_dict('5XX', ErrorSchema, description='Unexpected Error', component_name='ServerError') class APIView(CsrfExemptViewMixin, View) def get_response(self) def get_user(request) class AuthenticationMiddleware() USER_ID_SESSION_KEY = '_auth_user_id' USER_HASH_SESSION_KEY = '_auth_user_hash' def get_session_auth_hash(user) def update_session_auth_hash(request, user) def get_session_auth_fallback_hash(user) def login(request, user) def logout(request) def get_user_model() def get_user(request) def resolve_url(to, *args, **kwargs) class LoginRequired(Exception) class AuthViewMixin() login_required = True admin_required = False login_url = settings.AUTH_LOGIN_URL def check_auth(self) def get_response(self) class LogoutView(View) def post(self) def redirect_to_login(next, login_url=None, redirect_field_name='next') @register_viewset class CachedItemViewset(AdminViewset) class ListView(AdminModelListView) nav_section = 'Cache' model = CachedItem title = 'Cached items' fields = ['key', 'created_at', 'expires_at', 'updated_at'] queryset_order = ['-pk'] allow_global_search = False def get_objects(self) class DetailView(AdminModelDetailView) model = CachedItem title = 'Cached item' @register_chore('cache') def clear_expired() @register_cli('cache') @click.group() def cli() @cli.command() def clear_expired() @cli.command() @click.option('--force', is_flag=True) def clear_all(force) @cli.command() def stats() @register_config class Config(PackageConfig) package_label = 'plaincache' class Cached() def reload(self) def exists(self) @property def value(self) def set(self, value, expiration: datetime | timedelta | int | float | None=None) def delete(self) class CachedItemQuerySet(models.QuerySet) def expired(self) def unexpired(self) def forever(self) @models.register_model class CachedItem(models.Model) key = models.CharField(max_length=255) value = models.JSONField(required=False, allow_null=True) expires_at = models.DateTimeField(required=False, allow_null=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) objects = CachedItemQuerySet.as_manager() class Meta() indexes = [models.Index(fields=['expires_at'])] constraints = [models.UniqueConstraint(fields=['key'], name='plaincache_cacheditem_unique_key')] DEFAULT_RUFF_CONFIG = Path(__file__).parent / 'ruff_defaults.toml' @register_cli('code') @click.group() def cli() @cli.command() @click.argument('path', default='.') def check(path) @register_cli('fix') @cli.command() @click.argument('path', default='.') @click.option('--unsafe-fixes', is_flag=True, help='Apply ruff unsafe fixes') @click.option('--add-noqa', is_flag=True, help='Add noqa comments to suppress errors') def fix(path, unsafe_fixes, add_noqa) def get_code_config() def setup() ENTRYPOINT_GROUP = 'plain.dev' @register_cli('dev') @click.group(invoke_without_command=True) @click.pass_context @click.option('--port', '-p', default=8443, type=int, help='Port to run the web server on') @click.option('--hostname', '-h', default=None, type=str, help='Hostname to run the web server on') @click.option('--log-level', '-l', default='info', type=click.Choice(['debug', 'info', 'warning', 'error', 'critical']), help='Log level') def cli(ctx, port, hostname, log_level) @cli.command() def debug() @cli.command() def services() @cli.command() @click.option('--list', '-l', 'show_list', is_flag=True, help='List available entrypoints') @click.argument('entrypoint', required=False) def entrypoint(show_list, entrypoint) class Dev() def run(self) def start_app(self, signum, frame) def symlink_plain_src(self) def modify_hosts_file(self) def set_csrf_and_allowed_hosts(self) def run_preflight(self) def add_gunicorn(self) def add_entrypoints(self) def add_pyproject_run(self) @register_cli('contrib') @click.command('contribute', hidden=True) @click.option('--repo', default='../plain', help='Path to the plain repo') @click.option('--reset', is_flag=True, help='Undo any changes to pyproject.toml and uv.lock') @click.option('--all', 'all_packages', is_flag=True, help='Link all installed plain packages') @click.argument('packages', nargs=-1) def cli(packages, repo, reset, all_packages) def set_breakpoint_hook() DEV_REQUESTS_IGNORE_PATHS = ['/favicon.ico'] DEV_REQUESTS_MAX = 50 def setup() class MkcertManager() def setup_mkcert(self, install_path) def is_mkcert_ca_installed(self) def generate_certs(self, domain, storage_path) log = logging.getLogger(__name__) def cry(message, stderr=sys.__stderr__) class LF2CRLF_FileWrapper() @property def encoding(self) def write(self, data, nl_rex=re.compile('\r?\n')) def writelines(self, lines, nl_rex=re.compile('\r?\n')) class DevPdb(Pdb) active_instance = None def do_quit(self, arg) do_q = do_quit do_exit = do_quit def set_trace(self, frame=None) def set_trace(frame=None, host='127.0.0.1', port=4444, patch_stdstreams=False, quiet=False) ANSI_COLOURS = ['grey', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white'] def get_colors() ON_WINDOWS = 'win32' in str(sys.platform).lower() class ProcessManager() KILL_WAIT = 5 SIGNALS = {signal.SIGINT: {'name': 'SIGINT', 'rc': 130}, signal.SIGTERM: {'name': 'SIGTERM', 'rc': 143}} SYSTEM_PRINTER_NAME = 'system' class Manager() returncode = None def add_process(self, name, cmd, quiet=False, env=None, cwd=None) def loop(self) def terminate(self) def kill(self) Message = namedtuple('Message', 'type data time name color') class Printer() def write(self, message) class Process() def run(self, events=None, ignore_signals=False) class Popen(subprocess.Popen) def install_git_hook() @register_cli('pre-commit') @click.command() @click.option('--install', is_flag=True) def cli(install) def plain_db_connected() def check_short(message, *args) class RequestLog() @staticmethod def storage_path() @classmethod def replay_request(cls, name) @staticmethod def load_json_logs() @staticmethod def delete_old_logs() @staticmethod def clear() def save(self) def as_dict(self) @staticmethod def request_as_dict(request) @staticmethod def response_as_dict(response) @staticmethod def exception_as_dict(exception) def should_capture_request(request) class RequestsMiddleware() def store_exception(self, **kwargs) class ServicesPid() def write(self) def rm(self) def exists(self) class Services() @staticmethod def get_services(root) @staticmethod def are_running() def run(self) class DevRequestsRouter(Router) namespace = 'dev' urls = [path('', views.RequestsView, name='requests')] def has_pyproject_toml(target_path) class RequestsView(TemplateView) template_name = 'dev/requests.html' def get_template_context(self) def post(self) @register_template_extension class ElementsExtension(Extension) def preprocess(self, source, name, filename=None) @cached_property def template_elements(self) def replace_template_element_tags(self, contents: str) def get_connection(backend=None, fail_silently=False, **kwds) def send_mail(subject, message, from_email, recipient_list, fail_silently=False, auth_user=None, auth_password=None, connection=None, html_message=None) def send_mass_mail(datatuple, fail_silently=False, auth_user=None, auth_password=None, connection=None) class BaseEmailBackend() def open(self) def close(self) def send_messages(self, email_messages) class EmailBackend(BaseEmailBackend) def write_message(self, message) def send_messages(self, email_messages) class EmailBackend(ConsoleEmailBackend) def write_message(self, message) def open(self) def close(self) class EmailBackend(BaseEmailBackend) @property def connection_class(self) @cached_property def ssl_context(self) def open(self) def close(self) def send_messages(self, email_messages) utf8_charset = Charset.Charset('utf-8') utf8_charset_qp = Charset.Charset('utf-8') DEFAULT_ATTACHMENT_MIME_TYPE = 'application/octet-stream' RFC5322_EMAIL_LINE_LENGTH_LIMIT = 998 class BadHeaderError(ValueError) ADDRESS_HEADERS = {'from', 'sender', 'reply-to', 'to', 'cc', 'bcc', 'resent-from', 'resent-sender', 'resent-to', 'resent-cc', 'resent-bcc'} def forbid_multi_line_headers(name, val, encoding) def sanitize_address(addr, encoding) class MIMEMixin() def as_string(self, unixfrom=False, linesep='\n') def as_bytes(self, unixfrom=False, linesep='\n') class SafeMIMEMessage(MIMEMixin, MIMEMessage) class SafeMIMEText(MIMEMixin, MIMEText) def set_payload(self, payload, charset=None) class SafeMIMEMultipart(MIMEMixin, MIMEMultipart) class EmailMessage() content_subtype = 'plain' mixed_subtype = 'mixed' encoding = None def get_connection(self, fail_silently=False) def message(self) def recipients(self) def send(self, fail_silently=False) def attach(self, filename=None, content=None, mimetype=None) def attach_file(self, path, mimetype=None) class EmailMultiAlternatives(EmailMessage) alternative_subtype = 'alternative' def attach_alternative(self, content, mimetype) class TemplateEmail(EmailMultiAlternatives) def get_template_context(self) def render_content(self, context) def render_plain(self, context) def render_html(self, context) def render_subject(self, context) def get_plain_template_name(self) def get_html_template_name(self) def get_subject_template_name(self) class CachedDnsName() def get_fqdn(self) DNS_NAME = CachedDnsName() @register_cli('esbuild') @click.group('esbuild') def cli() @cli.command() @click.option('--minify', is_flag=True, default=True) def build(minify) @cli.command() @click.pass_context def dev(ctx) def esbuild(input_path, output_path, *, minify=True) def get_esbuilt_path(input_path) def run_dev_build() def run_build() class UnusedFlagsCard(Card) title = 'Unused Flags' @cached_property def flag_errors(self) def get_number(self) def get_text(self) @register_viewset class FlagAdmin(AdminViewset) class ListView(AdminModelListView) model = Flag fields = ['name', 'enabled', 'created_at__date', 'used_at__date', 'uuid'] search_fields = ['name', 'description'] cards = [UnusedFlagsCard] nav_section = 'Feature flags' class DetailView(AdminModelDetailView) model = Flag class FlagResultForm(ModelForm) class Meta() model = FlagResult fields = ['key', 'value'] @register_viewset class FlagResultAdmin(AdminViewset) class ListView(AdminModelListView) model = FlagResult title = 'Flag results' fields = ['flag', 'key', 'value', 'created_at__date', 'updated_at__date', 'uuid'] search_fields = ['flag__name', 'key'] nav_section = 'Feature flags' def get_initial_queryset(self) class DetailView(AdminModelDetailView) model = FlagResult title = 'Flag result' class UpdateView(AdminModelUpdateView) model = FlagResult title = 'Update flag result' form_class = FlagResultForm def get_flags_module() def get_flag_class(flag_name: str) @register_config class Config(PackageConfig) package_label = 'plainflags' class FlagError(Exception) class FlagDisabled(FlagError) class FlagImportError(FlagError) logger = logging.getLogger(__name__) class Flag() def get_key(self) def get_value(self) def get_db_name(self) def retrieve_or_compute_value(self) @cached_property def value(self) def validate_flag_name(value) @models.register_model class FlagResult(models.Model) uuid = models.UUIDField(default=uuid.uuid4) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) flag = models.ForeignKey('Flag', on_delete=models.CASCADE) key = models.CharField(max_length=255) value = models.JSONField() class Meta() constraints = [models.UniqueConstraint(fields=['flag', 'key'], name='plainflags_flagresult_unique_key'), models.UniqueConstraint(fields=['uuid'], name='plainflags_flagresult_unique_uuid')] @models.register_model class Flag(models.Model) uuid = models.UUIDField(default=uuid.uuid4) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) name = models.CharField(max_length=255, validators=[validate_flag_name]) description = models.TextField(required=False) enabled = models.BooleanField(default=True) used_at = models.DateTimeField(required=False, allow_null=True) class Meta() constraints = [models.UniqueConstraint(fields=['name'], name='plainflags_flag_unique_name'), models.UniqueConstraint(fields=['uuid'], name='plainflags_flag_unique_uuid')] @classmethod def check(cls, **kwargs) def coerce_key(key: Any) @register_template_extension class HTMXJSExtension(InclusionTagExtension) tags = {'htmx_js'} template_name = 'htmx/js.html' def get_context(self, context, *args, **kwargs) @register_template_extension class HTMXFragmentExtension(Extension) tags = {'htmxfragment'} def parse(self, parser) def render_template_fragment(*, template, fragment_name, context) def find_template_fragment(template: jinja2.Template, fragment_name: str) class HTMXViewMixin() def render_template(self) def get_response(self) def get_request_handler(self) def is_htmx_request(self) def get_htmx_fragment_name(self) def get_htmx_action_name(self) @register_cli('importmap') @click.group() def cli() @cli.command() def generate() logger = logging.getLogger(__name__) DEFAULT_CONFIG_FILENAME = 'importmap.toml' DEFAULT_LOCK_FILENAME = 'importmap.lock' class PackageSchema(Schema) name = fields.String(required=True) source = fields.String(required=True) class ConfigSchema(Schema) packages = fields.List(fields.Nested(PackageSchema), required=True) class LockfileSchema(Schema) config_hash = fields.String(required=True) importmap = fields.Dict(required=True) importmap_dev = fields.Dict(required=True) def hash_for_data(data) class Importmap() def load(self) def load_config(self) def load_lockfile(self) def save_lockfile(self, lockfile) def delete_lockfile(self) def generate_map(self, *args, **kwargs) logger = logging.getLogger(__name__) class ImportmapGeneratorError(Exception) class ImportmapGenerator() @classmethod def from_config(cls, config, *args, **kwargs) def get_env(self) def generate(self) @register_template_extension class ImportmapJSExtension(InclusionTagExtension) tags = {'importmap_js'} template_name = 'importmap/js.html' def get_context(self, context, *args, **kwargs) class LoginLinkForm(forms.Form) email = forms.EmailField() next = forms.CharField(required=False) def maybe_send_link(self, request, expires_in=60 * 60) def get_template_email(self, *, email, context) class LoginLinkExpired(Exception) class LoginLinkInvalid(Exception) class LoginLinkChanged(Exception) def generate_link_url(*, request, user, email, expires_in) def get_link_token_user(token) class ExpiringSigner(Signer) def sign(self, value, expires_in) def unsign(self, value) def sign_object(self, obj, serializer=JSONSerializer, compress=False, expires_in=None) def unsign_object(self, signed_obj, serializer=JSONSerializer) def dumps(obj, key=None, salt='plain.loginlink', serializer=JSONSerializer, compress=False, expires_in=None) def loads(s, key=None, salt='plain.loginlink', serializer=JSONSerializer, fallback_keys=None) class LoginlinkRouter(Router) namespace = 'loginlink' urls = [path('sent/', views.LoginLinkSentView.as_view(), name='sent'), path('failed/', views.LoginLinkFailedView.as_view(), name='failed'), path('token//', views.LoginLinkLoginView.as_view(), name='login')] class LoginLinkFormView(FormView) form_class = LoginLinkForm success_url = reverse_lazy('loginlink:sent') def get(self) def form_valid(self, form) def get_success_url(self, form) class LoginLinkSentView(TemplateView) template_name = 'loginlink/sent.html' def get(self) class LoginLinkFailedView(TemplateView) template_name = 'loginlink/failed.html' def get_template_context(self) class LoginLinkLoginView(View) success_url = '/' def get(self) class Aggregate(Func) template = '%(function)s(%(distinct)s%(expressions)s)' contains_aggregate = True name = None filter_template = '%s FILTER (WHERE %%(filter)s)' window_compatible = True allow_distinct = False empty_result_set_value = None def get_source_fields(self) def get_source_expressions(self) def set_source_expressions(self, exprs) def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) @property def default_alias(self) def get_group_by_cols(self) def as_sql(self, compiler, connection, **extra_context) class Avg(FixDurationInputMixin, NumericOutputFieldMixin, Aggregate) function = 'AVG' name = 'Avg' allow_distinct = True class Count(Aggregate) function = 'COUNT' name = 'Count' output_field = IntegerField() allow_distinct = True empty_result_set_value = 0 class Max(Aggregate) function = 'MAX' name = 'Max' class Min(Aggregate) function = 'MIN' name = 'Min' class StdDev(NumericOutputFieldMixin, Aggregate) name = 'StdDev' class Sum(FixDurationInputMixin, Aggregate) function = 'SUM' name = 'Sum' allow_distinct = True class Variance(NumericOutputFieldMixin, Aggregate) name = 'Variance' NO_DB_ALIAS = '__no_db__' RAN_DB_VERSION_CHECK = set() logger = logging.getLogger('plain.models.backends.base') class BaseDatabaseWrapper() data_types = {} data_types_suffix = {} data_type_check_constraints = {} ops = None vendor = 'unknown' display_name = 'unknown' SchemaEditorClass = None client_class = None creation_class = None features_class = None introspection_class = None ops_class = None validation_class = BaseDatabaseValidation queries_limit = 9000 def ensure_timezone(self) @cached_property def timezone(self) @cached_property def timezone_name(self) @property def queries_logged(self) @property def queries(self) def get_database_version(self) def check_database_version_supported(self) def get_connection_params(self) def get_new_connection(self, conn_params) def init_connection_state(self) def create_cursor(self, name=None) def connect(self) def ensure_connection(self) def cursor(self) def commit(self) def rollback(self) def close(self) def savepoint(self) def savepoint_rollback(self, sid) def savepoint_commit(self, sid) def clean_savepoints(self) def get_autocommit(self) def set_autocommit(self, autocommit, force_begin_transaction_with_broken_autocommit=False) def get_rollback(self) def set_rollback(self, rollback) def validate_no_atomic_block(self) def validate_no_broken_transaction(self) def disable_constraint_checking(self) def enable_constraint_checking(self) def check_constraints(self, table_names=None) def is_usable(self) def close_if_health_check_failed(self) def close_if_unusable_or_obsolete(self) @property def allow_thread_sharing(self) def validate_thread_sharing(self) def prepare_database(self) @cached_property def wrap_database_errors(self) def chunked_cursor(self) def make_debug_cursor(self, cursor) def make_cursor(self, cursor) @contextmanager def temporary_connection(self) def schema_editor(self, *args, **kwargs) def on_commit(self, func, robust=False) def run_and_clear_commit_hooks(self) @contextmanager def execute_wrapper(self, wrapper) def copy(self, alias=None) class BaseDatabaseClient() executable_name = None @classmethod def settings_to_cmd_args_env(cls, settings_dict, parameters) def runshell(self, parameters) TEST_DATABASE_PREFIX = 'test_' class BaseDatabaseCreation() def log(self, msg) def create_test_db(self, verbosity=1, autoclobber=False, serialize=True, keepdb=False) def set_as_test_mirror(self, primary_settings_dict) def get_test_db_clone_settings(self, suffix) def destroy_test_db(self, old_database_name=None, verbosity=1, keepdb=False, suffix=None) def sql_table_creation_suffix(self) def test_db_signature(self) class BaseDatabaseFeatures() minimum_database_version = None allows_group_by_selected_pks = False allows_group_by_select_index = True empty_fetchmany_value = [] update_can_self_select = True interprets_empty_strings_as_nulls = False supports_deferrable_unique_constraints = False can_use_chunked_reads = True can_return_columns_from_insert = False can_return_rows_from_bulk_insert = False has_bulk_insert = True uses_savepoints = True related_fields_match_type = False has_select_for_update = False has_select_for_update_nowait = False has_select_for_update_skip_locked = False has_select_for_update_of = False has_select_for_no_key_update = False select_for_update_of_column = False truncates_names = False ignores_unnecessary_order_by_in_subqueries = True has_native_uuid_field = False has_native_duration_field = False supports_temporal_subtraction = False has_zoneinfo_database = True supports_order_by_nulls_modifier = True order_by_nulls_first = False max_query_params = None allows_auto_pk_0 = True can_defer_constraint_checks = False supports_index_column_ordering = True can_rollback_ddl = False supports_atomic_references_rename = True supports_combined_alters = False supports_foreign_keys = True can_rename_index = False supports_column_check_constraints = True supports_table_check_constraints = True can_introspect_check_constraints = True requires_literal_defaults = False connection_persists_old_columns = False bare_select_suffix = '' implied_column_null = False supports_select_for_update_with_limit = True ignores_table_name_case = False for_update_after_from = False supports_select_union = True supports_select_intersection = True supports_select_difference = True supports_slicing_ordering_in_compound = False supports_parentheses_in_compound = True requires_compound_order_by_subquery = False supports_aggregate_filter_clause = False supports_over_clause = False only_supports_unbounded_with_preceding_and_following = False supports_callproc_kwargs = False supported_explain_formats = set() supports_update_conflicts = False supports_update_conflicts_with_target = False requires_casted_case_in_updates = False supports_partial_indexes = True supports_covering_indexes = False supports_expression_indexes = True collate_as_index_expression = False supports_boolean_expr_in_select_clause = True supports_comparing_boolean_expr = True supports_json_field = True can_introspect_json_field = True has_native_json_field = False supports_json_field_contains = True has_json_object_function = True supports_collation_on_charfield = True supports_collation_on_textfield = True supports_comments = False supports_comments_inline = False supports_logical_xor = False supports_unlimited_charfield = False @cached_property def supports_explaining_query_execution(self) @cached_property def supports_transactions(self) def allows_group_by_selected_pks_on_model(self, model) TableInfo = namedtuple('TableInfo', ['name', 'type']) FieldInfo = namedtuple('FieldInfo', 'name type_code display_size internal_size precision scale null_ok default collation') class BaseDatabaseIntrospection() data_types_reverse = {} def get_field_type(self, data_type, description) def identifier_converter(self, name) def table_names(self, cursor=None, include_views=False) def get_table_list(self, cursor) def get_table_description(self, cursor, table_name) def get_migratable_models(self) def plain_table_names(self, only_existing=False, include_views=True) def sequence_list(self) def get_sequences(self, cursor, table_name, table_fields=()) def get_relations(self, cursor, table_name) def get_primary_key_column(self, cursor, table_name) def get_primary_key_columns(self, cursor, table_name) def get_constraints(self, cursor, table_name) class BaseDatabaseOperations() compiler_module = 'plain.models.sql.compiler' integer_field_ranges = {'SmallIntegerField': (-32768, 32767), 'IntegerField': (-2147483648, 2147483647), 'BigIntegerField': (-9223372036854775808, 9223372036854775807), 'PositiveBigIntegerField': (0, 9223372036854775807), 'PositiveSmallIntegerField': (0, 32767), 'PositiveIntegerField': (0, 2147483647), 'SmallAutoField': (-32768, 32767), 'AutoField': (-2147483648, 2147483647), 'BigAutoField': (-9223372036854775808, 9223372036854775807)} set_operators = {'union': 'UNION', 'intersection': 'INTERSECT', 'difference': 'EXCEPT'} cast_data_types = {} cast_char_field_without_max_length = None PRECEDING = 'PRECEDING' FOLLOWING = 'FOLLOWING' UNBOUNDED_PRECEDING = 'UNBOUNDED ' + PRECEDING UNBOUNDED_FOLLOWING = 'UNBOUNDED ' + FOLLOWING CURRENT_ROW = 'CURRENT ROW' explain_prefix = None def autoinc_sql(self, table, column) def bulk_batch_size(self, fields, objs) def format_for_duration_arithmetic(self, sql) def unification_cast_sql(self, output_field) def date_extract_sql(self, lookup_type, sql, params) def date_trunc_sql(self, lookup_type, sql, params, tzname=None) def datetime_cast_date_sql(self, sql, params, tzname) def datetime_cast_time_sql(self, sql, params, tzname) def datetime_extract_sql(self, lookup_type, sql, params, tzname) def datetime_trunc_sql(self, lookup_type, sql, params, tzname) def time_trunc_sql(self, lookup_type, sql, params, tzname=None) def time_extract_sql(self, lookup_type, sql, params) def deferrable_sql(self) def distinct_sql(self, fields, params) def fetch_returned_insert_columns(self, cursor, returning_params) def field_cast_sql(self, db_type, internal_type) def force_no_ordering(self) def for_update_sql(self, nowait=False, skip_locked=False, of=(), no_key=False) def limit_offset_sql(self, low_mark, high_mark) def last_executed_query(self, cursor, sql, params) def last_insert_id(self, cursor, table_name, pk_name) def lookup_cast(self, lookup_type, internal_type=None) def max_in_list_size(self) def max_name_length(self) def no_limit_value(self) def pk_default_value(self) def prepare_sql_script(self, sql) def return_insert_columns(self, fields) def compiler(self, compiler_name) def quote_name(self, name) def regex_lookup(self, lookup_type) def savepoint_create_sql(self, sid) def savepoint_commit_sql(self, sid) def savepoint_rollback_sql(self, sid) def set_time_zone_sql(self) def prep_for_like_query(self, x) prep_for_iexact_query = prep_for_like_query def validate_autopk_value(self, value) def adapt_unknown_value(self, value) def adapt_integerfield_value(self, value, internal_type) def adapt_datefield_value(self, value) def adapt_datetimefield_value(self, value) def adapt_timefield_value(self, value) def adapt_decimalfield_value(self, value, max_digits=None, decimal_places=None) def adapt_ipaddressfield_value(self, value) def adapt_json_value(self, value, encoder) def year_lookup_bounds_for_date_field(self, value, iso_year=False) def year_lookup_bounds_for_datetime_field(self, value, iso_year=False) def get_db_converters(self, expression) def convert_durationfield_value(self, value, expression, connection) def check_expression_support(self, expression) def conditional_expression_supported_in_where_clause(self, expression) def combine_expression(self, connector, sub_expressions) def combine_duration_expression(self, connector, sub_expressions) def binary_placeholder_sql(self, value) def modify_insert_params(self, placeholder, params) def integer_field_range(self, internal_type) def subtract_temporals(self, internal_type, lhs, rhs) def window_frame_start(self, start) def window_frame_end(self, end) def window_frame_rows_start_end(self, start=None, end=None) def window_frame_range_start_end(self, start=None, end=None) def explain_query_prefix(self, format=None, **options) def insert_statement(self, on_conflict=None) def on_conflict_suffix_sql(self, fields, on_conflict, update_fields, unique_fields) logger = logging.getLogger('plain.models.backends.schema') class BaseDatabaseSchemaEditor() sql_create_table = 'CREATE TABLE %(table)s (%(definition)s)' sql_rename_table = 'ALTER TABLE %(old_table)s RENAME TO %(new_table)s' sql_delete_table = 'DROP TABLE %(table)s CASCADE' sql_create_column = 'ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s' sql_alter_column = 'ALTER TABLE %(table)s %(changes)s' sql_alter_column_type = 'ALTER COLUMN %(column)s TYPE %(type)s%(collation)s' sql_alter_column_null = 'ALTER COLUMN %(column)s DROP NOT NULL' sql_alter_column_not_null = 'ALTER COLUMN %(column)s SET NOT NULL' sql_alter_column_default = 'ALTER COLUMN %(column)s SET DEFAULT %(default)s' sql_alter_column_no_default = 'ALTER COLUMN %(column)s DROP DEFAULT' sql_alter_column_no_default_null = sql_alter_column_no_default sql_delete_column = 'ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE' sql_rename_column = 'ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s' sql_update_with_default = 'UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL' sql_unique_constraint = 'UNIQUE (%(columns)s)%(deferrable)s' sql_check_constraint = 'CHECK (%(check)s)' sql_delete_constraint = 'ALTER TABLE %(table)s DROP CONSTRAINT %(name)s' sql_constraint = 'CONSTRAINT %(name)s %(constraint)s' sql_create_check = 'ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)' sql_delete_check = sql_delete_constraint sql_create_unique = 'ALTER TABLE %(table)s ADD CONSTRAINT %(name)s UNIQUE (%(columns)s)%(deferrable)s' sql_delete_unique = sql_delete_constraint sql_create_fk = 'ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s' sql_create_inline_fk = None sql_create_column_inline_fk = None sql_delete_fk = sql_delete_constraint sql_create_index = 'CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(include)s%(extra)s%(condition)s' sql_create_unique_index = 'CREATE UNIQUE INDEX %(name)s ON %(table)s (%(columns)s)%(include)s%(condition)s' sql_rename_index = 'ALTER INDEX %(old_name)s RENAME TO %(new_name)s' sql_delete_index = 'DROP INDEX %(name)s' sql_create_pk = 'ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)' sql_delete_pk = sql_delete_constraint sql_alter_table_comment = 'COMMENT ON TABLE %(table)s IS %(comment)s' sql_alter_column_comment = 'COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s' def execute(self, sql, params=()) def quote_name(self, name) def table_sql(self, model) def column_sql(self, model, field, include_default=False) def skip_default(self, field) def skip_default_on_alter(self, field) def prepare_default(self, value) def effective_default(self, field) def quote_value(self, value) def create_model(self, model) def delete_model(self, model) def add_index(self, model, index) def remove_index(self, model, index) def rename_index(self, model, old_index, new_index) def add_constraint(self, model, constraint) def remove_constraint(self, model, constraint) def alter_db_table(self, model, old_db_table, new_db_table) def alter_db_table_comment(self, model, old_db_table_comment, new_db_table_comment) def add_field(self, model, field) def remove_field(self, model, field) def alter_field(self, model, old_field, new_field, strict=False) class BaseDatabaseValidation() def check(self, **kwargs) def check_field(self, field, **kwargs) class Reference() def references_table(self, table) def references_column(self, table, column) def rename_table_references(self, old_table, new_table) def rename_column_references(self, table, old_column, new_column) class Table(Reference) def references_table(self, table) def rename_table_references(self, old_table, new_table) class TableColumns(Table) def references_column(self, table, column) def rename_column_references(self, table, old_column, new_column) class Columns(TableColumns) class IndexName(TableColumns) class IndexColumns(Columns) class ForeignKeyName(TableColumns) def references_table(self, table) def references_column(self, table, column) def rename_table_references(self, old_table, new_table) def rename_column_references(self, table, old_column, new_column) class Statement(Reference) def references_table(self, table) def references_column(self, table, column) def rename_table_references(self, old_table, new_table) def rename_column_references(self, table, old_column, new_column) class Expressions(TableColumns) def rename_table_references(self, old_table, new_table) def rename_column_references(self, table, old_column, new_column) version = Database.version_info plain_conversions = {**conversions, **{FIELD_TYPE.TIME: backend_utils.typecast_time}} server_version_re = _lazy_re_compile('(\\d{1,2})\\.(\\d{1,2})\\.(\\d{1,2})') class CursorWrapper() codes_for_integrityerror = (1048, 1690, 3819, 4025) def execute(self, query, args=None) def executemany(self, query, args) class DatabaseWrapper(BaseDatabaseWrapper) vendor = 'mysql' data_types = {'AutoField': 'integer AUTO_INCREMENT', 'BigAutoField': 'bigint AUTO_INCREMENT', 'BinaryField': 'longblob', 'BooleanField': 'bool', 'CharField': 'varchar(%(max_length)s)', 'DateField': 'date', 'DateTimeField': 'datetime(6)', 'DecimalField': 'numeric(%(max_digits)s, %(decimal_places)s)', 'DurationField': 'bigint', 'FloatField': 'double precision', 'IntegerField': 'integer', 'BigIntegerField': 'bigint', 'IPAddressField': 'char(15)', 'GenericIPAddressField': 'char(39)', 'JSONField': 'json', 'PositiveBigIntegerField': 'bigint UNSIGNED', 'PositiveIntegerField': 'integer UNSIGNED', 'PositiveSmallIntegerField': 'smallint UNSIGNED', 'SmallAutoField': 'smallint AUTO_INCREMENT', 'SmallIntegerField': 'smallint', 'TextField': 'longtext', 'TimeField': 'time(6)', 'UUIDField': 'char(32)'} operators = {'exact': '= %s', 'iexact': 'LIKE %s', 'contains': 'LIKE BINARY %s', 'icontains': 'LIKE %s', 'gt': '> %s', 'gte': '>= %s', 'lt': '< %s', 'lte': '<= %s', 'startswith': 'LIKE BINARY %s', 'endswith': 'LIKE BINARY %s', 'istartswith': 'LIKE %s', 'iendswith': 'LIKE %s'} pattern_esc = "REPLACE(REPLACE(REPLACE({}, '\\\\', '\\\\\\\\'), '%%', '\\%%'), '_', '\\_')" pattern_ops = {'contains': "LIKE BINARY CONCAT('%%', {}, '%%')", 'icontains': "LIKE CONCAT('%%', {}, '%%')", 'startswith': "LIKE BINARY CONCAT({}, '%%')", 'istartswith': "LIKE CONCAT({}, '%%')", 'endswith': "LIKE BINARY CONCAT('%%', {})", 'iendswith': "LIKE CONCAT('%%', {})"} isolation_levels = {'read uncommitted', 'read committed', 'repeatable read', 'serializable'} Database = Database SchemaEditorClass = DatabaseSchemaEditor client_class = DatabaseClient creation_class = DatabaseCreation features_class = DatabaseFeatures introspection_class = DatabaseIntrospection ops_class = DatabaseOperations validation_class = DatabaseValidation def get_database_version(self) def get_connection_params(self) def get_new_connection(self, conn_params) def init_connection_state(self) def create_cursor(self, name=None) def disable_constraint_checking(self) def enable_constraint_checking(self) def check_constraints(self, table_names=None) def is_usable(self) @cached_property def display_name(self) @cached_property def data_type_check_constraints(self) @cached_property def mysql_server_data(self) @cached_property def mysql_server_info(self) @cached_property def mysql_version(self) @cached_property def mysql_is_mariadb(self) @cached_property def sql_mode(self) class DatabaseClient(BaseDatabaseClient) executable_name = 'mysql' @classmethod def settings_to_cmd_args_env(cls, settings_dict, parameters) def runshell(self, parameters) class SQLCompiler(compiler.SQLCompiler) def as_subquery_condition(self, alias, columns, compiler) class SQLInsertCompiler(compiler.SQLInsertCompiler, SQLCompiler) class SQLDeleteCompiler(compiler.SQLDeleteCompiler, SQLCompiler) def as_sql(self) class SQLUpdateCompiler(compiler.SQLUpdateCompiler, SQLCompiler) def as_sql(self) class SQLAggregateCompiler(compiler.SQLAggregateCompiler, SQLCompiler) class DatabaseCreation(BaseDatabaseCreation) def sql_table_creation_suffix(self) class DatabaseFeatures(BaseDatabaseFeatures) empty_fetchmany_value = () allows_group_by_selected_pks = True related_fields_match_type = True has_select_for_update = True supports_comments = True supports_comments_inline = True supports_temporal_subtraction = True supports_slicing_ordering_in_compound = True supports_update_conflicts = True supports_partial_indexes = False collate_as_index_expression = True supports_order_by_nulls_modifier = False order_by_nulls_first = True supports_logical_xor = True @cached_property def minimum_database_version(self) @cached_property def allows_auto_pk_0(self) @cached_property def update_can_self_select(self) @cached_property def can_return_columns_from_insert(self) can_return_rows_from_bulk_insert = property(operator.attrgetter('can_return_columns_from_insert')) @cached_property def has_zoneinfo_database(self) @cached_property def is_sql_auto_is_null_enabled(self) @cached_property def supports_over_clause(self) @cached_property def supports_column_check_constraints(self) supports_table_check_constraints = property(operator.attrgetter('supports_column_check_constraints')) @cached_property def can_introspect_check_constraints(self) @cached_property def has_select_for_update_skip_locked(self) @cached_property def has_select_for_update_nowait(self) @cached_property def has_select_for_update_of(self) @cached_property def supports_explain_analyze(self) @cached_property def supported_explain_formats(self) @cached_property def supports_transactions(self) uses_savepoints = property(operator.attrgetter('supports_transactions')) @cached_property def ignores_table_name_case(self) @cached_property def can_introspect_json_field(self) @cached_property def supports_index_column_ordering(self) @cached_property def supports_expression_indexes(self) @cached_property def supports_select_intersection(self) supports_select_difference = property(operator.attrgetter('supports_select_intersection')) @cached_property def can_rename_index(self) FieldInfo = namedtuple('FieldInfo', BaseFieldInfo._fields + ('extra', 'is_unsigned', 'has_json_constraint', 'comment')) InfoLine = namedtuple('InfoLine', 'col_name data_type max_len num_prec num_scale extra column_default collation is_unsigned comment') TableInfo = namedtuple('TableInfo', BaseTableInfo._fields + ('comment',)) class DatabaseIntrospection(BaseDatabaseIntrospection) data_types_reverse = {FIELD_TYPE.BLOB: 'TextField', FIELD_TYPE.CHAR: 'CharField', FIELD_TYPE.DECIMAL: 'DecimalField', FIELD_TYPE.NEWDECIMAL: 'DecimalField', FIELD_TYPE.DATE: 'DateField', FIELD_TYPE.DATETIME: 'DateTimeField', FIELD_TYPE.DOUBLE: 'FloatField', FIELD_TYPE.FLOAT: 'FloatField', FIELD_TYPE.INT24: 'IntegerField', FIELD_TYPE.JSON: 'JSONField', FIELD_TYPE.LONG: 'IntegerField', FIELD_TYPE.LONGLONG: 'BigIntegerField', FIELD_TYPE.SHORT: 'SmallIntegerField', FIELD_TYPE.STRING: 'CharField', FIELD_TYPE.TIME: 'TimeField', FIELD_TYPE.TIMESTAMP: 'DateTimeField', FIELD_TYPE.TINY: 'IntegerField', FIELD_TYPE.TINY_BLOB: 'TextField', FIELD_TYPE.MEDIUM_BLOB: 'TextField', FIELD_TYPE.LONG_BLOB: 'TextField', FIELD_TYPE.VAR_STRING: 'CharField'} def get_field_type(self, data_type, description) def get_table_list(self, cursor) def get_table_description(self, cursor, table_name) def get_sequences(self, cursor, table_name, table_fields=()) def get_relations(self, cursor, table_name) def get_storage_engine(self, cursor, table_name) def get_constraints(self, cursor, table_name) class DatabaseOperations(BaseDatabaseOperations) compiler_module = 'plain.models.backends.mysql.compiler' integer_field_ranges = {**BaseDatabaseOperations.integer_field_ranges, 'PositiveSmallIntegerField': (0, 65535), 'PositiveIntegerField': (0, 4294967295), 'PositiveBigIntegerField': (0, 18446744073709551615)} cast_data_types = {'AutoField': 'signed integer', 'BigAutoField': 'signed integer', 'SmallAutoField': 'signed integer', 'CharField': 'char(%(max_length)s)', 'DecimalField': 'decimal(%(max_digits)s, %(decimal_places)s)', 'TextField': 'char', 'IntegerField': 'signed integer', 'BigIntegerField': 'signed integer', 'SmallIntegerField': 'signed integer', 'PositiveBigIntegerField': 'unsigned integer', 'PositiveIntegerField': 'unsigned integer', 'PositiveSmallIntegerField': 'unsigned integer', 'DurationField': 'signed integer'} cast_char_field_without_max_length = 'char' explain_prefix = 'EXPLAIN' def date_extract_sql(self, lookup_type, sql, params) def date_trunc_sql(self, lookup_type, sql, params, tzname=None) def datetime_cast_date_sql(self, sql, params, tzname) def datetime_cast_time_sql(self, sql, params, tzname) def datetime_extract_sql(self, lookup_type, sql, params, tzname) def datetime_trunc_sql(self, lookup_type, sql, params, tzname) def time_trunc_sql(self, lookup_type, sql, params, tzname=None) def fetch_returned_insert_rows(self, cursor) def format_for_duration_arithmetic(self, sql) def force_no_ordering(self) def adapt_decimalfield_value(self, value, max_digits=None, decimal_places=None) def last_executed_query(self, cursor, sql, params) def no_limit_value(self) def quote_name(self, name) def return_insert_columns(self, fields) def validate_autopk_value(self, value) def adapt_datetimefield_value(self, value) def adapt_timefield_value(self, value) def max_name_length(self) def pk_default_value(self) def bulk_insert_sql(self, fields, placeholder_rows) def combine_expression(self, connector, sub_expressions) def get_db_converters(self, expression) def convert_booleanfield_value(self, value, expression, connection) def convert_datetimefield_value(self, value, expression, connection) def convert_uuidfield_value(self, value, expression, connection) def binary_placeholder_sql(self, value) def subtract_temporals(self, internal_type, lhs, rhs) def explain_query_prefix(self, format=None, **options) def regex_lookup(self, lookup_type) def insert_statement(self, on_conflict=None) def lookup_cast(self, lookup_type, internal_type=None) def conditional_expression_supported_in_where_clause(self, expression) def on_conflict_suffix_sql(self, fields, on_conflict, update_fields, unique_fields) class DatabaseSchemaEditor(BaseDatabaseSchemaEditor) sql_rename_table = 'RENAME TABLE %(old_table)s TO %(new_table)s' sql_alter_column_null = 'MODIFY %(column)s %(type)s NULL' sql_alter_column_not_null = 'MODIFY %(column)s %(type)s NOT NULL' sql_alter_column_type = 'MODIFY %(column)s %(type)s%(collation)s%(comment)s' sql_alter_column_no_default_null = 'ALTER COLUMN %(column)s SET DEFAULT NULL' sql_delete_column = 'ALTER TABLE %(table)s DROP COLUMN %(column)s' sql_delete_unique = 'ALTER TABLE %(table)s DROP INDEX %(name)s' sql_create_column_inline_fk = ', ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) REFERENCES %(to_table)s(%(to_column)s)' sql_delete_fk = 'ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s' sql_delete_index = 'DROP INDEX %(name)s ON %(table)s' sql_rename_index = 'ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s' sql_create_pk = 'ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)' sql_delete_pk = 'ALTER TABLE %(table)s DROP PRIMARY KEY' sql_create_index = 'CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s' sql_alter_table_comment = 'ALTER TABLE %(table)s COMMENT = %(comment)s' sql_alter_column_comment = None @property def sql_delete_check(self) @property def sql_rename_column(self) def quote_value(self, value) def skip_default(self, field) def skip_default_on_alter(self, field) def add_field(self, model, field) def remove_constraint(self, model, constraint) def remove_index(self, model, index) class DatabaseValidation(BaseDatabaseValidation) def check(self, **kwargs) def check_field_type(self, field, field_type) TIMESTAMPTZ_OID = adapters.types['timestamptz'].oid TSRANGE_OID = pg_types['tsrange'].oid TSTZRANGE_OID = pg_types['tstzrange'].oid class BaseTzLoader(TimestamptzLoader) timezone = None def load(self, data) def register_tzloader(tz, context) class PlainRangeDumper(RangeDumper) def upgrade(self, obj, format) @lru_cache def get_adapters_template(timezone) class DatabaseWrapper(BaseDatabaseWrapper) vendor = 'postgresql' display_name = 'PostgreSQL' data_types = {'AutoField': 'integer', 'BigAutoField': 'bigint', 'BinaryField': 'bytea', 'BooleanField': 'boolean', 'CharField': _get_varchar_column, 'DateField': 'date', 'DateTimeField': 'timestamp with time zone', 'DecimalField': 'numeric(%(max_digits)s, %(decimal_places)s)', 'DurationField': 'interval', 'FloatField': 'double precision', 'IntegerField': 'integer', 'BigIntegerField': 'bigint', 'IPAddressField': 'inet', 'GenericIPAddressField': 'inet', 'JSONField': 'jsonb', 'PositiveBigIntegerField': 'bigint', 'PositiveIntegerField': 'integer', 'PositiveSmallIntegerField': 'smallint', 'SmallAutoField': 'smallint', 'SmallIntegerField': 'smallint', 'TextField': 'text', 'TimeField': 'time', 'UUIDField': 'uuid'} data_type_check_constraints = {'PositiveBigIntegerField': '"%(column)s" >= 0', 'PositiveIntegerField': '"%(column)s" >= 0', 'PositiveSmallIntegerField': '"%(column)s" >= 0'} data_types_suffix = {'AutoField': 'GENERATED BY DEFAULT AS IDENTITY', 'BigAutoField': 'GENERATED BY DEFAULT AS IDENTITY', 'SmallAutoField': 'GENERATED BY DEFAULT AS IDENTITY'} operators = {'exact': '= %s', 'iexact': '= UPPER(%s)', 'contains': 'LIKE %s', 'icontains': 'LIKE UPPER(%s)', 'regex': '~ %s', 'iregex': '~* %s', 'gt': '> %s', 'gte': '>= %s', 'lt': '< %s', 'lte': '<= %s', 'startswith': 'LIKE %s', 'endswith': 'LIKE %s', 'istartswith': 'LIKE UPPER(%s)', 'iendswith': 'LIKE UPPER(%s)'} pattern_esc = "REPLACE(REPLACE(REPLACE({}, E'\\\\', E'\\\\\\\\'), E'%%', E'\\\\%%'), E'_', E'\\\\_')" pattern_ops = {'contains': "LIKE '%%' || {} || '%%'", 'icontains': "LIKE '%%' || UPPER({}) || '%%'", 'startswith': "LIKE {} || '%%'", 'istartswith': "LIKE UPPER({}) || '%%'", 'endswith': "LIKE '%%' || {}", 'iendswith': "LIKE '%%' || UPPER({})"} Database = Database SchemaEditorClass = DatabaseSchemaEditor client_class = DatabaseClient creation_class = DatabaseCreation features_class = DatabaseFeatures introspection_class = DatabaseIntrospection ops_class = DatabaseOperations def get_database_version(self) def get_connection_params(self) def get_new_connection(self, conn_params) def ensure_timezone(self) def ensure_role(self) def init_connection_state(self) def create_cursor(self, name=None) def tzinfo_factory(self, offset) def chunked_cursor(self) def check_constraints(self, table_names=None) def is_usable(self) @cached_property def pg_version(self) def make_debug_cursor(self, cursor) class CursorMixin() def callproc(self, name, args=None) class ServerBindingCursor(CursorMixin, Database.Cursor) class Cursor(CursorMixin, Database.ClientCursor) class CursorDebugWrapper(BaseCursorDebugWrapper) def copy(self, statement) class DatabaseClient(BaseDatabaseClient) executable_name = 'psql' @classmethod def settings_to_cmd_args_env(cls, settings_dict, parameters) def runshell(self, parameters) class DatabaseCreation(BaseDatabaseCreation) def sql_table_creation_suffix(self) class DatabaseFeatures(BaseDatabaseFeatures) minimum_database_version = (12,) allows_group_by_selected_pks = True can_return_columns_from_insert = True can_return_rows_from_bulk_insert = True has_native_uuid_field = True has_native_duration_field = True has_native_json_field = True can_defer_constraint_checks = True has_select_for_update = True has_select_for_update_nowait = True has_select_for_update_of = True has_select_for_update_skip_locked = True has_select_for_no_key_update = True supports_comments = True supports_transactions = True can_rollback_ddl = True supports_combined_alters = True supports_temporal_subtraction = True supports_slicing_ordering_in_compound = True requires_casted_case_in_updates = True supports_over_clause = True only_supports_unbounded_with_preceding_and_following = True supports_aggregate_filter_clause = True supported_explain_formats = {'JSON', 'TEXT', 'XML', 'YAML'} supports_deferrable_unique_constraints = True supports_update_conflicts = True supports_update_conflicts_with_target = True supports_covering_indexes = True can_rename_index = True supports_unlimited_charfield = True FieldInfo = namedtuple('FieldInfo', BaseFieldInfo._fields + ('is_autofield', 'comment')) TableInfo = namedtuple('TableInfo', BaseTableInfo._fields + ('comment',)) class DatabaseIntrospection(BaseDatabaseIntrospection) data_types_reverse = {16: 'BooleanField', 17: 'BinaryField', 20: 'BigIntegerField', 21: 'SmallIntegerField', 23: 'IntegerField', 25: 'TextField', 700: 'FloatField', 701: 'FloatField', 869: 'GenericIPAddressField', 1042: 'CharField', 1043: 'CharField', 1082: 'DateField', 1083: 'TimeField', 1114: 'DateTimeField', 1184: 'DateTimeField', 1186: 'DurationField', 1266: 'TimeField', 1700: 'DecimalField', 2950: 'UUIDField', 3802: 'JSONField'} index_default_access_method = 'btree' ignored_tables = [] def get_field_type(self, data_type, description) def get_table_list(self, cursor) def get_table_description(self, cursor, table_name) def get_sequences(self, cursor, table_name, table_fields=()) def get_relations(self, cursor, table_name) def get_constraints(self, cursor, table_name) @lru_cache def get_json_dumps(encoder) class DatabaseOperations(BaseDatabaseOperations) cast_char_field_without_max_length = 'varchar' explain_prefix = 'EXPLAIN' explain_options = frozenset(['ANALYZE', 'BUFFERS', 'COSTS', 'SETTINGS', 'SUMMARY', 'TIMING', 'VERBOSE', 'WAL']) cast_data_types = {'AutoField': 'integer', 'BigAutoField': 'bigint', 'SmallAutoField': 'smallint'} integerfield_type_map = {'SmallIntegerField': numeric.Int2, 'IntegerField': numeric.Int4, 'BigIntegerField': numeric.Int8, 'PositiveSmallIntegerField': numeric.Int2, 'PositiveIntegerField': numeric.Int4, 'PositiveBigIntegerField': numeric.Int8} def unification_cast_sql(self, output_field) def date_extract_sql(self, lookup_type, sql, params) def date_trunc_sql(self, lookup_type, sql, params, tzname=None) def datetime_cast_date_sql(self, sql, params, tzname) def datetime_cast_time_sql(self, sql, params, tzname) def datetime_extract_sql(self, lookup_type, sql, params, tzname) def datetime_trunc_sql(self, lookup_type, sql, params, tzname) def time_extract_sql(self, lookup_type, sql, params) def time_trunc_sql(self, lookup_type, sql, params, tzname=None) def deferrable_sql(self) def fetch_returned_insert_rows(self, cursor) def lookup_cast(self, lookup_type, internal_type=None) def no_limit_value(self) def prepare_sql_script(self, sql) def quote_name(self, name) def compose_sql(self, sql, params) def set_time_zone_sql(self) def prep_for_iexact_query(self, x) def max_name_length(self) def distinct_sql(self, fields, params) def last_executed_query(self, cursor, sql, params) def return_insert_columns(self, fields) def bulk_insert_sql(self, fields, placeholder_rows) def adapt_integerfield_value(self, value, internal_type) def adapt_datefield_value(self, value) def adapt_datetimefield_value(self, value) def adapt_timefield_value(self, value) def adapt_decimalfield_value(self, value, max_digits=None, decimal_places=None) def adapt_ipaddressfield_value(self, value) def adapt_json_value(self, value, encoder) def subtract_temporals(self, internal_type, lhs, rhs) def explain_query_prefix(self, format=None, **options) def on_conflict_suffix_sql(self, fields, on_conflict, update_fields, unique_fields) class DatabaseSchemaEditor(BaseDatabaseSchemaEditor) sql_update_with_default = 'UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL; SET CONSTRAINTS ALL IMMEDIATE' sql_alter_sequence_type = 'ALTER SEQUENCE IF EXISTS %(sequence)s AS %(type)s' sql_delete_sequence = 'DROP SEQUENCE IF EXISTS %(sequence)s CASCADE' sql_create_index = 'CREATE INDEX %(name)s ON %(table)s%(using)s (%(columns)s)%(include)s%(extra)s%(condition)s' sql_create_index_concurrently = 'CREATE INDEX CONCURRENTLY %(name)s ON %(table)s%(using)s (%(columns)s)%(include)s%(extra)s%(condition)s' sql_delete_index = 'DROP INDEX IF EXISTS %(name)s' sql_delete_index_concurrently = 'DROP INDEX CONCURRENTLY IF EXISTS %(name)s' sql_create_column_inline_fk = 'CONSTRAINT %(name)s REFERENCES %(to_table)s(%(to_column)s)%(deferrable)s; SET CONSTRAINTS %(namespace)s%(name)s IMMEDIATE' sql_delete_fk = 'SET CONSTRAINTS %(name)s IMMEDIATE; ALTER TABLE %(table)s DROP CONSTRAINT %(name)s' def execute(self, sql, params=()) sql_add_identity = 'ALTER TABLE %(table)s ALTER COLUMN %(column)s ADD GENERATED BY DEFAULT AS IDENTITY' sql_drop_indentity = 'ALTER TABLE %(table)s ALTER COLUMN %(column)s DROP IDENTITY IF EXISTS' def quote_value(self, value) def add_index(self, model, index, concurrently=False) def remove_index(self, model, index, concurrently=False) def register(connection) class ListAggregate(list) step = list.append class StdDevPop(ListAggregate) finalize = statistics.pstdev class StdDevSamp(ListAggregate) finalize = statistics.stdev class VarPop(ListAggregate) finalize = statistics.pvariance class VarSamp(ListAggregate) finalize = statistics.variance def decoder(conv_func) def adapt_date(val) def adapt_datetime(val) class DatabaseWrapper(BaseDatabaseWrapper) vendor = 'sqlite' display_name = 'SQLite' data_types = {'AutoField': 'integer', 'BigAutoField': 'integer', 'BinaryField': 'BLOB', 'BooleanField': 'bool', 'CharField': 'varchar(%(max_length)s)', 'DateField': 'date', 'DateTimeField': 'datetime', 'DecimalField': 'decimal', 'DurationField': 'bigint', 'FloatField': 'real', 'IntegerField': 'integer', 'BigIntegerField': 'bigint', 'IPAddressField': 'char(15)', 'GenericIPAddressField': 'char(39)', 'JSONField': 'text', 'PositiveBigIntegerField': 'bigint unsigned', 'PositiveIntegerField': 'integer unsigned', 'PositiveSmallIntegerField': 'smallint unsigned', 'SmallAutoField': 'integer', 'SmallIntegerField': 'smallint', 'TextField': 'text', 'TimeField': 'time', 'UUIDField': 'char(32)'} data_type_check_constraints = {'PositiveBigIntegerField': '"%(column)s" >= 0', 'JSONField': '(JSON_VALID("%(column)s") OR "%(column)s" IS NULL)', 'PositiveIntegerField': '"%(column)s" >= 0', 'PositiveSmallIntegerField': '"%(column)s" >= 0'} data_types_suffix = {'AutoField': 'AUTOINCREMENT', 'BigAutoField': 'AUTOINCREMENT', 'SmallAutoField': 'AUTOINCREMENT'} operators = {'exact': '= %s', 'iexact': "LIKE %s ESCAPE '\\'", 'contains': "LIKE %s ESCAPE '\\'", 'icontains': "LIKE %s ESCAPE '\\'", 'regex': 'REGEXP %s', 'iregex': "REGEXP '(?i)' || %s", 'gt': '> %s', 'gte': '>= %s', 'lt': '< %s', 'lte': '<= %s', 'startswith': "LIKE %s ESCAPE '\\'", 'endswith': "LIKE %s ESCAPE '\\'", 'istartswith': "LIKE %s ESCAPE '\\'", 'iendswith': "LIKE %s ESCAPE '\\'"} pattern_esc = "REPLACE(REPLACE(REPLACE({}, '\\', '\\\\'), '%%', '\\%%'), '_', '\\_')" pattern_ops = {'contains': "LIKE '%%' || {} || '%%' ESCAPE '\\'", 'icontains': "LIKE '%%' || UPPER({}) || '%%' ESCAPE '\\'", 'startswith': "LIKE {} || '%%' ESCAPE '\\'", 'istartswith': "LIKE UPPER({}) || '%%' ESCAPE '\\'", 'endswith': "LIKE '%%' || {} ESCAPE '\\'", 'iendswith': "LIKE '%%' || UPPER({}) ESCAPE '\\'"} Database = Database SchemaEditorClass = DatabaseSchemaEditor client_class = DatabaseClient creation_class = DatabaseCreation features_class = DatabaseFeatures introspection_class = DatabaseIntrospection ops_class = DatabaseOperations def get_connection_params(self) def get_database_version(self) def get_new_connection(self, conn_params) def create_cursor(self, name=None) def close(self) def disable_constraint_checking(self) def enable_constraint_checking(self) def check_constraints(self, table_names=None) def is_usable(self) def is_in_memory_db(self) FORMAT_QMARK_REGEX = _lazy_re_compile('(? class DatabaseClient(BaseDatabaseClient) executable_name = 'sqlite3' @classmethod def settings_to_cmd_args_env(cls, settings_dict, parameters) class DatabaseCreation(BaseDatabaseCreation) @staticmethod def is_in_memory_db(database_name) def get_test_db_clone_settings(self, suffix) def test_db_signature(self) class DatabaseFeatures(BaseDatabaseFeatures) minimum_database_version = (3, 21) max_query_params = 999 supports_transactions = True can_rollback_ddl = True requires_literal_defaults = True supports_temporal_subtraction = True ignores_table_name_case = True can_alter_table_rename_column = Database.sqlite_version_info >= (3, 25, 0) can_alter_table_drop_column = Database.sqlite_version_info >= (3, 35, 5) supports_parentheses_in_compound = False can_defer_constraint_checks = True supports_over_clause = Database.sqlite_version_info >= (3, 25, 0) supports_aggregate_filter_clause = Database.sqlite_version_info >= (3, 30, 1) supports_order_by_nulls_modifier = Database.sqlite_version_info >= (3, 30, 0) requires_compound_order_by_subquery = Database.sqlite_version_info < (3, 30) order_by_nulls_first = True supports_json_field_contains = False supports_update_conflicts = Database.sqlite_version_info >= (3, 24, 0) supports_update_conflicts_with_target = supports_update_conflicts @cached_property def supports_atomic_references_rename(self) @cached_property def supports_json_field(self) can_introspect_json_field = property(operator.attrgetter('supports_json_field')) has_json_object_function = property(operator.attrgetter('supports_json_field')) @cached_property def can_return_columns_from_insert(self) can_return_rows_from_bulk_insert = property(operator.attrgetter('can_return_columns_from_insert')) FieldInfo = namedtuple('FieldInfo', BaseFieldInfo._fields + ('pk', 'has_json_constraint')) field_size_re = _lazy_re_compile('^\\s*(?:var)?char\\s*\\(\\s*(\\d+)\\s*\\)\\s*$') def get_field_size(name) class FlexibleFieldLookupDict() base_data_types_reverse = {'bool': 'BooleanField', 'boolean': 'BooleanField', 'smallint': 'SmallIntegerField', 'smallint unsigned': 'PositiveSmallIntegerField', 'smallinteger': 'SmallIntegerField', 'int': 'IntegerField', 'integer': 'IntegerField', 'bigint': 'BigIntegerField', 'integer unsigned': 'PositiveIntegerField', 'bigint unsigned': 'PositiveBigIntegerField', 'decimal': 'DecimalField', 'real': 'FloatField', 'text': 'TextField', 'char': 'CharField', 'varchar': 'CharField', 'blob': 'BinaryField', 'date': 'DateField', 'datetime': 'DateTimeField', 'time': 'TimeField'} class DatabaseIntrospection(BaseDatabaseIntrospection) data_types_reverse = FlexibleFieldLookupDict() def get_field_type(self, data_type, description) def get_table_list(self, cursor) def get_table_description(self, cursor, table_name) def get_sequences(self, cursor, table_name, table_fields=()) def get_relations(self, cursor, table_name) def get_primary_key_columns(self, cursor, table_name) def get_constraints(self, cursor, table_name) class DatabaseOperations(BaseDatabaseOperations) cast_char_field_without_max_length = 'text' cast_data_types = {'DateField': 'TEXT', 'DateTimeField': 'TEXT'} explain_prefix = 'EXPLAIN QUERY PLAN' jsonfield_datatype_values = frozenset(['null', 'false', 'true']) def bulk_batch_size(self, fields, objs) def check_expression_support(self, expression) def date_extract_sql(self, lookup_type, sql, params) def fetch_returned_insert_rows(self, cursor) def format_for_duration_arithmetic(self, sql) def date_trunc_sql(self, lookup_type, sql, params, tzname=None) def time_trunc_sql(self, lookup_type, sql, params, tzname=None) def datetime_cast_date_sql(self, sql, params, tzname) def datetime_cast_time_sql(self, sql, params, tzname) def datetime_extract_sql(self, lookup_type, sql, params, tzname) def datetime_trunc_sql(self, lookup_type, sql, params, tzname) def time_extract_sql(self, lookup_type, sql, params) def pk_default_value(self) def last_executed_query(self, cursor, sql, params) def quote_name(self, name) def no_limit_value(self) def adapt_datetimefield_value(self, value) def adapt_timefield_value(self, value) def get_db_converters(self, expression) def convert_datetimefield_value(self, value, expression, connection) def convert_datefield_value(self, value, expression, connection) def convert_timefield_value(self, value, expression, connection) def get_decimalfield_converter(self, expression) def convert_uuidfield_value(self, value, expression, connection) def convert_booleanfield_value(self, value, expression, connection) def bulk_insert_sql(self, fields, placeholder_rows) def combine_expression(self, connector, sub_expressions) def combine_duration_expression(self, connector, sub_expressions) def integer_field_range(self, internal_type) def subtract_temporals(self, internal_type, lhs, rhs) def insert_statement(self, on_conflict=None) def return_insert_columns(self, fields) def on_conflict_suffix_sql(self, fields, on_conflict, update_fields, unique_fields) class DatabaseSchemaEditor(BaseDatabaseSchemaEditor) sql_delete_table = 'DROP TABLE %(table)s' sql_create_fk = None sql_create_inline_fk = 'REFERENCES %(to_table)s (%(to_column)s) DEFERRABLE INITIALLY DEFERRED' sql_create_column_inline_fk = sql_create_inline_fk sql_delete_column = 'ALTER TABLE %(table)s DROP COLUMN %(column)s' sql_create_unique = 'CREATE UNIQUE INDEX %(name)s ON %(table)s (%(columns)s)' sql_delete_unique = 'DROP INDEX %(name)s' def quote_value(self, value) def prepare_default(self, value) def alter_db_table(self, model, old_db_table, new_db_table, disable_constraints=True) def alter_field(self, model, old_field, new_field, strict=False) def delete_model(self, model, handle_autom2m=True) def add_field(self, model, field) def remove_field(self, model, field) def add_constraint(self, model, constraint) def remove_constraint(self, model, constraint) logger = logging.getLogger('plain.models.backends') class CursorWrapper() WRAP_ERROR_ATTRS = frozenset(['fetchone', 'fetchmany', 'fetchall', 'nextset']) def callproc(self, procname, params=None, kparams=None) def execute(self, sql, params=None) def executemany(self, sql, param_list) class CursorDebugWrapper(CursorWrapper) def execute(self, sql, params=None) def executemany(self, sql, param_list) @contextmanager def debug_sql(self, sql=None, params=None, use_last_executed_query=False, many=False) @contextmanager def debug_transaction(connection, sql) def split_tzname_delta(tzname) def typecast_date(s) def typecast_time(s) def typecast_timestamp(s) def split_identifier(identifier) def truncate_name(identifier, length=None, hash_len=4) def names_digest(*args, length) def format_number(value, max_digits, decimal_places) def strip_quotes(table_name) @register_cli('backups') @click.group('backups') def cli() @cli.command('list') def list_backups() @cli.command('create') @click.option('--pg-dump', default='pg_dump', envvar='PG_DUMP') @click.argument('backup_name', default='') def create_backup(backup_name, pg_dump) @cli.command('restore') @click.option('--latest', is_flag=True) @click.option('--pg-restore', default='pg_restore', envvar='PG_RESTORE') @click.argument('backup_name', default='') def restore_backup(backup_name, latest, pg_restore) @cli.command('delete') @click.argument('backup_name') def delete_backup(backup_name) @cli.command('clear') @click.confirmation_option(prompt='Are you sure you want to delete all backups?') def clear_backups() class PostgresBackupClient() def get_env(self) def create_backup(self, backup_path, *, pg_dump='pg_dump') def restore_backup(self, backup_path, *, pg_restore='pg_restore') class DatabaseBackups() def find_backups(self) def create(self, name, **create_kwargs) def restore(self, name, **restore_kwargs) def delete(self, name) class DatabaseBackup() def exists(self) def create(self, **create_kwargs) def iter_files(self) def restore(self, **restore_kwargs) def delete(self) def updated_at(self) class Deferred() DEFERRED = Deferred() class ModelBase(type) def add_to_class(cls, name, value) class ModelStateFieldsCacheDescriptor() class ModelState() db = None adding = True fields_cache = ModelStateFieldsCacheDescriptor() class Model() @classmethod def from_db(cls, db, field_names, values) pk = property(_get_pk_val, _set_pk_val) def get_deferred_fields(self) def refresh_from_db(self, using=None, fields=None) def serializable_value(self, field_name) def save(self, *, clean_and_validate=True, force_insert=False, force_update=False, using=None, update_fields=None) def save_base(self, *, raw=False, force_insert=False, force_update=False, using=None, update_fields=None) def delete(self, using=None) def prepare_database_save(self, field) def clean(self) def validate_unique(self, exclude=None) def unique_error_message(self, model_class, unique_check) def get_constraints(self) def validate_constraints(self, exclude=None) def full_clean(self, *, exclude=None, validate_unique=True, validate_constraints=True) def clean_fields(self, exclude=None) @classmethod def check(cls, **kwargs) def model_unpickle(model_id) @register_cli('models') @click.group() def cli() @cli.command() @click.option('--database', default=DEFAULT_DB_ALIAS, help='Nominates a database onto which to open a shell. Defaults to the "default" database.') @click.argument('parameters', nargs=-1) def db_shell(database, parameters) @cli.command() def db_wait() @register_cli('makemigrations') @cli.command() @click.argument('package_labels', nargs=-1) @click.option('--dry-run', is_flag=True, help="Just show what migrations would be made; don't actually write them.") @click.option('--merge', is_flag=True, help='Enable fixing of migration conflicts.') @click.option('--empty', is_flag=True, help='Create an empty migration.') @click.option('--noinput', '--no-input', 'no_input', is_flag=True, help='Tells Plain to NOT prompt the user for input of any kind.') @click.option('-n', '--name', help='Use this name for migration file(s).') @click.option('--check', is_flag=True, help="Exit with a non-zero status if model changes are missing migrations and don't actually write them.") @click.option('-v', '--verbosity', type=int, default=1, help='Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output') def makemigrations(package_labels, dry_run, merge, empty, no_input, name, check, verbosity) @register_cli('migrate') @cli.command() @click.argument('package_label', required=False) @click.argument('migration_name', required=False) @click.option('--database', default=DEFAULT_DB_ALIAS, help="Nominates a database to synchronize. Defaults to the 'default' database.") @click.option('--fake', is_flag=True, help='Mark migrations as run without actually running them.') @click.option('--fake-initial', is_flag=True, help='Detect if tables already exist and fake-apply initial migrations if so. Make sure that the current database schema matches your initial migration before using this flag. Plain will only check for an existing table name.') @click.option('--plan', is_flag=True, help='Shows a list of the migration actions that will be performed.') @click.option('--check', 'check_unapplied', is_flag=True, help='Exits with a non-zero status if unapplied migrations exist and does not actually apply migrations.') @click.option('--backup/--no-backup', 'backup', is_flag=True, default=None, help='Explicitly enable/disable pre-migration backups.') @click.option('--prune', is_flag=True, help='Delete nonexistent migrations from the plainmigrations table.') @click.option('-v', '--verbosity', type=int, default=1, help='Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output') def migrate(package_label, migration_name, database, fake, fake_initial, plan, check_unapplied, backup, prune, verbosity) @cli.command() @click.argument('package_label') @click.argument('migration_name') @click.option('--check', is_flag=True, help='Exit with a non-zero status if the migration can be optimized.') @click.option('-v', '--verbosity', type=int, default=1, help='Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output') def optimize_migration(package_label, migration_name, check, verbosity) @cli.command() @click.argument('package_labels', nargs=-1) @click.option('--database', default=DEFAULT_DB_ALIAS, help="Nominates a database to show migrations for. Defaults to the 'default' database.") @click.option('--format', type=click.Choice(['list', 'plan']), default='list', help='Output format.') @click.option('-v', '--verbosity', type=int, default=1, help='Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output') def show_migrations(package_labels, database, format, verbosity) @cli.command() @click.argument('package_label') @click.argument('start_migration_name', required=False) @click.argument('migration_name') @click.option('--no-optimize', is_flag=True, help='Do not try to optimize the squashed operations.') @click.option('--noinput', '--no-input', 'no_input', is_flag=True, help='Tells Plain to NOT prompt the user for input of any kind.') @click.option('--squashed-name', help='Sets the name of the new squashed migration.') @click.option('-v', '--verbosity', type=int, default=1, help='Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output') def squash_migrations(package_label, start_migration_name, migration_name, no_optimize, no_input, squashed_name, verbosity) MODELS_MODULE_NAME = 'models' @register_config class Config(PackageConfig) def ready(self) LOOKUP_SEP = '__' class OnConflict(Enum) IGNORE = 'ignore' UPDATE = 'update' class BaseConstraint() default_violation_error_message = 'Constraint “%(name)s” is violated.' violation_error_code = None violation_error_message = None @property def contains_expressions(self) def constraint_sql(self, model, schema_editor) def create_sql(self, model, schema_editor) def remove_sql(self, model, schema_editor) def validate(self, model, instance, exclude=None, using=DEFAULT_DB_ALIAS) def get_violation_error_message(self) def deconstruct(self) def clone(self) class CheckConstraint(BaseConstraint) def constraint_sql(self, model, schema_editor) def create_sql(self, model, schema_editor) def remove_sql(self, model, schema_editor) def validate(self, model, instance, exclude=None, using=DEFAULT_DB_ALIAS) def deconstruct(self) class Deferrable(Enum) DEFERRED = 'deferred' IMMEDIATE = 'immediate' class UniqueConstraint(BaseConstraint) @property def contains_expressions(self) def constraint_sql(self, model, schema_editor) def create_sql(self, model, schema_editor) def remove_sql(self, model, schema_editor) def deconstruct(self) def validate(self, model, instance, exclude=None, using=DEFAULT_DB_ALIAS) DEFAULT_ENV = 'DATABASE_URL' SCHEMES = {'postgres': 'plain.models.backends.postgresql', 'postgresql': 'plain.models.backends.postgresql', 'pgsql': 'plain.models.backends.postgresql', 'mysql': 'plain.models.backends.mysql', 'mysql2': 'plain.models.backends.mysql', 'sqlite': 'plain.models.backends.sqlite3'} class DBConfig(TypedDict) def config(env: str=DEFAULT_ENV, default: str | None=None, engine: str | None=None, conn_max_age: int | None=0, conn_health_checks: bool=False, ssl_require: bool=False, test_options: dict | None=None) def parse(url: str, engine: str | None=None, conn_max_age: int | None=0, conn_health_checks: bool=False, ssl_require: bool=False, test_options: dict | None=None) DEFAULT_DB_ALIAS = 'default' PLAIN_VERSION_PICKLE_KEY = '_plain_version' class Error(Exception) class InterfaceError(Error) class DatabaseError(Error) class DataError(DatabaseError) class OperationalError(DatabaseError) class IntegrityError(DatabaseError) class InternalError(DatabaseError) class ProgrammingError(DatabaseError) class NotSupportedError(DatabaseError) class DatabaseErrorWrapper() def load_backend(backend_name) class ConnectionHandler(BaseConnectionHandler) settings_name = 'DATABASES' def configure_settings(self, databases) @property def databases(self) def create_connection(self, alias) class ConnectionRouter() @cached_property def routers(self) db_for_read = _router_func('db_for_read') db_for_write = _router_func('db_for_write') def allow_relation(self, obj1, obj2, **hints) def allow_migrate(self, db, package_label, **hints) def allow_migrate_model(self, db, model) def get_migratable_models(self, models_registry, package_label, db) connections = ConnectionHandler() router = ConnectionRouter() connection = ConnectionProxy(connections, DEFAULT_DB_ALIAS) def reset_queries(**kwargs) def close_old_connections(**kwargs) DATABASE_ROUTERS = [] class ProtectedError(IntegrityError) class RestrictedError(IntegrityError) def CASCADE(collector, field, sub_objs, using) def PROTECT(collector, field, sub_objs, using) def RESTRICT(collector, field, sub_objs, using) def SET(value) def SET_NULL(collector, field, sub_objs, using) def SET_DEFAULT(collector, field, sub_objs, using) def DO_NOTHING(collector, field, sub_objs, using) def get_candidate_relations_to_delete(opts) class Collector() def add(self, objs, source=None, nullable=False, reverse_dependency=False) def add_dependency(self, model, dependency, reverse_dependency=False) def add_field_update(self, field, value, objs) def add_restricted_objects(self, field, objs) def clear_restricted_objects_from_set(self, model, objs) def clear_restricted_objects_from_queryset(self, model, qs) def can_fast_delete(self, objs, from_field=None) def get_del_batches(self, objs, fields) def collect(self, objs, source=None, nullable=False, collect_related=True, reverse_dependency=False, fail_on_restricted=True) def related_objects(self, related_model, related_fields, objs) def sort(self) def delete(self) def setup() class ChoicesMeta(enum.EnumMeta) @property def names(cls) @property def choices(cls) @property def labels(cls) @property def values(cls) class Choices(enum.Enum) @DynamicClassAttribute def label(self) class IntegerChoices(int, Choices) class TextChoices(str, Choices) class SQLiteNumericMixin() def as_sqlite(self, compiler, connection, **extra_context) class Combinable() ADD = '+' SUB = '-' MUL = '*' DIV = '/' POW = '^' MOD = '%%' BITAND = '&' BITOR = '|' BITLEFTSHIFT = '<<' BITRIGHTSHIFT = '>>' BITXOR = '#' def bitand(self, other) def bitleftshift(self, other) def bitrightshift(self, other) def bitxor(self, other) def bitor(self, other) class BaseExpression() empty_result_set_value = NotImplemented is_summary = False filterable = True window_compatible = False def get_db_converters(self, connection) def get_source_expressions(self) def set_source_expressions(self, exprs) def as_sql(self, compiler, connection) @cached_property def contains_aggregate(self) @cached_property def contains_over_clause(self) @cached_property def contains_column_references(self) def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) @property def conditional(self) @property def field(self) @cached_property def output_field(self) @cached_property def convert_value(self) def get_lookup(self, lookup) def get_transform(self, name) def relabeled_clone(self, change_map) def replace_expressions(self, replacements) def get_refs(self) def copy(self) def prefix_references(self, prefix) def get_group_by_cols(self) def get_source_fields(self) def asc(self, **kwargs) def desc(self, **kwargs) def reverse_ordering(self) def flatten(self) def select_format(self, compiler, sql, params) @deconstructible class Expression(BaseExpression, Combinable) @cached_property def identity(self) def register_combinable_fields(lhs, connector, rhs, result) class CombinedExpression(SQLiteNumericMixin, Expression) def get_source_expressions(self) def set_source_expressions(self, exprs) def as_sql(self, compiler, connection) def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) class DurationExpression(CombinedExpression) def compile(self, side, compiler, connection) def as_sql(self, compiler, connection) def as_sqlite(self, compiler, connection, **extra_context) class TemporalSubtraction(CombinedExpression) output_field = fields.DurationField() def as_sql(self, compiler, connection) @deconstructible(path='plain.models.F') class F(Combinable) def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) def replace_expressions(self, replacements) def asc(self, **kwargs) def desc(self, **kwargs) def copy(self) class ResolvedOuterRef(F) contains_aggregate = False contains_over_clause = False def as_sql(self, *args, **kwargs) def resolve_expression(self, *args, **kwargs) def relabeled_clone(self, relabels) def get_group_by_cols(self) class OuterRef(F) contains_aggregate = False def resolve_expression(self, *args, **kwargs) def relabeled_clone(self, relabels) @deconstructible(path='plain.models.Func') class Func(SQLiteNumericMixin, Expression) function = None template = '%(function)s(%(expressions)s)' arg_joiner = ', ' arity = None def get_source_expressions(self) def set_source_expressions(self, exprs) def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) def as_sql(self, compiler, connection, function=None, template=None, arg_joiner=None, **extra_context) def copy(self) @deconstructible(path='plain.models.Value') class Value(SQLiteNumericMixin, Expression) for_save = False def as_sql(self, compiler, connection) def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) def get_group_by_cols(self) @property def empty_result_set_value(self) class RawSQL(Expression) def as_sql(self, compiler, connection) def get_group_by_cols(self) class Star(Expression) def as_sql(self, compiler, connection) class Col(Expression) contains_column_references = True possibly_multivalued = False def as_sql(self, compiler, connection) def relabeled_clone(self, relabels) def get_group_by_cols(self) def get_db_converters(self, connection) class Ref(Expression) def get_source_expressions(self) def set_source_expressions(self, exprs) def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) def get_refs(self) def relabeled_clone(self, relabels) def as_sql(self, compiler, connection) def get_group_by_cols(self) class ExpressionList(Func) template = '%(expressions)s' def as_sqlite(self, compiler, connection, **extra_context) class OrderByList(Func) template = 'ORDER BY %(expressions)s' def as_sql(self, *args, **kwargs) def get_group_by_cols(self) @deconstructible(path='plain.models.ExpressionWrapper') class ExpressionWrapper(SQLiteNumericMixin, Expression) def set_source_expressions(self, exprs) def get_source_expressions(self) def get_group_by_cols(self) def as_sql(self, compiler, connection) class NegatedExpression(ExpressionWrapper) def as_sql(self, compiler, connection) def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) def select_format(self, compiler, sql, params) @deconstructible(path='plain.models.When') class When(Expression) template = 'WHEN %(condition)s THEN %(result)s' conditional = False def get_source_expressions(self) def set_source_expressions(self, exprs) def get_source_fields(self) def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) def as_sql(self, compiler, connection, template=None, **extra_context) def get_group_by_cols(self) @deconstructible(path='plain.models.Case') class Case(SQLiteNumericMixin, Expression) template = 'CASE %(cases)s ELSE %(default)s END' case_joiner = ' ' def get_source_expressions(self) def set_source_expressions(self, exprs) def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) def copy(self) def as_sql(self, compiler, connection, template=None, case_joiner=None, **extra_context) def get_group_by_cols(self) class Subquery(BaseExpression, Combinable) template = '(%(subquery)s)' contains_aggregate = False empty_result_set_value = None def get_source_expressions(self) def set_source_expressions(self, exprs) def copy(self) @property def external_aliases(self) def get_external_cols(self) def as_sql(self, compiler, connection, template=None, **extra_context) def get_group_by_cols(self) class Exists(Subquery) template = 'EXISTS(%(subquery)s)' output_field = fields.BooleanField() empty_result_set_value = False def select_format(self, compiler, sql, params) @deconstructible(path='plain.models.OrderBy') class OrderBy(Expression) template = '%(expression)s %(ordering)s' conditional = False def set_source_expressions(self, exprs) def get_source_expressions(self) def as_sql(self, compiler, connection, template=None, **extra_context) def get_group_by_cols(self) def reverse_ordering(self) def asc(self) def desc(self) class Window(SQLiteNumericMixin, Expression) template = '%(expression)s OVER (%(window)s)' contains_aggregate = False contains_over_clause = True def get_source_expressions(self) def set_source_expressions(self, exprs) def as_sql(self, compiler, connection, template=None) def as_sqlite(self, compiler, connection) def get_group_by_cols(self) class WindowFrame(Expression) template = '%(frame_type)s BETWEEN %(start)s AND %(end)s' def set_source_expressions(self, exprs) def get_source_expressions(self) def as_sql(self, compiler, connection) def get_group_by_cols(self) def window_frame_start_end(self, connection, start, end) class RowRange(WindowFrame) frame_type = 'ROWS' def window_frame_start_end(self, connection, start, end) class ValueRange(WindowFrame) frame_type = 'RANGE' def window_frame_start_end(self, connection, start, end) class Empty() class NOT_PROVIDED() BLANK_CHOICE_DASH = [('', '---------')] def return_None() @total_ordering class Field(RegisterLookupMixin) empty_strings_allowed = True empty_values = list(validators.EMPTY_VALUES) creation_counter = 0 auto_creation_counter = -1 default_validators = [] default_error_messages = {'invalid_choice': 'Value %(value)r is not a valid choice.', 'allow_null': 'This field cannot be null.', 'required': 'This field is be required.', 'unique': 'A %(model_name)s with this %(field_label)s already exists.'} system_check_deprecated_details = None system_check_removed_details = None non_db_attrs = ('required', 'choices', 'db_column', 'error_messages', 'limit_choices_to', 'on_delete', 'related_name', 'related_query_name', 'validators') hidden = False many_to_many = None many_to_one = None one_to_many = None related_model = None descriptor_class = DeferredAttribute description = property(_description) def check(self, **kwargs) def get_col(self, alias, output_field=None) @cached_property def cached_col(self) def select_format(self, compiler, sql, params) def deconstruct(self) def clone(self) def get_pk_value_on_save(self, instance) def to_python(self, value) @cached_property def error_messages(self) @cached_property def validators(self) def run_validators(self, value) def validate(self, value, model_instance) def clean(self, value, model_instance) def db_type_parameters(self, connection) def db_check(self, connection) def db_type(self, connection) def rel_db_type(self, connection) def cast_db_type(self, connection) def db_parameters(self, connection) def db_type_suffix(self, connection) def get_db_converters(self, connection) @property def db_returning(self) def set_attributes_from_name(self, name) def contribute_to_class(self, cls, name, private_only=False) def get_attname(self) def get_attname_column(self) def get_internal_type(self) def pre_save(self, model_instance, add) def get_prep_value(self, value) def get_db_prep_value(self, value, connection, prepared=False) def get_db_prep_save(self, value, connection) def has_default(self) def get_default(self) def get_choices(self, include_blank=True, blank_choice=BLANK_CHOICE_DASH, limit_choices_to=None, ordering=()) def value_to_string(self, obj) flatchoices = property(_get_flatchoices) def save_form_data(self, instance, data) def value_from_object(self, obj) class BooleanField(Field) empty_strings_allowed = False default_error_messages = {'invalid': '“%(value)s” value must be either True or False.', 'invalid_nullable': '“%(value)s” value must be either True, False, or None.'} description = 'Boolean (Either True or False)' def get_internal_type(self) def to_python(self, value) def get_prep_value(self, value) class CharField(Field) @property def description(self) def check(self, **kwargs) def cast_db_type(self, connection) def db_parameters(self, connection) def get_internal_type(self) def to_python(self, value) def get_prep_value(self, value) def deconstruct(self) class CommaSeparatedIntegerField(CharField) default_validators = [validators.validate_comma_separated_integer_list] description = 'Comma-separated integers' system_check_removed_details = {'msg': 'CommaSeparatedIntegerField is removed except for support in historical migrations.', 'hint': 'Use CharField(validators=[validate_comma_separated_integer_list]) instead.', 'id': 'fields.E901'} class DateTimeCheckMixin() def check(self, **kwargs) class DateField(DateTimeCheckMixin, Field) empty_strings_allowed = False default_error_messages = {'invalid': '“%(value)s” value has an invalid date format. It must be in YYYY-MM-DD format.', 'invalid_date': '“%(value)s” value has the correct format (YYYY-MM-DD) but it is an invalid date.'} description = 'Date (without time)' def deconstruct(self) def get_internal_type(self) def to_python(self, value) def pre_save(self, model_instance, add) def contribute_to_class(self, cls, name, **kwargs) def get_prep_value(self, value) def get_db_prep_value(self, value, connection, prepared=False) def value_to_string(self, obj) class DateTimeField(DateField) empty_strings_allowed = False default_error_messages = {'invalid': '“%(value)s” value has an invalid format. It must be in YYYY-MM-DD HH:MM[:ss[.uuuuuu]][TZ] format.', 'invalid_date': '“%(value)s” value has the correct format (YYYY-MM-DD) but it is an invalid date.', 'invalid_datetime': '“%(value)s” value has the correct format (YYYY-MM-DD HH:MM[:ss[.uuuuuu]][TZ]) but it is an invalid date/time.'} description = 'Date (with time)' def get_internal_type(self) def to_python(self, value) def pre_save(self, model_instance, add) def get_prep_value(self, value) def get_db_prep_value(self, value, connection, prepared=False) def value_to_string(self, obj) class DecimalField(Field) empty_strings_allowed = False default_error_messages = {'invalid': '“%(value)s” value must be a decimal number.'} description = 'Decimal number' def check(self, **kwargs) @cached_property def validators(self) @cached_property def context(self) def deconstruct(self) def get_internal_type(self) def to_python(self, value) def get_db_prep_value(self, value, connection, prepared=False) def get_prep_value(self, value) class DurationField(Field) empty_strings_allowed = False default_error_messages = {'invalid': '“%(value)s” value has an invalid format. It must be in [DD] [[HH:]MM:]ss[.uuuuuu] format.'} description = 'Duration' def get_internal_type(self) def to_python(self, value) def get_db_prep_value(self, value, connection, prepared=False) def get_db_converters(self, connection) def value_to_string(self, obj) class EmailField(CharField) default_validators = [validators.validate_email] description = 'Email address' def deconstruct(self) class FloatField(Field) empty_strings_allowed = False default_error_messages = {'invalid': '“%(value)s” value must be a float.'} description = 'Floating point number' def get_prep_value(self, value) def get_internal_type(self) def to_python(self, value) class IntegerField(Field) empty_strings_allowed = False default_error_messages = {'invalid': '“%(value)s” value must be an integer.'} description = 'Integer' def check(self, **kwargs) @cached_property def validators(self) def get_prep_value(self, value) def get_db_prep_value(self, value, connection, prepared=False) def get_internal_type(self) def to_python(self, value) class BigIntegerField(IntegerField) description = 'Big (8 byte) integer' def get_internal_type(self) class SmallIntegerField(IntegerField) description = 'Small integer' def get_internal_type(self) class IPAddressField(Field) empty_strings_allowed = False description = 'IPv4 address' system_check_removed_details = {'msg': 'IPAddressField has been removed except for support in historical migrations.', 'hint': 'Use GenericIPAddressField instead.', 'id': 'fields.E900'} def deconstruct(self) def get_prep_value(self, value) def get_internal_type(self) class GenericIPAddressField(Field) empty_strings_allowed = False description = 'IP address' default_error_messages = {} def check(self, **kwargs) def deconstruct(self) def get_internal_type(self) def to_python(self, value) def get_db_prep_value(self, value, connection, prepared=False) def get_prep_value(self, value) class NullBooleanField(BooleanField) default_error_messages = {'invalid': '“%(value)s” value must be either None, True or False.', 'invalid_nullable': '“%(value)s” value must be either None, True or False.'} description = 'Boolean (Either True, False or None)' system_check_removed_details = {'msg': 'NullBooleanField is removed except for support in historical migrations.', 'hint': 'Use BooleanField(allow_null=True) instead.', 'id': 'fields.E903'} def deconstruct(self) class PositiveIntegerRelDbTypeMixin() def rel_db_type(self, connection) class PositiveBigIntegerField(PositiveIntegerRelDbTypeMixin, BigIntegerField) description = 'Positive big integer' def get_internal_type(self) class PositiveIntegerField(PositiveIntegerRelDbTypeMixin, IntegerField) description = 'Positive integer' def get_internal_type(self) class PositiveSmallIntegerField(PositiveIntegerRelDbTypeMixin, SmallIntegerField) description = 'Positive small integer' def get_internal_type(self) class TextField(Field) description = 'Text' def check(self, **kwargs) def db_parameters(self, connection) def get_internal_type(self) def to_python(self, value) def get_prep_value(self, value) def deconstruct(self) class TimeField(DateTimeCheckMixin, Field) empty_strings_allowed = False default_error_messages = {'invalid': '“%(value)s” value has an invalid format. It must be in HH:MM[:ss[.uuuuuu]] format.', 'invalid_time': '“%(value)s” value has the correct format (HH:MM[:ss[.uuuuuu]]) but it is an invalid time.'} description = 'Time' def deconstruct(self) def get_internal_type(self) def to_python(self, value) def pre_save(self, model_instance, add) def get_prep_value(self, value) def get_db_prep_value(self, value, connection, prepared=False) def value_to_string(self, obj) class URLField(CharField) default_validators = [validators.URLValidator()] description = 'URL' def deconstruct(self) class BinaryField(Field) description = 'Raw binary data' empty_values = [None, b''] def check(self, **kwargs) def get_internal_type(self) def get_placeholder(self, value, compiler, connection) def get_default(self) def get_db_prep_value(self, value, connection, prepared=False) def value_to_string(self, obj) def to_python(self, value) class UUIDField(Field) default_error_messages = {'invalid': '“%(value)s” is not a valid UUID.'} description = 'Universally unique identifier' empty_strings_allowed = False def deconstruct(self) def get_internal_type(self) def get_prep_value(self, value) def get_db_prep_value(self, value, connection, prepared=False) def to_python(self, value) class AutoFieldMixin() db_returning = True def check(self, **kwargs) def deconstruct(self) def validate(self, value, model_instance) def get_db_prep_value(self, value, connection, prepared=False) def contribute_to_class(self, cls, name, **kwargs) class AutoFieldMeta(type) class AutoField(AutoFieldMixin, IntegerField) def get_internal_type(self) def rel_db_type(self, connection) class BigAutoField(AutoFieldMixin, BigIntegerField) def get_internal_type(self) def rel_db_type(self, connection) class SmallAutoField(AutoFieldMixin, SmallIntegerField) def get_internal_type(self) def rel_db_type(self, connection) class JSONField(CheckFieldDefaultMixin, Field) empty_strings_allowed = False description = 'A JSON object' default_error_messages = {'invalid': 'Value must be valid JSON.'} def check(self, **kwargs) def deconstruct(self) def from_db_value(self, value, expression, connection) def get_internal_type(self) def get_db_prep_value(self, value, connection, prepared=False) def get_db_prep_save(self, value, connection) def get_transform(self, name) def validate(self, value, model_instance) def value_to_string(self, obj) def compile_json_path(key_transforms, include_root=True) class DataContains(FieldGetDbPrepValueMixin, PostgresOperatorLookup) lookup_name = 'contains' postgres_operator = '@>' def as_sql(self, compiler, connection) class ContainedBy(FieldGetDbPrepValueMixin, PostgresOperatorLookup) lookup_name = 'contained_by' postgres_operator = '<@' def as_sql(self, compiler, connection) class HasKeyLookup(PostgresOperatorLookup) logical_operator = None def compile_json_path_final_key(self, key_transform) def as_sql(self, compiler, connection, template=None) def as_mysql(self, compiler, connection) def as_postgresql(self, compiler, connection) def as_sqlite(self, compiler, connection) class HasKey(HasKeyLookup) lookup_name = 'has_key' postgres_operator = '?' prepare_rhs = False class HasKeys(HasKeyLookup) lookup_name = 'has_keys' postgres_operator = '?&' logical_operator = ' AND ' def get_prep_lookup(self) class HasAnyKeys(HasKeys) lookup_name = 'has_any_keys' postgres_operator = '?|' logical_operator = ' OR ' class HasKeyOrArrayIndex(HasKey) def compile_json_path_final_key(self, key_transform) class CaseInsensitiveMixin() def process_lhs(self, compiler, connection) def process_rhs(self, compiler, connection) class JSONExact(lookups.Exact) can_use_none_as_rhs = True def process_rhs(self, compiler, connection) class JSONIContains(CaseInsensitiveMixin, lookups.IContains) class KeyTransform(Transform) postgres_operator = '->' postgres_nested_operator = '#>' def preprocess_lhs(self, compiler, connection) def as_mysql(self, compiler, connection) def as_postgresql(self, compiler, connection) def as_sqlite(self, compiler, connection) class KeyTextTransform(KeyTransform) postgres_operator = '->>' postgres_nested_operator = '#>>' output_field = TextField() def as_mysql(self, compiler, connection) @classmethod def from_lookup(cls, lookup) KT = KeyTextTransform.from_lookup class KeyTransformTextLookupMixin() class KeyTransformIsNull(lookups.IsNull) def as_sqlite(self, compiler, connection) class KeyTransformIn(lookups.In) def resolve_expression_parameter(self, compiler, connection, sql, param) class KeyTransformExact(JSONExact) def process_rhs(self, compiler, connection) class KeyTransformIExact(CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IExact) class KeyTransformIContains(CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IContains) class KeyTransformStartsWith(KeyTransformTextLookupMixin, lookups.StartsWith) class KeyTransformIStartsWith(CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IStartsWith) class KeyTransformEndsWith(KeyTransformTextLookupMixin, lookups.EndsWith) class KeyTransformIEndsWith(CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IEndsWith) class KeyTransformRegex(KeyTransformTextLookupMixin, lookups.Regex) class KeyTransformIRegex(CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IRegex) class KeyTransformNumericLookupMixin() def process_rhs(self, compiler, connection) class KeyTransformLt(KeyTransformNumericLookupMixin, lookups.LessThan) class KeyTransformLte(KeyTransformNumericLookupMixin, lookups.LessThanOrEqual) class KeyTransformGt(KeyTransformNumericLookupMixin, lookups.GreaterThan) class KeyTransformGte(KeyTransformNumericLookupMixin, lookups.GreaterThanOrEqual) class KeyTransformFactory() NOT_PROVIDED = object() class FieldCacheMixin() def get_cache_name(self) def get_cached_value(self, instance, default=NOT_PROVIDED) def is_cached(self, instance) def set_cached_value(self, instance, value) def delete_cached_value(self, instance) class CheckFieldDefaultMixin() def check(self, **kwargs) RECURSIVE_RELATIONSHIP_CONSTANT = 'self' def resolve_relation(scope_model, relation) def lazy_related_operation(function, model, *related_models, **kwargs) class RelatedField(FieldCacheMixin, Field) one_to_many = False many_to_many = False many_to_one = False @cached_property def related_model(self) def check(self, **kwargs) def db_type(self, connection) def contribute_to_class(self, cls, name, private_only=False, **kwargs) def deconstruct(self) def get_forward_related_filter(self, obj) def get_reverse_related_filter(self, obj) def set_attributes_from_rel(self) def do_related_class(self, other, cls) def get_limit_choices_to(self) def related_query_name(self) @property def target_field(self) def get_cache_name(self) class ForeignObject(RelatedField) many_to_many = False many_to_one = True one_to_many = False requires_unique_target = True related_accessor_class = ReverseManyToOneDescriptor forward_related_accessor_class = ForwardManyToOneDescriptor rel_class = ForeignObjectRel def check(self, **kwargs) def deconstruct(self) def resolve_related_fields(self) @cached_property def related_fields(self) @cached_property def reverse_related_fields(self) @cached_property def local_related_fields(self) @cached_property def foreign_related_fields(self) def get_local_related_value(self, instance) def get_foreign_related_value(self, instance) @staticmethod def get_instance_value_for_fields(instance, fields) def get_attname_column(self) def get_joining_columns(self, reverse_join=False) def get_reverse_joining_columns(self) def get_extra_descriptor_filter(self, instance) def get_extra_restriction(self, alias, related_alias) def get_path_info(self, filtered_relation=None) @cached_property def path_infos(self) def get_reverse_path_info(self, filtered_relation=None) @cached_property def reverse_path_infos(self) @classmethod @functools.cache def get_class_lookups(cls) def contribute_to_class(self, cls, name, private_only=False, **kwargs) def contribute_to_related_class(self, cls, related) class ForeignKey(ForeignObject) descriptor_class = ForeignKeyDeferredAttribute many_to_many = False many_to_one = True one_to_many = False rel_class = ManyToOneRel empty_strings_allowed = False default_error_messages = {'invalid': '%(model)s instance with %(field)s %(value)r does not exist.'} description = 'Foreign Key (type determined by related field)' def check(self, **kwargs) def deconstruct(self) def to_python(self, value) @property def target_field(self) def validate(self, value, model_instance) def resolve_related_fields(self) def get_attname(self) def get_attname_column(self) def get_default(self) def get_db_prep_save(self, value, connection) def get_db_prep_value(self, value, connection, prepared=False) def get_prep_value(self, value) def contribute_to_related_class(self, cls, related) def db_check(self, connection) def db_type(self, connection) def cast_db_type(self, connection) def db_parameters(self, connection) def convert_empty_strings(self, value, expression, connection) def get_db_converters(self, connection) def get_col(self, alias, output_field=None) class ManyToManyField(RelatedField) many_to_many = True many_to_one = False one_to_many = False rel_class = ManyToManyRel description = 'Many-to-many relationship' def check(self, **kwargs) def deconstruct(self) def get_path_info(self, filtered_relation=None) @cached_property def path_infos(self) def get_reverse_path_info(self, filtered_relation=None) @cached_property def reverse_path_infos(self) def contribute_to_class(self, cls, name, **kwargs) def contribute_to_related_class(self, cls, related) def set_attributes_from_rel(self) def value_from_object(self, obj) def save_form_data(self, instance, data) def db_check(self, connection) def db_type(self, connection) def db_parameters(self, connection) class ForeignKeyDeferredAttribute(DeferredAttribute) class ForwardManyToOneDescriptor() @cached_property def RelatedObjectDoesNotExist(self) def is_cached(self, instance) def get_queryset(self, **hints) def get_prefetch_queryset(self, instances, queryset=None) def get_object(self, instance) class ReverseManyToOneDescriptor() @cached_property def related_manager_cls(self) def create_reverse_many_to_one_manager(superclass, rel) class ManyToManyDescriptor(ReverseManyToOneDescriptor) @property def through(self) @cached_property def related_manager_cls(self) def create_forward_many_to_many_manager(superclass, rel, reverse) class MultiColSource() contains_aggregate = False contains_over_clause = False def relabeled_clone(self, relabels) def get_lookup(self, lookup) def resolve_expression(self, *args, **kwargs) def get_normalized_value(value, lhs) class RelatedIn(In) def get_prep_lookup(self) def as_sql(self, compiler, connection) class RelatedLookupMixin() def get_prep_lookup(self) def as_sql(self, compiler, connection) class RelatedExact(RelatedLookupMixin, Exact) class RelatedLessThan(RelatedLookupMixin, LessThan) class RelatedGreaterThan(RelatedLookupMixin, GreaterThan) class RelatedGreaterThanOrEqual(RelatedLookupMixin, GreaterThanOrEqual) class RelatedLessThanOrEqual(RelatedLookupMixin, LessThanOrEqual) class RelatedIsNull(RelatedLookupMixin, IsNull) class ForeignObjectRel(FieldCacheMixin) auto_created = True concrete = False is_relation = True allow_null = True empty_strings_allowed = False @cached_property def hidden(self) @cached_property def name(self) @property def remote_field(self) @property def target_field(self) @cached_property def related_model(self) @cached_property def many_to_many(self) @cached_property def many_to_one(self) @cached_property def one_to_many(self) def get_lookup(self, lookup_name) def get_internal_type(self) @property def db_type(self) @property def identity(self) def get_choices(self, include_blank=True, blank_choice=BLANK_CHOICE_DASH, limit_choices_to=None, ordering=()) def is_hidden(self) def get_joining_columns(self) def get_extra_restriction(self, alias, related_alias) def set_field_name(self) def get_accessor_name(self, model=None) def get_path_info(self, filtered_relation=None) @cached_property def path_infos(self) def get_cache_name(self) class ManyToOneRel(ForeignObjectRel) @property def identity(self) def get_related_field(self) def set_field_name(self) class ManyToManyRel(ForeignObjectRel) @property def identity(self) def get_related_field(self) def construct_instance(form, instance, fields=None) def model_to_dict(instance, fields=None) def fields_for_model(model, fields=None, formfield_callback=None, error_messages=None, field_classes=None) class ModelFormOptions() class ModelFormMetaclass(DeclarativeFieldsMetaclass) class BaseModelForm(BaseForm) def clean(self) def validate_unique(self) def save(self, commit=True) class ModelForm(BaseModelForm) class InlineForeignKeyField(Field) default_error_messages = {'invalid_choice': 'The inline value did not match the parent instance.'} def clean(self, value) def has_changed(self, initial, data) class ModelChoiceIteratorValue() class ModelChoiceIterator() def choice(self, obj) class ModelChoiceField(ChoiceField) default_error_messages = {'invalid_choice': 'Select a valid choice. That choice is not one of the available choices.'} iterator = ModelChoiceIterator queryset = property(_get_queryset, _set_queryset) choices = property(_get_choices, ChoiceField._set_choices) def prepare_value(self, value) def to_python(self, value) def validate(self, value) def has_changed(self, initial, data) class ModelMultipleChoiceField(ModelChoiceField) default_error_messages = {'invalid_list': 'Enter a list of values.', 'invalid_choice': 'Select a valid choice. %(value)s is not one of the available choices.', 'invalid_pk_value': '“%(pk)s” is not a valid value.'} def to_python(self, value) def clean(self, value) def prepare_value(self, value) def has_changed(self, initial, data) def value_from_form_data(self, data, files, html_name) def modelfield_to_formfield(modelfield, form_class=None, choices_form_class=None, **kwargs) class Cast(Func) function = 'CAST' template = '%(function)s(%(expressions)s AS %(db_type)s)' def as_sql(self, compiler, connection, **extra_context) def as_sqlite(self, compiler, connection, **extra_context) def as_mysql(self, compiler, connection, **extra_context) def as_postgresql(self, compiler, connection, **extra_context) class Coalesce(Func) function = 'COALESCE' @property def empty_result_set_value(self) class Collate(Func) function = 'COLLATE' template = '%(expressions)s %(function)s %(collation)s' collation_re = _lazy_re_compile('^[\\w\\-]+$') def as_sql(self, compiler, connection, **extra_context) class Greatest(Func) function = 'GREATEST' def as_sqlite(self, compiler, connection, **extra_context) class JSONObject(Func) function = 'JSON_OBJECT' output_field = JSONField() def as_sql(self, compiler, connection, **extra_context) def as_postgresql(self, compiler, connection, **extra_context) class Least(Func) function = 'LEAST' def as_sqlite(self, compiler, connection, **extra_context) class NullIf(Func) function = 'NULLIF' arity = 2 class TimezoneMixin() tzinfo = None def get_tzname(self) class Extract(TimezoneMixin, Transform) lookup_name = None output_field = IntegerField() def as_sql(self, compiler, connection) def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) class ExtractYear(Extract) lookup_name = 'year' class ExtractIsoYear(Extract) lookup_name = 'iso_year' class ExtractMonth(Extract) lookup_name = 'month' class ExtractDay(Extract) lookup_name = 'day' class ExtractWeek(Extract) lookup_name = 'week' class ExtractWeekDay(Extract) lookup_name = 'week_day' class ExtractIsoWeekDay(Extract) lookup_name = 'iso_week_day' class ExtractQuarter(Extract) lookup_name = 'quarter' class ExtractHour(Extract) lookup_name = 'hour' class ExtractMinute(Extract) lookup_name = 'minute' class ExtractSecond(Extract) lookup_name = 'second' class Now(Func) template = 'CURRENT_TIMESTAMP' output_field = DateTimeField() def as_postgresql(self, compiler, connection, **extra_context) def as_mysql(self, compiler, connection, **extra_context) def as_sqlite(self, compiler, connection, **extra_context) class TruncBase(TimezoneMixin, Transform) kind = None tzinfo = None def as_sql(self, compiler, connection) def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) def convert_value(self, value, expression, connection) class Trunc(TruncBase) class TruncYear(TruncBase) kind = 'year' class TruncQuarter(TruncBase) kind = 'quarter' class TruncMonth(TruncBase) kind = 'month' class TruncWeek(TruncBase) kind = 'week' class TruncDay(TruncBase) kind = 'day' class TruncDate(TruncBase) kind = 'date' lookup_name = 'date' output_field = DateField() def as_sql(self, compiler, connection) class TruncTime(TruncBase) kind = 'time' lookup_name = 'time' output_field = TimeField() def as_sql(self, compiler, connection) class TruncHour(TruncBase) kind = 'hour' class TruncMinute(TruncBase) kind = 'minute' class TruncSecond(TruncBase) kind = 'second' class Abs(Transform) function = 'ABS' lookup_name = 'abs' class ACos(NumericOutputFieldMixin, Transform) function = 'ACOS' lookup_name = 'acos' class ASin(NumericOutputFieldMixin, Transform) function = 'ASIN' lookup_name = 'asin' class ATan(NumericOutputFieldMixin, Transform) function = 'ATAN' lookup_name = 'atan' class ATan2(NumericOutputFieldMixin, Func) function = 'ATAN2' arity = 2 def as_sqlite(self, compiler, connection, **extra_context) class Ceil(Transform) function = 'CEILING' lookup_name = 'ceil' class Cos(NumericOutputFieldMixin, Transform) function = 'COS' lookup_name = 'cos' class Cot(NumericOutputFieldMixin, Transform) function = 'COT' lookup_name = 'cot' class Degrees(NumericOutputFieldMixin, Transform) function = 'DEGREES' lookup_name = 'degrees' class Exp(NumericOutputFieldMixin, Transform) function = 'EXP' lookup_name = 'exp' class Floor(Transform) function = 'FLOOR' lookup_name = 'floor' class Ln(NumericOutputFieldMixin, Transform) function = 'LN' lookup_name = 'ln' class Log(FixDecimalInputMixin, NumericOutputFieldMixin, Func) function = 'LOG' arity = 2 def as_sqlite(self, compiler, connection, **extra_context) class Mod(FixDecimalInputMixin, NumericOutputFieldMixin, Func) function = 'MOD' arity = 2 class Pi(NumericOutputFieldMixin, Func) function = 'PI' arity = 0 class Power(NumericOutputFieldMixin, Func) function = 'POWER' arity = 2 class Radians(NumericOutputFieldMixin, Transform) function = 'RADIANS' lookup_name = 'radians' class Random(NumericOutputFieldMixin, Func) function = 'RANDOM' arity = 0 def as_mysql(self, compiler, connection, **extra_context) def as_sqlite(self, compiler, connection, **extra_context) def get_group_by_cols(self) class Round(FixDecimalInputMixin, Transform) function = 'ROUND' lookup_name = 'round' arity = None def as_sqlite(self, compiler, connection, **extra_context) class Sign(Transform) function = 'SIGN' lookup_name = 'sign' class Sin(NumericOutputFieldMixin, Transform) function = 'SIN' lookup_name = 'sin' class Sqrt(NumericOutputFieldMixin, Transform) function = 'SQRT' lookup_name = 'sqrt' class Tan(NumericOutputFieldMixin, Transform) function = 'TAN' lookup_name = 'tan' class FixDecimalInputMixin() def as_postgresql(self, compiler, connection, **extra_context) class FixDurationInputMixin() def as_mysql(self, compiler, connection, **extra_context) class NumericOutputFieldMixin() class MySQLSHA2Mixin() def as_mysql(self, compiler, connection, **extra_context) class PostgreSQLSHAMixin() def as_postgresql(self, compiler, connection, **extra_context) class Chr(Transform) function = 'CHR' lookup_name = 'chr' def as_mysql(self, compiler, connection, **extra_context) def as_sqlite(self, compiler, connection, **extra_context) class ConcatPair(Func) function = 'CONCAT' def as_sqlite(self, compiler, connection, **extra_context) def as_postgresql(self, compiler, connection, **extra_context) def as_mysql(self, compiler, connection, **extra_context) def coalesce(self) class Concat(Func) function = None template = '%(expressions)s' class Left(Func) function = 'LEFT' arity = 2 output_field = CharField() def get_substr(self) def as_sqlite(self, compiler, connection, **extra_context) class Length(Transform) function = 'LENGTH' lookup_name = 'length' output_field = IntegerField() def as_mysql(self, compiler, connection, **extra_context) class Lower(Transform) function = 'LOWER' lookup_name = 'lower' class LPad(Func) function = 'LPAD' output_field = CharField() class LTrim(Transform) function = 'LTRIM' lookup_name = 'ltrim' class MD5(Transform) function = 'MD5' lookup_name = 'md5' class Ord(Transform) function = 'ASCII' lookup_name = 'ord' output_field = IntegerField() def as_mysql(self, compiler, connection, **extra_context) def as_sqlite(self, compiler, connection, **extra_context) class Repeat(Func) function = 'REPEAT' output_field = CharField() class Replace(Func) function = 'REPLACE' class Reverse(Transform) function = 'REVERSE' lookup_name = 'reverse' class Right(Left) function = 'RIGHT' def get_substr(self) class RPad(LPad) function = 'RPAD' class RTrim(Transform) function = 'RTRIM' lookup_name = 'rtrim' class SHA1(PostgreSQLSHAMixin, Transform) function = 'SHA1' lookup_name = 'sha1' class SHA224(MySQLSHA2Mixin, PostgreSQLSHAMixin, Transform) function = 'SHA224' lookup_name = 'sha224' class SHA256(MySQLSHA2Mixin, PostgreSQLSHAMixin, Transform) function = 'SHA256' lookup_name = 'sha256' class SHA384(MySQLSHA2Mixin, PostgreSQLSHAMixin, Transform) function = 'SHA384' lookup_name = 'sha384' class SHA512(MySQLSHA2Mixin, PostgreSQLSHAMixin, Transform) function = 'SHA512' lookup_name = 'sha512' class StrIndex(Func) function = 'INSTR' arity = 2 output_field = IntegerField() def as_postgresql(self, compiler, connection, **extra_context) class Substr(Func) function = 'SUBSTRING' output_field = CharField() def as_sqlite(self, compiler, connection, **extra_context) class Trim(Transform) function = 'TRIM' lookup_name = 'trim' class Upper(Transform) function = 'UPPER' lookup_name = 'upper' class CumeDist(Func) function = 'CUME_DIST' output_field = FloatField() window_compatible = True class DenseRank(Func) function = 'DENSE_RANK' output_field = IntegerField() window_compatible = True class FirstValue(Func) arity = 1 function = 'FIRST_VALUE' window_compatible = True class LagLeadFunction(Func) window_compatible = True class Lag(LagLeadFunction) function = 'LAG' class LastValue(Func) arity = 1 function = 'LAST_VALUE' window_compatible = True class Lead(LagLeadFunction) function = 'LEAD' class NthValue(Func) function = 'NTH_VALUE' window_compatible = True class Ntile(Func) function = 'NTILE' output_field = IntegerField() window_compatible = True class PercentRank(Func) function = 'PERCENT_RANK' output_field = FloatField() window_compatible = True class Rank(Func) function = 'RANK' output_field = IntegerField() window_compatible = True class RowNumber(Func) function = 'ROW_NUMBER' output_field = IntegerField() window_compatible = True class Index() suffix = 'idx' max_name_length = 30 @property def contains_expressions(self) def create_sql(self, model, schema_editor, using='', **kwargs) def remove_sql(self, model, schema_editor, **kwargs) def deconstruct(self) def clone(self) def set_name_with_model(self, model) class IndexExpression(Func) template = '%(expressions)s' wrapper_classes = (OrderBy, Collate) def set_wrapper_classes(self, connection=None) def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) def as_sqlite(self, compiler, connection, **extra_context) class Lookup(Expression) lookup_name = None prepare_rhs = True can_use_none_as_rhs = False def apply_bilateral_transforms(self, value) def batch_process_rhs(self, compiler, connection, rhs=None) def get_source_expressions(self) def set_source_expressions(self, new_exprs) def get_prep_lookup(self) def get_prep_lhs(self) def get_db_prep_lookup(self, value, connection) def process_lhs(self, compiler, connection, lhs=None) def process_rhs(self, compiler, connection) def rhs_is_direct_value(self) def get_group_by_cols(self) @cached_property def output_field(self) @property def identity(self) def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) def select_format(self, compiler, sql, params) class Transform(RegisterLookupMixin, Func) bilateral = False arity = 1 @property def lhs(self) def get_bilateral_transforms(self) class BuiltinLookup(Lookup) def process_lhs(self, compiler, connection, lhs=None) def as_sql(self, compiler, connection) def get_rhs_op(self, connection, rhs) class FieldGetDbPrepValueMixin() get_db_prep_lookup_value_is_iterable = False def get_db_prep_lookup(self, value, connection) class FieldGetDbPrepValueIterableMixin(FieldGetDbPrepValueMixin) get_db_prep_lookup_value_is_iterable = True def get_prep_lookup(self) def process_rhs(self, compiler, connection) def resolve_expression_parameter(self, compiler, connection, sql, param) def batch_process_rhs(self, compiler, connection, rhs=None) class PostgresOperatorLookup(Lookup) postgres_operator = None def as_postgresql(self, compiler, connection) @Field.register_lookup class Exact(FieldGetDbPrepValueMixin, BuiltinLookup) lookup_name = 'exact' def get_prep_lookup(self) def as_sql(self, compiler, connection) @Field.register_lookup class IExact(BuiltinLookup) lookup_name = 'iexact' prepare_rhs = False def process_rhs(self, qn, connection) @Field.register_lookup class GreaterThan(FieldGetDbPrepValueMixin, BuiltinLookup) lookup_name = 'gt' @Field.register_lookup class GreaterThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup) lookup_name = 'gte' @Field.register_lookup class LessThan(FieldGetDbPrepValueMixin, BuiltinLookup) lookup_name = 'lt' @Field.register_lookup class LessThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup) lookup_name = 'lte' class IntegerFieldOverflow() underflow_exception = EmptyResultSet overflow_exception = EmptyResultSet def process_rhs(self, compiler, connection) class IntegerFieldFloatRounding() def get_prep_lookup(self) @IntegerField.register_lookup class IntegerFieldExact(IntegerFieldOverflow, Exact) @IntegerField.register_lookup class IntegerGreaterThan(IntegerFieldOverflow, GreaterThan) underflow_exception = FullResultSet @IntegerField.register_lookup class IntegerGreaterThanOrEqual(IntegerFieldOverflow, IntegerFieldFloatRounding, GreaterThanOrEqual) underflow_exception = FullResultSet @IntegerField.register_lookup class IntegerLessThan(IntegerFieldOverflow, IntegerFieldFloatRounding, LessThan) overflow_exception = FullResultSet @IntegerField.register_lookup class IntegerLessThanOrEqual(IntegerFieldOverflow, LessThanOrEqual) overflow_exception = FullResultSet @Field.register_lookup class In(FieldGetDbPrepValueIterableMixin, BuiltinLookup) lookup_name = 'in' def get_prep_lookup(self) def process_rhs(self, compiler, connection) def get_rhs_op(self, connection, rhs) def as_sql(self, compiler, connection) def split_parameter_list_as_sql(self, compiler, connection) class PatternLookup(BuiltinLookup) param_pattern = '%%%s%%' prepare_rhs = False def get_rhs_op(self, connection, rhs) def process_rhs(self, qn, connection) @Field.register_lookup class Contains(PatternLookup) lookup_name = 'contains' @Field.register_lookup class IContains(Contains) lookup_name = 'icontains' @Field.register_lookup class StartsWith(PatternLookup) lookup_name = 'startswith' param_pattern = '%s%%' @Field.register_lookup class IStartsWith(StartsWith) lookup_name = 'istartswith' @Field.register_lookup class EndsWith(PatternLookup) lookup_name = 'endswith' param_pattern = '%%%s' @Field.register_lookup class IEndsWith(EndsWith) lookup_name = 'iendswith' @Field.register_lookup class Range(FieldGetDbPrepValueIterableMixin, BuiltinLookup) lookup_name = 'range' def get_rhs_op(self, connection, rhs) @Field.register_lookup class IsNull(BuiltinLookup) lookup_name = 'isnull' prepare_rhs = False def as_sql(self, compiler, connection) @Field.register_lookup class Regex(BuiltinLookup) lookup_name = 'regex' prepare_rhs = False def as_sql(self, compiler, connection) @Field.register_lookup class IRegex(Regex) lookup_name = 'iregex' class YearLookup(Lookup) def year_lookup_bounds(self, connection, year) def as_sql(self, compiler, connection) def get_direct_rhs_sql(self, connection, rhs) def get_bound_params(self, start, finish) class YearExact(YearLookup, Exact) def get_direct_rhs_sql(self, connection, rhs) def get_bound_params(self, start, finish) class YearGt(YearLookup, GreaterThan) def get_bound_params(self, start, finish) class YearGte(YearLookup, GreaterThanOrEqual) def get_bound_params(self, start, finish) class YearLt(YearLookup, LessThan) def get_bound_params(self, start, finish) class YearLte(YearLookup, LessThanOrEqual) def get_bound_params(self, start, finish) class UUIDTextMixin() def process_rhs(self, qn, connection) @UUIDField.register_lookup class UUIDIExact(UUIDTextMixin, IExact) @UUIDField.register_lookup class UUIDContains(UUIDTextMixin, Contains) @UUIDField.register_lookup class UUIDIContains(UUIDTextMixin, IContains) @UUIDField.register_lookup class UUIDStartsWith(UUIDTextMixin, StartsWith) @UUIDField.register_lookup class UUIDIStartsWith(UUIDTextMixin, IStartsWith) @UUIDField.register_lookup class UUIDEndsWith(UUIDTextMixin, EndsWith) @UUIDField.register_lookup class UUIDIEndsWith(UUIDTextMixin, IEndsWith) class BaseManager() creation_counter = 0 auto_created = False use_in_migrations = False def deconstruct(self) def check(self, **kwargs) @classmethod def from_queryset(cls, queryset_class, class_name=None) def contribute_to_class(self, cls, name) def db_manager(self, using=None, hints=None) @property def db(self) def get_queryset(self) def all(self) class Manager(BaseManager.from_queryset(QuerySet)) class ManagerDescriptor() class MigrationAutodetector() def changes(self, graph, trim_to_packages=None, convert_packages=None, migration_name=None) def deep_deconstruct(self, obj) def only_relation_agnostic_fields(self, fields) def check_dependency(self, operation, dependency) def add_operation(self, package_label, operation, dependencies=None, beginning=False) def generate_renamed_models(self) def generate_created_models(self) def generate_deleted_models(self) def create_renamed_fields(self) def generate_renamed_fields(self) def generate_added_fields(self) def generate_removed_fields(self) def generate_altered_fields(self) def create_altered_indexes(self) def generate_added_indexes(self) def generate_removed_indexes(self) def generate_renamed_indexes(self) def create_altered_constraints(self) def generate_added_constraints(self) def generate_removed_constraints(self) def generate_altered_db_table(self) def generate_altered_db_table_comment(self) def generate_altered_options(self) def generate_altered_managers(self) def arrange_for_graph(self, changes, graph, migration_name=None) @classmethod def parse_number(cls, name) class AmbiguityError(Exception) class BadMigrationError(Exception) class CircularDependencyError(Exception) class InconsistentMigrationHistory(Exception) class InvalidBasesError(ValueError) class NodeNotFoundError(LookupError) class MigrationSchemaMissing(DatabaseError) class MigrationExecutor() def migration_plan(self, targets, clean_start=False) def migrate(self, targets, plan=None, state=None, fake=False, fake_initial=False) def apply_migration(self, state, migration, fake=False, fake_initial=False) def record_migration(self, migration) def check_replacements(self) def detect_soft_applied(self, project_state, migration) @total_ordering class Node() def add_child(self, child) def add_parent(self, parent) class DummyNode(Node) def raise_error(self) class MigrationGraph() def add_node(self, key, migration) def add_dummy_node(self, key, origin, error_message) def add_dependency(self, migration, child, parent, skip_validation=False) def remove_replaced_nodes(self, replacement, replaced) def remove_replacement_node(self, replacement, replaced) def validate_consistency(self) def forwards_plan(self, target) def iterative_dfs(self, start, forwards=True) def root_nodes(self, app=None) def leaf_nodes(self, app=None) def ensure_not_cyclic(self) def make_state(self, nodes=None, at_end=True, real_packages=None) MIGRATIONS_MODULE_NAME = 'migrations' class MigrationLoader() @classmethod def migrations_module(cls, package_label) def load_disk(self) def get_migration(self, package_label, name_prefix) def get_migration_by_prefix(self, package_label, name_prefix) def check_key(self, key, current_package) def add_internal_dependencies(self, key, migration) def add_external_dependencies(self, key, migration) def build_graph(self) def check_consistent_history(self, connection) def detect_conflicts(self) def project_state(self, nodes=None, at_end=True) def collect_sql(self, plan) class Migration() operations = [] dependencies = [] replaces = [] initial = None atomic = True def mutate_state(self, project_state, preserve=True) def apply(self, project_state, schema_editor, collect_sql=False) def suggest_name(self) class SettingsTuple(tuple) def settings_dependency(value) class Operation() reduces_to_sql = True atomic = False elidable = False serialization_expand_args = [] def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def describe(self) @property def migration_name_fragment(self) def references_model(self, name, package_label) def references_field(self, model_name, name, package_label) def allow_migrate_model(self, connection_alias, model) def reduce(self, operation, package_label) class FieldOperation(Operation) @cached_property def model_name_lower(self) @cached_property def name_lower(self) def is_same_model_operation(self, operation) def is_same_field_operation(self, operation) def references_model(self, name, package_label) def references_field(self, model_name, name, package_label) def reduce(self, operation, package_label) class AddField(FieldOperation) def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def describe(self) @property def migration_name_fragment(self) def reduce(self, operation, package_label) class RemoveField(FieldOperation) def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def describe(self) @property def migration_name_fragment(self) def reduce(self, operation, package_label) class AlterField(FieldOperation) def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def describe(self) @property def migration_name_fragment(self) def reduce(self, operation, package_label) class RenameField(FieldOperation) @cached_property def old_name_lower(self) @cached_property def new_name_lower(self) def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def describe(self) @property def migration_name_fragment(self) def references_field(self, model_name, name, package_label) def reduce(self, operation, package_label) class ModelOperation(Operation) @cached_property def name_lower(self) def references_model(self, name, package_label) def reduce(self, operation, package_label) def can_reduce_through(self, operation, package_label) class CreateModel(ModelOperation) serialization_expand_args = ['fields', 'options', 'managers'] def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def describe(self) @property def migration_name_fragment(self) def references_model(self, name, package_label) def reduce(self, operation, package_label) class DeleteModel(ModelOperation) def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def references_model(self, name, package_label) def describe(self) @property def migration_name_fragment(self) class RenameModel(ModelOperation) @cached_property def old_name_lower(self) @cached_property def new_name_lower(self) def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def references_model(self, name, package_label) def describe(self) @property def migration_name_fragment(self) def reduce(self, operation, package_label) class ModelOptionOperation(ModelOperation) def reduce(self, operation, package_label) class AlterModelTable(ModelOptionOperation) def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def describe(self) @property def migration_name_fragment(self) class AlterModelTableComment(ModelOptionOperation) def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def describe(self) @property def migration_name_fragment(self) class AlterModelOptions(ModelOptionOperation) ALTER_OPTION_KEYS = ['base_manager_name', 'default_manager_name', 'default_related_name', 'get_latest_by', 'ordering'] def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def describe(self) @property def migration_name_fragment(self) class AlterModelManagers(ModelOptionOperation) serialization_expand_args = ['managers'] def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def describe(self) @property def migration_name_fragment(self) class IndexOperation(Operation) option_name = 'indexes' @cached_property def model_name_lower(self) class AddIndex(IndexOperation) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def deconstruct(self) def describe(self) @property def migration_name_fragment(self) class RemoveIndex(IndexOperation) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def deconstruct(self) def describe(self) @property def migration_name_fragment(self) class RenameIndex(IndexOperation) @cached_property def old_name_lower(self) @cached_property def new_name_lower(self) def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def describe(self) @property def migration_name_fragment(self) def reduce(self, operation, package_label) class AddConstraint(IndexOperation) option_name = 'constraints' def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def deconstruct(self) def describe(self) @property def migration_name_fragment(self) class RemoveConstraint(IndexOperation) option_name = 'constraints' def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def deconstruct(self) def describe(self) @property def migration_name_fragment(self) class SeparateDatabaseAndState(Operation) serialization_expand_args = ['database_operations', 'state_operations'] def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def describe(self) class RunSQL(Operation) def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def describe(self) class RunPython(Operation) reduces_to_sql = False def deconstruct(self) def state_forwards(self, package_label, state) def database_forwards(self, package_label, schema_editor, from_state, to_state) def describe(self) class MigrationOptimizer() def optimize(self, operations, package_label) def optimize_inner(self, operations, package_label) class MigrationQuestioner() def ask_initial(self, package_label) def ask_not_null_addition(self, field_name, model_name) def ask_not_null_alteration(self, field_name, model_name) def ask_rename(self, model_name, old_name, new_name, field_instance) def ask_rename_model(self, old_model_state, new_model_state) def ask_merge(self, package_label) def ask_auto_now_add_addition(self, field_name, model_name) def ask_unique_callable_default_addition(self, field_name, model_name) class InteractiveMigrationQuestioner(MigrationQuestioner) def ask_not_null_addition(self, field_name, model_name) def ask_not_null_alteration(self, field_name, model_name) def ask_rename(self, model_name, old_name, new_name, field_instance) def ask_rename_model(self, old_model_state, new_model_state) def ask_merge(self, package_label) def ask_auto_now_add_addition(self, field_name, model_name) def ask_unique_callable_default_addition(self, field_name, model_name) class NonInteractiveMigrationQuestioner(MigrationQuestioner) def log_lack_of_migration(self, field_name, model_name, reason) def ask_not_null_addition(self, field_name, model_name) def ask_not_null_alteration(self, field_name, model_name) def ask_auto_now_add_addition(self, field_name, model_name) class MigrationRecorder() @classproperty def Migration(cls) @property def migration_qs(self) def has_table(self) def ensure_schema(self) def applied_migrations(self) def record_applied(self, app, name) def record_unapplied(self, app, name) def flush(self) class BaseSerializer() def serialize(self) class BaseSequenceSerializer(BaseSerializer) def serialize(self) class BaseSimpleSerializer(BaseSerializer) def serialize(self) class ChoicesSerializer(BaseSerializer) def serialize(self) class DateTimeSerializer(BaseSerializer) def serialize(self) class DatetimeDatetimeSerializer(BaseSerializer) def serialize(self) class DecimalSerializer(BaseSerializer) def serialize(self) class DeconstructableSerializer(BaseSerializer) @staticmethod def serialize_deconstructed(path, args, kwargs) def serialize(self) class DictionarySerializer(BaseSerializer) def serialize(self) class EnumSerializer(BaseSerializer) def serialize(self) class FloatSerializer(BaseSimpleSerializer) def serialize(self) class FrozensetSerializer(BaseSequenceSerializer) class FunctionTypeSerializer(BaseSerializer) def serialize(self) class FunctoolsPartialSerializer(BaseSerializer) def serialize(self) class IterableSerializer(BaseSerializer) def serialize(self) class ModelFieldSerializer(DeconstructableSerializer) def serialize(self) class ModelManagerSerializer(DeconstructableSerializer) def serialize(self) class OperationSerializer(BaseSerializer) def serialize(self) class PathLikeSerializer(BaseSerializer) def serialize(self) class PathSerializer(BaseSerializer) def serialize(self) class RegexSerializer(BaseSerializer) def serialize(self) class SequenceSerializer(BaseSequenceSerializer) class SetSerializer(BaseSequenceSerializer) class SettingsReferenceSerializer(BaseSerializer) def serialize(self) class TupleSerializer(BaseSequenceSerializer) class TypeSerializer(BaseSerializer) def serialize(self) class UUIDSerializer(BaseSerializer) def serialize(self) class Serializer() @classmethod def register(cls, type_, serializer) def serializer_factory(value) def get_related_models_tuples(model) def get_related_models_recursive(model) class ProjectState() @property def relations(self) def add_model(self, model_state) def remove_model(self, package_label, model_name) def rename_model(self, package_label, old_name, new_name) def alter_model_options(self, package_label, model_name, options, option_keys=None) def alter_model_managers(self, package_label, model_name, managers) def add_index(self, package_label, model_name, index) def remove_index(self, package_label, model_name, index_name) def rename_index(self, package_label, model_name, old_index_name, new_index_name) def add_constraint(self, package_label, model_name, constraint) def remove_constraint(self, package_label, model_name, constraint_name) def add_field(self, package_label, model_name, name, field, preserve_default) def remove_field(self, package_label, model_name, name) def alter_field(self, package_label, model_name, name, field, preserve_default) def rename_field(self, package_label, model_name, old_name, new_name) def reload_model(self, package_label, model_name, delay=False) def reload_models(self, models, delay=True) def update_model_field_relation(self, model, model_key, field_name, field, concretes) def resolve_model_field_relations(self, model_key, field_name, field, concretes=None) def resolve_model_relations(self, model_key, concretes=None) def resolve_fields_and_relations(self) def clone(self) def clear_delayed_models_cache(self) @cached_property def models_registry(self) @classmethod def from_models_registry(cls, models_registry) class StateModelsRegistry(ModelsRegistry) @contextmanager def bulk_update(self) def render_multiple(self, model_states) def clone(self) def register_model(self, package_label, model) def unregister_model(self, package_label, model_name) class ModelState() @cached_property def name_lower(self) def get_field(self, field_name) @classmethod def from_model(cls, model, exclude_rels=False) def construct_managers(self) def clone(self) def render(self, models_registry) def get_index_by_name(self, name) def get_constraint_by_name(self, name) FieldReference = namedtuple('FieldReference', 'to through') COMPILED_REGEX_TYPE = type(re.compile('')) class RegexObject() def get_migration_name_timestamp() def resolve_relation(model, package_label=None, model_name=None) def field_references(model_tuple, field, reference_model_tuple, reference_field_name=None, reference_field=None) def get_references(state, model_tuple, field_tuple=()) def field_is_referenced(state, model_tuple, field_tuple) class OperationWriter() def serialize(self) def indent(self) def unindent(self) def feed(self, line) def render(self) class MigrationWriter() def as_string(self) @property def basedir(self) @property def filename(self) @property def path(self) @classmethod def serialize(cls, value) MIGRATION_HEADER_TEMPLATE = '# Generated by Plain %(version)s on %(timestamp)s\n\n' MIGRATION_TEMPLATE = '%(migration_header)s%(imports)s\n\nclass Migration(migrations.Migration):\n%(replaces_str)s%(initial_str)s\n dependencies = [\n%(dependencies)s ]\n\n operations = [\n%(operations)s ]\n' PROXY_PARENTS = object() EMPTY_RELATION_TREE = () IMMUTABLE_WARNING = "The return type of '%s' should never be mutated. If you want to manipulate this list for your own use, make a copy first." DEFAULT_NAMES = ('db_table', 'db_table_comment', 'ordering', 'get_latest_by', 'package_label', 'models_registry', 'default_related_name', 'required_db_features', 'required_db_vendor', 'base_manager_name', 'default_manager_name', 'indexes', 'constraints') def make_immutable_fields_list(name, data) class Options() FORWARD_PROPERTIES = {'fields', 'many_to_many', 'concrete_fields', 'local_concrete_fields', '_non_pk_concrete_field_names', '_forward_fields_map', 'managers', 'managers_map', 'base_manager', 'default_manager'} REVERSE_PROPERTIES = {'related_objects', 'fields_map', '_relation_tree'} default_models_registry = models_registry @property def label(self) @property def label_lower(self) def contribute_to_class(self, cls, name) def add_manager(self, manager) def add_field(self, field, private=False) def setup_pk(self, field) def can_migrate(self, connection) @cached_property def managers(self) @cached_property def managers_map(self) @cached_property def base_manager(self) @cached_property def default_manager(self) @cached_property def fields(self) @cached_property def concrete_fields(self) @cached_property def local_concrete_fields(self) @cached_property def many_to_many(self) @cached_property def related_objects(self) @cached_property def fields_map(self) def get_field(self, field_name) def get_fields(self, include_hidden=False) @cached_property def total_unique_constraints(self) @cached_property def db_returning_fields(self) @register_check def check_database_backends(databases=None, **kwargs) @register_check def check_all_models(package_configs=None, **kwargs) @register_check def check_lazy_references(package_configs=None, **kwargs) @register_check def check_database_tables(package_configs, **kwargs) MAX_GET_RESULTS = 21 REPR_OUTPUT_SIZE = 20 class BaseIterable() class ModelIterable(BaseIterable) class RawModelIterable(BaseIterable) class ValuesIterable(BaseIterable) class ValuesListIterable(BaseIterable) class NamedValuesListIterable(ValuesListIterable) class FlatValuesListIterable(BaseIterable) class QuerySet() @property def query(self) @query.setter def query(self, value) def as_manager(cls) as_manager = classmethod(as_manager) def iterator(self, chunk_size=None) def aggregate(self, *args, **kwargs) def count(self) def get(self, *args, **kwargs) def create(self, **kwargs) def bulk_create(self, objs, batch_size=None, update_conflicts=False, update_fields=None, unique_fields=None) def bulk_update(self, objs, fields, batch_size=None) def get_or_create(self, defaults=None, **kwargs) def update_or_create(self, defaults=None, create_defaults=None, **kwargs) def earliest(self, *fields) def latest(self, *fields) def first(self) def last(self) def in_bulk(self, id_list=None, *, field_name='pk') def delete(self) def update(self, **kwargs) def exists(self) def contains(self, obj) def explain(self, *, format=None, **options) def raw(self, raw_query, params=(), translations=None, using=None) def values(self, *fields, **expressions) def values_list(self, *fields, flat=False, named=False) def dates(self, field_name, kind, order='ASC') def datetimes(self, field_name, kind, order='ASC', tzinfo=None) def none(self) def all(self) def filter(self, *args, **kwargs) def exclude(self, *args, **kwargs) def complex_filter(self, filter_obj) def union(self, *other_qs, all=False) def intersection(self, *other_qs) def difference(self, *other_qs) def select_for_update(self, nowait=False, skip_locked=False, of=(), no_key=False) def select_related(self, *fields) def prefetch_related(self, *lookups) def annotate(self, *args, **kwargs) def alias(self, *args, **kwargs) def order_by(self, *field_names) def distinct(self, *field_names) def extra(self, select=None, where=None, params=None, tables=None, order_by=None, select_params=None) def reverse(self) def defer(self, *fields) def only(self, *fields) def using(self, alias) @property def ordered(self) @property def db(self) def resolve_expression(self, *args, **kwargs) class InstanceCheckMeta(type) class EmptyQuerySet() class RawQuerySet() def resolve_model_init_order(self) def prefetch_related(self, *lookups) def iterator(self) @property def db(self) def using(self, alias) @cached_property def columns(self) @cached_property def model_fields(self) class Prefetch() def add_prefix(self, prefix) def get_current_prefetch_to(self, level) def get_current_to_attr(self, level) def get_current_queryset(self, level) def normalize_prefetch_lookups(lookups, prefix=None) def prefetch_related_objects(model_instances, *related_lookups) def get_prefetcher(instance, through_attr, to_attr) def prefetch_one_level(instances, prefetcher, lookup, level) class RelatedPopulator() def populate(self, row, from_obj) def get_related_populators(klass_info, select, db) logger = logging.getLogger('plain.models') PathInfo = namedtuple('PathInfo', 'from_opts to_opts target_fields join_field m2m direct filtered_relation') def subclasses(cls) class Q(tree.Node) AND = 'AND' OR = 'OR' XOR = 'XOR' default = AND conditional = True def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) def flatten(self) def check(self, against, using=DEFAULT_DB_ALIAS) def deconstruct(self) class DeferredAttribute() class class_or_instance_method() class RegisterLookupMixin() @functools.cache def get_class_lookups(cls) def get_instance_lookups(self) get_lookups = class_or_instance_method(get_class_lookups, get_instance_lookups) get_class_lookups = classmethod(get_class_lookups) def get_lookup(self, lookup_name) def get_transform(self, lookup_name) @staticmethod def merge_dicts(dicts) def register_class_lookup(cls, lookup, lookup_name=None) def register_instance_lookup(self, lookup, lookup_name=None) register_lookup = class_or_instance_method(register_class_lookup, register_instance_lookup) register_class_lookup = classmethod(register_class_lookup) def select_related_descend(field, restricted, requested, select_mask, reverse=False) def refs_expression(lookup_parts, annotations) def check_rel_lookup_compatibility(model, target_opts, field) class FilteredRelation() def clone(self) def resolve_expression(self, *args, **kwargs) def as_sql(self, compiler, connection) class ModelsRegistryNotReady(Exception) class ModelsRegistry() def check_ready(self) @functools.cache def get_models(self, *, package_label='') def get_model(self, package_label, model_name=None, require_ready=True) def register_model(self, package_label, model) def clear_cache(self) def lazy_model_operation(self, function, *model_keys) def do_pending_operations(self, model) models_registry = ModelsRegistry() def register_model(model_class) class PositionRef(Ref) def as_sql(self, compiler, connection) class SQLCompiler() ordering_parts = _lazy_re_compile('^(.*)\\s(?:ASC|DESC).*', re.MULTILINE | re.DOTALL) def setup_query(self, with_col_aliases=False) def pre_sql_setup(self, with_col_aliases=False) def get_group_by(self, select, order_by) def collapse_group_by(self, expressions, having) def get_select(self, with_col_aliases=False) def get_order_by(self) def get_extra_select(self, order_by, select) def quote_name_unless_alias(self, name) def compile(self, node) def get_combinator_sql(self, combinator, all) def get_qualify_sql(self) def as_sql(self, with_limits=True, with_col_aliases=False) def get_default_columns(self, select_mask, start_alias=None, opts=None) def get_distinct(self) def find_ordering_name(self, name, opts, alias=None, default_order='ASC', already_seen=None) def get_from_clause(self) def get_related_selections(self, select, select_mask, opts=None, root_alias=None, cur_depth=1, requested=None, restricted=None) def get_select_for_update_of_arguments(self) def get_converters(self, expressions) def apply_converters(self, rows, converters) def results_iter(self, results=None, tuple_expected=False, chunked_fetch=False, chunk_size=GET_ITERATOR_CHUNK_SIZE) def has_results(self) def execute_sql(self, result_type=MULTI, chunked_fetch=False, chunk_size=GET_ITERATOR_CHUNK_SIZE) def as_subquery_condition(self, alias, columns, compiler) def explain_query(self) class SQLInsertCompiler(SQLCompiler) returning_fields = None returning_params = () def field_as_sql(self, field, val) def prepare_value(self, field, value) def pre_save_val(self, field, obj) def assemble_as_sql(self, fields, value_rows) def as_sql(self) def execute_sql(self, returning_fields=None) class SQLDeleteCompiler(SQLCompiler) @cached_property def single_alias(self) @cached_property def contains_self_reference_subquery(self) def as_sql(self) class SQLUpdateCompiler(SQLCompiler) def as_sql(self) def execute_sql(self, result_type) def pre_sql_setup(self) class SQLAggregateCompiler(SQLCompiler) def as_sql(self) def cursor_iter(cursor, sentinel, col_count, itersize) GET_ITERATOR_CHUNK_SIZE = 100 MULTI = 'multi' SINGLE = 'single' CURSOR = 'cursor' NO_RESULTS = 'no results' ORDER_DIR = {'ASC': ('ASC', 'DESC'), 'DESC': ('DESC', 'ASC')} INNER = 'INNER JOIN' LOUTER = 'LEFT OUTER JOIN' class MultiJoin(Exception) class Empty() class Join() def as_sql(self, compiler, connection) def relabeled_clone(self, change_map) @property def identity(self) def equals(self, other) def demote(self) def promote(self) class BaseTable() join_type = None parent_alias = None filtered_relation = None def as_sql(self, compiler, connection) def relabeled_clone(self, change_map) @property def identity(self) def equals(self, other) FORBIDDEN_ALIAS_PATTERN = _lazy_re_compile('[\'`\\"\\]\\[;\\s]|--|/\\*|\\*/') EXPLAIN_OPTIONS_PATTERN = _lazy_re_compile('[\\w\\-]+') def get_field_names_from_opts(opts) def get_children_from_q(q) JoinInfo = namedtuple('JoinInfo', ('final_field', 'targets', 'opts', 'joins', 'path', 'transform_function')) class RawQuery() def chain(self, using) def clone(self, using) def get_columns(self) @property def params_type(self) ExplainInfo = namedtuple('ExplainInfo', ('format', 'options')) class Query(BaseExpression) alias_prefix = 'T' empty_result_set_value = None subq_aliases = frozenset([alias_prefix]) compiler = 'SQLCompiler' base_table_class = BaseTable join_class = Join default_cols = True default_ordering = True standard_ordering = True filter_is_sticky = False subquery = False select = () group_by = None order_by = () low_mark = 0 high_mark = None distinct = False distinct_fields = () select_for_update = False select_for_update_nowait = False select_for_update_skip_locked = False select_for_update_of = () select_for_no_key_update = False select_related = False has_select_fields = False max_depth = 5 values_select = () annotation_select_mask = None combinator = None combinator_all = False combined_queries = () extra_select_mask = None extra_tables = () extra_order_by = () deferred_loading = (frozenset(), True) explain_info = None @property def output_field(self) @cached_property def base_table(self) def sql_with_params(self) def get_compiler(self, using=None, connection=None, elide_empty=True) def get_meta(self) def clone(self) def chain(self, klass=None) def relabeled_clone(self, change_map) def get_aggregation(self, using, aggregate_exprs) def get_count(self, using) def has_filters(self) def exists(self, limit=True) def has_results(self, using) def explain(self, using, format=None, **options) def combine(self, rhs, connector) def get_select_mask(self) def table_alias(self, table_name, create=False, filtered_relation=None) def ref_alias(self, alias) def unref_alias(self, alias, amount=1) def promote_joins(self, aliases) def demote_joins(self, aliases) def reset_refcounts(self, to_counts) def change_aliases(self, change_map) def bump_prefix(self, other_query, exclude=None) def get_initial_alias(self) def count_active_tables(self) def join(self, join, reuse=None, reuse_with_filtered_relation=False) def check_alias(self, alias) def add_annotation(self, annotation, alias, select=True) def resolve_expression(self, query, *args, **kwargs) def get_external_cols(self) def get_group_by_cols(self, wrapper=None) def as_sql(self, compiler, connection) def resolve_lookup_value(self, value, can_reuse, allow_joins) def solve_lookup_type(self, lookup, summarize=False) def check_query_object_type(self, value, opts, field) def check_related_objects(self, field, value, opts) def check_filterable(self, expression) def build_lookup(self, lookups, lhs, rhs) def try_transform(self, lhs, name) def build_filter(self, filter_expr, branch_negated=False, current_negated=False, can_reuse=None, allow_joins=True, split_subq=True, reuse_with_filtered_relation=False, check_filterable=True, summarize=False) def add_filter(self, filter_lhs, filter_rhs) def add_q(self, q_object) def build_where(self, filter_expr) def clear_where(self) def build_filtered_relation_q(self, q_object, reuse, branch_negated=False, current_negated=False) def add_filtered_relation(self, filtered_relation, alias) def names_to_path(self, names, opts, allow_many=True, fail_on_missing=False) def setup_joins(self, names, opts, alias, can_reuse=None, allow_many=True, reuse_with_filtered_relation=False) def trim_joins(self, targets, joins, path) def resolve_ref(self, name, allow_joins=True, reuse=None, summarize=False) def split_exclude(self, filter_expr, can_reuse, names_with_path) def set_empty(self) def is_empty(self) def set_limits(self, low=None, high=None) def clear_limits(self) @property def is_sliced(self) def has_limit_one(self) def can_filter(self) def clear_select_clause(self) def clear_select_fields(self) def add_select_col(self, col, name) def set_select(self, cols) def add_distinct_fields(self, *field_names) def add_fields(self, field_names, allow_m2m=True) def add_ordering(self, *ordering) def clear_ordering(self, force=False, clear_default=True) def set_group_by(self, allow_aliases=True) def add_select_related(self, fields) def add_extra(self, select, select_params, where, params, tables, order_by) def clear_deferred_loading(self) def add_deferred_loading(self, field_names) def add_immediate_loading(self, field_names) def set_annotation_mask(self, names) def append_annotation_mask(self, names) def set_extra_mask(self, names) def set_values(self, fields) @property def annotation_select(self) @property def extra_select(self) def trim_start(self, names_with_path) def is_nullable(self, field) def get_order_dir(field, default='ASC') class JoinPromoter() def add_votes(self, votes) def update_join_types(self, query) class DeleteQuery(Query) compiler = 'SQLDeleteCompiler' def do_query(self, table, where, using) def delete_batch(self, pk_list, using) class UpdateQuery(Query) compiler = 'SQLUpdateCompiler' def clone(self) def update_batch(self, pk_list, values, using) def add_update_values(self, values) def add_update_fields(self, values_seq) def add_related_update(self, model, field, value) def get_related_updates(self) class InsertQuery(Query) compiler = 'SQLInsertCompiler' def insert_values(self, fields, objs, raw=False) class AggregateQuery(Query) compiler = 'SQLAggregateCompiler' AND = 'AND' OR = 'OR' XOR = 'XOR' class WhereNode(tree.Node) default = AND resolved = False conditional = True def split_having_qualify(self, negated=False, must_group_by=False) def as_sql(self, compiler, connection) def get_group_by_cols(self) def get_source_expressions(self) def set_source_expressions(self, children) def relabel_aliases(self, change_map) def clone(self) def relabeled_clone(self, change_map) def replace_expressions(self, replacements) def get_refs(self) @cached_property def contains_aggregate(self) @cached_property def contains_over_clause(self) @property def is_summary(self) def resolve_expression(self, *args, **kwargs) @cached_property def output_field(self) def select_format(self, compiler, sql, params) def get_db_converters(self, connection) def get_lookup(self, lookup) def leaves(self) class NothingNode() contains_aggregate = False contains_over_clause = False def as_sql(self, compiler=None, connection=None) class ExtraWhere() contains_aggregate = False contains_over_clause = False def as_sql(self, compiler=None, connection=None) class SubqueryConstraint() contains_aggregate = False contains_over_clause = False def as_sql(self, compiler, connection) @pytest.fixture(scope='session') def setup_db(request) @pytest.fixture def db(setup_db) def setup_databases(verbosity, *, keepdb=False, debug_sql=False, aliases=None, serialized_aliases=None, **kwargs) def get_unique_databases_and_mirrors(aliases=None) def teardown_databases(old_config, verbosity, keepdb=False) def dependency_ordered(test_databases, dependencies) class TransactionManagementError(ProgrammingError) def get_connection(using=None) def get_autocommit(using=None) def set_autocommit(autocommit, using=None) def commit(using=None) def rollback(using=None) def savepoint(using=None) def savepoint_rollback(sid, using=None) def savepoint_commit(sid, using=None) def clean_savepoints(using=None) def get_rollback(using=None) def set_rollback(rollback, using=None) @contextmanager def mark_for_rollback_on_error(using=None) def on_commit(func, using=None, robust=False) class Atomic(ContextDecorator) def atomic(using=None, savepoint=True, durable=False) def make_model_tuple(model) def resolve_callables(mapping) def unpickle_named_row(names, values) @functools.lru_cache def create_namedtuple_class(*names) class ProvidersChartCard(ChartCard) title = 'Providers' def get_chart_data(self) @register_viewset class OAuthConnectionViewset(AdminViewset) class ListView(AdminModelListView) nav_section = 'OAuth' model = OAuthConnection title = 'Connections' fields = ['id', 'user', 'provider_key', 'provider_user_id'] cards = [ProvidersChartCard] class DetailView(AdminModelDetailView) model = OAuthConnection title = 'OAuth connection' @register_config class Config(PackageConfig) package_label = 'plainoauth' class OAuthError(Exception) class OAuthStateMismatchError(OAuthError) class OAuthUserAlreadyExistsError(OAuthError) @models.register_model class OAuthConnection(models.Model) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) user = models.ForeignKey(SettingsReference('AUTH_USER_MODEL'), on_delete=models.CASCADE, related_name='oauth_connections') provider_key = models.CharField(max_length=100) provider_user_id = models.CharField(max_length=100) access_token = models.CharField(max_length=2000) refresh_token = models.CharField(max_length=2000, required=False) access_token_expires_at = models.DateTimeField(required=False, allow_null=True) refresh_token_expires_at = models.DateTimeField(required=False, allow_null=True) class Meta() constraints = [models.UniqueConstraint(fields=['provider_key', 'provider_user_id'], name='plainoauth_oauthconnection_unique_provider_key_user_id')] ordering = ('provider_key',) def refresh_access_token(self) def set_token_fields(self, oauth_token: 'OAuthToken') def set_user_fields(self, oauth_user: 'OAuthUser') def access_token_expired(self) def refresh_token_expired(self) @classmethod def get_or_create_user(cls, *, provider_key: str, oauth_token: 'OAuthToken', oauth_user: 'OAuthUser') @classmethod def connect(cls, *, user, provider_key: str, oauth_token: 'OAuthToken', oauth_user: 'OAuthUser') @classmethod def check(cls, **kwargs) SESSION_STATE_KEY = 'plainoauth_state' SESSION_NEXT_KEY = 'plainoauth_next' class OAuthToken() class OAuthUser() class OAuthProvider() authorization_url = '' def get_authorization_url_params(self, *, request: HttpRequest) def refresh_oauth_token(self, *, oauth_token: OAuthToken) def get_oauth_token(self, *, code: str, request: HttpRequest) def get_oauth_user(self, *, oauth_token: OAuthToken) def get_authorization_url(self, *, request: HttpRequest) def get_client_id(self) def get_client_secret(self) def get_scope(self) def get_callback_url(self, *, request: HttpRequest) def generate_state(self) def check_request_state(self, *, request: HttpRequest) def handle_login_request(self, *, request: HttpRequest, redirect_to: str='') def handle_connect_request(self, *, request: HttpRequest, redirect_to: str='') def handle_disconnect_request(self, *, request: HttpRequest) def handle_callback_request(self, *, request: HttpRequest) def login(self, *, request: HttpRequest, user: Any) def get_login_redirect_url(self, *, request: HttpRequest) def get_disconnect_redirect_url(self, *, request: HttpRequest) def get_oauth_provider_instance(*, provider_key: str) def get_provider_keys() class OAuthRouter(Router) namespace = 'oauth' urls = [include('/', [path('login/', views.OAuthLoginView, name='login'), path('connect/', views.OAuthConnectView, name='connect'), path('disconnect/', views.OAuthDisconnectView, name='disconnect'), path('callback/', views.OAuthCallbackView, name='callback')])] logger = logging.getLogger(__name__) class OAuthLoginView(View) def post(self) class OAuthCallbackView(View) def get(self) class OAuthConnectView(AuthViewMixin, View) def post(self) class OAuthDisconnectView(AuthViewMixin, View) def post(self) class PageNotFoundError(Exception) class RedirectPageError(Exception) class PagesRenderer(mistune.HTMLRenderer) def heading(self, text, level, **attrs) def block_code(self, code, info=None) def render_markdown(content) class InnerTextParser(HTMLParser) def handle_data(self, data) def get_inner_text(html_content) class PageRenderError(Exception) class Page() def set_template_context(self, context) @cached_property def vars(self) @cached_property def title(self) @cached_property def content(self) def is_markdown(self) def is_template(self) def is_asset(self) def is_redirect(self) def get_url_path(self) def get_url_name(self) def get_template_name(self) def get_view_class(self) pages_registry = PagesRegistry() class PagesRouter(Router) namespace = 'pages' urls = pages_registry.get_page_urls() class PageViewMixin() @cached_property def page(self) class PageView(PageViewMixin, TemplateView) template_name = 'page.html' def get_template_names(self) def get_template_context(self) class PageRedirectView(PageViewMixin, View) def get(self) class PageAssetView(PageViewMixin, AssetView) def get_url_path(self) def get_asset_path(self, path) def get_debug_asset_path(self, path) class PageviewsTrendCard(TrendCard) title = 'Pageviews trend' model = Pageview datetime_field = 'timestamp' size = TrendCard.Sizes.FULL @register_viewset class PageviewAdmin(AdminViewset) class ListView(AdminModelListView) model = Pageview nav_section = 'Pageviews' title = 'Pageviews' fields = ['user_id', 'url', 'timestamp', 'session_key'] search_fields = ['pk', 'user_id', 'url', 'session_key'] cards = [PageviewsTrendCard] class DetailView(AdminModelDetailView) model = Pageview class UserPageviewsCard(Card) title = 'Recent pageviews' template_name = 'pageviews/card.html' def get_template_context(self) @register_chore('pageviews') def clear_old_pageviews() @register_config class Config(PackageConfig) package_label = 'plainpageviews' @models.register_model class Pageview(models.Model) uuid = models.UUIDField(default=uuid.uuid4) url = models.URLField(max_length=1024) timestamp = models.DateTimeField(auto_now_add=True) title = models.CharField(max_length=512, required=False) referrer = models.URLField(max_length=1024, required=False) user_id = models.CharField(max_length=255, required=False) session_key = models.CharField(max_length=255, required=False) class Meta() ordering = ['-timestamp'] indexes = [models.Index(fields=['timestamp']), models.Index(fields=['user_id']), models.Index(fields=['session_key']), models.Index(fields=['url'])] constraints = [models.UniqueConstraint(fields=['uuid'], name='plainpageviews_pageview_unique_uuid')] @register_template_extension class PageviewsJSExtension(InclusionTagExtension) tags = {'pageviews_js'} template_name = 'pageviews/js.html' def get_context(self, context, *args, **kwargs) class PageviewsRouter(Router) namespace = 'pageviews' urls = [path('track/', views.TrackView, name='track')] class TrackView(CsrfExemptViewMixin, View) def post(self) def check_user_password(user, password) class PasswordResetForm(forms.Form) email = forms.EmailField(max_length=254) def send_mail(self, *, template_name: str, context: dict, from_email: str, to_email: str) def get_users(self, email) def save(self, *, generate_reset_url: callable, email_template_name: str='password_reset', from_email: str='', extra_email_context: dict | None=None) class PasswordSetForm(forms.Form) new_password1 = forms.CharField(strip=False) new_password2 = forms.CharField(strip=False) def clean_new_password2(self) def save(self, commit=True) class PasswordChangeForm(PasswordSetForm) current_password = forms.CharField(strip=False) def clean_current_password(self) class PasswordLoginForm(forms.Form) email = forms.EmailField(max_length=150) password = forms.CharField(strip=False) def clean(self) def get_user(self) class PasswordSignupForm(ModelForm) confirm_password = forms.CharField(strip=False) class Meta() model = get_user_model() fields = ('email', 'password') def clean(self) def check_password(password, encoded, setter=None, preferred='default') def hash_password(password, salt=None, hasher='default') @functools.lru_cache def get_hashers() @functools.lru_cache def get_hashers_by_algorithm() def get_hasher(algorithm='default') def identify_hasher(encoded) def mask_hash(hash, show=6, char='*') def must_update_salt(salt, expected_entropy) class BasePasswordHasher() algorithm = None library = None salt_entropy = 128 def salt(self) def verify(self, password, encoded) def encode(self, password, salt) def decode(self, encoded) def safe_summary(self, encoded) def must_update(self, encoded) def harden_runtime(self, password, encoded) class PBKDF2PasswordHasher(BasePasswordHasher) algorithm = 'pbkdf2_sha256' iterations = 720000 digest = hashlib.sha256 def encode(self, password, salt, iterations=None) def decode(self, encoded) def verify(self, password, encoded) def safe_summary(self, encoded) def must_update(self, encoded) def harden_runtime(self, password, encoded) class PBKDF2SHA1PasswordHasher(PBKDF2PasswordHasher) algorithm = 'pbkdf2_sha1' digest = hashlib.sha1 class Argon2PasswordHasher(BasePasswordHasher) algorithm = 'argon2' library = 'argon2' time_cost = 2 memory_cost = 102400 parallelism = 8 def encode(self, password, salt) def decode(self, encoded) def verify(self, password, encoded) def safe_summary(self, encoded) def must_update(self, encoded) def harden_runtime(self, password, encoded) def params(self) class BCryptSHA256PasswordHasher(BasePasswordHasher) algorithm = 'bcrypt_sha256' digest = hashlib.sha256 library = ('bcrypt', 'bcrypt') rounds = 12 def salt(self) def encode(self, password, salt) def decode(self, encoded) def verify(self, password, encoded) def safe_summary(self, encoded) def must_update(self, encoded) def harden_runtime(self, password, encoded) class BCryptPasswordHasher(BCryptSHA256PasswordHasher) algorithm = 'bcrypt' digest = None class ScryptPasswordHasher(BasePasswordHasher) algorithm = 'scrypt' block_size = 8 maxmem = 0 parallelism = 1 work_factor = 2 ** 14 def encode(self, password, salt, n=None, r=None, p=None) def decode(self, encoded) def verify(self, password, encoded) def safe_summary(self, encoded) def must_update(self, encoded) def harden_runtime(self, password, encoded) class PasswordField(models.CharField) def deconstruct(self) def pre_save(self, model_instance, add) @deconstructible class MinimumLengthValidator() @deconstructible class CommonPasswordValidator() @cached_property def DEFAULT_PASSWORD_LIST_PATH(self) @deconstructible class NumericPasswordValidator() class PasswordForgotView(FormView) form_class = PasswordResetForm def generate_password_reset_token(self, user) def generate_password_reset_url(self, user) def form_valid(self, form) class PasswordResetView(FormView) form_class = PasswordSetForm reset_token_max_age = 60 * 60 def check_password_reset_token(self, token) def get(self) def get_user(self) def get_form_kwargs(self) def form_valid(self, form) class PasswordChangeView(FormView) form_class = PasswordChangeForm def get_form_kwargs(self) def form_valid(self, form) class PasswordLoginView(FormView) form_class = PasswordLoginForm success_url = '/' def get(self) def form_valid(self, form) class PasswordSignupView(CreateView) form_class = PasswordSignupForm success_url = '/' def form_valid(self, form) @register_cli('test') @click.command(context_settings={'ignore_unknown_options': True}) @click.argument('pytest_args', nargs=-1, type=click.UNPROCESSED) def cli(pytest_args) def setup() def pytest_configure(config) @pytest.fixture def settings() class RedirectForm(ModelForm) class Meta() model = Redirect fields = ['from_pattern', 'to_pattern', 'http_status', 'order', 'enabled', 'is_regex'] @register_viewset class RedirectAdmin(AdminViewset) class ListView(AdminModelListView) model = Redirect nav_section = 'Redirection' title = 'Redirects' fields = ['from_pattern', 'to_pattern', 'http_status', 'order', 'enabled'] search_fields = ['from_pattern', 'to_pattern'] allow_global_search = False class DetailView(AdminModelDetailView) model = Redirect class CreateView(AdminModelCreateView) model = Redirect form_class = RedirectForm class UpdateView(AdminModelUpdateView) model = Redirect form_class = RedirectForm class DeleteView(AdminModelDeleteView) model = Redirect @register_viewset class RedirectLogAdmin(AdminViewset) class ListView(AdminModelListView) model = RedirectLog nav_section = 'Redirection' title = 'Redirect logs' fields = ['created_at', 'from_url', 'to_url', 'http_status', 'user_agent', 'ip_address', 'referrer'] search_fields = ['from_url', 'to_url', 'user_agent', 'ip_address', 'referrer'] allow_global_search = False class DetailView(AdminModelDetailView) model = RedirectLog @register_viewset class NotFoundLogAdmin(AdminViewset) class ListView(AdminModelListView) model = NotFoundLog nav_section = 'Redirection' title = '404 logs' fields = ['created_at', 'url', 'user_agent', 'ip_address', 'referrer'] search_fields = ['url', 'user_agent', 'ip_address', 'referrer'] allow_global_search = False class DetailView(AdminModelDetailView) model = NotFoundLog @register_chore('redirection') def delete_logs() @register_config class Config(PackageConfig) package_label = 'plainredirection' class RedirectionMiddleware() @models.register_model class Redirect(models.Model) from_pattern = models.CharField(max_length=255) to_pattern = models.CharField(max_length=255) http_status = models.PositiveSmallIntegerField(default=301) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) order = models.PositiveSmallIntegerField(default=0) enabled = models.BooleanField(default=True) is_regex = models.BooleanField(default=False) class Meta() ordering = ['order', '-created_at'] indexes = [models.Index(fields=['order'])] constraints = [models.UniqueConstraint(fields=['from_pattern'], name='plainredirects_redirect_unique_from_pattern')] def matches_request(self, request) def get_redirect_url(self, request) @models.register_model class RedirectLog(models.Model) redirect = models.ForeignKey(Redirect, on_delete=models.CASCADE) from_url = models.URLField(max_length=512) to_url = models.URLField(max_length=512) http_status = models.PositiveSmallIntegerField(default=301) ip_address = models.GenericIPAddressField() user_agent = models.CharField(required=False, max_length=512) referrer = models.CharField(required=False, max_length=512) created_at = models.DateTimeField(auto_now_add=True) class Meta() ordering = ['-created_at'] @classmethod def from_redirect(cls, redirect, request) @models.register_model class NotFoundLog(models.Model) url = models.URLField(max_length=512) ip_address = models.GenericIPAddressField() user_agent = models.CharField(required=False, max_length=512) referrer = models.CharField(required=False, max_length=512) created_at = models.DateTimeField(auto_now_add=True) class Meta() ordering = ['-created_at'] @classmethod def from_request(cls, request) @register_chore('sessions') def clear_expired() @register_config class Config(PackageConfig) package_label = 'plainsessions' class CreateError(Exception) class UpdateError(Exception) class SessionStore() @property def key_salt(self) def get(self, key, default=None) def pop(self, key, default=__not_given) def setdefault(self, key, value) def update(self, dict_) def has_key(self, key) def keys(self) def values(self) def items(self) def clear(self) def is_empty(self) def flush(self) def cycle_key(self) def create(self) def save(self, must_create=False) SESSION_COOKIE_NAME = 'sessionid' SESSION_COOKIE_AGE = 60 * 60 * 24 * 7 * 2 SESSION_COOKIE_DOMAIN = None SESSION_COOKIE_SECURE = True SESSION_COOKIE_PATH = '/' SESSION_COOKIE_HTTPONLY = True SESSION_COOKIE_SAMESITE = 'Lax' SESSION_SAVE_EVERY_REQUEST = False SESSION_EXPIRE_AT_BROWSER_CLOSE = False class SessionInterrupted(BadRequest) class SessionMiddleware() @models.register_model class Session(models.Model) session_key = models.CharField(max_length=40, primary_key=True) session_data = models.TextField() expires_at = models.DateTimeField() class Meta() indexes = [models.Index(fields=['expires_at'])] def decoded_data(self) def add_session_cookie_message(message) W010 = Warning(add_session_cookie_message("You have 'plain.sessions' in your INSTALLED_PACKAGES, but you have not set SESSION_COOKIE_SECURE to True."), id='security.W010') W011 = Warning(add_session_cookie_message("You have 'plain.sessions.middleware.SessionMiddleware' in your MIDDLEWARE, but you have not set SESSION_COOKIE_SECURE to True."), id='security.W011') W012 = Warning(add_session_cookie_message('SESSION_COOKIE_SECURE is not set to True.'), id='security.W012') def add_httponly_message(message) W013 = Warning(add_httponly_message("You have 'plain.sessions' in your INSTALLED_PACKAGES, but you have not set SESSION_COOKIE_HTTPONLY to True."), id='security.W013') W014 = Warning(add_httponly_message("You have 'plain.sessions.middleware.SessionMiddleware' in your MIDDLEWARE, but you have not set SESSION_COOKIE_HTTPONLY to True."), id='security.W014') W015 = Warning(add_httponly_message('SESSION_COOKIE_HTTPONLY is not set to True.'), id='security.W015') @register_check(deploy=True) def check_session_cookie_secure(package_configs, **kwargs) @register_check(deploy=True) def check_session_cookie_httponly(package_configs, **kwargs) @register_viewset class SupportFormEntryAdmin(AdminViewset) class ListView(AdminModelListView) model = SupportFormEntry nav_section = 'Support' title = 'Form entries' fields = ['user', 'email', 'name', 'form_slug', 'created_at'] class DetailView(AdminModelDetailView) model = SupportFormEntry class UserSupportFormEntriesCard(Card) title = 'Recent support' template_name = 'support/card.html' def get_template_context(self) @register_config class Config(PackageConfig) package_label = 'plainsupport' SUPPORT_FORMS = {'default': 'plain.support.forms.SupportForm'} class SupportForm(ModelForm) class Meta() model = SupportFormEntry fields = ['name', 'email', 'message'] def find_user(self) def save(self, commit=True) def notify(self, instance) @models.register_model class SupportFormEntry(models.Model) uuid = models.UUIDField(default=uuid.uuid4) user = models.ForeignKey(SettingsReference('AUTH_USER_MODEL'), on_delete=models.SET_NULL, related_name='support_form_entries', allow_null=True, required=False) name = models.CharField(max_length=255) email = models.EmailField() message = models.TextField() created_at = models.DateTimeField(auto_now_add=True) form_slug = models.CharField(max_length=255) class Meta() ordering = ['-created_at'] constraints = [models.UniqueConstraint(fields=['uuid'], name='plainsupport_supportformentry_unique_uuid')] class SupportRouter(Router) namespace = 'support' urls = [path('form/.js', views.SupportFormJSView), path('form//iframe/', views.SupportIFrameView, name='iframe'), path('form//', views.SupportFormView, name='form')] class SupportFormView(FormView) template_name = 'support/page.html' def get_form(self) def get_template_context(self) def get_form_kwargs(self) def form_valid(self, form) def get_success_url(self, form) class SupportIFrameView(CsrfExemptViewMixin, SupportFormView) template_name = 'support/iframe.html' def get_response(self) class SupportFormJSView(View) def get(self) @register_cli('tailwind') @click.group('tailwind') def cli() @cli.command() @click.pass_context def init(ctx) @cli.command() @click.pass_context def install(ctx) @cli.command() def update() @cli.command() @click.option('--watch', is_flag=True) @click.option('--minify', is_flag=True) @click.pass_context def build(ctx, watch, minify) TAILWIND_SRC_PATH = APP_PATH.parent / 'tailwind.css' TAILWIND_DIST_PATH = APP_ASSETS_DIR / 'tailwind.min.css' @register_cli('tunnel') @click.command() @click.argument('destination') @click.option('--subdomain', help='The subdomain to use for the tunnel.', envvar='PLAIN_TUNNEL_SUBDOMAIN') @click.option('--tunnel-host', envvar='PLAIN_TUNNEL_HOST', hidden=True, default='plaintunnel.com') @click.option('--debug', 'log_level', flag_value='DEBUG', help='Enable debug logging.') @click.option('--quiet', 'log_level', flag_value='WARNING', help='Only log warnings and errors.') def cli(destination, subdomain, tunnel_host, log_level) class TunnelClient() def run(self) def setup() VENDOR_DIR = APP_ASSETS_DIR / 'vendor' @register_cli('vendor') @click.group() def cli() @cli.command() def sync() @cli.command() @click.argument('name', nargs=-1, default=None) def update(name) @cli.command() @click.argument('url') @click.option('--name', help='Name of the dependency') @click.option('--sourcemap/--no-sourcemap', default=True, help='Download sourcemap') def add(url, name, sourcemap) VENDOR_DIR = APP_ASSETS_DIR / 'vendor' def iter_next_version(version) class Dependency() @staticmethod def parse_version_from_url(url) def download(self, version) def install(self) def update(self) def save_config(self) def vendor(self, response) def get_deps() def setup() class DependencyError(Exception) class UnknownVersionError(DependencyError) class UnknownContentTypeError(DependencyError) class VersionMismatchError(DependencyError) class SuccessfulJobsCard(Card) title = 'Successful Jobs' text = 'View' def get_number(self) def get_link(self) class ErroredJobsCard(Card) title = 'Errored Jobs' text = 'View' def get_number(self) def get_link(self) class LostJobsCard(Card) title = 'Lost Jobs' text = 'View' def get_description(self) def get_number(self) def get_link(self) class RetriedJobsCard(Card) title = 'Retried Jobs' text = 'View' def get_number(self) def get_link(self) class WaitingJobsCard(Card) title = 'Waiting Jobs' def get_number(self) class RunningJobsCard(Card) title = 'Running Jobs' def get_number(self) @register_viewset class JobRequestViewset(AdminViewset) class ListView(AdminModelListView) nav_section = 'Worker' model = JobRequest title = 'Job requests' fields = ['id', 'job_class', 'priority', 'created_at', 'start_at', 'unique_key'] actions = ['Delete'] def perform_action(self, action: str, target_pks: list) class DetailView(AdminModelDetailView) model = JobRequest title = 'Job Request' @register_viewset class JobViewset(AdminViewset) class ListView(AdminModelListView) nav_section = 'Worker' model = Job fields = ['id', 'job_class', 'priority', 'created_at', 'started_at', 'unique_key'] actions = ['Delete'] cards = [WaitingJobsCard, RunningJobsCard] def perform_action(self, action: str, target_pks: list) class DetailView(AdminModelDetailView) model = Job @register_viewset class JobResultViewset(AdminViewset) class ListView(AdminModelListView) nav_section = 'Worker' model = JobResult title = 'Job results' fields = ['id', 'job_class', 'priority', 'created_at', 'status', 'retried', 'is_retry'] search_fields = ['uuid', 'job_uuid', 'job_request_uuid', 'job_class'] cards = [SuccessfulJobsCard, ErroredJobsCard, LostJobsCard, RetriedJobsCard] filters = ['Successful', 'Errored', 'Cancelled', 'Lost', 'Retried'] actions = ['Retry'] allow_global_search = False def get_description(self) def get_initial_queryset(self) def get_fields(self) def perform_action(self, action: str, target_pks: list) class DetailView(AdminModelDetailView) model = JobResult title = 'Job result' def post(self) @register_chore('worker') def clear_completed() logger = logging.getLogger('plain.worker') @register_cli('worker') @click.group() def cli() @cli.command() @click.option('queues', '--queue', default=['default'], multiple=True, type=str, help='Queue to process') @click.option('--max-processes', 'max_processes', default=None, type=int, envvar='PLAIN_WORKER_MAX_PROCESSES') @click.option('--max-jobs-per-process', 'max_jobs_per_process', default=None, type=int, envvar='PLAIN_WORKER_MAX_JOBS_PER_PROCESS') @click.option('--max-pending-per-process', 'max_pending_per_process', default=10, type=int, envvar='PLAIN_WORKER_MAX_PENDING_PER_PROCESS') @click.option('--stats-every', 'stats_every', default=60, type=int, envvar='PLAIN_WORKER_STATS_EVERY') def run(queues, max_processes, max_jobs_per_process, max_pending_per_process, stats_every) @cli.command() def clear_completed() @cli.command() def stats() @cli.command() def purge_processing() @cli.command() @click.argument('job_class_name', type=str) def run_job(job_class_name) @cli.command() def registered_jobs() JOBS_MODULE_NAME = 'jobs' @register_config class Config(PackageConfig) package_label = 'plainworker' def ready(self) logger = logging.getLogger(__name__) class JobType(type) class Job() def run(self) def run_in_worker(self, *, queue: str | None=None, delay: int | datetime.timedelta | datetime.datetime | None=None, priority: int | None=None, retries: int | None=None, retry_attempt: int=0, unique_key: str | None=None) def get_unique_key(self) def get_queue(self) def get_priority(self) def get_retries(self) def get_retry_delay(self, attempt: int) class AppLoggerMiddleware() logger = logging.getLogger('plain.worker') @models.register_model class JobRequest(models.Model) created_at = models.DateTimeField(auto_now_add=True) uuid = models.UUIDField(default=uuid.uuid4) job_class = models.CharField(max_length=255) parameters = models.JSONField(required=False, allow_null=True) priority = models.IntegerField(default=0) source = models.TextField(required=False) queue = models.CharField(default='default', max_length=255) retries = models.IntegerField(default=0) retry_attempt = models.IntegerField(default=0) unique_key = models.CharField(max_length=255, required=False) start_at = models.DateTimeField(required=False, allow_null=True) class Meta() ordering = ['priority', '-created_at'] indexes = [models.Index(fields=['priority']), models.Index(fields=['created_at']), models.Index(fields=['queue']), models.Index(fields=['start_at']), models.Index(fields=['unique_key']), models.Index(fields=['job_class']), models.Index(name='job_request_class_unique_key', fields=['job_class', 'unique_key'])] constraints = [models.UniqueConstraint(fields=['job_class', 'unique_key'], condition=models.Q(unique_key__gt='', retry_attempt=0), name='plainworker_jobrequest_unique_job_class_key'), models.UniqueConstraint(fields=['uuid'], name='plainworker_jobrequest_unique_uuid')] def convert_to_job(self) class JobQuerySet(models.QuerySet) def running(self) def waiting(self) def mark_lost_jobs(self) @models.register_model class Job(models.Model) uuid = models.UUIDField(default=uuid.uuid4) created_at = models.DateTimeField(auto_now_add=True) started_at = models.DateTimeField(required=False, allow_null=True) job_request_uuid = models.UUIDField() job_class = models.CharField(max_length=255) parameters = models.JSONField(required=False, allow_null=True) priority = models.IntegerField(default=0) source = models.TextField(required=False) queue = models.CharField(default='default', max_length=255) retries = models.IntegerField(default=0) retry_attempt = models.IntegerField(default=0) unique_key = models.CharField(max_length=255, required=False) objects = JobQuerySet.as_manager() class Meta() ordering = ['-created_at'] indexes = [models.Index(fields=['created_at']), models.Index(fields=['queue']), models.Index(fields=['unique_key']), models.Index(fields=['started_at']), models.Index(fields=['job_class']), models.Index(fields=['job_request_uuid']), models.Index(name='job_class_unique_key', fields=['job_class', 'unique_key'])] constraints = [models.UniqueConstraint(fields=['uuid'], name='plainworker_job_unique_uuid')] def run(self) def convert_to_result(self, *, status, error='') def as_json(self) class JobResultQuerySet(models.QuerySet) def successful(self) def cancelled(self) def lost(self) def errored(self) def retried(self) def failed(self) def retryable(self) def retry_failed_jobs(self) class JobResultStatuses(models.TextChoices) SUCCESSFUL = ('SUCCESSFUL', 'Successful') ERRORED = ('ERRORED', 'Errored') CANCELLED = ('CANCELLED', 'Cancelled') LOST = ('LOST', 'Lost') @models.register_model class JobResult(models.Model) uuid = models.UUIDField(default=uuid.uuid4) created_at = models.DateTimeField(auto_now_add=True) job_uuid = models.UUIDField() started_at = models.DateTimeField(required=False, allow_null=True) ended_at = models.DateTimeField(required=False, allow_null=True) error = models.TextField(required=False) status = models.CharField(max_length=20, choices=JobResultStatuses.choices) job_request_uuid = models.UUIDField() job_class = models.CharField(max_length=255) parameters = models.JSONField(required=False, allow_null=True) priority = models.IntegerField(default=0) source = models.TextField(required=False) queue = models.CharField(default='default', max_length=255) retries = models.IntegerField(default=0) retry_attempt = models.IntegerField(default=0) unique_key = models.CharField(max_length=255, required=False) retry_job_request_uuid = models.UUIDField(required=False, allow_null=True) objects = JobResultQuerySet.as_manager() class Meta() ordering = ['-created_at'] indexes = [models.Index(fields=['created_at']), models.Index(fields=['job_uuid']), models.Index(fields=['started_at']), models.Index(fields=['ended_at']), models.Index(fields=['status']), models.Index(fields=['job_request_uuid']), models.Index(fields=['job_class']), models.Index(fields=['queue'])] constraints = [models.UniqueConstraint(fields=['uuid'], name='plainworker_jobresult_unique_uuid')] def retry_job(self, delay: int | None=None) class JobParameters() @staticmethod def to_json(args, kwargs) @staticmethod def from_json(data) class ModelInstanceParameter() @staticmethod def from_instance(instance) @staticmethod def to_instance(s) @staticmethod def is_gid(x) class JobsRegistry() def register_job(self, job_class, alias='') def get_job_class_name(self, job_class) def get_job_class(self, name: str) def load_job(self, job_class_name: str, parameters) jobs_registry = JobsRegistry() def register_job(job_class=None, *, alias='') class Schedule() @classmethod def from_cron(cls, cron) def next(self, now=None) @register_job class ScheduledCommand(Job) def run(self) def get_unique_key(self) def load_schedule(schedules) logger = logging.getLogger('plain.worker') class Worker() def run(self) def shutdown(self) def maybe_log_stats(self) def maybe_check_job_results(self) def maybe_schedule_jobs(self) def log_stats(self) def rescue_job_results(self) def future_finished_callback(job_uuid: str, future: Future) def process_job(job_uuid)