1from __future__ import annotations
2
3import base64
4import json
5from http import HTTPStatus
6from typing import Any
7
8from plain.http import (
9 HTTPException,
10 JsonResponse,
11 Response,
12 ResponseBase,
13)
14from plain.logs import log_exception
15from plain.runtime import settings
16from plain.views.base import View
17
18from .exceptions import MCPInvalidParams, MCPUnauthorized
19from .resources import MCPResource
20from .tools import MCPTool
21
22PROTOCOL_VERSION = "2025-03-26"
23
24PARSE_ERROR = -32700
25INVALID_REQUEST = -32600
26METHOD_NOT_FOUND = -32601
27INVALID_PARAMS = -32602
28INTERNAL_ERROR = -32603
29UNAUTHORIZED = -32001
30FORBIDDEN = -32003
31NOT_FOUND = -32004
32
33_STATUS_TO_JSON_RPC_CODE: dict[int, int] = {
34 400: INVALID_PARAMS,
35 401: UNAUTHORIZED,
36 403: FORBIDDEN,
37 404: NOT_FOUND,
38 500: INTERNAL_ERROR,
39}
40
41
42class MCPView(View):
43 """An MCP server endpoint. Subclass to build your own.
44
45 `MCPView` is a Plain View — mount it in your URLs directly:
46
47 class AppMCP(MCPView):
48 name = "myapp"
49 tools = [Greet]
50
51 # app/urls.py
52 path("mcp/", AppMCP, name="mcp")
53
54 MCPView itself does no authentication. Compose with `plain.auth.views.AuthView`
55 for session auth (put `MCPView` first in the base list), or override
56 `before_request()` to verify a token / custom credentials and raise
57 `MCPUnauthorized` on failure. The raised exception is translated to a
58 JSON-RPC 401 response by `handle_exception`.
59
60 Register tools declaratively on the class:
61
62 class AppMCP(MCPView):
63 name = "myapp"
64 tools = [Greet, Search]
65
66 Or imperatively, which is how third-party packages attach to a shared
67 MCPView they don't own (e.g. `plain.admin.mcp.AdminMCP`):
68
69 AdminMCP.register_tool(PageViewStats)
70
71 Handling JSON-RPC methods beyond the tools capability: define a method
72 named `rpc_<method>` where slashes in the JSON-RPC method become
73 underscores. Advertise the matching capability by overriding
74 `get_capabilities()`:
75
76 class AppMCP(MCPView):
77 def rpc_prompts_list(self, params):
78 return {"prompts": [...]}
79
80 def get_capabilities(self):
81 caps = super().get_capabilities()
82 caps["prompts"] = {"listChanged": False}
83 return caps
84 """
85
86 name: str = ""
87 version: str = ""
88 tools: list[type[MCPTool]] = []
89 resources: list[type[MCPResource]] = []
90
91 @classmethod
92 def register_tool(cls, tool_cls: type[MCPTool]) -> type[MCPTool]:
93 """Attach a tool to this MCPView subclass.
94
95 Used by third-party packages to extend a shared MCPView subclass
96 (e.g. `plain.admin.mcp.AdminMCP`) that they don't own:
97
98 AdminMCP.register_tool(PageViewStats)
99 """
100 cls._append_unique("tools", tool_cls)
101 return tool_cls
102
103 @classmethod
104 def register_resource(cls, resource_cls: type[MCPResource]) -> type[MCPResource]:
105 """Attach a resource to this MCPView subclass. Parallels `register_tool`."""
106 cls._append_unique("resources", resource_cls)
107 return resource_cls
108
109 @classmethod
110 def _append_unique(cls, attr: str, item: type) -> None:
111 # Give this class its own list on first mutation so registrations
112 # don't bleed into the base class or sibling subclasses.
113 if attr not in cls.__dict__:
114 setattr(cls, attr, list(getattr(cls, attr)))
115 existing = getattr(cls, attr)
116 if item not in existing:
117 existing.append(item)
118
119 def get_tools(self) -> list[type[MCPTool]]:
120 """Return the tools available for this request.
121
122 Default: the class-level `tools` list, filtered through each
123 tool's `allowed_for(self)` classmethod. Override to skip per-tool
124 gates (e.g. superuser bypass) or to add dynamic tools. Returned
125 list must not be mutated by callers.
126 """
127 return [t for t in self.tools if t.allowed_for(self)]
128
129 def get_resources(self) -> list[type[MCPResource]]:
130 """Return the resources available for this request.
131
132 Default: the class-level `resources` list, filtered through each
133 resource's `allowed_for(self)` classmethod. Override to skip
134 per-resource gates or to add dynamic resources.
135 """
136 return [r for r in self.resources if r.allowed_for(self)]
137
138 def handle_exception(self, exc: Exception) -> ResponseBase:
139 """Translate framework exceptions into JSON-RPC responses.
140
141 MCP clients expect JSON bodies and can't follow HTTP redirects, so
142 we catch the standard auth/routing exceptions here and emit
143 JSON-RPC error objects at appropriate status codes.
144 """
145 if isinstance(exc, MCPUnauthorized):
146 return JsonResponse(
147 _error_response(None, UNAUTHORIZED, str(exc)), status_code=401
148 )
149
150 status = exc.status_code if isinstance(exc, HTTPException) else 500
151 if status >= 500:
152 log_exception(self.request, exc)
153 message = "Internal error"
154 else:
155 message = str(exc) or HTTPStatus(status).phrase
156 return JsonResponse(
157 _error_response(
158 None, _STATUS_TO_JSON_RPC_CODE.get(status, INTERNAL_ERROR), message
159 ),
160 status_code=status,
161 )
162
163 def post(self) -> ResponseBase:
164 response = self.handle_message(self.request.body)
165 if response is None:
166 return Response(status_code=204)
167 return JsonResponse(response)
168
169 def handle_message(self, raw: bytes | str) -> dict[str, Any] | None:
170 """Process a single JSON-RPC message and return the reply dict.
171
172 Returns None for notifications (no `id` field).
173 """
174 try:
175 message = json.loads(raw)
176 except (json.JSONDecodeError, ValueError) as e:
177 return _error_response(None, PARSE_ERROR, f"Parse error: {e}")
178
179 if not isinstance(message, dict):
180 return _error_response(
181 None, INVALID_REQUEST, "Request must be a JSON object"
182 )
183
184 if message.get("jsonrpc") != "2.0":
185 return _error_response(
186 message.get("id"),
187 INVALID_REQUEST,
188 "Missing or invalid 'jsonrpc' version; must be '2.0'",
189 )
190
191 msg_id = message.get("id")
192 method = message.get("method")
193
194 if not method or not isinstance(method, str):
195 return _error_response(msg_id, INVALID_REQUEST, "Missing or invalid method")
196
197 if msg_id is None:
198 return None
199
200 # MCP uses by-name params (objects) only. By-position (arrays) is
201 # valid JSON-RPC 2.0 in general but not how MCP methods are spec'd.
202 # Explicit null is accepted as "no params".
203 params = message.get("params")
204 if params is None:
205 params = {}
206 elif not isinstance(params, dict):
207 return _error_response(msg_id, INVALID_PARAMS, "'params' must be an object")
208
209 # Reject `_` so the `/` → `_` rewrite below can't be spoofed by
210 # a client sending `tools_list` instead of `tools/list`.
211 if "_" in method:
212 return _error_response(
213 msg_id, METHOD_NOT_FOUND, f"Unknown method: {method}"
214 )
215
216 handler = getattr(self, f"rpc_{method.replace('/', '_')}", None)
217 if handler is None:
218 return _error_response(
219 msg_id, METHOD_NOT_FOUND, f"Unknown method: {method}"
220 )
221
222 try:
223 result = handler(params)
224 return _success_response(msg_id, result)
225 except MCPInvalidParams as e:
226 return _error_response(msg_id, INVALID_PARAMS, str(e))
227 except Exception as e:
228 log_exception(self.request, e)
229 return _error_response(msg_id, INTERNAL_ERROR, "Internal error")
230
231 def get_capabilities(self) -> dict[str, Any]:
232 """Return the capabilities dict advertised to clients at `initialize`.
233
234 Override to advertise additional capabilities beyond `tools` /
235 `resources`. Call `super().get_capabilities()` to keep the
236 defaults.
237 """
238 capabilities: dict[str, Any] = {}
239 if self.get_tools():
240 capabilities["tools"] = {"listChanged": False}
241 if self.get_resources():
242 capabilities["resources"] = {
243 "subscribe": False,
244 "listChanged": False,
245 }
246 return capabilities
247
248 def rpc_initialize(self, params: dict[str, Any]) -> dict[str, Any]:
249 return {
250 "protocolVersion": PROTOCOL_VERSION,
251 "capabilities": self.get_capabilities(),
252 "serverInfo": {
253 "name": self.name,
254 "version": self.version or settings.VERSION,
255 },
256 }
257
258 def rpc_ping(self, params: dict[str, Any]) -> dict[str, Any]:
259 return {}
260
261 def rpc_tools_list(self, params: dict[str, Any]) -> dict[str, Any]:
262 return {
263 "tools": [
264 {
265 "name": tool_cls.name,
266 "description": tool_cls.description,
267 "inputSchema": tool_cls.input_schema,
268 }
269 for tool_cls in self.get_tools()
270 ]
271 }
272
273 def rpc_tools_call(self, params: dict[str, Any]) -> dict[str, Any]:
274 tool_name = params.get("name")
275 if not tool_name:
276 raise MCPInvalidParams("Missing tool name")
277
278 # Unauthorized tools are filtered out by `get_tools()`, so they
279 # hit this same "unknown" path — existence isn't leaked.
280 tool_cls = next((t for t in self.get_tools() if t.name == tool_name), None)
281 if tool_cls is None:
282 return {
283 "content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}],
284 "isError": True,
285 }
286
287 arguments = params.get("arguments", {})
288 try:
289 tool = tool_cls(**arguments)
290 except TypeError as e:
291 return {
292 "content": [{"type": "text", "text": f"Invalid arguments: {e}"}],
293 "isError": True,
294 }
295 tool.mcp = self
296
297 try:
298 result = tool.run()
299 except Exception as e:
300 log_exception(self.request, e)
301 return {
302 "content": [{"type": "text", "text": "Tool execution failed"}],
303 "isError": True,
304 }
305
306 return {"content": _to_content_blocks(result)}
307
308 def rpc_resources_list(self, params: dict[str, Any]) -> dict[str, Any]:
309 # Only static-URI resources go here; templated ones live under
310 # resources/templates/list per the MCP spec.
311 return {
312 "resources": [
313 {
314 "uri": resource_cls.uri,
315 "name": resource_cls.name,
316 "description": resource_cls.description,
317 "mimeType": resource_cls.mime_type,
318 }
319 for resource_cls in self.get_resources()
320 if resource_cls.uri
321 ]
322 }
323
324 def rpc_resources_templates_list(self, params: dict[str, Any]) -> dict[str, Any]:
325 return {
326 "resourceTemplates": [
327 {
328 "uriTemplate": resource_cls.uri_template,
329 "name": resource_cls.name,
330 "description": resource_cls.description,
331 "mimeType": resource_cls.mime_type,
332 }
333 for resource_cls in self.get_resources()
334 if resource_cls.uri_template
335 ]
336 }
337
338 def rpc_resources_read(self, params: dict[str, Any]) -> dict[str, Any]:
339 uri = params.get("uri")
340 if not uri:
341 raise MCPInvalidParams("Missing uri")
342
343 # Unauthorized resources are filtered out by `get_resources()`, so
344 # they hit this same "unknown" path — existence isn't leaked.
345 resource_cls: type[MCPResource] | None = None
346 matched_params: dict[str, Any] = {}
347 for candidate in self.get_resources():
348 try:
349 match = candidate.matches(uri)
350 except (TypeError, ValueError) as e:
351 # Regex matched but coercion failed — URI looks like this
352 # resource's template but the params don't parse.
353 raise MCPInvalidParams(f"Invalid URI params: {e}") from e
354 if match is not None:
355 resource_cls = candidate
356 matched_params = match
357 break
358
359 if resource_cls is None:
360 raise MCPInvalidParams(f"Unknown resource: {uri}")
361
362 try:
363 resource = resource_cls(**matched_params)
364 except TypeError as e:
365 raise MCPInvalidParams(f"Invalid URI params: {e}") from e
366 resource.mcp = self
367
368 # Resources have no in-band error channel like tools' `isError`, so
369 # read() exceptions propagate and surface as INTERNAL_ERROR.
370 content = resource.read()
371
372 entry: dict[str, Any] = {"uri": uri, "mimeType": resource.mime_type}
373 if isinstance(content, bytes):
374 entry["blob"] = _b64(content)
375 else:
376 entry["text"] = content
377 return {"contents": [entry]}
378
379
380_CONTENT_BLOCK_TYPES = {"text", "image", "audio", "resource", "resource_link"}
381
382
383def _b64(data: bytes) -> str:
384 return base64.b64encode(data).decode("ascii")
385
386
387def _to_content_blocks(value: Any) -> list[dict[str, Any]]:
388 """Convert a tool's `run()` return value into MCP content blocks.
389
390 Recognized shapes (in order):
391 - `str` → one text block
392 - a dict with `type` in the known content types → that single block
393 - a list where every item is such a dict → those blocks, in order
394 - any other `dict`/`list` → one text block with the value JSON-serialized
395 - anything else → one text block with `str(value)`
396
397 `bytes` values in `data` (image/audio) or `resource.blob` (embedded
398 resource) are base64-encoded automatically.
399 """
400 if isinstance(value, str):
401 return [{"type": "text", "text": value}]
402 if isinstance(value, dict) and _is_content_block(value):
403 return [_encode_binary(value)]
404 if isinstance(value, list) and value and all(_is_content_block(v) for v in value):
405 return [_encode_binary(v) for v in value]
406 if isinstance(value, dict | list):
407 return [{"type": "text", "text": json.dumps(value, default=str)}]
408 return [{"type": "text", "text": str(value)}]
409
410
411def _is_content_block(value: Any) -> bool:
412 return isinstance(value, dict) and value.get("type") in _CONTENT_BLOCK_TYPES
413
414
415def _encode_binary(block: dict[str, Any]) -> dict[str, Any]:
416 """Encode `bytes` fields to base64 in-place on a content block copy."""
417 block_type = block.get("type")
418 if block_type in ("image", "audio"):
419 data = block.get("data")
420 if isinstance(data, bytes):
421 return {**block, "data": _b64(data)}
422 elif block_type == "resource":
423 resource = block.get("resource")
424 if isinstance(resource, dict):
425 blob = resource.get("blob")
426 if isinstance(blob, bytes):
427 return {
428 **block,
429 "resource": {**resource, "blob": _b64(blob)},
430 }
431 return block
432
433
434def _success_response(msg_id: Any, result: Any) -> dict[str, Any]:
435 return {"jsonrpc": "2.0", "id": msg_id, "result": result}
436
437
438def _error_response(msg_id: Any, code: int, message: str) -> dict[str, Any]:
439 return {
440 "jsonrpc": "2.0",
441 "id": msg_id,
442 "error": {"code": code, "message": message},
443 }