1"""Lock implementations for job enqueueing.""" 2 3from__future__importannotations 4 5importhashlib 6fromcollections.abcimportIterator 7fromcontextlibimportcontextmanager 8fromtypingimportTYPE_CHECKING 910ifTYPE_CHECKING:11from.jobsimportJob121314@contextmanager15defpostgres_advisory_lock(job:Job,concurrency_key:str)->Iterator[None]:16"""17 PostgreSQL advisory lock context manager.1819 Generates lock key from job class + concurrency_key, acquires advisory lock.20 Uses pg_advisory_xact_lock which is automatically released when the21 transaction commits or rolls back. No explicit release needed.2223 Args:24 job: Job instance (used to get job class name)25 concurrency_key: Job grouping key26 """27fromplain.jobs.registryimportjobs_registry28fromplain.models.dbimportdb_connection2930# Generate lock key from job class + concurrency_key31job_class_name=jobs_registry.get_job_class_name(job.__class__)32lock_key=f"{job_class_name}::{concurrency_key}"3334# Convert lock key to int64 for PostgreSQL advisory lock35hash_bytes=hashlib.md5(lock_key.encode()).digest()36lock_id=int.from_bytes(hash_bytes[:8],"big",signed=True)3738# Acquire advisory lock (auto-released on transaction end)39withdb_connection.cursor()ascursor:40cursor.execute("SELECT pg_advisory_xact_lock(%s)",[lock_id])4142yield# Lock is held here