# plain.postgres **Model your data and store it in a database.** - [Overview](#overview) - [Database connection](#database-connection) - [Middleware](#middleware) - [Bypassing a connection pooler for management operations](#bypassing-a-connection-pooler-for-management-operations) - [Querying](#querying) - [Schema management](#schema-management) - [Syncing](#syncing) - [Structural migrations](#structural-migrations) - [Data migrations](#data-migrations) - [Convergence](#convergence) - [Fields](#fields) - [Relationships](#relationships) - [Constraints](#constraints) - [Forms](#forms) - [Architecture](#architecture) - [Diagnostics](#diagnostics) - [Settings](#settings) - [FAQs](#faqs) - [Installation](#installation) ## Overview ```python # app/users/models.py from datetime import datetime from plain import postgres from plain.postgres import types from plain.passwords.models import PasswordField @postgres.register_model class User(postgres.Model): email: str = types.EmailField() password = PasswordField() is_admin: bool = types.BooleanField(default=False) created_at: datetime = types.DateTimeField(create_now=True) def __str__(self) -> str: return self.email ``` Every model automatically includes an `id` field which serves as the primary key. The name `id` is reserved and can't be used for other fields. You can create, update, and delete instances of your models: ```python from .models import User # Create a new user user = User.query.create( email="test@example.com", password="password", ) # Update a user user.email = "new@example.com" user.save() # Delete a user user.delete() # Query for users admin_users = User.query.filter(is_admin=True) ``` ## Database connection Configure the database with a single URL. The canonical Plain setting is `POSTGRES_URL`: ```python # app/settings.py POSTGRES_URL = "postgresql://user:password@localhost:5432/dbname" ``` Or via environment variable: ```sh PLAIN_POSTGRES_URL=postgresql://user:password@localhost:5432/dbname ``` Plain also reads the `DATABASE_URL` environment variable as a fallback — it's the widely-used convention for Postgres connection strings, so most hosting setups work without extra configuration: ```sh DATABASE_URL=postgresql://user:password@localhost:5432/dbname ``` Precedence (highest to lowest): `PLAIN_POSTGRES_URL` → `POSTGRES_URL` in `settings.py` → `DATABASE_URL` environment variable. The URL supports any libpq connection parameter as a query string — for example `?sslmode=require&application_name=web&connect_timeout=10`. These are parsed and passed through to the driver. To explicitly disable the database (e.g. during Docker builds where no database is available), set the URL to the string `none`: ```sh PLAIN_POSTGRES_URL=none ``` ### Middleware Connections are checked out lazily on first use and returned to the pool when the HTTP request finishes. That's handled by [`DatabaseConnectionMiddleware`](./middleware.py#DatabaseConnectionMiddleware) — add it to `MIDDLEWARE` once you install `plain.postgres`: ```python # app/settings.py MIDDLEWARE = [ "plain.postgres.DatabaseConnectionMiddleware", # ...other middleware ] ``` Place it near the top so downstream middleware can use the database inside `before_request` / `after_response` and still have the connection returned cleanly at the end. For `StreamingResponse` / `AsyncStreamingResponse`, the connection is returned after the body is fully drained (not when the view returns), so generators that lazily query the database — for example `Model.query.iterator()` or raw cursor loops — keep their cursor alive until the last chunk is sent. Without the middleware, connections keep living on their thread until something explicitly calls `plain.postgres.db.return_database_connection()` (or the process exits). That's fine for short-lived scripts but wastes a connection per thread in long-running servers. ### Bypassing a connection pooler for management operations Transaction-mode poolers (PlanetScale, Supabase's pooler, Neon's pooler, standalone pgbouncer in transaction mode) can't run DDL, long transactions, or `pg_dump`. To work around this, set a second URL that management commands use to reach Postgres directly: ```sh PLAIN_POSTGRES_URL=postgresql://app@pooler:6432/myapp PLAIN_POSTGRES_MANAGEMENT_URL=postgresql://app@postgres:5432/myapp ``` When `POSTGRES_MANAGEMENT_URL` is set, these commands connect through it instead of `POSTGRES_URL`: - `plain migrations create`, `plain migrations apply`, `plain migrations list`, `plain migrations prune`, `plain migrations squash` - `plain postgres sync`, `plain postgres converge`, `plain postgres schema` - `plain postgres diagnose`, `plain postgres drop-unknown-tables`, `plain postgres shell` When it's unset, all commands use `POSTGRES_URL` — there's no behavior change for existing apps. To route custom code through the management connection, use the `use_management_connection()` context manager: ```python from plain.postgres import use_management_connection with use_management_connection(): # Any get_connection() / ORM calls inside this block use POSTGRES_MANAGEMENT_URL. run_custom_schema_change() ``` You _can_ point the two URLs at different Postgres roles — e.g. a least-privilege DML role for runtime and a DDL-capable role for management. Plain does not currently automate the grant/ownership plumbing that split requires (default privileges for newly-created tables, ownership reassignment, preflight checks that the runtime role can see the schema). If you adopt that pattern, you're responsible for wiring those up yourself. ## Querying Models come with a powerful query API through their [`QuerySet`](./query.py#QuerySet) interface: ```python # Get all users all_users = User.query.all() # Filter users admin_users = User.query.filter(is_admin=True) recent_users = User.query.filter(created_at__gte=datetime.now() - timedelta(days=7)) # Get a single user user = User.query.get(email="test@example.com") # Complex queries with Q objects from plain.postgres import Q users = User.query.filter( Q(is_admin=True) | Q(email__endswith="@example.com") ) # Ordering users = User.query.order_by("-created_at") # Limiting results first_10_users = User.query.all()[:10] ``` For more advanced querying options, see the [`QuerySet`](./query.py#QuerySet) class. ### Custom QuerySets You can customize [`QuerySet`](./query.py#QuerySet) classes to provide specialized query methods. Define a custom QuerySet and assign it to your model's `query` attribute: ```python from typing import Self from plain.postgres import types class PublishedQuerySet(postgres.QuerySet["Article"]): def published_only(self) -> Self: return self.filter(status="published") def draft_only(self) -> Self: return self.filter(status="draft") @postgres.register_model class Article(postgres.Model): title: str = types.TextField(max_length=200) status: str = types.TextField(max_length=20) query = PublishedQuerySet() # Usage - all methods available on Article.query all_articles = Article.query.all() published_articles = Article.query.published_only() draft_articles = Article.query.draft_only() # Chaining works naturally recent_published = Article.query.published_only().order_by("-created_at")[:10] ``` For internal code that needs to create QuerySet instances programmatically, use `from_model()`: ```python special_qs = SpecialQuerySet.from_model(Article) ``` ### Typing QuerySets For better type checking of query results, you can explicitly type the `query` attribute: ```python from __future__ import annotations from plain import postgres from plain.postgres import types @postgres.register_model class User(postgres.Model): email: str = types.EmailField() is_admin: bool = types.BooleanField(default=False) query: postgres.QuerySet[User] = postgres.QuerySet() ``` With this annotation, type checkers will know that `User.query.get()` returns a `User` instance and `User.query.filter()` returns `QuerySet[User]`. This is optional but improves IDE autocomplete and type checking. ### Raw SQL For complex queries that can't be expressed with the ORM, you can use raw SQL. Use `Model.query.raw()` to execute raw SQL and get model instances back: ```python users = User.query.raw(""" SELECT * FROM users WHERE created_at > %s ORDER BY created_at DESC """, [some_date]) for user in users: print(user.email) # Full model instance with all fields ``` Raw querysets support `prefetch_related()` for loading related objects: ```python users = User.query.raw("SELECT * FROM users WHERE is_admin = %s", [True]) users = users.prefetch_related("posts") ``` For queries that don't map to a model, use the database cursor directly: ```python from plain.postgres import get_connection with get_connection().cursor() as cursor: cursor.execute("SELECT COUNT(*) FROM users WHERE is_admin = %s", [True]) count = cursor.fetchone()[0] ``` For SQL set operations (UNION, INTERSECT, EXCEPT), use raw SQL. For simple cases, use Q objects instead: ```python from plain.postgres import Q # Equivalent to UNION (on same model) users = User.query.filter(Q(is_admin=True) | Q(is_staff=True)) ``` ### Avoiding N+1 queries #### Use `select_related` for ForeignKey access in loops Accessing a FK in a loop without `select_related()` fires one query per row. ```python # Bad — N+1 queries for post in Post.query.all(): print(post.author.name) # Good — single JOIN for post in Post.query.select_related("author").all(): print(post.author.name) ``` #### Use `prefetch_related` for reverse/M2N access in loops Reverse ForeignKey and ManyToMany relations need a separate prefetch query. ```python # Bad — N+1 queries for author in Author.query.all(): print(author.posts.count()) # Good — one extra query for author in Author.query.prefetch_related("posts").all(): print(author.posts.count()) ``` #### Annotate instead of per-row aggregations Use database-level aggregation instead of calling `.count()` or similar per row. ```python # Bad — N+1 queries for category in Category.query.all(): print(category.products.count()) # Good — single query with annotation from plain.postgres.aggregates import Count for category in Category.query.annotate(num_products=Count("products")).all(): print(category.num_products) ``` #### Fetch all data in the view Templates should only render data, never trigger queries. Prepare everything in the view. ```python # Bad — template triggers lazy queries def get_template_context(self): return {"posts": Post.query.all()} # related lookups happen in template # Good — eagerly load everything def get_template_context(self): return {"posts": Post.query.select_related("author").prefetch_related("tags").all()} ``` ### Query efficiency #### Use `.values_list()` when you only need specific columns ```python # Bad — loads entire model objects emails = [u.email for u in User.query.all()] # Good — single column, flat list emails = list(User.query.values_list("email", flat=True)) ``` #### Use `.exists()` instead of `.count() > 0` `.exists()` stops at the first match; `.count()` scans all matching rows. ```python # Bad if User.query.filter(is_active=True).count() > 0: ... # Good if User.query.filter(is_active=True).exists(): ... ``` #### Use `.count()` instead of `len(queryset)` `len()` loads all objects into memory just to count them. ```python # Bad total = len(User.query.all()) # Good total = User.query.count() ``` #### Use `bulk_create` / `bulk_update` for batch operations Avoid calling `.save()` in a loop — each call is a separate query. ```python # Bad — N INSERT statements for name in names: Tag(name=name).save() # Good — single INSERT Tag.query.bulk_create([Tag(name=name) for name in names]) ``` #### Use queryset `.update()` / `.delete()` for mass operations ```python # Bad — N UPDATE statements for user in User.query.filter(is_active=False): user.is_archived = True user.save() # Good — single UPDATE statement User.query.filter(is_active=False).update(is_archived=True) ``` #### Use `.only()` / `.defer()` for heavy columns Skip large text or JSON fields when you don't need them. ```python # Bad — loads large body text for a listing page posts = Post.query.all() # Good — defers heavy column posts = Post.query.defer("body").all() ``` #### Use `.iterator()` for large result sets Process rows in chunks instead of loading everything into memory. ```python # Bad — entire table in memory for row in HugeTable.query.all(): process(row) # Good — chunked iteration for row in HugeTable.query.iterator(chunk_size=2000): process(row) ``` ## Transactions By default, each query runs in its own implicit transaction and is committed immediately (autocommit mode). When you need multiple queries to succeed or fail together — like creating a user and their profile — wrap them in an explicit transaction. ### Atomic blocks Wrap multiple queries in a transaction with `transaction.atomic()`: ```python from plain.postgres import transaction with transaction.atomic(): user = User(email="test@example.com") user.save() Profile(user=user).save() # Both saves commit together, or both roll back on error ``` Nesting `atomic()` creates savepoints: ```python with transaction.atomic(): user.save() try: with transaction.atomic(): risky_operation() # If this fails... except SomeError: pass # ...only the inner block rolls back safe_operation() # This still runs in the outer transaction ``` ### Read-only transactions Run a block of code in a read-only transaction using `read_only()`. Any write (INSERT, UPDATE, DELETE, DDL) raises `psycopg.errors.ReadOnlySqlTransaction`: ```python from plain.postgres.db import read_only with read_only(): users = User.query.all() # reads work User.query.create(name="x") # raises psycopg.errors.ReadOnlySqlTransaction ``` `read_only()` opens a single `BEGIN READ ONLY` transaction for the block. Nested `atomic()` blocks inside become savepoints of the outer read-only transaction and inherit read-only. Because it opens its own transaction, `read_only()` cannot be entered inside an existing `atomic()` block — doing so raises `TransactionManagementError`. Because the whole block is one transaction, catching a database error inside `read_only()` and trying to keep reading will fail — the transaction is aborted and any further query raises `TransactionManagementError`. Wrap the write in a nested `atomic()` savepoint if you need to recover and continue: ```python with read_only(): try: with atomic(): User.query.create(name="x") # raises, savepoint rolls back except psycopg.errors.ReadOnlySqlTransaction: pass User.query.count() # still works — outer txn is healthy ``` ## Schema management Schema changes fall into three categories, each with a different author and apply model: - **Convergence** — declarative properties like indexes, constraints, NOT NULL, and FK `on_delete`. Derived from model definitions and applied automatically using online-safe DDL (`CREATE INDEX CONCURRENTLY`, `NOT VALID` + `VALIDATE`, etc.). The framework owns the safe apply pattern. - **Structural migrations** — tables, columns, renames, column type changes. Framework-generated from the model diff, but you review them and decide when to deploy (a column type change can rewrite the table; a column drop is destructive). - **Data migrations** — backfills, transformations, one-time cleanup. Authored by you via `RunPython` or `RunSQL`. The framework only sequences them. | Change | Category | Safe apply pattern | | ------------------------------------- | -------------------- | --------------------------------------------------- | | Add / drop index | Convergence | `CREATE INDEX CONCURRENTLY` | | Add / drop unique or check constraint | Convergence | `ADD CONSTRAINT NOT VALID` + `VALIDATE` | | Add / remove NOT NULL | Convergence | `CHECK NOT VALID` + `VALIDATE` + `SET NOT NULL` | | Change FK `on_delete` action | Convergence | drop + re-add with `NOT VALID` + `VALIDATE` | | Set / change / drop column `DEFAULT` | Convergence | catalog-only `ALTER COLUMN SET/DROP DEFAULT` | | Create / drop table | Structural migration | framework-generated, you review | | Add / drop / rename column | Structural migration | framework-generated, you review | | Column type change (safe widening) | Structural migration | framework-generated `ALTER TYPE` with implicit cast | | Column type change (other) | Data migration | you author (explicit `RunSQL` with `USING`) | | Data backfill or transformation | Data migration | you author (`RunPython` / `RunSQL`) | | One-time cleanup, seeding | Data migration | you author | **The principle: who authors the change, and can the framework guarantee safety?** If the framework can derive both the change and a universally-safe apply pattern from model definitions, it belongs to convergence. If the framework can generate the DDL but safety depends on context (table size, deploy timing, destructiveness), it's a structural migration — you review it before deploying. If only you know what to do, it's a data migration. Many convergence-managed changes produce DB-enforced behavior — cascading deletes (`ON DELETE`), validation (`CHECK`, `NOT NULL`), default generation. Whether a change is "behavioral" doesn't determine the category; whether the framework can guarantee a safe apply does. | Property | Convergence | Migrations | | ------------------------ | --------------------------------------------------------------------- | ----------------------------------------------------------- | | Authored by | Framework (derived from models) | Framework (structural) or you (data) | | When it runs | Every `sync`, by diffing models vs database | Once, in recorded timestamp order | | Drift correction | Yes — reverts undeclared DB changes on next sync | No — manual DB changes persist | | Reversible (intentional) | Implicit — roll back code, re-sync re-derives | No — forward-only, fix-forward | | Failure behavior | Per-operation commits — partial progress on failure (re-run to retry) | Batch transaction — failure rolls back the entire migration | | Files on disk | None — derived from models live | `.py` files in `migrations/` | | Safe DDL | Framework-applied patterns (CONCURRENTLY, NOT VALID + VALIDATE) | Generated DDL; you review before deploy | **Drift correction is a convergence-only behavior.** Convergence re-runs on every `sync` and compares models against the database. An index created manually outside a model declaration will be dropped on the next run because models are the source of truth. Migrations don't behave this way — once applied, they're recorded and never re-applied. **Caveats.** The safety promise isn't absolute. Structural migrations aren't lint-checked yet: adding a column with a volatile default (`gen_random_uuid()`, `now()`) on a large table will rewrite it without warning. Review structural migrations before deploying to production. **Column type changes.** The autodetector only auto-generates `AlterField` for a small allowlist of lossless widenings (`smallint → integer`, `smallint → bigint`, `integer → bigint`) and for parameter-only changes like `max_length`. Every other base-type change rejects with guidance — arbitrary `USING col::newtype` casts either fail at apply time (e.g. timestamp → uuid) or silently corrupt data (e.g. bigint FK → text stringifies PKs), and migrations are forward-only. For anything outside the allowlist, scaffold `plain migrations create --empty --name alter___type` and author an explicit `RunSQL` with a `USING` expression you've reviewed. "Safe" here means data-integrity safe, not operationally cheap. An `ALTER COLUMN ... TYPE` that changes on-disk width (any of the allowlisted widenings) takes `ACCESS EXCLUSIVE` and rewrites the table — on a large table this can block writes for minutes. Deploy these during a maintenance window, not in the middle of traffic. **Out of scope for convergence.** Triggers, views, stored procedures, and other non-standard DDL stay outside convergence — it won't create them from models, and it won't drop them if they exist. Manage them with `RunSQL` data migrations. ### Syncing The primary command for all schema management is `postgres sync`. It runs migrations and convergence together: ```bash plain postgres sync ``` ``` plain postgres sync │ ├─ 1. Create migrations (development only) │ Detects model changes, generates migration files │ ├─ 2. Apply migrations │ Runs pending migrations in a single transaction │ └─ 3. Converge Compares indexes, constraints, FKs, and nullability against model declarations — applies fixes independently ``` In development (`DEBUG=True`), sync auto-generates migrations before applying them. In production, it only applies existing migrations and converges. | Command | Purpose | | ------------------------------ | --------------------------------------------------------------------- | | `plain postgres sync` | Create + apply migrations + converge (the one command for everything) | | `plain postgres sync --check` | Exit non-zero if anything would change (for CI) | | `plain postgres schema` | Show schema state with drift detection | | `plain postgres schema --json` | Machine-readable schema output | | `plain postgres converge` | Run convergence alone (advanced) | ### Structural migrations Structural migrations are framework-generated from model changes — tables, columns, renames, column type changes. They are Python files stored in your app's `migrations/` directory. You don't author them by hand; you edit models and run `plain migrations create`. ```bash plain migrations create ``` Key flags: - `--dry-run` — Show what migrations would be created (with operations and SQL) without writing files - `--check` — Exit non-zero if migrations are needed (for CI) - `--name ` — Set the migration filename Shared commands (apply equally to structural and data migrations): | Command | Purpose | | ------------------------------------------- | ---------------------------------------------------- | | `plain migrations apply` | Apply pending migrations | | `plain migrations apply --plan` | Preview what would run | | `plain migrations apply --check` | Exit non-zero if unapplied migrations exist (for CI) | | `plain migrations apply --fake` | Mark as applied without running SQL | | `plain migrations list` | View migration status by package | | `plain migrations squash ` | Squash migrations into one | | `plain migrations prune` | Remove stale migration records | #### Development workflow During development, iterating on models often produces multiple small migrations (0002, 0003, 0004...). Clean these up before committing. **Consolidating uncommitted migrations (delete-and-recreate):** Use this when migrations exist only in your local dev environment and haven't been committed or deployed. 1. Delete the intermediate migration files (keep the initial 0001 and any previously committed migrations) 2. `plain migrations prune --yes` — removes stale DB records for the deleted files 3. `plain migrations create` — creates a single fresh migration with all the changes 4. `plain migrations apply --fake` — marks the new migration as applied (the schema is already correct from the old migrations) **Consolidating committed migrations (squash):** Use this when migrations have already been committed or deployed to other environments. `plain migrations squash ` creates a replacement migration with a `replaces` list. Keep the original files until all environments have migrated past the squash point, then delete them and run `migrations prune`. **Which method to use:** | Scenario | Method | | ----------------------------------------- | ------------------------------------------------------- | | Migrations are local only (not committed) | Delete-and-recreate | | Migrations are committed but not deployed | Delete-and-recreate (if all developers reset) or squash | | Migrations are deployed to production | Squash or full reset | #### Resetting migrations Over time a package can accumulate dozens of migrations. Once **every environment** (dev, staging, production) has applied all of them, you can replace the entire history with a single fresh `0001_initial`. **Prerequisites:** - Every environment (dev, staging, production) has applied all existing migrations. If any environment is behind, the reset will break it. - The first migration is named `0001_initial` (the default). If it has a different name, this workflow won't work cleanly. **Steps:** 1. Run `plain migrations list` locally and verify everything is applied. 2. Delete every file in the package's `migrations/` directory except `__init__.py`. 3. Run `plain migrations create` to generate a fresh `0001_initial`. 4. Run `plain migrations prune --yes` to remove stale DB records. The existing `0001_initial` record matches the new file, so the database is immediately up to date. 5. Verify with `plain postgres schema` (zero issues means the reset is clean) and `plain migrations create --check` (no pending changes). 6. Commit and deploy. On every other environment, run `plain migrations prune --yes`. No actual SQL runs — it only cleans up migration history records. If `migrations prune` is already in your deploy steps, no changes are needed. **Things to keep in mind:** - If resetting multiple packages, process depended-on packages first — the new `0001_initial` may have cross-package FK dependencies. - Data migrations (`RunPython`) in the deleted history are gone, which is fine since they've already run everywhere. - If CI runs `migrations create --check` or `migrations apply --check`, the reset PR must be merged and deployed before those checks pass in other branches. ### Data migrations Data migrations are user-authored operations — backfills, transformations, seeding, cleanup. The framework has no way to derive these from models; you write the logic and it gets sequenced in timestamp order alongside structural migrations. Create an empty migration to author one: ```bash plain migrations create --empty ``` Add a `RunPython` or `RunSQL` operation inside: ```python def forwards(models, schema_editor): User = models.get_model("users", "User") # Use .update() for batch SQL — a row-by-row save loop can lock a large table. User.query.filter(full_name="").update(full_name="pending") ``` For large tables, chunk the work (e.g. by ID range) and commit between batches so no single transaction holds locks for too long. See [Structural migrations](#structural-migrations) for shared commands (`apply`, `list`, `squash`, `prune`). #### Cascading deletes inside data migrations `Model.delete()` and `QuerySet.delete()` rely on Postgres `ON DELETE` clauses (`CASCADE`, `SET NULL`, `RESTRICT`) — Plain does not walk relationships in Python. Foreign key constraints are added by **convergence** (step 3 of `postgres sync`), not by migrations. During a fresh `migrations apply` (before convergence has run), FK constraints don't exist yet. A `RunPython` data migration that calls `.delete()` on a model with cascading children will: - Delete only the parent row — children become orphans - Cause convergence's later `VALIDATE CONSTRAINT` step to fail because of the orphans This only affects fresh applies. Existing databases keep their FK constraints across syncs, so incremental data migrations are unaffected. **If your data migration needs to delete rows that have cascading children, handle the cascade explicitly:** ```python def forwards(models, schema_editor): Parent = models.get_model("myapp", "Parent") Child = models.get_model("myapp", "Child") # Delete children first, then parent — explicit, no constraint reliance Child.query.filter(parent__name="old").delete() Parent.query.filter(name="old").delete() ``` Or use `RunSQL` with explicit cascade if the relationship is large. ### Convergence Convergence compares the indexes, constraints, foreign keys, nullability, and [storage parameters](#storage-parameters) declared on your models against what actually exists in the database, then applies fixes to make them match. You don't need to create migrations for these — just declare them on your model and run `postgres sync`. ```python @postgres.register_model class User(postgres.Model): email: str = types.EmailField() username: str = types.TextField(max_length=150) age: int = types.IntegerField() model_options = postgres.Options( indexes=[ postgres.Index(fields=["email"], name="users_email_idx"), ], constraints=[ postgres.UniqueConstraint(fields=["email"], name="users_email_uniq"), postgres.CheckConstraint(check=postgres.Q(age__gte=0), name="users_age_positive"), ], ) ``` When you run `postgres sync`, convergence detects that these indexes and constraints are missing and creates them — using non-blocking DDL operations where possible (e.g. `CREATE INDEX CONCURRENTLY`, `ADD CONSTRAINT ... NOT VALID` followed by `VALIDATE CONSTRAINT`). **Key properties:** - **Idempotent** — safe to run repeatedly. If the database already matches, nothing happens. - **Non-blocking** — indexes are built with `CONCURRENTLY`, constraints use `NOT VALID` + `VALIDATE` to avoid locking writes. - **Per-operation commits** — each fix is committed independently so a single failure doesn't roll back other fixes. - **Self-healing** — detects and rebuilds `INVALID` indexes (e.g. from a previously failed `CREATE INDEX CONCURRENTLY`). - **Rename-aware** — detects renamed indexes and constraints by matching their structure, avoiding unnecessary drop + recreate. **Inspecting schema state:** Use `postgres schema` to see what convergence would do. It shows every model's columns, indexes, and constraints compared against the database, with issues highlighted: ```bash plain postgres schema # all models plain postgres schema User # single model plain postgres schema --json # machine-readable output ``` **Staged rollouts:** Some changes can't be applied automatically. For example, if you add `NOT NULL` to a column that has existing `NULL` rows, convergence will report this as a blocked change and tell you to backfill the data first. Run `postgres sync` again after the backfill. **Cleanup:** When you remove an index or constraint from a model, convergence automatically drops the undeclared database object on the next `postgres sync`. Models are the source of truth — if it's not declared, it gets removed. ### DDL timeouts Every framework-issued DDL statement — both in migrations and in convergence — is wrapped with `lock_timeout` and `statement_timeout` so a deploy can't hang indefinitely waiting for a lock, and so a backfill against an unexpectedly large table fails fast instead of holding `ACCESS EXCLUSIVE` for minutes. ```python # app/settings.py — defaults shown POSTGRES_MIGRATION_LOCK_TIMEOUT = "3s" POSTGRES_MIGRATION_STATEMENT_TIMEOUT = "3s" POSTGRES_CONVERGENCE_LOCK_TIMEOUT = "3s" POSTGRES_CONVERGENCE_STATEMENT_TIMEOUT = "3s" ``` `lock_timeout` applies to every DDL. `statement_timeout` applies only to statements that take `ACCESS EXCLUSIVE` — non-blocking operations (`CREATE INDEX CONCURRENTLY`, `VALIDATE CONSTRAINT`) run unbounded because they can't cascade the lock queue. If a migration issues a row-touching UPDATE (e.g. a hand-written `RunPython` or `RunSQL` backfill), the 3s `statement_timeout` will kill it on any non-tiny table. That's intentional — the right fix is a batched data migration, not a single long-running UPDATE. The common first-time failure mode is applying migrations against a pre-seeded dev or staging database: raise the ceiling for that one run, then lower it back for production deploys. Use `RunSQL(no_timeout=True)` to opt out for a specific operation: ```python from plain.postgres.migrations.operations import RunSQL operations = [ RunSQL( "UPDATE orders SET status = 'pending' WHERE status IS NULL", no_timeout=True, ), ] ``` Non-atomic migrations (`Migration.atomic = False`, used for `CREATE INDEX CONCURRENTLY` in a migration) skip the timeout prelude automatically — `SET LOCAL` is a no-op outside a transaction block. Manage timeouts inside your own `RunSQL` if you need them. Environment overrides: every setting accepts `PLAIN_POSTGRES_*` env vars, so you can raise the ceiling for a specific deploy without a code change: ```bash PLAIN_POSTGRES_MIGRATION_STATEMENT_TIMEOUT=30s plain migrations apply ``` ## Fields You can use many field types for different data: ```python from decimal import Decimal from datetime import datetime from plain import postgres from plain.postgres import types class Product(postgres.Model): # Text fields name: str = types.TextField(max_length=200) description: str = types.TextField() # Numeric fields price: Decimal = types.DecimalField(max_digits=10, decimal_places=2) quantity: int = types.IntegerField(default=0) # Boolean fields is_active: bool = types.BooleanField(default=True) # Date and time fields created_at: datetime = types.DateTimeField(create_now=True) updated_at: datetime = types.DateTimeField(update_now=True) ``` **Text fields:** - [`TextField`](./fields/__init__.py#TextField) - Text (with optional max length) - [`EmailField`](./fields/__init__.py#EmailField) - Email address (validated) - [`URLField`](./fields/__init__.py#URLField) - URL (validated) **Numeric fields:** - [`IntegerField`](./fields/__init__.py#IntegerField) - Integer - [`BigIntegerField`](./fields/__init__.py#BigIntegerField) - Big (8 byte) integer - [`SmallIntegerField`](./fields/__init__.py#SmallIntegerField) - Small integer - [`FloatField`](./fields/__init__.py#FloatField) - Floating point number - [`DecimalField`](./fields/__init__.py#DecimalField) - Fixed precision decimal **Date and time fields:** - [`DateField`](./fields/__init__.py#DateField) - Date (without time) - [`DateTimeField`](./fields/__init__.py#DateTimeField) - Date with time - [`TimeField`](./fields/__init__.py#TimeField) - Time (without date) - [`DurationField`](./fields/__init__.py#DurationField) - Time duration (timedelta) - [`TimeZoneField`](./fields/timezones.py#TimeZoneField) - Timezone (stored as string, accessed as ZoneInfo) **Other fields:** - [`BooleanField`](./fields/__init__.py#BooleanField) - True/False - [`UUIDField`](./fields/__init__.py#UUIDField) - UUID (pass `generate=True` for a per-row `gen_random_uuid()` default) - [`BinaryField`](./fields/__init__.py#BinaryField) - Raw binary data - [`JSONField`](./fields/json.py#JSONField) - JSON data - [`GenericIPAddressField`](./fields/__init__.py#GenericIPAddressField) - IPv4 or IPv6 address - [`RandomStringField`](./fields/text.py#RandomStringField) - Per-row random hex string generated by Postgres (`length=`) — use for tokens, slugs, short IDs instead of a Python callable default. Slices `gen_random_uuid()` directly; values are uniform hex characters **Encrypted fields:** - [`EncryptedTextField`](./fields/encrypted.py#EncryptedTextField) - Text encrypted at rest - [`EncryptedJSONField`](./fields/encrypted.py#EncryptedJSONField) - JSON encrypted at rest See [Encrypted fields](#encrypted-fields) for details. For relationship fields, see [Relationships](#relationships). For nullable fields, use `| None` in the annotation: ```python published_at: datetime | None = types.DateTimeField(allow_null=True, required=False) ``` ### Sharing fields across models To share common fields across multiple models, use Python classes as mixins. The final, registered model must inherit directly from `postgres.Model` and the mixins should not. ```python from datetime import datetime from plain import postgres from plain.postgres import types # Regular Python class for shared fields class TimestampedMixin: created_at: datetime = types.DateTimeField(create_now=True) updated_at: datetime = types.DateTimeField(update_now=True) # Models inherit from the mixin AND postgres.Model @postgres.register_model class User(TimestampedMixin, postgres.Model): email: str = types.EmailField() password = PasswordField() is_admin: bool = types.BooleanField(default=False) @postgres.register_model class Note(TimestampedMixin, postgres.Model): content: str = types.TextField(max_length=1024) liked: bool = types.BooleanField(default=False) ``` ### Encrypted fields Encrypted fields transparently encrypt values before writing to the database and decrypt on read. Use them for third-party credentials, API keys, OAuth tokens, and other secrets your application needs back in plaintext. This is **not** for passwords or tokens you issue — those should be hashed (one-way). This is for secrets you receive from others and need to use later. ```python from plain import postgres from plain.postgres import types @postgres.register_model class Integration(postgres.Model): name: str = types.TextField(max_length=100) api_key: str = types.EncryptedTextField(max_length=200) credentials: dict = types.EncryptedJSONField(required=False, allow_null=True) ``` Values are encrypted using Fernet (AES-128-CBC + HMAC-SHA256) with a key derived from `SECRET_KEY`. The `cryptography` package is required — install it with `pip install cryptography`. **Available fields:** - `EncryptedTextField` — encrypts text, stored as `text` in the database regardless of `max_length` (ciphertext is longer than plaintext). `max_length` is enforced on the plaintext value during validation. - `EncryptedJSONField` — serializes to JSON, encrypts, and stores as `text`. Supports custom `encoder` and `decoder` parameters (same as `JSONField`). **Limitations:** - **No lookups** — encrypted values are non-deterministic (same plaintext produces different ciphertext each time), so filtering on encrypted fields doesn't work. Only `isnull` lookups are supported. - **No indexes or constraints** — encrypted fields cannot be used in indexes or unique constraints. Preflight checks will catch this. **Key rotation:** Encryption uses `SECRET_KEY`. When rotating keys, add the old key to `SECRET_KEY_FALLBACKS` — the field will decrypt with any fallback key and re-encrypt with the current key on save. **Gradual migration:** If you add encryption to an existing plaintext column, old unencrypted values are returned as-is on read (the field detects whether a value is encrypted by its `$fernet$` prefix). They'll be encrypted on the next save. ## Relationships Use [`ForeignKeyField`](./fields/related.py#ForeignKeyField) for many-to-one and [`ManyToManyField`](./fields/related.py#ManyToManyField) for many-to-many: ```python from plain import postgres from plain.postgres import types @postgres.register_model class Book(postgres.Model): title: str = types.TextField(max_length=200) author: Author = types.ForeignKeyField("Author", on_delete=postgres.CASCADE) tags = types.ManyToManyField("Tag") ``` ### Reverse relationships When you define a `ForeignKey` or `ManyToManyField`, Plain automatically creates a reverse accessor on the related model (like `author.book_set`). You can explicitly declare these reverse relationships using [`ReverseForeignKey`](./fields/reverse_descriptors.py#ReverseForeignKey) and [`ReverseManyToMany`](./fields/reverse_descriptors.py#ReverseManyToMany): ```python from plain import postgres from plain.postgres import types @postgres.register_model class Author(postgres.Model): name: str = types.TextField(max_length=200) # Explicit reverse accessor for all books by this author books = types.ReverseForeignKey(to="Book", field="author") @postgres.register_model class Book(postgres.Model): title: str = types.TextField(max_length=200) author: Author = types.ForeignKeyField(Author, on_delete=postgres.CASCADE) # Usage author = Author.query.get(name="Jane Doe") for book in author.books.all(): print(book.title) # Add a new book author.books.create(title="New Book") ``` For many-to-many relationships: ```python @postgres.register_model class Feature(postgres.Model): name: str = types.TextField(max_length=100) # Explicit reverse accessor for all cars with this feature cars = types.ReverseManyToMany(to="Car", field="features") @postgres.register_model class Car(postgres.Model): model: str = types.TextField(max_length=100) features = types.ManyToManyField(Feature) # Usage feature = Feature.query.get(name="Sunroof") for car in feature.cars.all(): print(car.model) ``` **Why use explicit reverse relations?** - **Self-documenting**: The reverse accessor is visible in the model definition - **Better IDE support**: Autocomplete works for reverse accessors - **Type safety**: When combined with type annotations, type checkers understand the relationship - **Control**: You choose the accessor name instead of relying on automatic `_set` naming Reverse relations are optional — if you don't declare them, the automatic `{model}_set` accessor still works. To get type checking for custom QuerySet methods on reverse relations, specify the QuerySet type as a second parameter: ```python # Basic usage books: types.ReverseForeignKey[Book] = types.ReverseForeignKey(to="Book", field="author") # With custom QuerySet for proper method recognition books: types.ReverseForeignKey[Book, BookQuerySet] = types.ReverseForeignKey(to="Book", field="author") # Now type checkers recognize custom methods like .published() author.books.query.published() ``` ## Constraints ### Validation `save()` runs `full_clean()` by default — field validators, model `clean()`, and any constraints with a `validate()` method are all checked, raising `ValidationError` on violation. Pass `clean_and_validate=False` to skip it (e.g. for trusted bulk loads). ```python @postgres.register_model class User(postgres.Model): email: str = types.EmailField() age: int = types.IntegerField() model_options = postgres.Options( constraints=[ postgres.UniqueConstraint(fields=["email"], name="unique_email"), ], ) def clean(self): if self.age < 18: raise ValidationError("User must be 18 or older") ``` Field-level validation happens automatically based on field types and constraints. ### Indexes and constraints You can optimize queries and ensure data integrity with indexes and constraints. These are managed automatically by [convergence](#convergence) — just declare them on the model and run `postgres sync`. ```python class User(postgres.Model): email: str = types.EmailField() username: str = types.TextField(max_length=150) age: int = types.IntegerField() model_options = postgres.Options( indexes=[ postgres.Index(fields=["email"]), postgres.Index(fields=["-created_at"], name="user_created_idx"), ], constraints=[ postgres.UniqueConstraint(fields=["email", "username"], name="unique_user"), postgres.CheckConstraint(check=postgres.Q(age__gte=0), name="age_positive"), ], ) ``` Constraints are also checked during `full_clean()` (which `save()` runs by default — see [Validation](#validation)). Pass `violation_error` to customize the resulting `ValidationError`. It accepts anything `ValidationError(...)` accepts — a string, a `{field: message}` dict, or a fully-formed `ValidationError`: ```python # Simple message — lands on NON_FIELD_ERRORS postgres.CheckConstraint( check=postgres.Q(age__gte=0), name="age_positive", violation_error="Age must be zero or greater.", ) # Dict form — routes to a specific field postgres.CheckConstraint( check=postgres.Q(age__gte=0), name="age_positive", violation_error={"age": "Age must be zero or greater."}, ) # Full ValidationError — for code, params, multiple fields postgres.CheckConstraint( check=postgres.Q(age__gte=0), name="age_positive", violation_error=ValidationError( {"age": "Age must be zero or greater."}, code="age_negative", ), ) ``` A `code` becomes `ValidationError.code` — useful for test assertions, error tracking buckets, and code that branches on specific error types without string-matching. `UniqueConstraint` accepts the same `violation_error`. With a single-field unique constraint, a string `violation_error="That email is taken."` auto-routes to that field; otherwise (multi-field, expressions, or a CheckConstraint) errors land on `NON_FIELD_ERRORS` unless you pass the dict form. See [BaseConstraint](./constraints.py#BaseConstraint) for the full signature. ### Storage parameters Per-table Postgres [storage parameters](https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-STORAGE-PARAMETERS) (`pg_class.reloptions`) are declared on `model_options` and managed by [convergence](#convergence) — `postgres sync` issues the `ALTER TABLE … SET/RESET (...)` to make the live table match. They're not serialized into migrations. ```python class CachedItem(postgres.Model): ... model_options = postgres.Options( storage_parameters={ # Tighter autovacuum on a churn-heavy table "autovacuum_vacuum_scale_factor": 0.1, # TOAST has its own autovacuum schedule — prefix with `toast.` "toast.autovacuum_vacuum_scale_factor": 0.05, }, ) ``` Models are the source of truth: undeclared parameters set on the live table are reset on the next sync. Use this for autovacuum tuning, `fillfactor`, TOAST options, etc. — anything you'd otherwise apply by hand with `ALTER TABLE … SET (...)`. ### Schema design #### Index fields used in filters and ordering Add indexes for columns that appear in `.filter()`, `.order_by()`, or `.exclude()`. ```python # Bad — full table scan on every filtered query class Order(postgres.Model): status: str = types.TextField(max_length=20) created_at: datetime = types.DateTimeField() # Good — indexed for common queries class Order(postgres.Model): status: str = types.TextField(max_length=20) created_at: datetime = types.DateTimeField() model_options = postgres.Options( indexes=[postgres.Index(fields=["status", "-created_at"])], ) ``` #### Use database constraints, not app-only validation Enforce uniqueness and data integrity at the database level. ```python # Bad — only validated in Python def save(self): if MyModel.query.filter(email=self.email).exists(): raise ValueError("duplicate") # Good — database-enforced model_options = postgres.Options( constraints=[postgres.UniqueConstraint(fields=["email"])], ) ``` #### Choose `on_delete` deliberately CASCADE for owned children, RESTRICT for referenced data, SET_NULL for optional references. ```python # Bad — blindly using CASCADE everywhere company: Company = types.ForeignKeyField("Company", on_delete=postgres.CASCADE) # deleting company deletes invoices! # Good — block the delete while invoices reference the company company: Company = types.ForeignKeyField("Company", on_delete=postgres.RESTRICT) ``` #### No `allow_null` on string fields Use `default=""` instead of `allow_null=True` to avoid two representations of "empty." ```python # Bad — NULL and "" both mean "empty" nickname: str = types.TextField(max_length=50, allow_null=True) # Good — single empty representation nickname: str = types.TextField(max_length=50, default="") ``` ## Forms Models integrate with [plain.forms](../../../plain-forms/plain/forms/README.md): ```python from plain import forms from .models import User class UserForm(forms.ModelForm): class Meta: model = User fields = ["email", "is_admin"] # Usage form = UserForm(request=request) if form.is_valid(): user = form.save() ``` ## Architecture ```mermaid graph TB subgraph "User API" Model["Model"] QS["QuerySet"] Expr["Expressions
F() Q() Value()"] end subgraph "Query Layer" Query["Query"] Where["WhereNode"] Join["Join"] end subgraph "Compilation" Compiler["SQLCompiler"] end subgraph "Database" Connection["DatabaseConnection"] DB[(Database)] end Model -- ".query" --> QS QS -- "owns" --> Query Expr -- "used by" --> Query Query -- "contains" --> Where Query -- "contains" --> Join Query -- "get_compiler()" --> Compiler Compiler -- "execute_sql()" --> Connection Connection -- "executes" --> DB ``` **Query execution flow:** 1. **Model.query** returns a [`QuerySet`](./query.py#QuerySet) bound to the model 2. **QuerySet** methods like `.filter()` modify the internal [`Query`](./sql/query.py#Query) object 3. When results are needed, **Query.get_compiler()** creates the appropriate [`SQLCompiler`](./sql/compiler.py#SQLCompiler) 4. **SQLCompiler.as_sql()** renders the Query to SQL 5. **SQLCompiler.execute_sql()** runs the SQL via [`DatabaseConnection`](./postgres/connection.py#DatabaseConnection) and returns results **Key components:** - [`Model`](./base.py#Model) - Defines fields, relationships, and provides the `query` attribute - [`QuerySet`](./query.py#QuerySet) - Chainable API (`.filter()`, `.exclude()`, `.order_by()`) that builds a Query - [`Query`](./sql/query.py#Query) - Internal representation of a query's logical structure (tables, joins, filters) - [`SQLCompiler`](./sql/compiler.py#SQLCompiler) - Transforms a Query into executable SQL - [`DatabaseConnection`](./postgres/connection.py#DatabaseConnection) - PostgreSQL connection and query execution ## Diagnostics Run health checks against your database. `diagnose` is designed to produce only actionable findings — every warning has a copy-paste fix or specific resource to investigate, and noisy one-off signals (hit ratios, XID age) are surfaced as informational context rather than as warnings. ```bash uv run plain postgres diagnose ``` Output modes: ```bash uv run plain postgres diagnose --json # structured output for scripts/agents uv run plain postgres diagnose --verbose # expand to show every check, including passing uv run plain postgres diagnose --all # include findings on installed-package tables ``` ### Guiding principle `diagnose` emits a **warning** only if the remedy fits in the user's codebase or is an app-level action they own. If the remedy is "run SQL against your DB" or "configure your Postgres server," the check emits **operational context** or an **informational number**, not a warning. This keeps the warning surface high-trust — every warning has an edit-to-make — and prevents `diagnose` from bleeding into DB-host concerns. ### Warning-tier checks Things the user can fix by editing code + running `plain postgres sync`, or app-level incidents they must act on. **Structural — always-real; a fix is possible immediately.** | Check | What it finds | Severity | | ----------------------- | --------------------------------------------------------------------------------------------------- | ---------------- | | **Invalid indexes** | Broken indexes from failed `CREATE INDEX CONCURRENTLY` — maintained on writes, never used for reads | Warning | | **Duplicate indexes** | One index is a column-prefix of another on the same table | Warning | | **Missing FK indexes** | Foreign key columns without any index coverage | Warning | | **Sequence exhaustion** | Identity sequences approaching their type max | Warning/Critical | **Cumulative — depends on stats since the last reset.** | Check | What it finds | Severity | | ---------------------------- | -------------------------------------------------------------------------------------------------------------------- | -------- | | **Unused indexes** | Indexes with zero scans since stats reset (>1 MB). Excludes unique, constraint-backing, and sole-FK-coverage indexes | Warning | | **Missing index candidates** | Tables with seq-scan activity suggesting a missing index. Includes top contributing queries from pg_stat_statements | Warning | **Snapshot — point-in-time incidents.** | Check | What it finds | Severity | | ---------------------------- | ----------------------------------------------------------------------- | ---------------- | | **Long-running connections** | Client backends idle-in-transaction or running a query past a threshold | Warning/Critical | | **Blocking queries** | Queries currently blocking other queries via held locks | Warning/Critical | ### Operational-context findings These are facts about the database whose remedies live outside Plain today (`ANALYZE`, `VACUUM`, `REINDEX`, autovacuum server tuning). They're surfaced so agents and humans can interpret findings correctly, but the CLI renders them as context rather than alarming warnings — the user can't express the fix in their model code. (In JSON output each finding still carries `status: "warning"`; the `tier: "operational"` field is what distinguishes it.) Each finding still carries the exact SQL in its suggestion for anyone who wants to act. | Finding | What it reports | | ------------------- | ------------------------------------------------------------------------------------------ | | **Stats freshness** | Tables whose planner statistics are missing (never analyzed) or stale | | **Vacuum health** | Tables with >10% dead tuples | | **Table bloat** | Tables with significant estimated wasted space (≥100 MB AND ≥25% bloat, ioguix estimator) | | **Index bloat** | btree indexes with significant estimated wasted space (≥100 MB AND ≥30%, ioguix estimator) | Now that per-table autovacuum / fillfactor knobs are expressible in [storage parameters](#storage-parameters) on `model_options`, these findings may graduate back to the warning tier in a future release — the remedy is now in code. ### Informational context Alongside checks, `diagnose` surfaces context an agent or human may want to read but that isn't actionable on its own: - **Cache hit ratio**, **Index hit ratio** — buffer hit rates (volatile after restart; not a warning in themselves) - **XID wraparound** — transaction ID age as a percent of the 2B limit. Autovacuum usually keeps this low; long-running transactions can block the freeze process even on managed Postgres - **Connection saturation** — active/max connections at this moment - **Stats reset** — when cumulative stats were last reset (affects the confidence of operational checks) - **pg_stat_statements** — whether the extension is installed ### Cross-check caveats Findings whose confidence depends on another check are tagged with a caveat. For example: - `unused_indexes` on a table flagged by `stats_freshness` → caveat: "this table has never been analyzed — the planner may not yet use this index; re-check after running ANALYZE" - `missing_index_candidates` on a never-analyzed table → caveat: "planner statistics are absent — running ANALYZE may change query plans and make this finding moot" This prevents false confidence: dropping an "unused" index on a never-analyzed table is often the wrong move. ### Model-aware suggestions Findings on app-owned tables include the Plain model class and its source file. Suggestions reference the exact edit point: ``` app/processing/models.py :: ProcessingResult — Add an Index on ["is_processing"] to the model, then run plain postgres sync ``` This closes the loop from detection to fix — agents can draft the model edit without guessing. ### App vs package issues Each finding is tagged with its **source**: - **App** — your code, fully actionable - **Package** — owned by an installed package (e.g., `plain-jobs`). These appear in the footer summary by default; use `--all` to see details - **Unmanaged** — tables not managed by any Plain model. The suggestion includes exact SQL to run ### Production usage Run diagnose against your **production database** to get meaningful stats. On Heroku: ```bash heroku run -a your-app "plain postgres diagnose --json" ``` The `--json` flag must be quoted so Heroku passes it through to the command. Cumulative-stat checks (`stats_freshness`, `vacuum_health`, `unused_indexes`, `missing_index_candidates`, `table_bloat`, `index_bloat`) need cumulative stat history after the last reset to be reliable. Check the `stats_reset` informational to see how much history you have. (Note: this list spans both the warning and operational tiers — the common thread is that all five depend on counters that `pg_stat_reset()` wipes.) ### Preflight checks Two related checks run automatically during `uv run plain preflight` (and `uv run plain check`): - **`postgres.missing_fk_indexes`** — warns about FK fields without index coverage in your model definitions - **`postgres.duplicate_indexes`** — warns about prefix-redundant indexes in your model constraints These are static, code-level checks that catch issues before you deploy. The `diagnose` command complements them with runtime stats from the actual database. ### What diagnose deliberately doesn't do - **LLM-powered column recommendations for missing indexes** — `missing_index_candidates` shows the culprit queries and lets you decide. For precise column-level suggestions, use a platform tool (PlanetScale Insights, Dexter, pg_qualstats + hypopg). - **Historical trending** — `diagnose` is stateless; it reports on the current state of cumulative stats. Continuous monitoring is out of scope. - **Niche server checks** (WAL bloat, replication slot age, etc.) — better covered by your Postgres provider's monitoring or a dedicated tool; users on self-hosted setups that need them typically have their own tooling. ## Settings The connection is configured with a single URL (`POSTGRES_URL`). `DATABASE_URL` is read as a platform-compat fallback. Set the URL to `none` to explicitly disable the database (e.g. during Docker image builds). | Setting | Type | Default | Env var | | ---------------------------------------- | ------------- | ----------------------- | ---------------------------------------------- | | `POSTGRES_URL` | `Secret[str]` | `$DATABASE_URL` or `""` | `PLAIN_POSTGRES_URL` | | `POSTGRES_MANAGEMENT_URL` | `Secret[str]` | `""` | `PLAIN_POSTGRES_MANAGEMENT_URL` | | `POSTGRES_POOL_MIN_SIZE` | `int` | `4` | `PLAIN_POSTGRES_POOL_MIN_SIZE` | | `POSTGRES_POOL_MAX_SIZE` | `int` | `20` | `PLAIN_POSTGRES_POOL_MAX_SIZE` | | `POSTGRES_POOL_MAX_LIFETIME` | `float` | `3600.0` | `PLAIN_POSTGRES_POOL_MAX_LIFETIME` | | `POSTGRES_POOL_TIMEOUT` | `float` | `30.0` | `PLAIN_POSTGRES_POOL_TIMEOUT` | | `POSTGRES_MIGRATION_LOCK_TIMEOUT` | `str` | `"3s"` | `PLAIN_POSTGRES_MIGRATION_LOCK_TIMEOUT` | | `POSTGRES_MIGRATION_STATEMENT_TIMEOUT` | `str` | `"3s"` | `PLAIN_POSTGRES_MIGRATION_STATEMENT_TIMEOUT` | | `POSTGRES_CONVERGENCE_LOCK_TIMEOUT` | `str` | `"3s"` | `PLAIN_POSTGRES_CONVERGENCE_LOCK_TIMEOUT` | | `POSTGRES_CONVERGENCE_STATEMENT_TIMEOUT` | `str` | `"3s"` | `PLAIN_POSTGRES_CONVERGENCE_STATEMENT_TIMEOUT` | See [`default_settings.py`](./default_settings.py) for more details. ## FAQs #### How do I add a field to an existing model? Add the field to your model class, then run `plain migrations create` to create a migration. If the field is required (no `default=` and not `allow_null=True`), the autodetector refuses to generate the migration, since there's no value to seed existing rows with. You have two options: 1. Declare a `default=` on the field so the new column has a value for existing rows. 2. Add the field with `allow_null=True`, scaffold a data migration with `plain migrations create --empty --name backfill_` to populate existing rows, then remove `allow_null=True` from the field — convergence applies `NOT NULL` on the next `postgres sync`. #### How do I make an existing column `NOT NULL`? Edit the field to remove `allow_null=True`. `plain migrations create` won't detect a schema change — nullability is managed by convergence. Run `plain postgres sync`: - If the column has no `NULL` rows, convergence applies the change with a non-blocking `CHECK NOT VALID` + `VALIDATE` + `SET NOT NULL` pattern. - If `NULL` rows exist, convergence blocks and prints the table and column to backfill. Scaffold a data migration with `plain migrations create --empty --name backfill_`, write the backfill, and run `postgres sync` again. #### How do I create a unique constraint on multiple fields? Use `UniqueConstraint` in your model's `model_options`: ```python model_options = postgres.Options( constraints=[ postgres.UniqueConstraint(fields=["email", "organization"], name="unique_email_per_org"), ], ) ``` #### Can I use multiple databases? Currently, Plain supports a single database connection per application. For applications requiring multiple databases, you can use raw SQL with separate connection management. ## Installation Install the `plain.postgres` package from [PyPI](https://pypi.org/project/plain.postgres/). You must also pick a `psycopg` implementation — `plain.postgres` depends on `psycopg` but does not pick one for you, so installing `plain.postgres` alone will not be able to connect. ```bash uv add plain.postgres psycopg[binary] # Pre-built wheels, easiest for local development # or uv add plain.postgres psycopg[c] # Compiled against your system's libpq, recommended for production ``` Then add to your `INSTALLED_PACKAGES` and register [`DatabaseConnectionMiddleware`](#middleware) so pooled connections are returned at the end of each request: ```python # app/settings.py INSTALLED_PACKAGES = [ ... "plain.postgres", ] MIDDLEWARE = [ "plain.postgres.DatabaseConnectionMiddleware", ... ] ```