v0.146.0
 1from __future__ import annotations
 2
 3import logging
 4import threading
 5from datetime import UTC, datetime
 6from typing import TypedDict
 7
 8from opentelemetry import trace
 9from opentelemetry.trace import format_span_id, format_trace_id
10
11from .core import PERSISTING_MODES
12from .otel import get_observer_span_processor
13
14
15class ObserverLogEntry(TypedDict):
16    message: str
17    level: str
18    span_id: str
19    timestamp: datetime
20
21
22class ObserverLogHandler(logging.Handler):
23    """Custom logging handler that captures logs during active traces when observer is enabled."""
24
25    def __init__(self, level: int = logging.NOTSET) -> None:
26        super().__init__(level)
27        self._logs_lock = threading.Lock()
28        self._trace_logs: dict[str, list[ObserverLogEntry]] = {}
29
30    def emit(self, record: logging.LogRecord) -> None:
31        """Emit a log record if we're in an active observer trace."""
32        try:
33            # Get the current span to determine if we're in an active trace
34            current_span = trace.get_current_span()
35            if not current_span or not current_span.get_span_context().is_valid:
36                return
37
38            # Get trace and span IDs
39            trace_id = f"0x{format_trace_id(current_span.get_span_context().trace_id)}"
40            span_id = f"0x{format_span_id(current_span.get_span_context().span_id)}"
41
42            # Check if observer is recording this trace
43            processor = get_observer_span_processor()
44            if not processor:
45                return
46
47            # Check if we should record logs for this trace
48            with processor._traces_lock:
49                if trace_id not in processor._traces:
50                    return
51
52                trace_info = processor._traces[trace_id]
53                if trace_info["mode"] not in PERSISTING_MODES:
54                    return
55
56            # Store the formatted message with span context
57            log_entry: ObserverLogEntry = {
58                "message": self.format(record),
59                "level": record.levelname,
60                "span_id": span_id,
61                "timestamp": datetime.fromtimestamp(record.created, tz=UTC),
62            }
63
64            with self._logs_lock:
65                if trace_id not in self._trace_logs:
66                    self._trace_logs[trace_id] = []
67                self._trace_logs[trace_id].append(log_entry)
68
69                # Limit logs per trace to prevent memory issues
70                if len(self._trace_logs[trace_id]) > 1000:
71                    self._trace_logs[trace_id] = self._trace_logs[trace_id][-500:]
72
73        except Exception:
74            # Don't let logging errors break the application
75            pass
76
77    def pop_logs_for_trace(self, trace_id: str) -> list[ObserverLogEntry]:
78        """Get and remove all logs for a specific trace in one operation."""
79        with self._logs_lock:
80            return self._trace_logs.pop(trace_id, []).copy()
81
82
83# Global instance of the log handler
84observer_log_handler = ObserverLogHandler()