1from __future__ import annotations
  2
  3import base64
  4import json
  5from http import HTTPStatus
  6from typing import Any
  7
  8from plain.http import (
  9    HTTPException,
 10    JsonResponse,
 11    Response,
 12    ResponseBase,
 13)
 14from plain.logs import log_exception
 15from plain.runtime import settings
 16from plain.views.base import View
 17
 18from .exceptions import MCPInvalidParams, MCPUnauthorized
 19from .resources import MCPResource
 20from .tools import MCPTool
 21
 22PROTOCOL_VERSION = "2025-03-26"
 23
 24PARSE_ERROR = -32700
 25INVALID_REQUEST = -32600
 26METHOD_NOT_FOUND = -32601
 27INVALID_PARAMS = -32602
 28INTERNAL_ERROR = -32603
 29UNAUTHORIZED = -32001
 30FORBIDDEN = -32003
 31NOT_FOUND = -32004
 32
 33_STATUS_TO_JSON_RPC_CODE: dict[int, int] = {
 34    400: INVALID_PARAMS,
 35    401: UNAUTHORIZED,
 36    403: FORBIDDEN,
 37    404: NOT_FOUND,
 38    500: INTERNAL_ERROR,
 39}
 40
 41
 42class MCPView(View):
 43    """An MCP server endpoint. Subclass to build your own.
 44
 45    `MCPView` is a Plain View — mount it in your URLs directly:
 46
 47        class AppMCP(MCPView):
 48            name = "myapp"
 49            tools = [Greet]
 50
 51        # app/urls.py
 52        path("mcp/", AppMCP, name="mcp")
 53
 54    MCPView itself does no authentication. Compose with `plain.auth.views.AuthView`
 55    for session auth (put `MCPView` first in the base list), or override
 56    `before_request()` to verify a token / custom credentials and raise
 57    `MCPUnauthorized` on failure. The raised exception is translated to a
 58    JSON-RPC 401 response by `handle_exception`.
 59
 60    Register tools declaratively on the class:
 61
 62        class AppMCP(MCPView):
 63            name = "myapp"
 64            tools = [Greet, Search]
 65
 66    Or imperatively, which is how third-party packages attach to a shared
 67    MCPView they don't own (e.g. `plain.admin.mcp.AdminMCP`):
 68
 69        AdminMCP.register_tool(PageViewStats)
 70
 71    Handling JSON-RPC methods beyond the tools capability: define a method
 72    named `rpc_<method>` where slashes in the JSON-RPC method become
 73    underscores. Advertise the matching capability by overriding
 74    `get_capabilities()`:
 75
 76        class AppMCP(MCPView):
 77            def rpc_prompts_list(self, params):
 78                return {"prompts": [...]}
 79
 80            def get_capabilities(self):
 81                caps = super().get_capabilities()
 82                caps["prompts"] = {"listChanged": False}
 83                return caps
 84    """
 85
 86    name: str = ""
 87    version: str = ""
 88    tools: list[type[MCPTool]] = []
 89    resources: list[type[MCPResource]] = []
 90
 91    @classmethod
 92    def register_tool(cls, tool_cls: type[MCPTool]) -> type[MCPTool]:
 93        """Attach a tool to this MCPView subclass.
 94
 95        Used by third-party packages to extend a shared MCPView subclass
 96        (e.g. `plain.admin.mcp.AdminMCP`) that they don't own:
 97
 98            AdminMCP.register_tool(PageViewStats)
 99        """
100        cls._append_unique("tools", tool_cls)
101        return tool_cls
102
103    @classmethod
104    def register_resource(cls, resource_cls: type[MCPResource]) -> type[MCPResource]:
105        """Attach a resource to this MCPView subclass. Parallels `register_tool`."""
106        cls._append_unique("resources", resource_cls)
107        return resource_cls
108
109    @classmethod
110    def _append_unique(cls, attr: str, item: type) -> None:
111        # Give this class its own list on first mutation so registrations
112        # don't bleed into the base class or sibling subclasses.
113        if attr not in cls.__dict__:
114            setattr(cls, attr, list(getattr(cls, attr)))
115        existing = getattr(cls, attr)
116        if item not in existing:
117            existing.append(item)
118
119    def get_tools(self) -> list[type[MCPTool]]:
120        """Return the tools available for this request.
121
122        Default: the class-level `tools` list, filtered through each
123        tool's `allowed_for(self)` classmethod. Override to skip per-tool
124        gates (e.g. superuser bypass) or to add dynamic tools. Returned
125        list must not be mutated by callers.
126        """
127        return [t for t in self.tools if t.allowed_for(self)]
128
129    def get_resources(self) -> list[type[MCPResource]]:
130        """Return the resources available for this request.
131
132        Default: the class-level `resources` list, filtered through each
133        resource's `allowed_for(self)` classmethod. Override to skip
134        per-resource gates or to add dynamic resources.
135        """
136        return [r for r in self.resources if r.allowed_for(self)]
137
138    def handle_exception(self, exc: Exception) -> ResponseBase:
139        """Translate framework exceptions into JSON-RPC responses.
140
141        MCP clients expect JSON bodies and can't follow HTTP redirects, so
142        we catch the standard auth/routing exceptions here and emit
143        JSON-RPC error objects at appropriate status codes.
144        """
145        if isinstance(exc, MCPUnauthorized):
146            return JsonResponse(
147                _error_response(None, UNAUTHORIZED, str(exc)), status_code=401
148            )
149
150        status = exc.status_code if isinstance(exc, HTTPException) else 500
151        if status >= 500:
152            log_exception(self.request, exc)
153            message = "Internal error"
154        else:
155            message = str(exc) or HTTPStatus(status).phrase
156        return JsonResponse(
157            _error_response(
158                None, _STATUS_TO_JSON_RPC_CODE.get(status, INTERNAL_ERROR), message
159            ),
160            status_code=status,
161        )
162
163    def post(self) -> ResponseBase:
164        response = self.handle_message(self.request.body)
165        if response is None:
166            return Response(status_code=204)
167        return JsonResponse(response)
168
169    def handle_message(self, raw: bytes | str) -> dict[str, Any] | None:
170        """Process a single JSON-RPC message and return the reply dict.
171
172        Returns None for notifications (no `id` field).
173        """
174        try:
175            message = json.loads(raw)
176        except (json.JSONDecodeError, ValueError) as e:
177            return _error_response(None, PARSE_ERROR, f"Parse error: {e}")
178
179        if not isinstance(message, dict):
180            return _error_response(
181                None, INVALID_REQUEST, "Request must be a JSON object"
182            )
183
184        if message.get("jsonrpc") != "2.0":
185            return _error_response(
186                message.get("id"),
187                INVALID_REQUEST,
188                "Missing or invalid 'jsonrpc' version; must be '2.0'",
189            )
190
191        msg_id = message.get("id")
192        method = message.get("method")
193
194        if not method or not isinstance(method, str):
195            return _error_response(msg_id, INVALID_REQUEST, "Missing or invalid method")
196
197        if msg_id is None:
198            return None
199
200        # MCP uses by-name params (objects) only. By-position (arrays) is
201        # valid JSON-RPC 2.0 in general but not how MCP methods are spec'd.
202        # Explicit null is accepted as "no params".
203        params = message.get("params")
204        if params is None:
205            params = {}
206        elif not isinstance(params, dict):
207            return _error_response(msg_id, INVALID_PARAMS, "'params' must be an object")
208
209        # Reject `_` so the `/` → `_` rewrite below can't be spoofed by
210        # a client sending `tools_list` instead of `tools/list`.
211        if "_" in method:
212            return _error_response(
213                msg_id, METHOD_NOT_FOUND, f"Unknown method: {method}"
214            )
215
216        handler = getattr(self, f"rpc_{method.replace('/', '_')}", None)
217        if handler is None:
218            return _error_response(
219                msg_id, METHOD_NOT_FOUND, f"Unknown method: {method}"
220            )
221
222        try:
223            result = handler(params)
224            return _success_response(msg_id, result)
225        except MCPInvalidParams as e:
226            return _error_response(msg_id, INVALID_PARAMS, str(e))
227        except Exception as e:
228            log_exception(self.request, e)
229            return _error_response(msg_id, INTERNAL_ERROR, "Internal error")
230
231    def get_capabilities(self) -> dict[str, Any]:
232        """Return the capabilities dict advertised to clients at `initialize`.
233
234        Override to advertise additional capabilities beyond `tools` /
235        `resources`. Call `super().get_capabilities()` to keep the
236        defaults.
237        """
238        capabilities: dict[str, Any] = {}
239        if self.get_tools():
240            capabilities["tools"] = {"listChanged": False}
241        if self.get_resources():
242            capabilities["resources"] = {
243                "subscribe": False,
244                "listChanged": False,
245            }
246        return capabilities
247
248    def rpc_initialize(self, params: dict[str, Any]) -> dict[str, Any]:
249        return {
250            "protocolVersion": PROTOCOL_VERSION,
251            "capabilities": self.get_capabilities(),
252            "serverInfo": {
253                "name": self.name,
254                "version": self.version or settings.VERSION,
255            },
256        }
257
258    def rpc_ping(self, params: dict[str, Any]) -> dict[str, Any]:
259        return {}
260
261    def rpc_tools_list(self, params: dict[str, Any]) -> dict[str, Any]:
262        return {
263            "tools": [
264                {
265                    "name": tool_cls.name,
266                    "description": tool_cls.description,
267                    "inputSchema": tool_cls.input_schema,
268                }
269                for tool_cls in self.get_tools()
270            ]
271        }
272
273    def rpc_tools_call(self, params: dict[str, Any]) -> dict[str, Any]:
274        tool_name = params.get("name")
275        if not tool_name:
276            raise MCPInvalidParams("Missing tool name")
277
278        # Unauthorized tools are filtered out by `get_tools()`, so they
279        # hit this same "unknown" path — existence isn't leaked.
280        tool_cls = next((t for t in self.get_tools() if t.name == tool_name), None)
281        if tool_cls is None:
282            return {
283                "content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}],
284                "isError": True,
285            }
286
287        arguments = params.get("arguments", {})
288        try:
289            tool = tool_cls(**arguments)
290        except TypeError as e:
291            return {
292                "content": [{"type": "text", "text": f"Invalid arguments: {e}"}],
293                "isError": True,
294            }
295        tool.mcp = self
296
297        try:
298            result = tool.run()
299        except Exception as e:
300            log_exception(self.request, e)
301            return {
302                "content": [{"type": "text", "text": "Tool execution failed"}],
303                "isError": True,
304            }
305
306        return {"content": _to_content_blocks(result)}
307
308    def rpc_resources_list(self, params: dict[str, Any]) -> dict[str, Any]:
309        # Only static-URI resources go here; templated ones live under
310        # resources/templates/list per the MCP spec.
311        return {
312            "resources": [
313                {
314                    "uri": resource_cls.uri,
315                    "name": resource_cls.name,
316                    "description": resource_cls.description,
317                    "mimeType": resource_cls.mime_type,
318                }
319                for resource_cls in self.get_resources()
320                if resource_cls.uri
321            ]
322        }
323
324    def rpc_resources_templates_list(self, params: dict[str, Any]) -> dict[str, Any]:
325        return {
326            "resourceTemplates": [
327                {
328                    "uriTemplate": resource_cls.uri_template,
329                    "name": resource_cls.name,
330                    "description": resource_cls.description,
331                    "mimeType": resource_cls.mime_type,
332                }
333                for resource_cls in self.get_resources()
334                if resource_cls.uri_template
335            ]
336        }
337
338    def rpc_resources_read(self, params: dict[str, Any]) -> dict[str, Any]:
339        uri = params.get("uri")
340        if not uri:
341            raise MCPInvalidParams("Missing uri")
342
343        # Unauthorized resources are filtered out by `get_resources()`, so
344        # they hit this same "unknown" path — existence isn't leaked.
345        resource_cls: type[MCPResource] | None = None
346        matched_params: dict[str, Any] = {}
347        for candidate in self.get_resources():
348            try:
349                match = candidate.matches(uri)
350            except (TypeError, ValueError) as e:
351                # Regex matched but coercion failed — URI looks like this
352                # resource's template but the params don't parse.
353                raise MCPInvalidParams(f"Invalid URI params: {e}") from e
354            if match is not None:
355                resource_cls = candidate
356                matched_params = match
357                break
358
359        if resource_cls is None:
360            raise MCPInvalidParams(f"Unknown resource: {uri}")
361
362        try:
363            resource = resource_cls(**matched_params)
364        except TypeError as e:
365            raise MCPInvalidParams(f"Invalid URI params: {e}") from e
366        resource.mcp = self
367
368        # Resources have no in-band error channel like tools' `isError`, so
369        # read() exceptions propagate and surface as INTERNAL_ERROR.
370        content = resource.read()
371
372        entry: dict[str, Any] = {"uri": uri, "mimeType": resource.mime_type}
373        if isinstance(content, bytes):
374            entry["blob"] = _b64(content)
375        else:
376            entry["text"] = content
377        return {"contents": [entry]}
378
379
380_CONTENT_BLOCK_TYPES = {"text", "image", "audio", "resource", "resource_link"}
381
382
383def _b64(data: bytes) -> str:
384    return base64.b64encode(data).decode("ascii")
385
386
387def _to_content_blocks(value: Any) -> list[dict[str, Any]]:
388    """Convert a tool's `run()` return value into MCP content blocks.
389
390    Recognized shapes (in order):
391    - `str` → one text block
392    - a dict with `type` in the known content types → that single block
393    - a list where every item is such a dict → those blocks, in order
394    - any other `dict`/`list` → one text block with the value JSON-serialized
395    - anything else → one text block with `str(value)`
396
397    `bytes` values in `data` (image/audio) or `resource.blob` (embedded
398    resource) are base64-encoded automatically.
399    """
400    if isinstance(value, str):
401        return [{"type": "text", "text": value}]
402    if isinstance(value, dict) and _is_content_block(value):
403        return [_encode_binary(value)]
404    if isinstance(value, list) and value and all(_is_content_block(v) for v in value):
405        return [_encode_binary(v) for v in value]
406    if isinstance(value, dict | list):
407        return [{"type": "text", "text": json.dumps(value, default=str)}]
408    return [{"type": "text", "text": str(value)}]
409
410
411def _is_content_block(value: Any) -> bool:
412    return isinstance(value, dict) and value.get("type") in _CONTENT_BLOCK_TYPES
413
414
415def _encode_binary(block: dict[str, Any]) -> dict[str, Any]:
416    """Encode `bytes` fields to base64 in-place on a content block copy."""
417    block_type = block.get("type")
418    if block_type in ("image", "audio"):
419        data = block.get("data")
420        if isinstance(data, bytes):
421            return {**block, "data": _b64(data)}
422    elif block_type == "resource":
423        resource = block.get("resource")
424        if isinstance(resource, dict):
425            blob = resource.get("blob")
426            if isinstance(blob, bytes):
427                return {
428                    **block,
429                    "resource": {**resource, "blob": _b64(blob)},
430                }
431    return block
432
433
434def _success_response(msg_id: Any, result: Any) -> dict[str, Any]:
435    return {"jsonrpc": "2.0", "id": msg_id, "result": result}
436
437
438def _error_response(msg_id: Any, code: int, message: str) -> dict[str, Any]:
439    return {
440        "jsonrpc": "2.0",
441        "id": msg_id,
442        "error": {"code": code, "message": message},
443    }