主题
构建 MCP 客户端
本指南将教您如何构建连接到 MCP 服务器的客户端应用程序,包括基础连接、功能调用和高级特性实现。
前提条件
Python 环境
- Python 3.10 或更高版本
- MCP SDK 1.2.0 或更高版本
安装依赖
bash
# 使用 pip 安装
pip install mcp
# 或使用 uv(推荐)
uv add mcp
基础客户端实现
1. 简单客户端
python
import asyncio
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters
async def basic_client():
"""基础客户端示例"""
# 配置服务器参数
server_params = StdioServerParameters(
command="python",
args=["path/to/your/server.py"]
)
# 创建客户端
client = Client()
try:
# 连接到服务器
async with client.connect(server_params) as session:
print("成功连接到 MCP 服务器")
# 列出可用资源
resources = await session.list_resources()
print(f"可用资源: {len(resources)} 个")
for resource in resources:
print(f" - {resource.name}: {resource.uri}")
# 列出可用工具
tools = await session.list_tools()
print(f"可用工具: {len(tools)} 个")
for tool in tools:
print(f" - {tool.name}: {tool.description}")
# 列出可用提示
prompts = await session.list_prompts()
print(f"可用提示: {len(prompts)} 个")
for prompt in prompts:
print(f" - {prompt.name}: {prompt.description}")
except Exception as e:
print(f"连接失败: {e}")
if __name__ == "__main__":
asyncio.run(basic_client())
2. 交互式客户端
python
import asyncio
import json
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters
class InteractiveClient:
def __init__(self, server_params):
self.client = Client()
self.server_params = server_params
self.session = None
async def connect(self):
"""连接到服务器"""
try:
self.session = await self.client.connect(self.server_params)
print("✅ 成功连接到 MCP 服务器")
return True
except Exception as e:
print(f"❌ 连接失败: {e}")
return False
async def disconnect(self):
"""断开连接"""
if self.session:
await self.session.close()
print("🔌 已断开连接")
async def show_capabilities(self):
"""显示服务器能力"""
if not self.session:
print("❌ 未连接到服务器")
return
print("\n📋 服务器能力:")
# 显示资源
resources = await self.session.list_resources()
if resources:
print(f"\n📁 资源 ({len(resources)} 个):")
for i, resource in enumerate(resources, 1):
print(f" {i}. {resource.name}")
print(f" URI: {resource.uri}")
if resource.description:
print(f" 描述: {resource.description}")
# 显示工具
tools = await self.session.list_tools()
if tools:
print(f"\n🔧 工具 ({len(tools)} 个):")
for i, tool in enumerate(tools, 1):
print(f" {i}. {tool.name}")
if tool.description:
print(f" 描述: {tool.description}")
# 显示提示
prompts = await self.session.list_prompts()
if prompts:
print(f"\n💡 提示 ({len(prompts)} 个):")
for i, prompt in enumerate(prompts, 1):
print(f" {i}. {prompt.name}")
if prompt.description:
print(f" 描述: {prompt.description}")
async def read_resource(self, uri: str):
"""读取资源"""
try:
result = await self.session.read_resource(uri)
print(f"\n📖 资源内容 ({uri}):")
for content in result.content:
if hasattr(content, 'text'):
print(content.text)
else:
print(content)
except Exception as e:
print(f"❌ 读取资源失败: {e}")
async def call_tool(self, tool_name: str, arguments: dict):
"""调用工具"""
try:
result = await self.session.call_tool(tool_name, arguments)
print(f"\n🔧 工具调用结果 ({tool_name}):")
for content in result.content:
if hasattr(content, 'text'):
print(content.text)
else:
print(content)
except Exception as e:
print(f"❌ 工具调用失败: {e}")
async def get_prompt(self, prompt_name: str, arguments: dict = None):
"""获取提示"""
try:
result = await self.session.get_prompt(prompt_name, arguments or {})
print(f"\n💡 提示内容 ({prompt_name}):")
for message in result.messages:
print(f"角色: {message.role}")
for content in message.content:
if hasattr(content, 'text'):
print(f"内容: {content.text}")
except Exception as e:
print(f"❌ 获取提示失败: {e}")
async def interactive_loop(self):
"""交互式循环"""
if not await self.connect():
return
try:
await self.show_capabilities()
while True:
print("\n" + "="*50)
print("MCP 客户端交互界面")
print("1. 显示服务器能力")
print("2. 读取资源")
print("3. 调用工具")
print("4. 获取提示")
print("5. 退出")
choice = input("\n请选择操作 (1-5): ").strip()
if choice == "1":
await self.show_capabilities()
elif choice == "2":
uri = input("请输入资源 URI: ").strip()
if uri:
await self.read_resource(uri)
elif choice == "3":
tool_name = input("请输入工具名称: ").strip()
if tool_name:
args_str = input("请输入参数 (JSON 格式,留空表示无参数): ").strip()
try:
arguments = json.loads(args_str) if args_str else {}
await self.call_tool(tool_name, arguments)
except json.JSONDecodeError:
print("❌ 参数格式错误,请使用有效的 JSON 格式")
elif choice == "4":
prompt_name = input("请输入提示名称: ").strip()
if prompt_name:
args_str = input("请输入参数 (JSON 格式,留空表示无参数): ").strip()
try:
arguments = json.loads(args_str) if args_str else {}
await self.get_prompt(prompt_name, arguments)
except json.JSONDecodeError:
print("❌ 参数格式错误,请使用有效的 JSON 格式")
elif choice == "5":
break
else:
print("❌ 无效选择,请重试")
finally:
await self.disconnect()
# 使用示例
async def main():
server_params = StdioServerParameters(
command="python",
args=["server.py"]
)
client = InteractiveClient(server_params)
await client.interactive_loop()
if __name__ == "__main__":
asyncio.run(main())
高级客户端功能
1. 多服务器客户端
python
class MultiServerClient:
def __init__(self):
self.client = Client()
self.sessions = {}
self.server_configs = {}
async def add_server(self, name: str, server_params):
"""添加服务器配置"""
self.server_configs[name] = server_params
print(f"✅ 已添加服务器配置: {name}")
async def connect_to_server(self, name: str):
"""连接到指定服务器"""
if name not in self.server_configs:
print(f"❌ 未找到服务器配置: {name}")
return False
try:
session = await self.client.connect(self.server_configs[name])
self.sessions[name] = session
print(f"✅ 成功连接到服务器: {name}")
return True
except Exception as e:
print(f"❌ 连接服务器 {name} 失败: {e}")
return False
async def connect_all_servers(self):
"""连接到所有配置的服务器"""
for name in self.server_configs:
await self.connect_to_server(name)
async def list_all_resources(self):
"""列出所有服务器的资源"""
all_resources = {}
for name, session in self.sessions.items():
try:
resources = await session.list_resources()
all_resources[name] = resources
print(f"\n📁 服务器 {name} 的资源:")
for resource in resources:
print(f" - {resource.name}: {resource.uri}")
except Exception as e:
print(f"❌ 获取服务器 {name} 资源失败: {e}")
return all_resources
async def aggregate_tools(self):
"""聚合所有服务器的工具"""
all_tools = {}
for name, session in self.sessions.items():
try:
tools = await session.list_tools()
for tool in tools:
tool_key = f"{name}.{tool.name}"
all_tools[tool_key] = {
'server': name,
'session': session,
'tool': tool
}
except Exception as e:
print(f"❌ 获取服务器 {name} 工具失败: {e}")
return all_tools
async def call_tool_on_server(self, server_name: str, tool_name: str, arguments: dict):
"""在指定服务器上调用工具"""
if server_name not in self.sessions:
print(f"❌ 未连接到服务器: {server_name}")
return None
try:
session = self.sessions[server_name]
result = await session.call_tool(tool_name, arguments)
return result
except Exception as e:
print(f"❌ 在服务器 {server_name} 上调用工具 {tool_name} 失败: {e}")
return None
async def disconnect_all(self):
"""断开所有连接"""
for name, session in self.sessions.items():
try:
await session.close()
print(f"🔌 已断开服务器: {name}")
except Exception as e:
print(f"❌ 断开服务器 {name} 失败: {e}")
self.sessions.clear()
# 使用示例
async def multi_server_example():
client = MultiServerClient()
# 添加多个服务器
await client.add_server("file-server", StdioServerParameters(
command="python", args=["file_server.py"]
))
await client.add_server("web-server", StdioServerParameters(
command="python", args=["web_server.py"]
))
# 连接到所有服务器
await client.connect_all_servers()
# 列出所有资源
await client.list_all_resources()
# 聚合工具
tools = await client.aggregate_tools()
print(f"\n🔧 总共可用工具: {len(tools)} 个")
for tool_key, tool_info in tools.items():
print(f" - {tool_key}: {tool_info['tool'].description}")
# 断开所有连接
await client.disconnect_all()
2. 智能客户端
python
import re
from typing import List, Dict, Any
class SmartClient:
def __init__(self, server_params):
self.client = Client()
self.server_params = server_params
self.session = None
self.capabilities = {}
async def connect(self):
"""连接并缓存能力信息"""
self.session = await self.client.connect(self.server_params)
await self._cache_capabilities()
async def _cache_capabilities(self):
"""缓存服务器能力"""
self.capabilities = {
'resources': await self.session.list_resources(),
'tools': await self.session.list_tools(),
'prompts': await self.session.list_prompts()
}
def find_resources_by_pattern(self, pattern: str) -> List:
"""根据模式查找资源"""
matching_resources = []
regex = re.compile(pattern, re.IGNORECASE)
for resource in self.capabilities['resources']:
if regex.search(resource.name) or regex.search(resource.uri):
matching_resources.append(resource)
return matching_resources
def find_tools_by_description(self, keyword: str) -> List:
"""根据描述关键词查找工具"""
matching_tools = []
keyword_lower = keyword.lower()
for tool in self.capabilities['tools']:
if (keyword_lower in tool.name.lower() or
(tool.description and keyword_lower in tool.description.lower())):
matching_tools.append(tool)
return matching_tools
async def smart_resource_access(self, query: str):
"""智能资源访问"""
# 尝试直接匹配 URI
for resource in self.capabilities['resources']:
if query in resource.uri or query == resource.name:
return await self.session.read_resource(resource.uri)
# 模糊匹配
matches = self.find_resources_by_pattern(query)
if matches:
print(f"找到 {len(matches)} 个匹配的资源:")
for i, resource in enumerate(matches, 1):
print(f" {i}. {resource.name} ({resource.uri})")
if len(matches) == 1:
return await self.session.read_resource(matches[0].uri)
else:
choice = input("请选择资源编号: ").strip()
try:
index = int(choice) - 1
if 0 <= index < len(matches):
return await self.session.read_resource(matches[index].uri)
except ValueError:
pass
print(f"❌ 未找到匹配的资源: {query}")
return None
async def smart_tool_call(self, query: str, **kwargs):
"""智能工具调用"""
# 直接匹配工具名
for tool in self.capabilities['tools']:
if query == tool.name:
return await self.session.call_tool(tool.name, kwargs)
# 模糊匹配
matches = self.find_tools_by_description(query)
if matches:
print(f"找到 {len(matches)} 个匹配的工具:")
for i, tool in enumerate(matches, 1):
print(f" {i}. {tool.name}: {tool.description}")
if len(matches) == 1:
return await self.session.call_tool(matches[0].name, kwargs)
else:
choice = input("请选择工具编号: ").strip()
try:
index = int(choice) - 1
if 0 <= index < len(matches):
return await self.session.call_tool(matches[index].name, kwargs)
except ValueError:
pass
print(f"❌ 未找到匹配的工具: {query}")
return None
async def auto_complete_task(self, task_description: str):
"""自动完成任务"""
print(f"🤖 正在分析任务: {task_description}")
# 简单的任务分析和执行逻辑
task_lower = task_description.lower()
if "文件" in task_lower or "file" in task_lower:
# 文件相关任务
if "读取" in task_lower or "read" in task_lower:
file_resources = self.find_resources_by_pattern("file://")
if file_resources:
print("📁 找到文件资源,正在读取...")
for resource in file_resources[:3]: # 限制前3个
result = await self.session.read_resource(resource.uri)
print(f"📖 {resource.name}:")
for content in result.content:
if hasattr(content, 'text'):
print(content.text[:200] + "..." if len(content.text) > 200 else content.text)
elif "计算" in task_lower or "calculate" in task_lower:
# 计算相关任务
calc_tools = self.find_tools_by_description("计算")
if calc_tools:
print("🔢 找到计算工具...")
# 这里可以进一步解析数学表达式
expression = input("请输入要计算的表达式: ")
result = await self.session.call_tool(calc_tools[0].name, {"expression": expression})
for content in result.content:
if hasattr(content, 'text'):
print(f"计算结果: {content.text}")
elif "搜索" in task_lower or "search" in task_lower:
# 搜索相关任务
search_tools = self.find_tools_by_description("搜索")
if search_tools:
print("🔍 找到搜索工具...")
query = input("请输入搜索关键词: ")
result = await self.session.call_tool(search_tools[0].name, {"query": query})
for content in result.content:
if hasattr(content, 'text'):
print(f"搜索结果: {content.text}")
else:
print("❓ 无法自动识别任务类型,请手动操作")
# 使用示例
async def smart_client_example():
server_params = StdioServerParameters(
command="python",
args=["server.py"]
)
smart_client = SmartClient(server_params)
await smart_client.connect()
# 智能资源访问
result = await smart_client.smart_resource_access("config")
if result:
print("📖 资源内容:")
for content in result.content:
if hasattr(content, 'text'):
print(content.text)
# 智能工具调用
result = await smart_client.smart_tool_call("计算", expression="2 + 3 * 4")
if result:
print("🔧 工具结果:")
for content in result.content:
if hasattr(content, 'text'):
print(content.text)
# 自动任务完成
await smart_client.auto_complete_task("我想读取一些文件")
错误处理和重连
健壮的客户端
python
import asyncio
import time
from typing import Optional
class RobustClient:
def __init__(self, server_params, max_retries: int = 3, retry_delay: float = 1.0):
self.client = Client()
self.server_params = server_params
self.max_retries = max_retries
self.retry_delay = retry_delay
self.session: Optional = None
self.connected = False
async def connect_with_retry(self):
"""带重试的连接"""
for attempt in range(self.max_retries):
try:
self.session = await self.client.connect(self.server_params)
self.connected = True
print(f"✅ 连接成功 (尝试 {attempt + 1}/{self.max_retries})")
return True
except Exception as e:
print(f"❌ 连接失败 (尝试 {attempt + 1}/{self.max_retries}): {e}")
if attempt < self.max_retries - 1:
await asyncio.sleep(self.retry_delay * (2 ** attempt)) # 指数退避
self.connected = False
return False
async def ensure_connected(self):
"""确保连接可用"""
if not self.connected or not self.session:
return await self.connect_with_retry()
return True
async def safe_call_tool(self, tool_name: str, arguments: dict, retries: int = 2):
"""安全的工具调用"""
for attempt in range(retries + 1):
try:
if not await self.ensure_connected():
raise Exception("无法建立连接")
result = await self.session.call_tool(tool_name, arguments)
return result
except Exception as e:
print(f"❌ 工具调用失败 (尝试 {attempt + 1}/{retries + 1}): {e}")
if attempt < retries:
self.connected = False # 标记需要重连
await asyncio.sleep(self.retry_delay)
else:
raise
async def safe_read_resource(self, uri: str, retries: int = 2):
"""安全的资源读取"""
for attempt in range(retries + 1):
try:
if not await self.ensure_connected():
raise Exception("无法建立连接")
result = await self.session.read_resource(uri)
return result
except Exception as e:
print(f"❌ 资源读取失败 (尝试 {attempt + 1}/{retries + 1}): {e}")
if attempt < retries:
self.connected = False # 标记需要重连
await asyncio.sleep(self.retry_delay)
else:
raise
async def health_check(self):
"""健康检查"""
try:
if not self.session:
return False
# 尝试列出资源作为健康检查
await self.session.list_resources()
return True
except Exception:
self.connected = False
return False
async def start_health_monitor(self, interval: float = 30.0):
"""启动健康监控"""
while True:
await asyncio.sleep(interval)
if not await self.health_check():
print("⚠️ 检测到连接问题,尝试重连...")
await self.connect_with_retry()
async def disconnect(self):
"""断开连接"""
if self.session:
try:
await self.session.close()
except Exception as e:
print(f"断开连接时出错: {e}")
finally:
self.session = None
self.connected = False
性能优化
连接池和缓存
python
import asyncio
from collections import defaultdict
import time
class OptimizedClient:
def __init__(self):
self.client = Client()
self.connection_pool = {}
self.resource_cache = {}
self.cache_ttl = defaultdict(float)
self.cache_duration = 300 # 5分钟缓存
async def get_session(self, server_name: str, server_params):
"""获取或创建会话"""
if server_name not in self.connection_pool:
session = await self.client.connect(server_params)
self.connection_pool[server_name] = session
return self.connection_pool[server_name]
async def cached_read_resource(self, session, uri: str, force_refresh: bool = False):
"""带缓存的资源读取"""
cache_key = f"{id(session)}:{uri}"
current_time = time.time()
# 检查缓存
if not force_refresh and cache_key in self.resource_cache:
if current_time - self.cache_ttl[cache_key] < self.cache_duration:
print(f"📋 使用缓存: {uri}")
return self.resource_cache[cache_key]
# 读取并缓存
result = await session.read_resource(uri)
self.resource_cache[cache_key] = result
self.cache_ttl[cache_key] = current_time
print(f"🔄 已缓存: {uri}")
return result
async def batch_read_resources(self, session, uris: list):
"""批量读取资源"""
tasks = []
for uri in uris:
task = self.cached_read_resource(session, uri)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
successful_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"❌ 读取 {uris[i]} 失败: {result}")
else:
successful_results.append((uris[i], result))
return successful_results
def clear_cache(self, pattern: str = None):
"""清理缓存"""
if pattern:
keys_to_remove = [key for key in self.resource_cache.keys() if pattern in key]
for key in keys_to_remove:
del self.resource_cache[key]
del self.cache_ttl[key]
else:
self.resource_cache.clear()
self.cache_ttl.clear()
print(f"🗑️ 已清理缓存")
async def close_all_connections(self):
"""关闭所有连接"""
for name, session in self.connection_pool.items():
try:
await session.close()
print(f"🔌 已关闭连接: {name}")
except Exception as e:
print(f"❌ 关闭连接 {name} 失败: {e}")
self.connection_pool.clear()
self.clear_cache()
下一步
完成客户端构建后,您可以: