Skip to content

Python SDK

MCP Python SDK 提供了构建 MCP 服务器和客户端的完整工具集,支持现代 Python 异步编程模式。

安装

使用 pip

bash
pip install mcp

使用 uv(推荐)

bash
uv add mcp

开发版本

bash
pip install git+https://github.com/modelcontextprotocol/python-sdk.git

核心组件

1. 服务器 SDK

基础服务器

python
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Resource, Tool, Prompt
import asyncio

# 创建服务器实例
server = Server("my-server")

@server.list_resources()
async def list_resources() -> list[Resource]:
    """列出可用资源"""
    return [
        Resource(
            uri="file://config.json",
            name="配置文件",
            description="应用程序配置",
            mimeType="application/json"
        )
    ]

@server.read_resource()
async def read_resource(uri: str) -> str:
    """读取资源内容"""
    if uri == "file://config.json":
        return '{"app": "my-app", "version": "1.0.0"}'
    raise ValueError(f"未知资源: {uri}")

@server.list_tools()
async def list_tools() -> list[Tool]:
    """列出可用工具"""
    return [
        Tool(
            name="calculate",
            description="执行数学计算",
            inputSchema={
                "type": "object",
                "properties": {
                    "expression": {
                        "type": "string",
                        "description": "要计算的数学表达式"
                    }
                },
                "required": ["expression"]
            }
        )
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[dict]:
    """调用工具"""
    if name == "calculate":
        expression = arguments.get("expression", "")
        try:
            result = eval(expression)  # 注意:生产环境中应使用安全的计算方法
            return [{"type": "text", "text": f"结果: {result}"}]
        except Exception as e:
            return [{"type": "text", "text": f"计算错误: {e}"}]
    
    raise ValueError(f"未知工具: {name}")

async def main():
    """启动服务器"""
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

高级服务器功能

python
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
    Resource, Tool, Prompt, TextContent, ImageContent,
    EmbeddedResource, LoggingLevel
)
import asyncio
import json
import logging
from pathlib import Path

class AdvancedServer:
    def __init__(self, name: str):
        self.server = Server(name)
        self.setup_logging()
        self.setup_handlers()
        self.resources_cache = {}
    
    def setup_logging(self):
        """设置日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(self.server.name)
    
    def setup_handlers(self):
        """设置处理器"""
        
        @self.server.list_resources()
        async def list_resources() -> list[Resource]:
            """动态资源列表"""
            resources = []
            
            # 文件系统资源
            data_dir = Path("./data")
            if data_dir.exists():
                for file_path in data_dir.glob("**/*"):
                    if file_path.is_file():
                        resources.append(Resource(
                            uri=f"file://{file_path}",
                            name=file_path.name,
                            description=f"文件: {file_path}",
                            mimeType=self._get_mime_type(file_path)
                        ))
            
            # API 资源
            resources.extend([
                Resource(
                    uri="api://weather/current",
                    name="当前天气",
                    description="获取当前天气信息"
                ),
                Resource(
                    uri="api://news/latest",
                    name="最新新闻",
                    description="获取最新新闻"
                )
            ])
            
            self.logger.info(f"列出 {len(resources)} 个资源")
            return resources
        
        @self.server.read_resource()
        async def read_resource(uri: str) -> str | bytes:
            """读取资源内容"""
            self.logger.info(f"读取资源: {uri}")
            
            # 缓存检查
            if uri in self.resources_cache:
                self.logger.info(f"使用缓存: {uri}")
                return self.resources_cache[uri]
            
            try:
                if uri.startswith("file://"):
                    file_path = Path(uri[7:])  # 移除 "file://" 前缀
                    if file_path.exists():
                        content = file_path.read_text(encoding='utf-8')
                        self.resources_cache[uri] = content
                        return content
                    else:
                        raise FileNotFoundError(f"文件不存在: {file_path}")
                
                elif uri.startswith("api://"):
                    # 模拟 API 调用
                    if "weather" in uri:
                        content = json.dumps({
                            "temperature": 22,
                            "humidity": 65,
                            "condition": "晴朗"
                        }, ensure_ascii=False)
                    elif "news" in uri:
                        content = json.dumps({
                            "headlines": [
                                "科技新闻 1",
                                "科技新闻 2",
                                "科技新闻 3"
                            ]
                        }, ensure_ascii=False)
                    else:
                        raise ValueError(f"未知 API: {uri}")
                    
                    self.resources_cache[uri] = content
                    return content
                
                else:
                    raise ValueError(f"不支持的 URI 方案: {uri}")
            
            except Exception as e:
                self.logger.error(f"读取资源失败: {e}")
                raise
        
        @self.server.list_tools()
        async def list_tools() -> list[Tool]:
            """工具列表"""
            return [
                Tool(
                    name="file_search",
                    description="在文件中搜索文本",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "pattern": {
                                "type": "string",
                                "description": "搜索模式"
                            },
                            "directory": {
                                "type": "string",
                                "description": "搜索目录",
                                "default": "."
                            }
                        },
                        "required": ["pattern"]
                    }
                ),
                Tool(
                    name="web_request",
                    description="发送 HTTP 请求",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "url": {
                                "type": "string",
                                "description": "请求 URL"
                            },
                            "method": {
                                "type": "string",
                                "enum": ["GET", "POST", "PUT", "DELETE"],
                                "default": "GET"
                            },
                            "headers": {
                                "type": "object",
                                "description": "请求头"
                            },
                            "data": {
                                "type": "object",
                                "description": "请求数据"
                            }
                        },
                        "required": ["url"]
                    }
                ),
                Tool(
                    name="data_analysis",
                    description="分析数据并生成报告",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "data": {
                                "type": "array",
                                "description": "要分析的数据"
                            },
                            "analysis_type": {
                                "type": "string",
                                "enum": ["summary", "trend", "correlation"],
                                "description": "分析类型"
                            }
                        },
                        "required": ["data", "analysis_type"]
                    }
                )
            ]
        
        @self.server.call_tool()
        async def call_tool(name: str, arguments: dict) -> list[dict]:
            """调用工具"""
            self.logger.info(f"调用工具: {name} with {arguments}")
            
            try:
                if name == "file_search":
                    return await self._file_search(arguments)
                elif name == "web_request":
                    return await self._web_request(arguments)
                elif name == "data_analysis":
                    return await self._data_analysis(arguments)
                else:
                    raise ValueError(f"未知工具: {name}")
            
            except Exception as e:
                self.logger.error(f"工具调用失败: {e}")
                return [{"type": "text", "text": f"错误: {e}"}]
        
        @self.server.list_prompts()
        async def list_prompts() -> list[Prompt]:
            """提示列表"""
            return [
                Prompt(
                    name="code_review",
                    description="代码审查提示",
                    arguments=[
                        {
                            "name": "code",
                            "description": "要审查的代码",
                            "required": True
                        },
                        {
                            "name": "language",
                            "description": "编程语言",
                            "required": False
                        }
                    ]
                ),
                Prompt(
                    name="data_summary",
                    description="数据摘要提示",
                    arguments=[
                        {
                            "name": "data_source",
                            "description": "数据源",
                            "required": True
                        }
                    ]
                )
            ]
        
        @self.server.get_prompt()
        async def get_prompt(name: str, arguments: dict) -> dict:
            """获取提示"""
            self.logger.info(f"获取提示: {name} with {arguments}")
            
            if name == "code_review":
                code = arguments.get("code", "")
                language = arguments.get("language", "python")
                
                return {
                    "messages": [
                        {
                            "role": "user",
                            "content": {
                                "type": "text",
                                "text": f"请审查以下 {language} 代码,提供改进建议:\n\n```{language}\n{code}\n```"
                            }
                        }
                    ]
                }
            
            elif name == "data_summary":
                data_source = arguments.get("data_source", "")
                
                return {
                    "messages": [
                        {
                            "role": "user",
                            "content": {
                                "type": "text",
                                "text": f"请分析并总结来自 {data_source} 的数据,包括关键指标、趋势和洞察。"
                            }
                        }
                    ]
                }
            
            else:
                raise ValueError(f"未知提示: {name}")
    
    async def _file_search(self, arguments: dict) -> list[dict]:
        """文件搜索实现"""
        pattern = arguments.get("pattern", "")
        directory = arguments.get("directory", ".")
        
        results = []
        search_dir = Path(directory)
        
        if search_dir.exists():
            for file_path in search_dir.rglob("*.py"):  # 只搜索 Python 文件
                try:
                    content = file_path.read_text(encoding='utf-8')
                    if pattern.lower() in content.lower():
                        results.append(f"找到匹配: {file_path}")
                except Exception as e:
                    self.logger.warning(f"读取文件失败 {file_path}: {e}")
        
        return [{"type": "text", "text": f"搜索结果:\n" + "\n".join(results)}]
    
    async def _web_request(self, arguments: dict) -> list[dict]:
        """Web 请求实现"""
        url = arguments.get("url", "")
        method = arguments.get("method", "GET")
        
        # 这里应该使用真实的 HTTP 客户端,如 aiohttp
        # 为了示例,我们返回模拟响应
        response = {
            "status": 200,
            "url": url,
            "method": method,
            "data": "模拟响应数据"
        }
        
        return [{"type": "text", "text": json.dumps(response, ensure_ascii=False, indent=2)}]
    
    async def _data_analysis(self, arguments: dict) -> list[dict]:
        """数据分析实现"""
        data = arguments.get("data", [])
        analysis_type = arguments.get("analysis_type", "summary")
        
        if analysis_type == "summary":
            summary = {
                "count": len(data),
                "type": type(data[0]).__name__ if data else "unknown",
                "sample": data[:3] if data else []
            }
            result = f"数据摘要:\n{json.dumps(summary, ensure_ascii=False, indent=2)}"
        
        elif analysis_type == "trend":
            # 简单趋势分析
            if all(isinstance(x, (int, float)) for x in data):
                if len(data) > 1:
                    trend = "上升" if data[-1] > data[0] else "下降"
                else:
                    trend = "无法确定"
                result = f"趋势分析: {trend}"
            else:
                result = "趋势分析: 数据类型不支持"
        
        else:
            result = f"分析类型 '{analysis_type}' 暂未实现"
        
        return [{"type": "text", "text": result}]
    
    def _get_mime_type(self, file_path: Path) -> str:
        """获取文件 MIME 类型"""
        suffix = file_path.suffix.lower()
        mime_types = {
            '.json': 'application/json',
            '.txt': 'text/plain',
            '.py': 'text/x-python',
            '.js': 'text/javascript',
            '.html': 'text/html',
            '.css': 'text/css',
            '.md': 'text/markdown'
        }
        return mime_types.get(suffix, 'application/octet-stream')
    
    async def run(self):
        """运行服务器"""
        self.logger.info(f"启动服务器: {self.server.name}")
        
        async with stdio_server() as (read_stream, write_stream):
            await self.server.run(
                read_stream,
                write_stream,
                self.server.create_initialization_options()
            )

# 使用示例
async def main():
    server = AdvancedServer("advanced-mcp-server")
    await server.run()

if __name__ == "__main__":
    asyncio.run(main())

2. 客户端 SDK

基础客户端

python
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters
import asyncio

async def basic_client():
    """基础客户端示例"""
    # 配置服务器参数
    server_params = StdioServerParameters(
        command="python",
        args=["server.py"]
    )
    
    # 创建客户端
    client = Client()
    
    async with client.connect(server_params) as session:
        # 列出资源
        resources = await session.list_resources()
        print(f"可用资源: {len(resources)}")
        
        # 读取资源
        if resources:
            content = await session.read_resource(resources[0].uri)
            print(f"资源内容: {content}")
        
        # 列出工具
        tools = await session.list_tools()
        print(f"可用工具: {len(tools)}")
        
        # 调用工具
        if tools:
            result = await session.call_tool(
                tools[0].name,
                {"expression": "2 + 3"}
            )
            print(f"工具结果: {result}")

if __name__ == "__main__":
    asyncio.run(basic_client())

高级客户端

python
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters
from mcp.client.sse import SseServerParameters
import asyncio
import json
from typing import Dict, List, Any

class AdvancedClient:
    def __init__(self):
        self.client = Client()
        self.sessions = {}
        self.capabilities_cache = {}
    
    async def connect_stdio_server(self, name: str, command: str, args: List[str]):
        """连接 STDIO 服务器"""
        server_params = StdioServerParameters(command=command, args=args)
        session = await self.client.connect(server_params)
        self.sessions[name] = session
        await self._cache_capabilities(name, session)
        return session
    
    async def connect_sse_server(self, name: str, url: str):
        """连接 SSE 服务器"""
        server_params = SseServerParameters(url=url)
        session = await self.client.connect(server_params)
        self.sessions[name] = session
        await self._cache_capabilities(name, session)
        return session
    
    async def _cache_capabilities(self, name: str, session):
        """缓存服务器能力"""
        try:
            capabilities = {
                'resources': await session.list_resources(),
                'tools': await session.list_tools(),
                'prompts': await session.list_prompts()
            }
            self.capabilities_cache[name] = capabilities
        except Exception as e:
            print(f"缓存能力失败 {name}: {e}")
    
    async def get_server_info(self, server_name: str) -> Dict[str, Any]:
        """获取服务器信息"""
        if server_name not in self.capabilities_cache:
            return {}
        
        caps = self.capabilities_cache[server_name]
        return {
            'resources_count': len(caps.get('resources', [])),
            'tools_count': len(caps.get('tools', [])),
            'prompts_count': len(caps.get('prompts', [])),
            'resources': [r.name for r in caps.get('resources', [])],
            'tools': [t.name for t in caps.get('tools', [])],
            'prompts': [p.name for p in caps.get('prompts', [])]
        }
    
    async def execute_workflow(self, workflow: List[Dict[str, Any]]):
        """执行工作流"""
        results = []
        
        for step in workflow:
            step_type = step.get('type')
            server_name = step.get('server')
            
            if server_name not in self.sessions:
                results.append({
                    'step': step,
                    'error': f'服务器 {server_name} 未连接'
                })
                continue
            
            session = self.sessions[server_name]
            
            try:
                if step_type == 'read_resource':
                    uri = step.get('uri')
                    result = await session.read_resource(uri)
                    results.append({
                        'step': step,
                        'result': result
                    })
                
                elif step_type == 'call_tool':
                    tool_name = step.get('tool')
                    arguments = step.get('arguments', {})
                    result = await session.call_tool(tool_name, arguments)
                    results.append({
                        'step': step,
                        'result': result
                    })
                
                elif step_type == 'get_prompt':
                    prompt_name = step.get('prompt')
                    arguments = step.get('arguments', {})
                    result = await session.get_prompt(prompt_name, arguments)
                    results.append({
                        'step': step,
                        'result': result
                    })
                
                else:
                    results.append({
                        'step': step,
                        'error': f'未知步骤类型: {step_type}'
                    })
            
            except Exception as e:
                results.append({
                    'step': step,
                    'error': str(e)
                })
        
        return results
    
    async def batch_operations(self, operations: List[Dict[str, Any]]):
        """批量操作"""
        tasks = []
        
        for op in operations:
            if op['type'] == 'read_resource':
                task = self._read_resource_task(op)
            elif op['type'] == 'call_tool':
                task = self._call_tool_task(op)
            else:
                continue
            
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
    
    async def _read_resource_task(self, op: Dict[str, Any]):
        """资源读取任务"""
        server_name = op['server']
        uri = op['uri']
        session = self.sessions[server_name]
        return await session.read_resource(uri)
    
    async def _call_tool_task(self, op: Dict[str, Any]):
        """工具调用任务"""
        server_name = op['server']
        tool_name = op['tool']
        arguments = op.get('arguments', {})
        session = self.sessions[server_name]
        return await session.call_tool(tool_name, arguments)
    
    async def monitor_resources(self, server_name: str, interval: float = 5.0):
        """监控资源变化"""
        if server_name not in self.sessions:
            print(f"服务器 {server_name} 未连接")
            return
        
        session = self.sessions[server_name]
        last_resources = set()
        
        while True:
            try:
                resources = await session.list_resources()
                current_resources = {r.uri for r in resources}
                
                # 检查变化
                added = current_resources - last_resources
                removed = last_resources - current_resources
                
                if added:
                    print(f"新增资源: {added}")
                if removed:
                    print(f"移除资源: {removed}")
                
                last_resources = current_resources
                await asyncio.sleep(interval)
            
            except Exception as e:
                print(f"监控资源失败: {e}")
                await asyncio.sleep(interval)
    
    async def close_all(self):
        """关闭所有连接"""
        for name, session in self.sessions.items():
            try:
                await session.close()
                print(f"已关闭连接: {name}")
            except Exception as e:
                print(f"关闭连接失败 {name}: {e}")
        
        self.sessions.clear()
        self.capabilities_cache.clear()

# 使用示例
async def advanced_client_example():
    client = AdvancedClient()
    
    try:
        # 连接多个服务器
        await client.connect_stdio_server(
            "file-server",
            "python",
            ["file_server.py"]
        )
        
        await client.connect_stdio_server(
            "calc-server",
            "python",
            ["calc_server.py"]
        )
        
        # 获取服务器信息
        for server_name in client.sessions:
            info = await client.get_server_info(server_name)
            print(f"服务器 {server_name}: {info}")
        
        # 执行工作流
        workflow = [
            {
                'type': 'read_resource',
                'server': 'file-server',
                'uri': 'file://config.json'
            },
            {
                'type': 'call_tool',
                'server': 'calc-server',
                'tool': 'calculate',
                'arguments': {'expression': '10 * 5'}
            }
        ]
        
        results = await client.execute_workflow(workflow)
        print(f"工作流结果: {results}")
        
        # 批量操作
        operations = [
            {
                'type': 'call_tool',
                'server': 'calc-server',
                'tool': 'calculate',
                'arguments': {'expression': '1 + 1'}
            },
            {
                'type': 'call_tool',
                'server': 'calc-server',
                'tool': 'calculate',
                'arguments': {'expression': '2 * 3'}
            }
        ]
        
        batch_results = await client.batch_operations(operations)
        print(f"批量操作结果: {batch_results}")
    
    finally:
        await client.close_all()

if __name__ == "__main__":
    asyncio.run(advanced_client_example())

类型系统

核心类型

python
from mcp.types import (
    # 基础类型
    Resource, Tool, Prompt,
    TextContent, ImageContent, EmbeddedResource,
    
    # 消息类型
    InitializeRequest, InitializeResponse,
    ListResourcesRequest, ListResourcesResponse,
    ReadResourceRequest, ReadResourceResponse,
    ListToolsRequest, ListToolsResponse,
    CallToolRequest, CallToolResponse,
    ListPromptsRequest, ListPromptsResponse,
    GetPromptRequest, GetPromptResponse,
    
    # 错误类型
    McpError, InvalidRequestError, MethodNotFoundError,
    InvalidParamsError, InternalError,
    
    # 传输类型
    StdioServerParameters, SseServerParameters,
    
    # 日志类型
    LoggingLevel, LoggingMessage
)

# 资源定义
resource = Resource(
    uri="file://example.txt",
    name="示例文件",
    description="这是一个示例文件",
    mimeType="text/plain"
)

# 工具定义
tool = Tool(
    name="my_tool",
    description="我的工具",
    inputSchema={
        "type": "object",
        "properties": {
            "param1": {"type": "string"},
            "param2": {"type": "number"}
        },
        "required": ["param1"]
    }
)

# 提示定义
prompt = Prompt(
    name="my_prompt",
    description="我的提示",
    arguments=[
        {
            "name": "context",
            "description": "上下文信息",
            "required": True
        }
    ]
)

错误处理

自定义错误处理

python
from mcp.types import McpError, InvalidParamsError
from mcp.server import Server
import logging

server = Server("error-handling-server")
logger = logging.getLogger(__name__)

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[dict]:
    """带错误处理的工具调用"""
    try:
        if name == "divide":
            a = arguments.get("a")
            b = arguments.get("b")
            
            # 参数验证
            if a is None or b is None:
                raise InvalidParamsError("缺少必需参数 'a' 或 'b'")
            
            if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
                raise InvalidParamsError("参数必须是数字")
            
            if b == 0:
                raise InvalidParamsError("除数不能为零")
            
            result = a / b
            return [{"type": "text", "text": f"结果: {result}"}]
        
        else:
            raise McpError(f"未知工具: {name}", code=-32601)
    
    except InvalidParamsError:
        # 重新抛出参数错误
        raise
    except Exception as e:
        # 记录并转换为内部错误
        logger.error(f"工具调用内部错误: {e}")
        raise McpError("内部服务器错误", code=-32603)

测试

单元测试

python
import pytest
import asyncio
from mcp.server import Server
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters

@pytest.fixture
async def test_server():
    """测试服务器夹具"""
    server = Server("test-server")
    
    @server.list_tools()
    async def list_tools():
        return [
            {
                "name": "add",
                "description": "加法运算",
                "inputSchema": {
                    "type": "object",
                    "properties": {
                        "a": {"type": "number"},
                        "b": {"type": "number"}
                    },
                    "required": ["a", "b"]
                }
            }
        ]
    
    @server.call_tool()
    async def call_tool(name: str, arguments: dict):
        if name == "add":
            return [{"type": "text", "text": str(arguments["a"] + arguments["b"])}]
        raise ValueError(f"未知工具: {name}")
    
    return server

@pytest.mark.asyncio
async def test_tool_call(test_server):
    """测试工具调用"""
    # 这里需要实际的服务器进程来测试
    # 在实际测试中,您需要启动服务器进程
    pass

class MockSession:
    """模拟会话用于测试"""
    
    async def list_tools(self):
        return [
            {
                "name": "test_tool",
                "description": "测试工具"
            }
        ]
    
    async def call_tool(self, name: str, arguments: dict):
        if name == "test_tool":
            return [{"type": "text", "text": "测试结果"}]
        raise ValueError(f"未知工具: {name}")

@pytest.mark.asyncio
async def test_mock_session():
    """测试模拟会话"""
    session = MockSession()
    
    # 测试列出工具
    tools = await session.list_tools()
    assert len(tools) == 1
    assert tools[0]["name"] == "test_tool"
    
    # 测试调用工具
    result = await session.call_tool("test_tool", {})
    assert result[0]["text"] == "测试结果"
    
    # 测试错误情况
    with pytest.raises(ValueError):
        await session.call_tool("unknown_tool", {})

部署

Docker 部署

dockerfile
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt

# 复制代码
COPY . .

# 暴露端口(如果使用 HTTP 传输)
EXPOSE 8000

# 启动命令
CMD ["python", "server.py"]
yaml
# docker-compose.yml
version: '3.8'

services:
  mcp-server:
    build: .
    ports:
      - "8000:8000"
    environment:
      - LOG_LEVEL=INFO
    volumes:
      - ./data:/app/data
    restart: unless-stopped

生产环境配置

python
import os
import logging
from mcp.server import Server
from mcp.server.stdio import stdio_server

# 环境配置
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
DATA_DIR = os.getenv("DATA_DIR", "./data")
MAX_CONNECTIONS = int(os.getenv("MAX_CONNECTIONS", "100"))

# 配置日志
logging.basicConfig(
    level=getattr(logging, LOG_LEVEL),
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('mcp-server.log'),
        logging.StreamHandler()
    ]
)

class ProductionServer:
    def __init__(self):
        self.server = Server("production-mcp-server")
        self.logger = logging.getLogger(__name__)
        self.setup_handlers()
    
    def setup_handlers(self):
        """设置生产环境处理器"""
        # 实现生产级别的处理器
        pass
    
    async def run(self):
        """运行生产服务器"""
        self.logger.info("启动生产服务器")
        
        try:
            async with stdio_server() as (read_stream, write_stream):
                await self.server.run(
                    read_stream,
                    write_stream,
                    self.server.create_initialization_options()
                )
        except Exception as e:
            self.logger.error(f"服务器错误: {e}")
            raise

if __name__ == "__main__":
    server = ProductionServer()
    asyncio.run(server.run())

最佳实践

1. 性能优化

python
import asyncio
from functools import lru_cache
import time

class OptimizedServer:
    def __init__(self):
        self.server = Server("optimized-server")
        self.cache = {}
        self.cache_ttl = {}
        
    @lru_cache(maxsize=128)
    def expensive_computation(self, input_data: str) -> str:
        """使用 LRU 缓存的昂贵计算"""
        # 模拟昂贵的计算
        time.sleep(0.1)
        return f"处理结果: {input_data}"
    
    async def cached_resource_read(self, uri: str, ttl: int = 300):
        """带 TTL 的资源缓存"""
        current_time = time.time()
        
        # 检查缓存
        if uri in self.cache:
            cached_time = self.cache_ttl.get(uri, 0)
            if current_time - cached_time < ttl:
                return self.cache[uri]
        
        # 读取并缓存
        content = await self._read_resource_from_source(uri)
        self.cache[uri] = content
        self.cache_ttl[uri] = current_time
        
        return content
    
    async def _read_resource_from_source(self, uri: str) -> str:
        """从源读取资源"""
        # 实际的资源读取逻辑
        await asyncio.sleep(0.01)  # 模拟 I/O
        return f"资源内容: {uri}"

2. 安全性

python
import hashlib
import hmac
import os
from typing import Optional

class SecureServer:
    def __init__(self, api_key: Optional[str] = None):
        self.server = Server("secure-server")
        self.api_key = api_key or os.getenv("MCP_API_KEY")
        self.allowed_paths = {"/safe/path1", "/safe/path2"}
    
    def verify_signature(self, data: str, signature: str) -> bool:
        """验证请求签名"""
        if not self.api_key:
            return True  # 如果没有配置密钥,跳过验证
        
        expected = hmac.new(
            self.api_key.encode(),
            data.encode(),
            hashlib.sha256
        ).hexdigest()
        
        return hmac.compare_digest(expected, signature)
    
    def sanitize_path(self, path: str) -> str:
        """清理文件路径"""
        # 移除危险字符
        path = path.replace("..", "").replace("//", "/")
        
        # 检查是否在允许的路径内
        if not any(path.startswith(allowed) for allowed in self.allowed_paths):
            raise ValueError(f"路径不被允许: {path}")
        
        return path
    
    async def secure_file_read(self, path: str) -> str:
        """安全的文件读取"""
        safe_path = self.sanitize_path(path)
        
        try:
            with open(safe_path, 'r', encoding='utf-8') as f:
                return f.read()
        except FileNotFoundError:
            raise ValueError(f"文件不存在: {safe_path}")
        except PermissionError:
            raise ValueError(f"没有权限访问: {safe_path}")

3. 监控和日志

python
import time
import json
from functools import wraps

class MonitoredServer:
    def __init__(self):
        self.server = Server("monitored-server")
        self.metrics = {
            'requests': 0,
            'errors': 0,
            'response_times': []
        }
    
    def monitor_performance(self, func):
        """性能监控装饰器"""
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            self.metrics['requests'] += 1
            
            try:
                result = await func(*args, **kwargs)
                return result
            except Exception as e:
                self.metrics['errors'] += 1
                self.logger.error(f"函数 {func.__name__} 出错: {e}")
                raise
            finally:
                duration = time.time() - start_time
                self.metrics['response_times'].append(duration)
                
                # 保持最近 1000 次请求的响应时间
                if len(self.metrics['response_times']) > 1000:
                    self.metrics['response_times'] = self.metrics['response_times'][-1000:]
        
        return wrapper
    
    def get_metrics(self) -> dict:
        """获取性能指标"""
        response_times = self.metrics['response_times']
        
        if response_times:
            avg_response_time = sum(response_times) / len(response_times)
            max_response_time = max(response_times)
            min_response_time = min(response_times)
        else:
            avg_response_time = max_response_time = min_response_time = 0
        
        return {
            'total_requests': self.metrics['requests'],
            'total_errors': self.metrics['errors'],
            'error_rate': self.metrics['errors'] / max(self.metrics['requests'], 1),
            'avg_response_time': avg_response_time,
            'max_response_time': max_response_time,
            'min_response_time': min_response_time
        }

下一步

🚀 探索模型上下文协议的无限可能 | 连接 AI 与世界的桥梁 | 让智能更智能