1from __future__ import annotations
2
3import importlib.metadata
4from typing import Any
5
6from opentelemetry import metrics, trace
7from opentelemetry.semconv._incubating.metrics.messaging_metrics import (
8 create_messaging_client_consumed_messages,
9 create_messaging_client_operation_duration,
10 create_messaging_client_sent_messages,
11)
12from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
13
14from plain.utils.otel import format_exception_type
15
16try:
17 _package_version = importlib.metadata.version("plain.jobs")
18except importlib.metadata.PackageNotFoundError:
19 _package_version = "dev"
20
21tracer = trace.get_tracer("plain.jobs", _package_version)
22meter = metrics.get_meter("plain.jobs", version=_package_version)
23
24sent_messages_counter = create_messaging_client_sent_messages(meter)
25consumed_messages_counter = create_messaging_client_consumed_messages(meter)
26operation_duration_histogram = create_messaging_client_operation_duration(meter)
27
28
29def record_span_error(
30 span: trace.Span,
31 exc: BaseException,
32 metric_attributes: dict[str, Any],
33) -> None:
34 error_type = format_exception_type(exc)
35 span.record_exception(exc)
36 span.set_status(trace.StatusCode.ERROR)
37 span.set_attribute(ERROR_TYPE, error_type)
38 metric_attributes[ERROR_TYPE] = error_type