1from __future__ import annotations
2
3import logging
4import threading
5from datetime import UTC, datetime
6from typing import TypedDict
7
8from opentelemetry import trace
9from opentelemetry.trace import format_span_id, format_trace_id
10
11from .core import PERSISTING_MODES
12from .otel import get_observer_span_processor
13
14
15class ObserverLogEntry(TypedDict):
16 message: str
17 level: str
18 span_id: str
19 timestamp: datetime
20
21
22class ObserverLogHandler(logging.Handler):
23 """Custom logging handler that captures logs during active traces when observer is enabled."""
24
25 def __init__(self, level: int = logging.NOTSET) -> None:
26 super().__init__(level)
27 self._logs_lock = threading.Lock()
28 self._trace_logs: dict[str, list[ObserverLogEntry]] = {}
29
30 def emit(self, record: logging.LogRecord) -> None:
31 """Emit a log record if we're in an active observer trace."""
32 try:
33 # Get the current span to determine if we're in an active trace
34 current_span = trace.get_current_span()
35 if not current_span or not current_span.get_span_context().is_valid:
36 return
37
38 # Get trace and span IDs
39 trace_id = f"0x{format_trace_id(current_span.get_span_context().trace_id)}"
40 span_id = f"0x{format_span_id(current_span.get_span_context().span_id)}"
41
42 # Check if observer is recording this trace
43 processor = get_observer_span_processor()
44 if not processor:
45 return
46
47 # Check if we should record logs for this trace
48 with processor._traces_lock:
49 if trace_id not in processor._traces:
50 return
51
52 trace_info = processor._traces[trace_id]
53 if trace_info["mode"] not in PERSISTING_MODES:
54 return
55
56 # Store the formatted message with span context
57 log_entry: ObserverLogEntry = {
58 "message": self.format(record),
59 "level": record.levelname,
60 "span_id": span_id,
61 "timestamp": datetime.fromtimestamp(record.created, tz=UTC),
62 }
63
64 with self._logs_lock:
65 if trace_id not in self._trace_logs:
66 self._trace_logs[trace_id] = []
67 self._trace_logs[trace_id].append(log_entry)
68
69 # Limit logs per trace to prevent memory issues
70 if len(self._trace_logs[trace_id]) > 1000:
71 self._trace_logs[trace_id] = self._trace_logs[trace_id][-500:]
72
73 except Exception:
74 # Don't let logging errors break the application
75 pass
76
77 def pop_logs_for_trace(self, trace_id: str) -> list[ObserverLogEntry]:
78 """Get and remove all logs for a specific trace in one operation."""
79 with self._logs_lock:
80 return self._trace_logs.pop(trace_id, []).copy()
81
82
83# Global instance of the log handler
84observer_log_handler = ObserverLogHandler()