Plain is headed towards 1.0! Subscribe for development updates →

 1import logging
 2import threading
 3from datetime import UTC, datetime
 4
 5from opentelemetry import trace
 6from opentelemetry.trace import format_span_id, format_trace_id
 7
 8from .core import ObserverMode
 9from .otel import get_observer_span_processor
10
11
12class ObserverLogHandler(logging.Handler):
13    """Custom logging handler that captures logs during active traces when observer is enabled."""
14
15    def __init__(self, level=logging.NOTSET):
16        super().__init__(level)
17        self._logs_lock = threading.Lock()
18        self._trace_logs = {}  # trace_id -> list of log records
19
20    def emit(self, record):
21        """Emit a log record if we're in an active observer trace."""
22        try:
23            # Get the current span to determine if we're in an active trace
24            current_span = trace.get_current_span()
25            if not current_span or not current_span.get_span_context().is_valid:
26                return
27
28            # Get trace and span IDs
29            trace_id = f"0x{format_trace_id(current_span.get_span_context().trace_id)}"
30            span_id = f"0x{format_span_id(current_span.get_span_context().span_id)}"
31
32            # Check if observer is recording this trace
33            processor = get_observer_span_processor()
34            if not processor:
35                return
36
37            # Check if we should record logs for this trace
38            with processor._traces_lock:
39                if trace_id not in processor._traces:
40                    return
41
42                trace_info = processor._traces[trace_id]
43                # Only capture logs in PERSIST mode
44                if trace_info["mode"] != ObserverMode.PERSIST.value:
45                    return
46
47            # Store the formatted message with span context
48            log_entry = {
49                "message": self.format(record),
50                "level": record.levelname,
51                "span_id": span_id,
52                "timestamp": datetime.fromtimestamp(record.created, tz=UTC),
53            }
54
55            with self._logs_lock:
56                if trace_id not in self._trace_logs:
57                    self._trace_logs[trace_id] = []
58                self._trace_logs[trace_id].append(log_entry)
59
60                # Limit logs per trace to prevent memory issues
61                if len(self._trace_logs[trace_id]) > 1000:
62                    self._trace_logs[trace_id] = self._trace_logs[trace_id][-500:]
63
64        except Exception:
65            # Don't let logging errors break the application
66            pass
67
68    def pop_logs_for_trace(self, trace_id):
69        """Get and remove all logs for a specific trace in one operation."""
70        with self._logs_lock:
71            return self._trace_logs.pop(trace_id, []).copy()
72
73
74# Global instance of the log handler
75observer_log_handler = ObserverLogHandler()