Plain is headed towards 1.0! Subscribe for development updates →

  1import logging
  2import re
  3import threading
  4from collections import defaultdict
  5
  6import opentelemetry.context as context_api
  7from opentelemetry import baggage, trace
  8from opentelemetry.sdk.trace import SpanProcessor, sampling
  9from opentelemetry.semconv.attributes import url_attributes
 10from opentelemetry.trace import SpanKind, format_span_id, format_trace_id
 11
 12from plain.http.cookie import unsign_cookie_value
 13from plain.models.otel import suppress_db_tracing
 14from plain.runtime import settings
 15
 16from .core import Observer, ObserverMode
 17
 18logger = logging.getLogger(__name__)
 19
 20
 21def get_span_processor():
 22    """Get the span collector instance from the tracer provider."""
 23    if not (current_provider := trace.get_tracer_provider()):
 24        return None
 25
 26    # Look for ObserverSpanProcessor in the span processors
 27    # Check if the provider has a _active_span_processor attribute
 28    if hasattr(current_provider, "_active_span_processor"):
 29        # It's a composite processor, check its _span_processors
 30        if composite_processor := current_provider._active_span_processor:
 31            if hasattr(composite_processor, "_span_processors"):
 32                for processor in composite_processor._span_processors:
 33                    if isinstance(processor, ObserverSpanProcessor):
 34                        return processor
 35
 36    return None
 37
 38
 39def get_current_trace_summary() -> str | None:
 40    """Get performance summary for the currently active trace."""
 41    if not (current_span := trace.get_current_span()):
 42        return None
 43
 44    if not (processor := get_span_processor()):
 45        return None
 46
 47    trace_id = f"0x{format_trace_id(current_span.get_span_context().trace_id)}"
 48    return processor.get_trace_summary(trace_id)
 49
 50
 51class ObserverSampler(sampling.Sampler):
 52    """Samples traces based on request path and cookies."""
 53
 54    def __init__(self):
 55        # Custom parent-based sampler
 56        self._delegate = sampling.ParentBased(sampling.ALWAYS_OFF)
 57
 58        # TODO ignore url namespace instead? admin, observer, assets
 59        self._ignore_url_paths = [
 60            re.compile(p) for p in settings.OBSERVER_IGNORE_URL_PATTERNS
 61        ]
 62
 63    def should_sample(
 64        self,
 65        parent_context,
 66        trace_id,
 67        name,
 68        kind: SpanKind | None = None,
 69        attributes=None,
 70        links=None,
 71        trace_state=None,
 72    ):
 73        # First, drop if the URL should be ignored.
 74        if attributes:
 75            if url_path := attributes.get(url_attributes.URL_PATH, ""):
 76                for pattern in self._ignore_url_paths:
 77                    if pattern.match(url_path):
 78                        return sampling.SamplingResult(
 79                            sampling.Decision.DROP,
 80                            attributes=attributes,
 81                        )
 82
 83        # If no processor decision, check cookies directly for root spans
 84        decision = None
 85        if parent_context:
 86            # Check cookies for sampling decision
 87            if cookies := baggage.get_baggage("http.request.cookies", parent_context):
 88                if observer_cookie := cookies.get(Observer.COOKIE_NAME):
 89                    unsigned_value = unsign_cookie_value(
 90                        Observer.COOKIE_NAME, observer_cookie, default=False
 91                    )
 92
 93                    if unsigned_value in (
 94                        ObserverMode.PERSIST.value,
 95                        ObserverMode.SUMMARY.value,
 96                    ):
 97                        # Always use RECORD_AND_SAMPLE so ParentBased works correctly
 98                        # The processor will check the mode to decide whether to export
 99                        decision = sampling.Decision.RECORD_AND_SAMPLE
100                    else:
101                        decision = sampling.Decision.DROP
102
103        # If there are links, assume it is to another trace/span that we are keeping
104        if links:
105            decision = sampling.Decision.RECORD_AND_SAMPLE
106
107        # If no decision from cookies, use default
108        if decision is None:
109            result = self._delegate.should_sample(
110                parent_context,
111                trace_id,
112                name,
113                kind=kind,
114                attributes=attributes,
115                links=links,
116                trace_state=trace_state,
117            )
118            decision = result.decision
119
120        return sampling.SamplingResult(
121            decision,
122            attributes=attributes,
123        )
124
125    def get_description(self) -> str:
126        return "ObserverSampler"
127
128
129class ObserverSpanProcessor(SpanProcessor):
130    """Collects spans in real-time for current trace performance monitoring.
131
132    This processor keeps spans in memory for traces that have the 'summary' or 'persist'
133    cookie set. These spans can be accessed via get_current_trace_summary() for
134    real-time debugging. Spans with 'persist' cookie will also be persisted to the
135    database.
136    """
137
138    def __init__(self):
139        # Span storage
140        self._traces = defaultdict(
141            lambda: {
142                "trace": None,  # Trace model instance
143                "active_otel_spans": {},  # span_id -> opentelemetry span
144                "completed_otel_spans": [],  # list of opentelemetry spans
145                "span_models": [],  # list of Span model instances
146                "root_span_id": None,
147                "mode": None,  # None, ObserverMode.SUMMARY.value, or ObserverMode.PERSIST.value
148            }
149        )
150        self._traces_lock = threading.Lock()
151
152    def on_start(self, span, parent_context=None):
153        """Called when a span starts."""
154        trace_id = f"0x{format_trace_id(span.get_span_context().trace_id)}"
155
156        with self._traces_lock:
157            # Check if we already have this trace
158            if trace_id in self._traces:
159                trace_info = self._traces[trace_id]
160            else:
161                # First span in trace - determine if we should record it
162                mode = self._get_recording_mode(span, parent_context)
163                if not mode:
164                    # Don't create trace entry for traces we won't record
165                    return
166
167                # Create trace entry only for traces we'll record
168                trace_info = self._traces[trace_id]
169                trace_info["mode"] = mode
170
171                # Clean up old traces if too many
172                if len(self._traces) > 1000:
173                    # Remove oldest 100 traces
174                    oldest_ids = sorted(self._traces.keys())[:100]
175                    for old_id in oldest_ids:
176                        del self._traces[old_id]
177
178            span_id = f"0x{format_span_id(span.get_span_context().span_id)}"
179
180            # Store span (we know mode is truthy if we get here)
181            trace_info["active_otel_spans"][span_id] = span
182
183            # Track root span
184            if not span.parent:
185                trace_info["root_span_id"] = span_id
186
187    def on_end(self, span):
188        """Called when a span ends."""
189        trace_id = f"0x{format_trace_id(span.get_span_context().trace_id)}"
190        span_id = f"0x{format_span_id(span.get_span_context().span_id)}"
191
192        with self._traces_lock:
193            # Skip if we don't have this trace (mode was None on start)
194            if trace_id not in self._traces:
195                return
196
197            trace_info = self._traces[trace_id]
198
199            # Move span from active to completed
200            if trace_info["active_otel_spans"].pop(span_id, None):
201                trace_info["completed_otel_spans"].append(span)
202
203            # Check if trace is complete (root span ended)
204            if span_id == trace_info["root_span_id"]:
205                all_spans = trace_info["completed_otel_spans"]
206
207                from .models import Span, Trace
208
209                trace_info["trace"] = Trace.from_opentelemetry_spans(all_spans)
210                trace_info["span_models"] = [
211                    Span.from_opentelemetry_span(s, trace_info["trace"])
212                    for s in all_spans
213                ]
214
215                # Export if in persist mode
216                if trace_info["mode"] == ObserverMode.PERSIST.value:
217                    logger.debug(
218                        "Exporting %d spans for trace %s",
219                        len(trace_info["span_models"]),
220                        trace_id,
221                    )
222                    self._export_trace(trace_info["trace"], trace_info["span_models"])
223
224                # Clean up trace
225                del self._traces[trace_id]
226
227    def get_trace_summary(self, trace_id: str) -> str | None:
228        """Get performance summary for a specific trace."""
229        from .models import Span, Trace
230
231        with self._traces_lock:
232            # Return None if trace doesn't exist (mode was None)
233            if trace_id not in self._traces:
234                return None
235
236            trace_info = self._traces[trace_id]
237
238            # Combine active and completed spans
239            all_otel_spans = (
240                list(trace_info["active_otel_spans"].values())
241                + trace_info["completed_otel_spans"]
242            )
243
244            if not all_otel_spans:
245                return None
246
247            # Create or update trace model instance
248            if not trace_info["trace"]:
249                trace_info["trace"] = Trace.from_opentelemetry_spans(all_otel_spans)
250
251            if not trace_info["trace"]:
252                return None
253
254            # Create span model instances if needed
255            span_models = trace_info.get("span_models", [])
256            if not span_models:
257                span_models = [
258                    Span.from_opentelemetry_span(s, trace_info["trace"])
259                    for s in all_otel_spans
260                ]
261
262            return trace_info["trace"].get_trace_summary(span_models)
263
264    def _export_trace(self, trace, span_models):
265        """Export trace and spans to the database."""
266        from .models import Span, Trace
267
268        with suppress_db_tracing():
269            try:
270                trace.save()
271
272                for span_model in span_models:
273                    span_model.trace = trace
274
275                # Bulk create spans
276                Span.objects.bulk_create(span_models)
277            except Exception as e:
278                logger.warning(
279                    "Failed to export trace to database: %s",
280                    e,
281                    exc_info=True,
282                )
283
284            # Delete oldest traces if we exceed the limit
285            if settings.OBSERVER_TRACE_LIMIT > 0:
286                try:
287                    if Trace.objects.count() > settings.OBSERVER_TRACE_LIMIT:
288                        delete_ids = Trace.objects.order_by("start_time")[
289                            : settings.OBSERVER_TRACE_LIMIT
290                        ].values_list("id", flat=True)
291                        Trace.objects.filter(id__in=delete_ids).delete()
292                except Exception as e:
293                    logger.warning(
294                        "Failed to clean up old observer traces: %s", e, exc_info=True
295                    )
296
297    def _get_recording_mode(self, span, parent_context) -> str | None:
298        # If the span has links, then we are going to export if the linked span is also exported
299        for link in span.links:
300            if link.context.is_valid and link.context.span_id:
301                from .models import Span
302
303                if Span.objects.filter(
304                    span_id=f"0x{format_span_id(link.context.span_id)}"
305                ).exists():
306                    return ObserverMode.PERSIST.value
307
308        if not (context := parent_context or context_api.get_current()):
309            return None
310
311        if not (cookies := baggage.get_baggage("http.request.cookies", context)):
312            return None
313
314        if not (observer_cookie := cookies.get(Observer.COOKIE_NAME)):
315            return None
316
317        try:
318            mode = unsign_cookie_value(
319                Observer.COOKIE_NAME, observer_cookie, default=None
320            )
321            if mode in (ObserverMode.SUMMARY.value, ObserverMode.PERSIST.value):
322                return mode
323        except Exception as e:
324            logger.warning("Failed to unsign observer cookie: %s", e)
325
326        return None
327
328    def shutdown(self):
329        """Cleanup when shutting down."""
330        with self._traces_lock:
331            self._traces.clear()
332
333    def force_flush(self, timeout_millis=None):
334        """Required by SpanProcessor interface."""
335        return True