1# Generated by Plain 0.140.0 on 2026-05-06 04:47
2
3import datetime
4import uuid
5from typing import Any
6
7from plain import postgres
8from plain.postgres import migrations
9
10# Sentinel worker_id stamped on JobProcess rows that pre-date the heartbeat-
11# based rescue mechanism. The accompanying WorkerHeartbeat row is given a
12# 24-hour grace period (last_heartbeat_at = now + 24h) so any legitimately-
13# still-running pre-upgrade jobs have time to finish naturally before rescue
14# converts orphans to LOST. After 24h the synthetic heartbeat goes stale and
15# rescue cleans up anything that didn't finish — matching the previous
16# JOBS_TIMEOUT default. Tune the grace period in this migration if your
17# longest legitimate pre-upgrade runtime exceeds 24h.
18#
19# Rolling-deploy safety: after the backfill runs, this migration alters
20# worker_id to NOT NULL. Any old worker still trying to convert a JobRequest
21# without supplying a worker_id will hit the constraint and fail loudly,
22# leaving the JobRequest in queue for a new worker to pick up — strictly
23# better than silently inserting unrescuable NULL rows.
24_PRE_HEARTBEAT_WORKER_ID = uuid.UUID("00000000-0000-0000-0000-000000000000")
25_UPGRADE_GRACE = datetime.timedelta(hours=24)
26
27
28def _backfill_unstamped_job_processes(apps: Any, schema_editor: Any) -> None:
29 JobProcess = apps.get_model("plainjobs", "JobProcess")
30 WorkerHeartbeat = apps.get_model("plainjobs", "WorkerHeartbeat")
31
32 if not JobProcess.query.filter(worker_id__isnull=True).exists():
33 return
34
35 now = datetime.datetime.now(datetime.UTC)
36 WorkerHeartbeat.query.create(
37 worker_id=_PRE_HEARTBEAT_WORKER_ID,
38 hostname="<pre-heartbeat-upgrade>",
39 pid=0,
40 queues=[],
41 started_at=now,
42 last_heartbeat_at=now + _UPGRADE_GRACE,
43 )
44 JobProcess.query.filter(worker_id__isnull=True).update(
45 worker_id=_PRE_HEARTBEAT_WORKER_ID
46 )
47
48
49class Migration(migrations.Migration):
50 dependencies = [
51 ("plainjobs", "0011_jobprocess_requested_at_jobresult_requested_at"),
52 ]
53
54 operations = [
55 migrations.CreateModel(
56 name="WorkerHeartbeat",
57 fields=[
58 ("id", postgres.PrimaryKeyField()),
59 ("hostname", postgres.TextField(max_length=255)),
60 ("last_heartbeat_at", postgres.DateTimeField()),
61 ("pid", postgres.IntegerField()),
62 ("queues", postgres.JSONField()),
63 ("started_at", postgres.DateTimeField(create_now=True)),
64 ("worker_id", postgres.UUIDField()),
65 ],
66 options={
67 "ordering": ["-last_heartbeat_at"],
68 },
69 ),
70 migrations.AddField(
71 model_name="jobprocess",
72 name="worker_id",
73 field=postgres.UUIDField(allow_null=True, required=False),
74 ),
75 migrations.RunPython(_backfill_unstamped_job_processes),
76 migrations.AlterField(
77 model_name="jobprocess",
78 name="worker_id",
79 field=postgres.UUIDField(),
80 ),
81 ]