1"""Lock implementations for job enqueueing."""
2
3from __future__ import annotations
4
5import hashlib
6from collections.abc import Iterator
7from contextlib import contextmanager
8from typing import TYPE_CHECKING
9
10if TYPE_CHECKING:
11 from .jobs import Job
12
13
14@contextmanager
15def postgres_advisory_lock(job: Job, concurrency_key: str) -> Iterator[None]:
16 """
17 PostgreSQL advisory lock context manager.
18
19 Generates lock key from job class + concurrency_key, acquires advisory lock.
20 Uses pg_advisory_xact_lock which is automatically released when the
21 transaction commits or rolls back. No explicit release needed.
22
23 Args:
24 job: Job instance (used to get job class name)
25 concurrency_key: Job grouping key
26 """
27 from plain.jobs.registry import jobs_registry
28 from plain.postgres.db import get_connection
29
30 # Generate lock key from job class + concurrency_key
31 job_class_name = jobs_registry.get_job_class_name(job.__class__)
32 lock_key = f"{job_class_name}::{concurrency_key}"
33
34 # Convert lock key to int64 for PostgreSQL advisory lock
35 hash_bytes = hashlib.md5(lock_key.encode()).digest()
36 lock_id = int.from_bytes(hash_bytes[:8], "big", signed=True)
37
38 # Acquire advisory lock (auto-released on transaction end)
39 with get_connection().cursor() as cursor:
40 cursor.execute("SELECT pg_advisory_xact_lock(%s)", [lock_id])
41
42 yield # Lock is held here