Worker
Process background jobs with a database-driven worker.
from plain.worker import Job
from plain.mail import send_mail
# Create a new job class
class WelcomeUserJob(Job):
def __init__(self, user):
self.user = user
def run(self):
send_mail(
subject="Welcome!",
message=f"Hello from Plain, {self.user}",
from_email="[email protected]",
recipient_list=[self.user.email],
)
# Instantiate a job and send it to the worker
user = User.objects.get(pk=1)
WelcomeUserJob(user).run_in_worker()
The worker process is run separately using plain worker run
.
Staff
Job history
Scheduled jobs
Monitoring
1from plain.logs import app_logger
2
3
4class AppLoggerMiddleware:
5 def __init__(self, run_job):
6 self.run_job = run_job
7
8 def __call__(self, job):
9 app_logger.kv.context["job_request_uuid"] = str(job.job_request_uuid)
10 app_logger.kv.context["job_uuid"] = str(job.uuid)
11
12 job_result = self.run_job(job)
13
14 app_logger.kv.context.pop("job_request_uuid", None)
15 app_logger.kv.context.pop("job_uuid", None)
16
17 return job_result