主题
客户端概念
MCP 客户端是连接到 MCP 服务器并利用其功能的应用程序。本页面介绍了客户端的核心概念和实现方式。
客户端功能
MCP 客户端具有以下核心功能:
连接管理
- 建立与服务器的连接
- 维护会话状态
- 处理连接重连和故障恢复
协议处理
- 发送和接收 MCP 消息
- 处理协议版本协商
- 管理能力交换
功能集成
- 访问服务器提供的资源
- 调用服务器工具
- 使用服务器提示
核心客户端特性
采样 (Sampling)
采样是客户端与 LLM 交互的核心机制:
- 文本生成:请求 LLM 生成文本响应
- 结构化输出:获取格式化的数据结构
- 流式响应:支持实时文本生成
python
# 示例:使用采样功能
response = await client.sample(
messages=[{"role": "user", "content": "解释量子计算"}],
max_tokens=1000
)
根 (Roots)
根定义了客户端可以访问的文件系统路径:
- 安全边界:限制文件访问范围
- 权限控制:定义可读写的目录
- 路径解析:处理相对和绝对路径
python
# 配置根路径
client.set_roots([
"/home/user/projects",
"/tmp/workspace"
])
启发 (Elicitation)
启发是引导 LLM 生成特定类型响应的技术:
- 提示工程:设计有效的提示模板
- 上下文注入:添加相关背景信息
- 响应格式化:指定输出格式要求
客户端架构
传输层
负责底层通信:
- STDIO 传输
- HTTP 传输
- WebSocket 传输
协议层
处理 MCP 协议:
- 消息序列化/反序列化
- 错误处理
- 能力协商
应用层
实现业务逻辑:
- 功能调用
- 数据处理
- 用户界面
实现示例
基础客户端
python
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters
async def main():
# 创建客户端
client = Client()
# 连接到服务器
server_params = StdioServerParameters(
command="python",
args=["server.py"]
)
async with client.connect(server_params) as session:
# 列出可用资源
resources = await session.list_resources()
print(f"可用资源: {resources}")
# 读取资源
if resources:
content = await session.read_resource(resources[0].uri)
print(f"资源内容: {content}")
# 列出可用工具
tools = await session.list_tools()
print(f"可用工具: {tools}")
# 调用工具
if tools:
result = await session.call_tool(
tools[0].name,
arguments={"param": "value"}
)
print(f"工具结果: {result}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
高级客户端功能
python
class AdvancedClient:
def __init__(self):
self.client = Client()
self.sessions = {}
async def connect_multiple_servers(self, server_configs):
"""连接多个服务器"""
for name, config in server_configs.items():
session = await self.client.connect(config)
self.sessions[name] = session
async def aggregate_resources(self):
"""聚合所有服务器的资源"""
all_resources = []
for name, session in self.sessions.items():
resources = await session.list_resources()
for resource in resources:
resource.server = name
all_resources.append(resource)
return all_resources
async def smart_tool_selection(self, task_description):
"""智能选择合适的工具"""
all_tools = []
for session in self.sessions.values():
tools = await session.list_tools()
all_tools.extend(tools)
# 基于任务描述选择最合适的工具
# 这里可以使用 LLM 或其他 AI 技术
selected_tool = self.select_best_tool(task_description, all_tools)
return selected_tool
错误处理
连接错误
python
try:
async with client.connect(server_params) as session:
# 执行操作
pass
except ConnectionError as e:
print(f"连接失败: {e}")
# 实现重连逻辑
except TimeoutError as e:
print(f"连接超时: {e}")
# 处理超时情况
协议错误
python
try:
result = await session.call_tool("tool_name", arguments={})
except McpError as e:
if e.code == "METHOD_NOT_FOUND":
print("工具不存在")
elif e.code == "INVALID_PARAMS":
print("参数无效")
else:
print(f"协议错误: {e.message}")
安全考虑
服务器验证
- 验证服务器身份
- 检查证书有效性
- 实现信任机制
数据保护
- 加密敏感数据传输
- 避免日志中记录敏感信息
- 实现数据脱敏
权限控制
- 限制文件系统访问
- 验证工具调用权限
- 实现用户授权机制
性能优化
连接池
python
class ConnectionPool:
def __init__(self, max_connections=10):
self.max_connections = max_connections
self.connections = []
self.available = []
async def get_connection(self, server_params):
if self.available:
return self.available.pop()
if len(self.connections) < self.max_connections:
conn = await self.create_connection(server_params)
self.connections.append(conn)
return conn
# 等待可用连接
return await self.wait_for_connection()
缓存策略
python
class CachedClient:
def __init__(self):
self.resource_cache = {}
self.tool_cache = {}
async def read_resource_cached(self, uri, ttl=300):
if uri in self.resource_cache:
cached_time, content = self.resource_cache[uri]
if time.time() - cached_time < ttl:
return content
content = await self.session.read_resource(uri)
self.resource_cache[uri] = (time.time(), content)
return content
监控和调试
日志记录
python
import logging
logger = logging.getLogger(__name__)
class LoggingClient:
async def call_tool_with_logging(self, tool_name, arguments):
logger.info(f"调用工具: {tool_name}, 参数: {arguments}")
start_time = time.time()
try:
result = await self.session.call_tool(tool_name, arguments)
duration = time.time() - start_time
logger.info(f"工具调用成功,耗时: {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
logger.error(f"工具调用失败,耗时: {duration:.2f}s, 错误: {e}")
raise
性能监控
python
class PerformanceMonitor:
def __init__(self):
self.metrics = {
'tool_calls': 0,
'resource_reads': 0,
'total_time': 0,
'errors': 0
}
def record_tool_call(self, duration, success=True):
self.metrics['tool_calls'] += 1
self.metrics['total_time'] += duration
if not success:
self.metrics['errors'] += 1
def get_stats(self):
if self.metrics['tool_calls'] > 0:
avg_time = self.metrics['total_time'] / self.metrics['tool_calls']
error_rate = self.metrics['errors'] / self.metrics['tool_calls']
return {
'average_call_time': avg_time,
'error_rate': error_rate,
'total_calls': self.metrics['tool_calls']
}
return {}
最佳实践
设计原则
- 异步优先:使用异步编程模式
- 错误恢复:实现健壮的错误处理
- 资源管理:及时释放连接和资源
- 用户体验:提供清晰的反馈和进度指示
实现建议
- 使用官方 SDK 进行开发
- 实现适当的重试机制
- 添加详细的日志记录
- 进行充分的测试
下一步
了解了客户端概念后,您可以: