1fromfunctoolsimportcached_property 2 3fromplain.auth.viewsimportAuthViewMixin 4fromplain.htmx.viewsimportHTMXViewMixin 5fromplain.httpimportJsonResponse,Response,ResponseRedirect 6fromplain.runtimeimportsettings 7fromplain.viewsimportTemplateView 8 9from.coreimportObserver 10from.modelsimportTrace 11 12 13classObserverTracesView(AuthViewMixin,HTMXViewMixin,TemplateView): 14template_name="observer/traces.html" 15admin_required=True 16 17@cached_property 18defobserver(self): 19"""Get the Observer instance for this request.""" 20returnObserver(self.request) 21 22defcheck_auth(self): 23# Allow the view if we're in DEBUG 24ifsettings.DEBUG: 25return 26 27super().check_auth() 28 29defget_response(self): 30response=super().get_response() 31# So we can load it in the toolbar 32response.headers["X-Frame-Options"]="SAMEORIGIN" 33returnresponse 34 35defget_template_context(self): 36context=super().get_template_context() 37context["observer"]=self.observer 38context["traces"]=Trace.objects.all() 39iftrace_id:=self.request.query_params.get("trace_id"): 40context["trace"]=Trace.objects.filter(id=trace_id).first() 41else: 42context["trace"]=context["traces"].first() 43returncontext 44 45defget(self): 46# Check if JSON format is requested 47ifself.request.query_params.get("format")=="json": 48iftrace_id:=self.request.query_params.get("trace_id"): 49iftrace:=Trace.objects.filter(id=trace_id).first(): 50returnJsonResponse(trace.as_dict()) 51returnJsonResponse({"error":"Trace not found"},status=404) 52 53returnsuper().get() 54 55defhtmx_post_enable_summary(self): 56"""Enable summary mode via HTMX.""" 57response=Response(status_code=204) 58response.headers["HX-Refresh"]="true" 59self.observer.enable_summary_mode(response) 60returnresponse 61 62defhtmx_post_enable_persist(self): 63"""Enable full persist mode via HTMX.""" 64response=Response(status_code=204) 65response.headers["HX-Refresh"]="true" 66self.observer.enable_persist_mode(response) 67returnresponse 68 69defhtmx_post_disable(self): 70"""Disable observer via HTMX.""" 71response=Response(status_code=204) 72response.headers["HX-Refresh"]="true" 73self.observer.disable(response) 74returnresponse 75 76defhtmx_delete_traces(self): 77"""Clear all traces via HTMX DELETE.""" 78Trace.objects.all().delete() 79response=Response(status_code=204) 80response.headers["HX-Refresh"]="true" 81returnresponse 82 83defhtmx_delete_trace(self): 84"""Delete a specific trace via HTMX DELETE.""" 85trace_id=self.request.query_params.get("trace_id") 86Trace.objects.get(id=trace_id).delete() 87response=Response(status_code=204) 88response.headers["HX-Refresh"]="true" 89returnresponse 90 91defpost(self): 92"""A standard, non-htmx post used by the button html (where htmx may not be available).""" 93 94observe_action=self.request.data["observe_action"] 95 96response=ResponseRedirect(self.request.data.get("redirect_url",".")) 97 98ifobserve_action=="summary": 99self.observer.enable_summary_mode(response)# Default to summary mode100elifobserve_action=="persist":101self.observer.enable_persist_mode(response)102elifobserve_action=="disable":103self.observer.disable(response)104105returnresponse