v0.146.0
 1# Generated by Plain 0.140.0 on 2026-05-06 04:47
 2
 3import datetime
 4import uuid
 5from typing import Any
 6
 7from plain import postgres
 8from plain.postgres import migrations
 9
10# Sentinel worker_id stamped on JobProcess rows that pre-date the heartbeat-
11# based rescue mechanism. The accompanying WorkerHeartbeat row is given a
12# 24-hour grace period (last_heartbeat_at = now + 24h) so any legitimately-
13# still-running pre-upgrade jobs have time to finish naturally before rescue
14# converts orphans to LOST. After 24h the synthetic heartbeat goes stale and
15# rescue cleans up anything that didn't finish — matching the previous
16# JOBS_TIMEOUT default. Tune the grace period in this migration if your
17# longest legitimate pre-upgrade runtime exceeds 24h.
18#
19# Rolling-deploy safety: after the backfill runs, this migration alters
20# worker_id to NOT NULL. Any old worker still trying to convert a JobRequest
21# without supplying a worker_id will hit the constraint and fail loudly,
22# leaving the JobRequest in queue for a new worker to pick up — strictly
23# better than silently inserting unrescuable NULL rows.
24_PRE_HEARTBEAT_WORKER_ID = uuid.UUID("00000000-0000-0000-0000-000000000000")
25_UPGRADE_GRACE = datetime.timedelta(hours=24)
26
27
28def _backfill_unstamped_job_processes(apps: Any, schema_editor: Any) -> None:
29    JobProcess = apps.get_model("plainjobs", "JobProcess")
30    WorkerHeartbeat = apps.get_model("plainjobs", "WorkerHeartbeat")
31
32    if not JobProcess.query.filter(worker_id__isnull=True).exists():
33        return
34
35    now = datetime.datetime.now(datetime.UTC)
36    WorkerHeartbeat.query.create(
37        worker_id=_PRE_HEARTBEAT_WORKER_ID,
38        hostname="<pre-heartbeat-upgrade>",
39        pid=0,
40        queues=[],
41        started_at=now,
42        last_heartbeat_at=now + _UPGRADE_GRACE,
43    )
44    JobProcess.query.filter(worker_id__isnull=True).update(
45        worker_id=_PRE_HEARTBEAT_WORKER_ID
46    )
47
48
49class Migration(migrations.Migration):
50    dependencies = [
51        ("plainjobs", "0011_jobprocess_requested_at_jobresult_requested_at"),
52    ]
53
54    operations = [
55        migrations.CreateModel(
56            name="WorkerHeartbeat",
57            fields=[
58                ("id", postgres.PrimaryKeyField()),
59                ("hostname", postgres.TextField(max_length=255)),
60                ("last_heartbeat_at", postgres.DateTimeField()),
61                ("pid", postgres.IntegerField()),
62                ("queues", postgres.JSONField()),
63                ("started_at", postgres.DateTimeField(create_now=True)),
64                ("worker_id", postgres.UUIDField()),
65            ],
66            options={
67                "ordering": ["-last_heartbeat_at"],
68            },
69        ),
70        migrations.AddField(
71            model_name="jobprocess",
72            name="worker_id",
73            field=postgres.UUIDField(allow_null=True, required=False),
74        ),
75        migrations.RunPython(_backfill_unstamped_job_processes),
76        migrations.AlterField(
77            model_name="jobprocess",
78            name="worker_id",
79            field=postgres.UUIDField(),
80        ),
81    ]