1from __future__ import annotations
2
3import logging
4import threading
5
6from opentelemetry import _logs, metrics, trace
7from opentelemetry._logs._internal import ProxyLoggerProvider
8from opentelemetry.exporter.otlp.proto.http import Compression
9from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
10from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
11from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
12from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
13from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
14from opentelemetry.sdk.metrics import Counter, Histogram, MeterProvider, UpDownCounter
15from opentelemetry.sdk.metrics.export import (
16 AggregationTemporality,
17 PeriodicExportingMetricReader,
18)
19from opentelemetry.sdk.resources import Resource
20from opentelemetry.sdk.trace import TracerProvider, sampling
21from opentelemetry.sdk.trace.export import BatchSpanProcessor
22from opentelemetry.semconv.attributes import service_attributes
23
24from plain.packages import PackageConfig, register_config
25from plain.runtime import settings
26
27
28class _ExporterLoopFilter(logging.Filter):
29 """Block records that would feed back into OTLP export under failure.
30
31 Two sources to suppress:
32
33 1. The OpenTelemetry SDK's own namespace — its `failed to export`
34 warnings would re-queue indefinitely.
35 2. Anything emitted from inside an OTel SDK exporter thread (e.g.
36 urllib3 connection errors raised by the OTLP HTTP exporter). The
37 SDK names every exporter thread with an `Otel` prefix —
38 `OtelBatch{Span,Log}RecordProcessor` for traces/logs, and
39 `OtelPeriodicExportingMetricReader` for metrics — so matching on
40 that prefix catches all three. Scoping to the thread, not the
41 urllib3 namespace, lets user-code urllib3 logs flow through.
42
43 See `opentelemetry/sdk/_shared_internal/__init__.py` (BatchProcessor)
44 and `opentelemetry/sdk/metrics/export/__init__.py`
45 (PeriodicExportingMetricReader) for the thread names.
46 """
47
48 def filter(self, record: logging.LogRecord) -> bool:
49 name = record.name
50 if name == "opentelemetry" or name.startswith("opentelemetry."):
51 return False
52 if threading.current_thread().name.startswith("Otel"):
53 return False
54 return True
55
56
57@register_config
58class Config(PackageConfig):
59 package_label = "plainconnect"
60
61 def ready(self) -> None:
62 if not settings.CONNECT_EXPORT_ENABLED or not settings.CONNECT_EXPORT_TOKEN:
63 return
64
65 # Don't capture per-batch OTLP export failures as Sentry events. The OTel
66 # SDK's exporters log "Failed to export X batch" at ERROR after retries
67 # are exhausted, which Sentry's LoggingIntegration would otherwise turn
68 # into an issue per app per incident — noise the app owner can't act on
69 # (network/edge timeouts, ingest backend hiccups). The records still
70 # flow to console/file/etc. — only the Sentry capture is suppressed.
71 # Mirrors Sentry SDK's own self-protection for `sentry_sdk.errors` and
72 # `urllib3.connectionpool` in sentry_sdk/integrations/logging.py.
73 try:
74 import sentry_sdk.integrations.logging as _sentry_log # ty: ignore[unresolved-import]
75 except ImportError:
76 pass
77 else:
78 for name in (
79 "opentelemetry.exporter.otlp.proto.http.trace_exporter",
80 "opentelemetry.exporter.otlp.proto.http.metric_exporter",
81 "opentelemetry.exporter.otlp.proto.http._log_exporter",
82 "opentelemetry.sdk._shared_internal",
83 "opentelemetry.sdk._logs._internal.export",
84 "opentelemetry.sdk.metrics._internal.export",
85 ):
86 _sentry_log.ignore_logger(name)
87
88 resource = Resource.create(
89 {
90 service_attributes.SERVICE_NAME: settings.NAME,
91 service_attributes.SERVICE_VERSION: settings.VERSION,
92 }
93 )
94
95 export_url = str(settings.CONNECT_EXPORT_URL).rstrip("/")
96 headers = {"Authorization": f"Bearer {settings.CONNECT_EXPORT_TOKEN}"}
97
98 # Traces
99 current_provider = trace.get_tracer_provider()
100 if current_provider and not isinstance(
101 current_provider, trace.ProxyTracerProvider
102 ):
103 raise RuntimeError(
104 "A tracer provider already exists."
105 " plain.connect must be listed before plain.observer in INSTALLED_PACKAGES."
106 )
107
108 span_exporter = OTLPSpanExporter(
109 endpoint=f"{export_url}/v1/traces",
110 headers=headers,
111 timeout=30,
112 compression=Compression.Gzip,
113 )
114 sampler = sampling.TraceIdRatioBased(settings.CONNECT_TRACE_SAMPLE_RATE)
115 tracer_provider = TracerProvider(sampler=sampler, resource=resource)
116 tracer_provider.add_span_processor(BatchSpanProcessor(span_exporter))
117 trace.set_tracer_provider(tracer_provider)
118
119 # Metrics — use delta temporality so each export contains only the
120 # increment since the last export, not a running total. This makes
121 # server-side aggregation (sum/avg in ClickHouse) straightforward.
122 metric_exporter = OTLPMetricExporter(
123 endpoint=f"{export_url}/v1/metrics",
124 headers=headers,
125 timeout=30,
126 compression=Compression.Gzip,
127 preferred_temporality={
128 Counter: AggregationTemporality.DELTA,
129 Histogram: AggregationTemporality.DELTA,
130 UpDownCounter: AggregationTemporality.DELTA,
131 },
132 )
133 reader = PeriodicExportingMetricReader(metric_exporter)
134 meter_provider = MeterProvider(metric_readers=[reader], resource=resource)
135 metrics.set_meter_provider(meter_provider)
136
137 # Logs
138 if settings.CONNECT_EXPORT_LOGS:
139 current_logger_provider = _logs.get_logger_provider()
140 if current_logger_provider and not isinstance(
141 current_logger_provider, ProxyLoggerProvider
142 ):
143 raise RuntimeError(
144 "A logger provider already exists."
145 " plain.connect must be listed before plain.observer in INSTALLED_PACKAGES."
146 )
147
148 # Accept either a level name ("INFO") or an int (20).
149 raw_level = settings.CONNECT_LOG_LEVEL
150 if isinstance(raw_level, str):
151 log_level = logging.getLevelName(raw_level.upper())
152 if not isinstance(log_level, int):
153 raise ValueError(
154 f"CONNECT_LOG_LEVEL={raw_level!r} is not a valid logging level."
155 )
156 else:
157 log_level = int(raw_level)
158
159 log_exporter = OTLPLogExporter(
160 endpoint=f"{export_url}/v1/logs",
161 headers=headers,
162 timeout=30,
163 compression=Compression.Gzip,
164 )
165 logger_provider = LoggerProvider(resource=resource)
166 logger_provider.add_log_record_processor(
167 BatchLogRecordProcessor(log_exporter)
168 )
169 _logs.set_logger_provider(logger_provider)
170
171 handler = LoggingHandler(level=log_level, logger_provider=logger_provider)
172 # Filter on the handler (not the loggers) so OTLP exporter and
173 # HTTP-client diagnostics still reach the app's console/file
174 # handlers — we only stop them from being re-exported, which
175 # would loop under failure.
176 handler.addFilter(_ExporterLoopFilter())
177
178 # Plain's `configure_logging` sets `plain` and `app` to
179 # propagate=False, so attaching only to root would miss them.
180 # Attach to root for everything else (user `getLogger(__name__)`,
181 # third-party libs).
182 for name in ("", "plain", "app"):
183 logging.getLogger(name).addHandler(handler)
184
185 # Root defaults to WARNING. A library that uses
186 # `logging.getLogger(__name__)` without setting its own level
187 # inherits root's effective level — so INFO/DEBUG records get
188 # dropped before the OTLP handler runs. Widen root just enough
189 # to let CONNECT_LOG_LEVEL through; never narrow it.
190 # NOTSET (0) on root already means "all messages processed",
191 # so leave it alone in that case.
192 root = logging.getLogger()
193 if root.level != logging.NOTSET and root.level > log_level:
194 root.setLevel(log_level)