1import logging
2import re
3import threading
4from collections import defaultdict
5
6import opentelemetry.context as context_api
7from opentelemetry import baggage, trace
8from opentelemetry.sdk.trace import SpanProcessor, sampling
9from opentelemetry.semconv.attributes import url_attributes
10from opentelemetry.trace import SpanKind, format_span_id, format_trace_id
11
12from plain.http.cookie import unsign_cookie_value
13from plain.models.otel import suppress_db_tracing
14from plain.runtime import settings
15
16from .core import Observer, ObserverMode
17
18logger = logging.getLogger(__name__)
19
20
21def get_span_processor():
22 """Get the span collector instance from the tracer provider."""
23 if not (current_provider := trace.get_tracer_provider()):
24 return None
25
26 # Look for ObserverSpanProcessor in the span processors
27 # Check if the provider has a _active_span_processor attribute
28 if hasattr(current_provider, "_active_span_processor"):
29 # It's a composite processor, check its _span_processors
30 if composite_processor := current_provider._active_span_processor:
31 if hasattr(composite_processor, "_span_processors"):
32 for processor in composite_processor._span_processors:
33 if isinstance(processor, ObserverSpanProcessor):
34 return processor
35
36 return None
37
38
39def get_current_trace_summary() -> str | None:
40 """Get performance summary for the currently active trace."""
41 if not (current_span := trace.get_current_span()):
42 return None
43
44 if not (processor := get_span_processor()):
45 return None
46
47 trace_id = f"0x{format_trace_id(current_span.get_span_context().trace_id)}"
48 return processor.get_trace_summary(trace_id)
49
50
51class ObserverSampler(sampling.Sampler):
52 """Samples traces based on request path and cookies."""
53
54 def __init__(self):
55 # Custom parent-based sampler
56 self._delegate = sampling.ParentBased(sampling.ALWAYS_OFF)
57
58 # TODO ignore url namespace instead? admin, observer, assets
59 self._ignore_url_paths = [
60 re.compile(p) for p in settings.OBSERVER_IGNORE_URL_PATTERNS
61 ]
62
63 def should_sample(
64 self,
65 parent_context,
66 trace_id,
67 name,
68 kind: SpanKind | None = None,
69 attributes=None,
70 links=None,
71 trace_state=None,
72 ):
73 # First, drop if the URL should be ignored.
74 if attributes:
75 if url_path := attributes.get(url_attributes.URL_PATH, ""):
76 for pattern in self._ignore_url_paths:
77 if pattern.match(url_path):
78 return sampling.SamplingResult(
79 sampling.Decision.DROP,
80 attributes=attributes,
81 )
82
83 # If no processor decision, check cookies directly for root spans
84 decision = None
85 if parent_context:
86 # Check cookies for sampling decision
87 if cookies := baggage.get_baggage("http.request.cookies", parent_context):
88 if observer_cookie := cookies.get(Observer.COOKIE_NAME):
89 unsigned_value = unsign_cookie_value(
90 Observer.COOKIE_NAME, observer_cookie, default=False
91 )
92
93 if unsigned_value in (
94 ObserverMode.PERSIST.value,
95 ObserverMode.SUMMARY.value,
96 ):
97 # Always use RECORD_AND_SAMPLE so ParentBased works correctly
98 # The processor will check the mode to decide whether to export
99 decision = sampling.Decision.RECORD_AND_SAMPLE
100 else:
101 decision = sampling.Decision.DROP
102
103 # If there are links, assume it is to another trace/span that we are keeping
104 if links:
105 decision = sampling.Decision.RECORD_AND_SAMPLE
106
107 # If no decision from cookies, use default
108 if decision is None:
109 result = self._delegate.should_sample(
110 parent_context,
111 trace_id,
112 name,
113 kind=kind,
114 attributes=attributes,
115 links=links,
116 trace_state=trace_state,
117 )
118 decision = result.decision
119
120 return sampling.SamplingResult(
121 decision,
122 attributes=attributes,
123 )
124
125 def get_description(self) -> str:
126 return "ObserverSampler"
127
128
129class ObserverSpanProcessor(SpanProcessor):
130 """Collects spans in real-time for current trace performance monitoring.
131
132 This processor keeps spans in memory for traces that have the 'summary' or 'persist'
133 cookie set. These spans can be accessed via get_current_trace_summary() for
134 real-time debugging. Spans with 'persist' cookie will also be persisted to the
135 database.
136 """
137
138 def __init__(self):
139 # Span storage
140 self._traces = defaultdict(
141 lambda: {
142 "trace": None, # Trace model instance
143 "active_otel_spans": {}, # span_id -> opentelemetry span
144 "completed_otel_spans": [], # list of opentelemetry spans
145 "span_models": [], # list of Span model instances
146 "root_span_id": None,
147 "mode": None, # None, ObserverMode.SUMMARY.value, or ObserverMode.PERSIST.value
148 }
149 )
150 self._traces_lock = threading.Lock()
151
152 def on_start(self, span, parent_context=None):
153 """Called when a span starts."""
154 trace_id = f"0x{format_trace_id(span.get_span_context().trace_id)}"
155
156 with self._traces_lock:
157 # Check if we already have this trace
158 if trace_id in self._traces:
159 trace_info = self._traces[trace_id]
160 else:
161 # First span in trace - determine if we should record it
162 mode = self._get_recording_mode(span, parent_context)
163 if not mode:
164 # Don't create trace entry for traces we won't record
165 return
166
167 # Create trace entry only for traces we'll record
168 trace_info = self._traces[trace_id]
169 trace_info["mode"] = mode
170
171 # Clean up old traces if too many
172 if len(self._traces) > 1000:
173 # Remove oldest 100 traces
174 oldest_ids = sorted(self._traces.keys())[:100]
175 for old_id in oldest_ids:
176 del self._traces[old_id]
177
178 span_id = f"0x{format_span_id(span.get_span_context().span_id)}"
179
180 # Store span (we know mode is truthy if we get here)
181 trace_info["active_otel_spans"][span_id] = span
182
183 # Track root span
184 if not span.parent:
185 trace_info["root_span_id"] = span_id
186
187 def on_end(self, span):
188 """Called when a span ends."""
189 trace_id = f"0x{format_trace_id(span.get_span_context().trace_id)}"
190 span_id = f"0x{format_span_id(span.get_span_context().span_id)}"
191
192 with self._traces_lock:
193 # Skip if we don't have this trace (mode was None on start)
194 if trace_id not in self._traces:
195 return
196
197 trace_info = self._traces[trace_id]
198
199 # Move span from active to completed
200 if trace_info["active_otel_spans"].pop(span_id, None):
201 trace_info["completed_otel_spans"].append(span)
202
203 # Check if trace is complete (root span ended)
204 if span_id == trace_info["root_span_id"]:
205 all_spans = trace_info["completed_otel_spans"]
206
207 from .models import Span, Trace
208
209 trace_info["trace"] = Trace.from_opentelemetry_spans(all_spans)
210 trace_info["span_models"] = [
211 Span.from_opentelemetry_span(s, trace_info["trace"])
212 for s in all_spans
213 ]
214
215 # Export if in persist mode
216 if trace_info["mode"] == ObserverMode.PERSIST.value:
217 logger.debug(
218 "Exporting %d spans for trace %s",
219 len(trace_info["span_models"]),
220 trace_id,
221 )
222 self._export_trace(trace_info["trace"], trace_info["span_models"])
223
224 # Clean up trace
225 del self._traces[trace_id]
226
227 def get_trace_summary(self, trace_id: str) -> str | None:
228 """Get performance summary for a specific trace."""
229 from .models import Span, Trace
230
231 with self._traces_lock:
232 # Return None if trace doesn't exist (mode was None)
233 if trace_id not in self._traces:
234 return None
235
236 trace_info = self._traces[trace_id]
237
238 # Combine active and completed spans
239 all_otel_spans = (
240 list(trace_info["active_otel_spans"].values())
241 + trace_info["completed_otel_spans"]
242 )
243
244 if not all_otel_spans:
245 return None
246
247 # Create or update trace model instance
248 if not trace_info["trace"]:
249 trace_info["trace"] = Trace.from_opentelemetry_spans(all_otel_spans)
250
251 if not trace_info["trace"]:
252 return None
253
254 # Create span model instances if needed
255 span_models = trace_info.get("span_models", [])
256 if not span_models:
257 span_models = [
258 Span.from_opentelemetry_span(s, trace_info["trace"])
259 for s in all_otel_spans
260 ]
261
262 return trace_info["trace"].get_trace_summary(span_models)
263
264 def _export_trace(self, trace, span_models):
265 """Export trace and spans to the database."""
266 from .models import Span, Trace
267
268 with suppress_db_tracing():
269 try:
270 trace.save()
271
272 for span_model in span_models:
273 span_model.trace = trace
274
275 # Bulk create spans
276 Span.objects.bulk_create(span_models)
277 except Exception as e:
278 logger.warning(
279 "Failed to export trace to database: %s",
280 e,
281 exc_info=True,
282 )
283
284 # Delete oldest traces if we exceed the limit
285 if settings.OBSERVER_TRACE_LIMIT > 0:
286 try:
287 if Trace.objects.count() > settings.OBSERVER_TRACE_LIMIT:
288 delete_ids = Trace.objects.order_by("start_time")[
289 : settings.OBSERVER_TRACE_LIMIT
290 ].values_list("id", flat=True)
291 Trace.objects.filter(id__in=delete_ids).delete()
292 except Exception as e:
293 logger.warning(
294 "Failed to clean up old observer traces: %s", e, exc_info=True
295 )
296
297 def _get_recording_mode(self, span, parent_context) -> str | None:
298 # If the span has links, then we are going to export if the linked span is also exported
299 for link in span.links:
300 if link.context.is_valid and link.context.span_id:
301 from .models import Span
302
303 if Span.objects.filter(
304 span_id=f"0x{format_span_id(link.context.span_id)}"
305 ).exists():
306 return ObserverMode.PERSIST.value
307
308 if not (context := parent_context or context_api.get_current()):
309 return None
310
311 if not (cookies := baggage.get_baggage("http.request.cookies", context)):
312 return None
313
314 if not (observer_cookie := cookies.get(Observer.COOKIE_NAME)):
315 return None
316
317 try:
318 mode = unsign_cookie_value(
319 Observer.COOKIE_NAME, observer_cookie, default=None
320 )
321 if mode in (ObserverMode.SUMMARY.value, ObserverMode.PERSIST.value):
322 return mode
323 except Exception as e:
324 logger.warning("Failed to unsign observer cookie: %s", e)
325
326 return None
327
328 def shutdown(self):
329 """Cleanup when shutting down."""
330 with self._traces_lock:
331 self._traces.clear()
332
333 def force_flush(self, timeout_millis=None):
334 """Required by SpanProcessor interface."""
335 return True