主题
Python SDK
MCP Python SDK 提供了构建 MCP 服务器和客户端的完整工具集,支持现代 Python 异步编程模式。
安装
使用 pip
bash
pip install mcp
使用 uv(推荐)
bash
uv add mcp
开发版本
bash
pip install git+https://github.com/modelcontextprotocol/python-sdk.git
核心组件
1. 服务器 SDK
基础服务器
python
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Resource, Tool, Prompt
import asyncio
# 创建服务器实例
server = Server("my-server")
@server.list_resources()
async def list_resources() -> list[Resource]:
"""列出可用资源"""
return [
Resource(
uri="file://config.json",
name="配置文件",
description="应用程序配置",
mimeType="application/json"
)
]
@server.read_resource()
async def read_resource(uri: str) -> str:
"""读取资源内容"""
if uri == "file://config.json":
return '{"app": "my-app", "version": "1.0.0"}'
raise ValueError(f"未知资源: {uri}")
@server.list_tools()
async def list_tools() -> list[Tool]:
"""列出可用工具"""
return [
Tool(
name="calculate",
description="执行数学计算",
inputSchema={
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "要计算的数学表达式"
}
},
"required": ["expression"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[dict]:
"""调用工具"""
if name == "calculate":
expression = arguments.get("expression", "")
try:
result = eval(expression) # 注意:生产环境中应使用安全的计算方法
return [{"type": "text", "text": f"结果: {result}"}]
except Exception as e:
return [{"type": "text", "text": f"计算错误: {e}"}]
raise ValueError(f"未知工具: {name}")
async def main():
"""启动服务器"""
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
高级服务器功能
python
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource, Tool, Prompt, TextContent, ImageContent,
EmbeddedResource, LoggingLevel
)
import asyncio
import json
import logging
from pathlib import Path
class AdvancedServer:
def __init__(self, name: str):
self.server = Server(name)
self.setup_logging()
self.setup_handlers()
self.resources_cache = {}
def setup_logging(self):
"""设置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(self.server.name)
def setup_handlers(self):
"""设置处理器"""
@self.server.list_resources()
async def list_resources() -> list[Resource]:
"""动态资源列表"""
resources = []
# 文件系统资源
data_dir = Path("./data")
if data_dir.exists():
for file_path in data_dir.glob("**/*"):
if file_path.is_file():
resources.append(Resource(
uri=f"file://{file_path}",
name=file_path.name,
description=f"文件: {file_path}",
mimeType=self._get_mime_type(file_path)
))
# API 资源
resources.extend([
Resource(
uri="api://weather/current",
name="当前天气",
description="获取当前天气信息"
),
Resource(
uri="api://news/latest",
name="最新新闻",
description="获取最新新闻"
)
])
self.logger.info(f"列出 {len(resources)} 个资源")
return resources
@self.server.read_resource()
async def read_resource(uri: str) -> str | bytes:
"""读取资源内容"""
self.logger.info(f"读取资源: {uri}")
# 缓存检查
if uri in self.resources_cache:
self.logger.info(f"使用缓存: {uri}")
return self.resources_cache[uri]
try:
if uri.startswith("file://"):
file_path = Path(uri[7:]) # 移除 "file://" 前缀
if file_path.exists():
content = file_path.read_text(encoding='utf-8')
self.resources_cache[uri] = content
return content
else:
raise FileNotFoundError(f"文件不存在: {file_path}")
elif uri.startswith("api://"):
# 模拟 API 调用
if "weather" in uri:
content = json.dumps({
"temperature": 22,
"humidity": 65,
"condition": "晴朗"
}, ensure_ascii=False)
elif "news" in uri:
content = json.dumps({
"headlines": [
"科技新闻 1",
"科技新闻 2",
"科技新闻 3"
]
}, ensure_ascii=False)
else:
raise ValueError(f"未知 API: {uri}")
self.resources_cache[uri] = content
return content
else:
raise ValueError(f"不支持的 URI 方案: {uri}")
except Exception as e:
self.logger.error(f"读取资源失败: {e}")
raise
@self.server.list_tools()
async def list_tools() -> list[Tool]:
"""工具列表"""
return [
Tool(
name="file_search",
description="在文件中搜索文本",
inputSchema={
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "搜索模式"
},
"directory": {
"type": "string",
"description": "搜索目录",
"default": "."
}
},
"required": ["pattern"]
}
),
Tool(
name="web_request",
description="发送 HTTP 请求",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "请求 URL"
},
"method": {
"type": "string",
"enum": ["GET", "POST", "PUT", "DELETE"],
"default": "GET"
},
"headers": {
"type": "object",
"description": "请求头"
},
"data": {
"type": "object",
"description": "请求数据"
}
},
"required": ["url"]
}
),
Tool(
name="data_analysis",
description="分析数据并生成报告",
inputSchema={
"type": "object",
"properties": {
"data": {
"type": "array",
"description": "要分析的数据"
},
"analysis_type": {
"type": "string",
"enum": ["summary", "trend", "correlation"],
"description": "分析类型"
}
},
"required": ["data", "analysis_type"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[dict]:
"""调用工具"""
self.logger.info(f"调用工具: {name} with {arguments}")
try:
if name == "file_search":
return await self._file_search(arguments)
elif name == "web_request":
return await self._web_request(arguments)
elif name == "data_analysis":
return await self._data_analysis(arguments)
else:
raise ValueError(f"未知工具: {name}")
except Exception as e:
self.logger.error(f"工具调用失败: {e}")
return [{"type": "text", "text": f"错误: {e}"}]
@self.server.list_prompts()
async def list_prompts() -> list[Prompt]:
"""提示列表"""
return [
Prompt(
name="code_review",
description="代码审查提示",
arguments=[
{
"name": "code",
"description": "要审查的代码",
"required": True
},
{
"name": "language",
"description": "编程语言",
"required": False
}
]
),
Prompt(
name="data_summary",
description="数据摘要提示",
arguments=[
{
"name": "data_source",
"description": "数据源",
"required": True
}
]
)
]
@self.server.get_prompt()
async def get_prompt(name: str, arguments: dict) -> dict:
"""获取提示"""
self.logger.info(f"获取提示: {name} with {arguments}")
if name == "code_review":
code = arguments.get("code", "")
language = arguments.get("language", "python")
return {
"messages": [
{
"role": "user",
"content": {
"type": "text",
"text": f"请审查以下 {language} 代码,提供改进建议:\n\n```{language}\n{code}\n```"
}
}
]
}
elif name == "data_summary":
data_source = arguments.get("data_source", "")
return {
"messages": [
{
"role": "user",
"content": {
"type": "text",
"text": f"请分析并总结来自 {data_source} 的数据,包括关键指标、趋势和洞察。"
}
}
]
}
else:
raise ValueError(f"未知提示: {name}")
async def _file_search(self, arguments: dict) -> list[dict]:
"""文件搜索实现"""
pattern = arguments.get("pattern", "")
directory = arguments.get("directory", ".")
results = []
search_dir = Path(directory)
if search_dir.exists():
for file_path in search_dir.rglob("*.py"): # 只搜索 Python 文件
try:
content = file_path.read_text(encoding='utf-8')
if pattern.lower() in content.lower():
results.append(f"找到匹配: {file_path}")
except Exception as e:
self.logger.warning(f"读取文件失败 {file_path}: {e}")
return [{"type": "text", "text": f"搜索结果:\n" + "\n".join(results)}]
async def _web_request(self, arguments: dict) -> list[dict]:
"""Web 请求实现"""
url = arguments.get("url", "")
method = arguments.get("method", "GET")
# 这里应该使用真实的 HTTP 客户端,如 aiohttp
# 为了示例,我们返回模拟响应
response = {
"status": 200,
"url": url,
"method": method,
"data": "模拟响应数据"
}
return [{"type": "text", "text": json.dumps(response, ensure_ascii=False, indent=2)}]
async def _data_analysis(self, arguments: dict) -> list[dict]:
"""数据分析实现"""
data = arguments.get("data", [])
analysis_type = arguments.get("analysis_type", "summary")
if analysis_type == "summary":
summary = {
"count": len(data),
"type": type(data[0]).__name__ if data else "unknown",
"sample": data[:3] if data else []
}
result = f"数据摘要:\n{json.dumps(summary, ensure_ascii=False, indent=2)}"
elif analysis_type == "trend":
# 简单趋势分析
if all(isinstance(x, (int, float)) for x in data):
if len(data) > 1:
trend = "上升" if data[-1] > data[0] else "下降"
else:
trend = "无法确定"
result = f"趋势分析: {trend}"
else:
result = "趋势分析: 数据类型不支持"
else:
result = f"分析类型 '{analysis_type}' 暂未实现"
return [{"type": "text", "text": result}]
def _get_mime_type(self, file_path: Path) -> str:
"""获取文件 MIME 类型"""
suffix = file_path.suffix.lower()
mime_types = {
'.json': 'application/json',
'.txt': 'text/plain',
'.py': 'text/x-python',
'.js': 'text/javascript',
'.html': 'text/html',
'.css': 'text/css',
'.md': 'text/markdown'
}
return mime_types.get(suffix, 'application/octet-stream')
async def run(self):
"""运行服务器"""
self.logger.info(f"启动服务器: {self.server.name}")
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
# 使用示例
async def main():
server = AdvancedServer("advanced-mcp-server")
await server.run()
if __name__ == "__main__":
asyncio.run(main())
2. 客户端 SDK
基础客户端
python
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters
import asyncio
async def basic_client():
"""基础客户端示例"""
# 配置服务器参数
server_params = StdioServerParameters(
command="python",
args=["server.py"]
)
# 创建客户端
client = Client()
async with client.connect(server_params) as session:
# 列出资源
resources = await session.list_resources()
print(f"可用资源: {len(resources)}")
# 读取资源
if resources:
content = await session.read_resource(resources[0].uri)
print(f"资源内容: {content}")
# 列出工具
tools = await session.list_tools()
print(f"可用工具: {len(tools)}")
# 调用工具
if tools:
result = await session.call_tool(
tools[0].name,
{"expression": "2 + 3"}
)
print(f"工具结果: {result}")
if __name__ == "__main__":
asyncio.run(basic_client())
高级客户端
python
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters
from mcp.client.sse import SseServerParameters
import asyncio
import json
from typing import Dict, List, Any
class AdvancedClient:
def __init__(self):
self.client = Client()
self.sessions = {}
self.capabilities_cache = {}
async def connect_stdio_server(self, name: str, command: str, args: List[str]):
"""连接 STDIO 服务器"""
server_params = StdioServerParameters(command=command, args=args)
session = await self.client.connect(server_params)
self.sessions[name] = session
await self._cache_capabilities(name, session)
return session
async def connect_sse_server(self, name: str, url: str):
"""连接 SSE 服务器"""
server_params = SseServerParameters(url=url)
session = await self.client.connect(server_params)
self.sessions[name] = session
await self._cache_capabilities(name, session)
return session
async def _cache_capabilities(self, name: str, session):
"""缓存服务器能力"""
try:
capabilities = {
'resources': await session.list_resources(),
'tools': await session.list_tools(),
'prompts': await session.list_prompts()
}
self.capabilities_cache[name] = capabilities
except Exception as e:
print(f"缓存能力失败 {name}: {e}")
async def get_server_info(self, server_name: str) -> Dict[str, Any]:
"""获取服务器信息"""
if server_name not in self.capabilities_cache:
return {}
caps = self.capabilities_cache[server_name]
return {
'resources_count': len(caps.get('resources', [])),
'tools_count': len(caps.get('tools', [])),
'prompts_count': len(caps.get('prompts', [])),
'resources': [r.name for r in caps.get('resources', [])],
'tools': [t.name for t in caps.get('tools', [])],
'prompts': [p.name for p in caps.get('prompts', [])]
}
async def execute_workflow(self, workflow: List[Dict[str, Any]]):
"""执行工作流"""
results = []
for step in workflow:
step_type = step.get('type')
server_name = step.get('server')
if server_name not in self.sessions:
results.append({
'step': step,
'error': f'服务器 {server_name} 未连接'
})
continue
session = self.sessions[server_name]
try:
if step_type == 'read_resource':
uri = step.get('uri')
result = await session.read_resource(uri)
results.append({
'step': step,
'result': result
})
elif step_type == 'call_tool':
tool_name = step.get('tool')
arguments = step.get('arguments', {})
result = await session.call_tool(tool_name, arguments)
results.append({
'step': step,
'result': result
})
elif step_type == 'get_prompt':
prompt_name = step.get('prompt')
arguments = step.get('arguments', {})
result = await session.get_prompt(prompt_name, arguments)
results.append({
'step': step,
'result': result
})
else:
results.append({
'step': step,
'error': f'未知步骤类型: {step_type}'
})
except Exception as e:
results.append({
'step': step,
'error': str(e)
})
return results
async def batch_operations(self, operations: List[Dict[str, Any]]):
"""批量操作"""
tasks = []
for op in operations:
if op['type'] == 'read_resource':
task = self._read_resource_task(op)
elif op['type'] == 'call_tool':
task = self._call_tool_task(op)
else:
continue
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def _read_resource_task(self, op: Dict[str, Any]):
"""资源读取任务"""
server_name = op['server']
uri = op['uri']
session = self.sessions[server_name]
return await session.read_resource(uri)
async def _call_tool_task(self, op: Dict[str, Any]):
"""工具调用任务"""
server_name = op['server']
tool_name = op['tool']
arguments = op.get('arguments', {})
session = self.sessions[server_name]
return await session.call_tool(tool_name, arguments)
async def monitor_resources(self, server_name: str, interval: float = 5.0):
"""监控资源变化"""
if server_name not in self.sessions:
print(f"服务器 {server_name} 未连接")
return
session = self.sessions[server_name]
last_resources = set()
while True:
try:
resources = await session.list_resources()
current_resources = {r.uri for r in resources}
# 检查变化
added = current_resources - last_resources
removed = last_resources - current_resources
if added:
print(f"新增资源: {added}")
if removed:
print(f"移除资源: {removed}")
last_resources = current_resources
await asyncio.sleep(interval)
except Exception as e:
print(f"监控资源失败: {e}")
await asyncio.sleep(interval)
async def close_all(self):
"""关闭所有连接"""
for name, session in self.sessions.items():
try:
await session.close()
print(f"已关闭连接: {name}")
except Exception as e:
print(f"关闭连接失败 {name}: {e}")
self.sessions.clear()
self.capabilities_cache.clear()
# 使用示例
async def advanced_client_example():
client = AdvancedClient()
try:
# 连接多个服务器
await client.connect_stdio_server(
"file-server",
"python",
["file_server.py"]
)
await client.connect_stdio_server(
"calc-server",
"python",
["calc_server.py"]
)
# 获取服务器信息
for server_name in client.sessions:
info = await client.get_server_info(server_name)
print(f"服务器 {server_name}: {info}")
# 执行工作流
workflow = [
{
'type': 'read_resource',
'server': 'file-server',
'uri': 'file://config.json'
},
{
'type': 'call_tool',
'server': 'calc-server',
'tool': 'calculate',
'arguments': {'expression': '10 * 5'}
}
]
results = await client.execute_workflow(workflow)
print(f"工作流结果: {results}")
# 批量操作
operations = [
{
'type': 'call_tool',
'server': 'calc-server',
'tool': 'calculate',
'arguments': {'expression': '1 + 1'}
},
{
'type': 'call_tool',
'server': 'calc-server',
'tool': 'calculate',
'arguments': {'expression': '2 * 3'}
}
]
batch_results = await client.batch_operations(operations)
print(f"批量操作结果: {batch_results}")
finally:
await client.close_all()
if __name__ == "__main__":
asyncio.run(advanced_client_example())
类型系统
核心类型
python
from mcp.types import (
# 基础类型
Resource, Tool, Prompt,
TextContent, ImageContent, EmbeddedResource,
# 消息类型
InitializeRequest, InitializeResponse,
ListResourcesRequest, ListResourcesResponse,
ReadResourceRequest, ReadResourceResponse,
ListToolsRequest, ListToolsResponse,
CallToolRequest, CallToolResponse,
ListPromptsRequest, ListPromptsResponse,
GetPromptRequest, GetPromptResponse,
# 错误类型
McpError, InvalidRequestError, MethodNotFoundError,
InvalidParamsError, InternalError,
# 传输类型
StdioServerParameters, SseServerParameters,
# 日志类型
LoggingLevel, LoggingMessage
)
# 资源定义
resource = Resource(
uri="file://example.txt",
name="示例文件",
description="这是一个示例文件",
mimeType="text/plain"
)
# 工具定义
tool = Tool(
name="my_tool",
description="我的工具",
inputSchema={
"type": "object",
"properties": {
"param1": {"type": "string"},
"param2": {"type": "number"}
},
"required": ["param1"]
}
)
# 提示定义
prompt = Prompt(
name="my_prompt",
description="我的提示",
arguments=[
{
"name": "context",
"description": "上下文信息",
"required": True
}
]
)
错误处理
自定义错误处理
python
from mcp.types import McpError, InvalidParamsError
from mcp.server import Server
import logging
server = Server("error-handling-server")
logger = logging.getLogger(__name__)
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[dict]:
"""带错误处理的工具调用"""
try:
if name == "divide":
a = arguments.get("a")
b = arguments.get("b")
# 参数验证
if a is None or b is None:
raise InvalidParamsError("缺少必需参数 'a' 或 'b'")
if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
raise InvalidParamsError("参数必须是数字")
if b == 0:
raise InvalidParamsError("除数不能为零")
result = a / b
return [{"type": "text", "text": f"结果: {result}"}]
else:
raise McpError(f"未知工具: {name}", code=-32601)
except InvalidParamsError:
# 重新抛出参数错误
raise
except Exception as e:
# 记录并转换为内部错误
logger.error(f"工具调用内部错误: {e}")
raise McpError("内部服务器错误", code=-32603)
测试
单元测试
python
import pytest
import asyncio
from mcp.server import Server
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters
@pytest.fixture
async def test_server():
"""测试服务器夹具"""
server = Server("test-server")
@server.list_tools()
async def list_tools():
return [
{
"name": "add",
"description": "加法运算",
"inputSchema": {
"type": "object",
"properties": {
"a": {"type": "number"},
"b": {"type": "number"}
},
"required": ["a", "b"]
}
}
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "add":
return [{"type": "text", "text": str(arguments["a"] + arguments["b"])}]
raise ValueError(f"未知工具: {name}")
return server
@pytest.mark.asyncio
async def test_tool_call(test_server):
"""测试工具调用"""
# 这里需要实际的服务器进程来测试
# 在实际测试中,您需要启动服务器进程
pass
class MockSession:
"""模拟会话用于测试"""
async def list_tools(self):
return [
{
"name": "test_tool",
"description": "测试工具"
}
]
async def call_tool(self, name: str, arguments: dict):
if name == "test_tool":
return [{"type": "text", "text": "测试结果"}]
raise ValueError(f"未知工具: {name}")
@pytest.mark.asyncio
async def test_mock_session():
"""测试模拟会话"""
session = MockSession()
# 测试列出工具
tools = await session.list_tools()
assert len(tools) == 1
assert tools[0]["name"] == "test_tool"
# 测试调用工具
result = await session.call_tool("test_tool", {})
assert result[0]["text"] == "测试结果"
# 测试错误情况
with pytest.raises(ValueError):
await session.call_tool("unknown_tool", {})
部署
Docker 部署
dockerfile
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt
# 复制代码
COPY . .
# 暴露端口(如果使用 HTTP 传输)
EXPOSE 8000
# 启动命令
CMD ["python", "server.py"]
yaml
# docker-compose.yml
version: '3.8'
services:
mcp-server:
build: .
ports:
- "8000:8000"
environment:
- LOG_LEVEL=INFO
volumes:
- ./data:/app/data
restart: unless-stopped
生产环境配置
python
import os
import logging
from mcp.server import Server
from mcp.server.stdio import stdio_server
# 环境配置
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
DATA_DIR = os.getenv("DATA_DIR", "./data")
MAX_CONNECTIONS = int(os.getenv("MAX_CONNECTIONS", "100"))
# 配置日志
logging.basicConfig(
level=getattr(logging, LOG_LEVEL),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('mcp-server.log'),
logging.StreamHandler()
]
)
class ProductionServer:
def __init__(self):
self.server = Server("production-mcp-server")
self.logger = logging.getLogger(__name__)
self.setup_handlers()
def setup_handlers(self):
"""设置生产环境处理器"""
# 实现生产级别的处理器
pass
async def run(self):
"""运行生产服务器"""
self.logger.info("启动生产服务器")
try:
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
except Exception as e:
self.logger.error(f"服务器错误: {e}")
raise
if __name__ == "__main__":
server = ProductionServer()
asyncio.run(server.run())
最佳实践
1. 性能优化
python
import asyncio
from functools import lru_cache
import time
class OptimizedServer:
def __init__(self):
self.server = Server("optimized-server")
self.cache = {}
self.cache_ttl = {}
@lru_cache(maxsize=128)
def expensive_computation(self, input_data: str) -> str:
"""使用 LRU 缓存的昂贵计算"""
# 模拟昂贵的计算
time.sleep(0.1)
return f"处理结果: {input_data}"
async def cached_resource_read(self, uri: str, ttl: int = 300):
"""带 TTL 的资源缓存"""
current_time = time.time()
# 检查缓存
if uri in self.cache:
cached_time = self.cache_ttl.get(uri, 0)
if current_time - cached_time < ttl:
return self.cache[uri]
# 读取并缓存
content = await self._read_resource_from_source(uri)
self.cache[uri] = content
self.cache_ttl[uri] = current_time
return content
async def _read_resource_from_source(self, uri: str) -> str:
"""从源读取资源"""
# 实际的资源读取逻辑
await asyncio.sleep(0.01) # 模拟 I/O
return f"资源内容: {uri}"
2. 安全性
python
import hashlib
import hmac
import os
from typing import Optional
class SecureServer:
def __init__(self, api_key: Optional[str] = None):
self.server = Server("secure-server")
self.api_key = api_key or os.getenv("MCP_API_KEY")
self.allowed_paths = {"/safe/path1", "/safe/path2"}
def verify_signature(self, data: str, signature: str) -> bool:
"""验证请求签名"""
if not self.api_key:
return True # 如果没有配置密钥,跳过验证
expected = hmac.new(
self.api_key.encode(),
data.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
def sanitize_path(self, path: str) -> str:
"""清理文件路径"""
# 移除危险字符
path = path.replace("..", "").replace("//", "/")
# 检查是否在允许的路径内
if not any(path.startswith(allowed) for allowed in self.allowed_paths):
raise ValueError(f"路径不被允许: {path}")
return path
async def secure_file_read(self, path: str) -> str:
"""安全的文件读取"""
safe_path = self.sanitize_path(path)
try:
with open(safe_path, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
raise ValueError(f"文件不存在: {safe_path}")
except PermissionError:
raise ValueError(f"没有权限访问: {safe_path}")
3. 监控和日志
python
import time
import json
from functools import wraps
class MonitoredServer:
def __init__(self):
self.server = Server("monitored-server")
self.metrics = {
'requests': 0,
'errors': 0,
'response_times': []
}
def monitor_performance(self, func):
"""性能监控装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
self.metrics['requests'] += 1
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
self.metrics['errors'] += 1
self.logger.error(f"函数 {func.__name__} 出错: {e}")
raise
finally:
duration = time.time() - start_time
self.metrics['response_times'].append(duration)
# 保持最近 1000 次请求的响应时间
if len(self.metrics['response_times']) > 1000:
self.metrics['response_times'] = self.metrics['response_times'][-1000:]
return wrapper
def get_metrics(self) -> dict:
"""获取性能指标"""
response_times = self.metrics['response_times']
if response_times:
avg_response_time = sum(response_times) / len(response_times)
max_response_time = max(response_times)
min_response_time = min(response_times)
else:
avg_response_time = max_response_time = min_response_time = 0
return {
'total_requests': self.metrics['requests'],
'total_errors': self.metrics['errors'],
'error_rate': self.metrics['errors'] / max(self.metrics['requests'], 1),
'avg_response_time': avg_response_time,
'max_response_time': max_response_time,
'min_response_time': min_response_time
}
下一步
- 查看 TypeScript SDK - 了解 TypeScript 实现
- 构建服务器 - 深入服务器开发
- 构建客户端 - 深入客户端开发
- 查看示例 - 学习实际应用