Plain is headed towards 1.0! Subscribe for development updates →

 1"""Lock implementations for job enqueueing."""
 2
 3from __future__ import annotations
 4
 5import hashlib
 6from collections.abc import Iterator
 7from contextlib import contextmanager
 8from typing import TYPE_CHECKING
 9
10if TYPE_CHECKING:
11    from .jobs import Job
12
13
14@contextmanager
15def postgres_advisory_lock(job: Job, concurrency_key: str) -> Iterator[None]:
16    """
17    PostgreSQL advisory lock context manager.
18
19    Generates lock key from job class + concurrency_key, acquires advisory lock.
20    Uses pg_advisory_xact_lock which is automatically released when the
21    transaction commits or rolls back. No explicit release needed.
22
23    Args:
24        job: Job instance (used to get job class name)
25        concurrency_key: Job grouping key
26    """
27    from plain.jobs.registry import jobs_registry
28    from plain.models.db import db_connection
29
30    # Generate lock key from job class + concurrency_key
31    job_class_name = jobs_registry.get_job_class_name(job.__class__)
32    lock_key = f"{job_class_name}::{concurrency_key}"
33
34    # Convert lock key to int64 for PostgreSQL advisory lock
35    hash_bytes = hashlib.md5(lock_key.encode()).digest()
36    lock_id = int.from_bytes(hash_bytes[:8], "big", signed=True)
37
38    # Acquire advisory lock (auto-released on transaction end)
39    with db_connection.cursor() as cursor:
40        cursor.execute("SELECT pg_advisory_xact_lock(%s)", [lock_id])
41
42    yield  # Lock is held here