v0.140.1
  1from __future__ import annotations
  2
  3import logging
  4import threading
  5
  6from opentelemetry import _logs, metrics, trace
  7from opentelemetry._logs._internal import ProxyLoggerProvider
  8from opentelemetry.exporter.otlp.proto.http import Compression
  9from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
 10from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
 11from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
 12from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
 13from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
 14from opentelemetry.sdk.metrics import Counter, Histogram, MeterProvider, UpDownCounter
 15from opentelemetry.sdk.metrics.export import (
 16    AggregationTemporality,
 17    PeriodicExportingMetricReader,
 18)
 19from opentelemetry.sdk.resources import Resource
 20from opentelemetry.sdk.trace import TracerProvider, sampling
 21from opentelemetry.sdk.trace.export import BatchSpanProcessor
 22from opentelemetry.semconv.attributes import service_attributes
 23
 24from plain.packages import PackageConfig, register_config
 25from plain.runtime import settings
 26
 27
 28class _ExporterLoopFilter(logging.Filter):
 29    """Block records that would feed back into OTLP export under failure.
 30
 31    Two sources to suppress:
 32
 33    1. The OpenTelemetry SDK's own namespace — its `failed to export`
 34       warnings would re-queue indefinitely.
 35    2. Anything emitted from inside an OTel SDK exporter thread (e.g.
 36       urllib3 connection errors raised by the OTLP HTTP exporter). The
 37       SDK names every exporter thread with an `Otel` prefix —
 38       `OtelBatch{Span,Log}RecordProcessor` for traces/logs, and
 39       `OtelPeriodicExportingMetricReader` for metrics — so matching on
 40       that prefix catches all three. Scoping to the thread, not the
 41       urllib3 namespace, lets user-code urllib3 logs flow through.
 42
 43    See `opentelemetry/sdk/_shared_internal/__init__.py` (BatchProcessor)
 44    and `opentelemetry/sdk/metrics/export/__init__.py`
 45    (PeriodicExportingMetricReader) for the thread names.
 46    """
 47
 48    def filter(self, record: logging.LogRecord) -> bool:
 49        name = record.name
 50        if name == "opentelemetry" or name.startswith("opentelemetry."):
 51            return False
 52        if threading.current_thread().name.startswith("Otel"):
 53            return False
 54        return True
 55
 56
 57@register_config
 58class Config(PackageConfig):
 59    package_label = "plainconnect"
 60
 61    def ready(self) -> None:
 62        if not settings.CONNECT_EXPORT_ENABLED or not settings.CONNECT_EXPORT_TOKEN:
 63            return
 64
 65        # Don't capture per-batch OTLP export failures as Sentry events. The OTel
 66        # SDK's exporters log "Failed to export X batch" at ERROR after retries
 67        # are exhausted, which Sentry's LoggingIntegration would otherwise turn
 68        # into an issue per app per incident — noise the app owner can't act on
 69        # (network/edge timeouts, ingest backend hiccups). The records still
 70        # flow to console/file/etc. — only the Sentry capture is suppressed.
 71        # Mirrors Sentry SDK's own self-protection for `sentry_sdk.errors` and
 72        # `urllib3.connectionpool` in sentry_sdk/integrations/logging.py.
 73        try:
 74            import sentry_sdk.integrations.logging as _sentry_log  # ty: ignore[unresolved-import]
 75        except ImportError:
 76            pass
 77        else:
 78            for name in (
 79                "opentelemetry.exporter.otlp.proto.http.trace_exporter",
 80                "opentelemetry.exporter.otlp.proto.http.metric_exporter",
 81                "opentelemetry.exporter.otlp.proto.http._log_exporter",
 82                "opentelemetry.sdk._shared_internal",
 83                "opentelemetry.sdk._logs._internal.export",
 84                "opentelemetry.sdk.metrics._internal.export",
 85            ):
 86                _sentry_log.ignore_logger(name)
 87
 88        resource = Resource.create(
 89            {
 90                service_attributes.SERVICE_NAME: settings.NAME,
 91                service_attributes.SERVICE_VERSION: settings.VERSION,
 92            }
 93        )
 94
 95        export_url = str(settings.CONNECT_EXPORT_URL).rstrip("/")
 96        headers = {"Authorization": f"Bearer {settings.CONNECT_EXPORT_TOKEN}"}
 97
 98        # Traces
 99        current_provider = trace.get_tracer_provider()
100        if current_provider and not isinstance(
101            current_provider, trace.ProxyTracerProvider
102        ):
103            raise RuntimeError(
104                "A tracer provider already exists."
105                " plain.connect must be listed before plain.observer in INSTALLED_PACKAGES."
106            )
107
108        span_exporter = OTLPSpanExporter(
109            endpoint=f"{export_url}/v1/traces",
110            headers=headers,
111            timeout=30,
112            compression=Compression.Gzip,
113        )
114        sampler = sampling.TraceIdRatioBased(settings.CONNECT_TRACE_SAMPLE_RATE)
115        tracer_provider = TracerProvider(sampler=sampler, resource=resource)
116        tracer_provider.add_span_processor(BatchSpanProcessor(span_exporter))
117        trace.set_tracer_provider(tracer_provider)
118
119        # Metrics — use delta temporality so each export contains only the
120        # increment since the last export, not a running total.  This makes
121        # server-side aggregation (sum/avg in ClickHouse) straightforward.
122        metric_exporter = OTLPMetricExporter(
123            endpoint=f"{export_url}/v1/metrics",
124            headers=headers,
125            timeout=30,
126            compression=Compression.Gzip,
127            preferred_temporality={
128                Counter: AggregationTemporality.DELTA,
129                Histogram: AggregationTemporality.DELTA,
130                UpDownCounter: AggregationTemporality.DELTA,
131            },
132        )
133        reader = PeriodicExportingMetricReader(metric_exporter)
134        meter_provider = MeterProvider(metric_readers=[reader], resource=resource)
135        metrics.set_meter_provider(meter_provider)
136
137        # Logs
138        if settings.CONNECT_EXPORT_LOGS:
139            current_logger_provider = _logs.get_logger_provider()
140            if current_logger_provider and not isinstance(
141                current_logger_provider, ProxyLoggerProvider
142            ):
143                raise RuntimeError(
144                    "A logger provider already exists."
145                    " plain.connect must be listed before plain.observer in INSTALLED_PACKAGES."
146                )
147
148            # Accept either a level name ("INFO") or an int (20).
149            raw_level = settings.CONNECT_LOG_LEVEL
150            if isinstance(raw_level, str):
151                log_level = logging.getLevelName(raw_level.upper())
152                if not isinstance(log_level, int):
153                    raise ValueError(
154                        f"CONNECT_LOG_LEVEL={raw_level!r} is not a valid logging level."
155                    )
156            else:
157                log_level = int(raw_level)
158
159            log_exporter = OTLPLogExporter(
160                endpoint=f"{export_url}/v1/logs",
161                headers=headers,
162                timeout=30,
163                compression=Compression.Gzip,
164            )
165            logger_provider = LoggerProvider(resource=resource)
166            logger_provider.add_log_record_processor(
167                BatchLogRecordProcessor(log_exporter)
168            )
169            _logs.set_logger_provider(logger_provider)
170
171            handler = LoggingHandler(level=log_level, logger_provider=logger_provider)
172            # Filter on the handler (not the loggers) so OTLP exporter and
173            # HTTP-client diagnostics still reach the app's console/file
174            # handlers — we only stop them from being re-exported, which
175            # would loop under failure.
176            handler.addFilter(_ExporterLoopFilter())
177
178            # Plain's `configure_logging` sets `plain` and `app` to
179            # propagate=False, so attaching only to root would miss them.
180            # Attach to root for everything else (user `getLogger(__name__)`,
181            # third-party libs).
182            for name in ("", "plain", "app"):
183                logging.getLogger(name).addHandler(handler)
184
185            # Root defaults to WARNING. A library that uses
186            # `logging.getLogger(__name__)` without setting its own level
187            # inherits root's effective level — so INFO/DEBUG records get
188            # dropped before the OTLP handler runs. Widen root just enough
189            # to let CONNECT_LOG_LEVEL through; never narrow it.
190            # NOTSET (0) on root already means "all messages processed",
191            # so leave it alone in that case.
192            root = logging.getLogger()
193            if root.level != logging.NOTSET and root.level > log_level:
194                root.setLevel(log_level)