Skip to content

构建 MCP 客户端

本指南将教您如何构建连接到 MCP 服务器的客户端应用程序,包括基础连接、功能调用和高级特性实现。

前提条件

Python 环境

  • Python 3.10 或更高版本
  • MCP SDK 1.2.0 或更高版本

安装依赖

bash
# 使用 pip 安装
pip install mcp

# 或使用 uv(推荐)
uv add mcp

基础客户端实现

1. 简单客户端

python
import asyncio
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters

async def basic_client():
    """基础客户端示例"""
    # 配置服务器参数
    server_params = StdioServerParameters(
        command="python",
        args=["path/to/your/server.py"]
    )
    
    # 创建客户端
    client = Client()
    
    try:
        # 连接到服务器
        async with client.connect(server_params) as session:
            print("成功连接到 MCP 服务器")
            
            # 列出可用资源
            resources = await session.list_resources()
            print(f"可用资源: {len(resources)} 个")
            for resource in resources:
                print(f"  - {resource.name}: {resource.uri}")
            
            # 列出可用工具
            tools = await session.list_tools()
            print(f"可用工具: {len(tools)} 个")
            for tool in tools:
                print(f"  - {tool.name}: {tool.description}")
            
            # 列出可用提示
            prompts = await session.list_prompts()
            print(f"可用提示: {len(prompts)} 个")
            for prompt in prompts:
                print(f"  - {prompt.name}: {prompt.description}")
                
    except Exception as e:
        print(f"连接失败: {e}")

if __name__ == "__main__":
    asyncio.run(basic_client())

2. 交互式客户端

python
import asyncio
import json
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters

class InteractiveClient:
    def __init__(self, server_params):
        self.client = Client()
        self.server_params = server_params
        self.session = None
    
    async def connect(self):
        """连接到服务器"""
        try:
            self.session = await self.client.connect(self.server_params)
            print("✅ 成功连接到 MCP 服务器")
            return True
        except Exception as e:
            print(f"❌ 连接失败: {e}")
            return False
    
    async def disconnect(self):
        """断开连接"""
        if self.session:
            await self.session.close()
            print("🔌 已断开连接")
    
    async def show_capabilities(self):
        """显示服务器能力"""
        if not self.session:
            print("❌ 未连接到服务器")
            return
        
        print("\n📋 服务器能力:")
        
        # 显示资源
        resources = await self.session.list_resources()
        if resources:
            print(f"\n📁 资源 ({len(resources)} 个):")
            for i, resource in enumerate(resources, 1):
                print(f"  {i}. {resource.name}")
                print(f"     URI: {resource.uri}")
                if resource.description:
                    print(f"     描述: {resource.description}")
        
        # 显示工具
        tools = await self.session.list_tools()
        if tools:
            print(f"\n🔧 工具 ({len(tools)} 个):")
            for i, tool in enumerate(tools, 1):
                print(f"  {i}. {tool.name}")
                if tool.description:
                    print(f"     描述: {tool.description}")
        
        # 显示提示
        prompts = await self.session.list_prompts()
        if prompts:
            print(f"\n💡 提示 ({len(prompts)} 个):")
            for i, prompt in enumerate(prompts, 1):
                print(f"  {i}. {prompt.name}")
                if prompt.description:
                    print(f"     描述: {prompt.description}")
    
    async def read_resource(self, uri: str):
        """读取资源"""
        try:
            result = await self.session.read_resource(uri)
            print(f"\n📖 资源内容 ({uri}):")
            for content in result.content:
                if hasattr(content, 'text'):
                    print(content.text)
                else:
                    print(content)
        except Exception as e:
            print(f"❌ 读取资源失败: {e}")
    
    async def call_tool(self, tool_name: str, arguments: dict):
        """调用工具"""
        try:
            result = await self.session.call_tool(tool_name, arguments)
            print(f"\n🔧 工具调用结果 ({tool_name}):")
            for content in result.content:
                if hasattr(content, 'text'):
                    print(content.text)
                else:
                    print(content)
        except Exception as e:
            print(f"❌ 工具调用失败: {e}")
    
    async def get_prompt(self, prompt_name: str, arguments: dict = None):
        """获取提示"""
        try:
            result = await self.session.get_prompt(prompt_name, arguments or {})
            print(f"\n💡 提示内容 ({prompt_name}):")
            for message in result.messages:
                print(f"角色: {message.role}")
                for content in message.content:
                    if hasattr(content, 'text'):
                        print(f"内容: {content.text}")
        except Exception as e:
            print(f"❌ 获取提示失败: {e}")
    
    async def interactive_loop(self):
        """交互式循环"""
        if not await self.connect():
            return
        
        try:
            await self.show_capabilities()
            
            while True:
                print("\n" + "="*50)
                print("MCP 客户端交互界面")
                print("1. 显示服务器能力")
                print("2. 读取资源")
                print("3. 调用工具")
                print("4. 获取提示")
                print("5. 退出")
                
                choice = input("\n请选择操作 (1-5): ").strip()
                
                if choice == "1":
                    await self.show_capabilities()
                
                elif choice == "2":
                    uri = input("请输入资源 URI: ").strip()
                    if uri:
                        await self.read_resource(uri)
                
                elif choice == "3":
                    tool_name = input("请输入工具名称: ").strip()
                    if tool_name:
                        args_str = input("请输入参数 (JSON 格式,留空表示无参数): ").strip()
                        try:
                            arguments = json.loads(args_str) if args_str else {}
                            await self.call_tool(tool_name, arguments)
                        except json.JSONDecodeError:
                            print("❌ 参数格式错误,请使用有效的 JSON 格式")
                
                elif choice == "4":
                    prompt_name = input("请输入提示名称: ").strip()
                    if prompt_name:
                        args_str = input("请输入参数 (JSON 格式,留空表示无参数): ").strip()
                        try:
                            arguments = json.loads(args_str) if args_str else {}
                            await self.get_prompt(prompt_name, arguments)
                        except json.JSONDecodeError:
                            print("❌ 参数格式错误,请使用有效的 JSON 格式")
                
                elif choice == "5":
                    break
                
                else:
                    print("❌ 无效选择,请重试")
        
        finally:
            await self.disconnect()

# 使用示例
async def main():
    server_params = StdioServerParameters(
        command="python",
        args=["server.py"]
    )
    
    client = InteractiveClient(server_params)
    await client.interactive_loop()

if __name__ == "__main__":
    asyncio.run(main())

高级客户端功能

1. 多服务器客户端

python
class MultiServerClient:
    def __init__(self):
        self.client = Client()
        self.sessions = {}
        self.server_configs = {}
    
    async def add_server(self, name: str, server_params):
        """添加服务器配置"""
        self.server_configs[name] = server_params
        print(f"✅ 已添加服务器配置: {name}")
    
    async def connect_to_server(self, name: str):
        """连接到指定服务器"""
        if name not in self.server_configs:
            print(f"❌ 未找到服务器配置: {name}")
            return False
        
        try:
            session = await self.client.connect(self.server_configs[name])
            self.sessions[name] = session
            print(f"✅ 成功连接到服务器: {name}")
            return True
        except Exception as e:
            print(f"❌ 连接服务器 {name} 失败: {e}")
            return False
    
    async def connect_all_servers(self):
        """连接到所有配置的服务器"""
        for name in self.server_configs:
            await self.connect_to_server(name)
    
    async def list_all_resources(self):
        """列出所有服务器的资源"""
        all_resources = {}
        for name, session in self.sessions.items():
            try:
                resources = await session.list_resources()
                all_resources[name] = resources
                print(f"\n📁 服务器 {name} 的资源:")
                for resource in resources:
                    print(f"  - {resource.name}: {resource.uri}")
            except Exception as e:
                print(f"❌ 获取服务器 {name} 资源失败: {e}")
        
        return all_resources
    
    async def aggregate_tools(self):
        """聚合所有服务器的工具"""
        all_tools = {}
        for name, session in self.sessions.items():
            try:
                tools = await session.list_tools()
                for tool in tools:
                    tool_key = f"{name}.{tool.name}"
                    all_tools[tool_key] = {
                        'server': name,
                        'session': session,
                        'tool': tool
                    }
            except Exception as e:
                print(f"❌ 获取服务器 {name} 工具失败: {e}")
        
        return all_tools
    
    async def call_tool_on_server(self, server_name: str, tool_name: str, arguments: dict):
        """在指定服务器上调用工具"""
        if server_name not in self.sessions:
            print(f"❌ 未连接到服务器: {server_name}")
            return None
        
        try:
            session = self.sessions[server_name]
            result = await session.call_tool(tool_name, arguments)
            return result
        except Exception as e:
            print(f"❌ 在服务器 {server_name} 上调用工具 {tool_name} 失败: {e}")
            return None
    
    async def disconnect_all(self):
        """断开所有连接"""
        for name, session in self.sessions.items():
            try:
                await session.close()
                print(f"🔌 已断开服务器: {name}")
            except Exception as e:
                print(f"❌ 断开服务器 {name} 失败: {e}")
        
        self.sessions.clear()

# 使用示例
async def multi_server_example():
    client = MultiServerClient()
    
    # 添加多个服务器
    await client.add_server("file-server", StdioServerParameters(
        command="python", args=["file_server.py"]
    ))
    await client.add_server("web-server", StdioServerParameters(
        command="python", args=["web_server.py"]
    ))
    
    # 连接到所有服务器
    await client.connect_all_servers()
    
    # 列出所有资源
    await client.list_all_resources()
    
    # 聚合工具
    tools = await client.aggregate_tools()
    print(f"\n🔧 总共可用工具: {len(tools)} 个")
    for tool_key, tool_info in tools.items():
        print(f"  - {tool_key}: {tool_info['tool'].description}")
    
    # 断开所有连接
    await client.disconnect_all()

2. 智能客户端

python
import re
from typing import List, Dict, Any

class SmartClient:
    def __init__(self, server_params):
        self.client = Client()
        self.server_params = server_params
        self.session = None
        self.capabilities = {}
    
    async def connect(self):
        """连接并缓存能力信息"""
        self.session = await self.client.connect(self.server_params)
        await self._cache_capabilities()
    
    async def _cache_capabilities(self):
        """缓存服务器能力"""
        self.capabilities = {
            'resources': await self.session.list_resources(),
            'tools': await self.session.list_tools(),
            'prompts': await self.session.list_prompts()
        }
    
    def find_resources_by_pattern(self, pattern: str) -> List:
        """根据模式查找资源"""
        matching_resources = []
        regex = re.compile(pattern, re.IGNORECASE)
        
        for resource in self.capabilities['resources']:
            if regex.search(resource.name) or regex.search(resource.uri):
                matching_resources.append(resource)
        
        return matching_resources
    
    def find_tools_by_description(self, keyword: str) -> List:
        """根据描述关键词查找工具"""
        matching_tools = []
        keyword_lower = keyword.lower()
        
        for tool in self.capabilities['tools']:
            if (keyword_lower in tool.name.lower() or 
                (tool.description and keyword_lower in tool.description.lower())):
                matching_tools.append(tool)
        
        return matching_tools
    
    async def smart_resource_access(self, query: str):
        """智能资源访问"""
        # 尝试直接匹配 URI
        for resource in self.capabilities['resources']:
            if query in resource.uri or query == resource.name:
                return await self.session.read_resource(resource.uri)
        
        # 模糊匹配
        matches = self.find_resources_by_pattern(query)
        if matches:
            print(f"找到 {len(matches)} 个匹配的资源:")
            for i, resource in enumerate(matches, 1):
                print(f"  {i}. {resource.name} ({resource.uri})")
            
            if len(matches) == 1:
                return await self.session.read_resource(matches[0].uri)
            else:
                choice = input("请选择资源编号: ").strip()
                try:
                    index = int(choice) - 1
                    if 0 <= index < len(matches):
                        return await self.session.read_resource(matches[index].uri)
                except ValueError:
                    pass
        
        print(f"❌ 未找到匹配的资源: {query}")
        return None
    
    async def smart_tool_call(self, query: str, **kwargs):
        """智能工具调用"""
        # 直接匹配工具名
        for tool in self.capabilities['tools']:
            if query == tool.name:
                return await self.session.call_tool(tool.name, kwargs)
        
        # 模糊匹配
        matches = self.find_tools_by_description(query)
        if matches:
            print(f"找到 {len(matches)} 个匹配的工具:")
            for i, tool in enumerate(matches, 1):
                print(f"  {i}. {tool.name}: {tool.description}")
            
            if len(matches) == 1:
                return await self.session.call_tool(matches[0].name, kwargs)
            else:
                choice = input("请选择工具编号: ").strip()
                try:
                    index = int(choice) - 1
                    if 0 <= index < len(matches):
                        return await self.session.call_tool(matches[index].name, kwargs)
                except ValueError:
                    pass
        
        print(f"❌ 未找到匹配的工具: {query}")
        return None
    
    async def auto_complete_task(self, task_description: str):
        """自动完成任务"""
        print(f"🤖 正在分析任务: {task_description}")
        
        # 简单的任务分析和执行逻辑
        task_lower = task_description.lower()
        
        if "文件" in task_lower or "file" in task_lower:
            # 文件相关任务
            if "读取" in task_lower or "read" in task_lower:
                file_resources = self.find_resources_by_pattern("file://")
                if file_resources:
                    print("📁 找到文件资源,正在读取...")
                    for resource in file_resources[:3]:  # 限制前3个
                        result = await self.session.read_resource(resource.uri)
                        print(f"📖 {resource.name}:")
                        for content in result.content:
                            if hasattr(content, 'text'):
                                print(content.text[:200] + "..." if len(content.text) > 200 else content.text)
        
        elif "计算" in task_lower or "calculate" in task_lower:
            # 计算相关任务
            calc_tools = self.find_tools_by_description("计算")
            if calc_tools:
                print("🔢 找到计算工具...")
                # 这里可以进一步解析数学表达式
                expression = input("请输入要计算的表达式: ")
                result = await self.session.call_tool(calc_tools[0].name, {"expression": expression})
                for content in result.content:
                    if hasattr(content, 'text'):
                        print(f"计算结果: {content.text}")
        
        elif "搜索" in task_lower or "search" in task_lower:
            # 搜索相关任务
            search_tools = self.find_tools_by_description("搜索")
            if search_tools:
                print("🔍 找到搜索工具...")
                query = input("请输入搜索关键词: ")
                result = await self.session.call_tool(search_tools[0].name, {"query": query})
                for content in result.content:
                    if hasattr(content, 'text'):
                        print(f"搜索结果: {content.text}")
        
        else:
            print("❓ 无法自动识别任务类型,请手动操作")

# 使用示例
async def smart_client_example():
    server_params = StdioServerParameters(
        command="python",
        args=["server.py"]
    )
    
    smart_client = SmartClient(server_params)
    await smart_client.connect()
    
    # 智能资源访问
    result = await smart_client.smart_resource_access("config")
    if result:
        print("📖 资源内容:")
        for content in result.content:
            if hasattr(content, 'text'):
                print(content.text)
    
    # 智能工具调用
    result = await smart_client.smart_tool_call("计算", expression="2 + 3 * 4")
    if result:
        print("🔧 工具结果:")
        for content in result.content:
            if hasattr(content, 'text'):
                print(content.text)
    
    # 自动任务完成
    await smart_client.auto_complete_task("我想读取一些文件")

错误处理和重连

健壮的客户端

python
import asyncio
import time
from typing import Optional

class RobustClient:
    def __init__(self, server_params, max_retries: int = 3, retry_delay: float = 1.0):
        self.client = Client()
        self.server_params = server_params
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.session: Optional = None
        self.connected = False
    
    async def connect_with_retry(self):
        """带重试的连接"""
        for attempt in range(self.max_retries):
            try:
                self.session = await self.client.connect(self.server_params)
                self.connected = True
                print(f"✅ 连接成功 (尝试 {attempt + 1}/{self.max_retries})")
                return True
            except Exception as e:
                print(f"❌ 连接失败 (尝试 {attempt + 1}/{self.max_retries}): {e}")
                if attempt < self.max_retries - 1:
                    await asyncio.sleep(self.retry_delay * (2 ** attempt))  # 指数退避
        
        self.connected = False
        return False
    
    async def ensure_connected(self):
        """确保连接可用"""
        if not self.connected or not self.session:
            return await self.connect_with_retry()
        return True
    
    async def safe_call_tool(self, tool_name: str, arguments: dict, retries: int = 2):
        """安全的工具调用"""
        for attempt in range(retries + 1):
            try:
                if not await self.ensure_connected():
                    raise Exception("无法建立连接")
                
                result = await self.session.call_tool(tool_name, arguments)
                return result
            
            except Exception as e:
                print(f"❌ 工具调用失败 (尝试 {attempt + 1}/{retries + 1}): {e}")
                if attempt < retries:
                    self.connected = False  # 标记需要重连
                    await asyncio.sleep(self.retry_delay)
                else:
                    raise
    
    async def safe_read_resource(self, uri: str, retries: int = 2):
        """安全的资源读取"""
        for attempt in range(retries + 1):
            try:
                if not await self.ensure_connected():
                    raise Exception("无法建立连接")
                
                result = await self.session.read_resource(uri)
                return result
            
            except Exception as e:
                print(f"❌ 资源读取失败 (尝试 {attempt + 1}/{retries + 1}): {e}")
                if attempt < retries:
                    self.connected = False  # 标记需要重连
                    await asyncio.sleep(self.retry_delay)
                else:
                    raise
    
    async def health_check(self):
        """健康检查"""
        try:
            if not self.session:
                return False
            
            # 尝试列出资源作为健康检查
            await self.session.list_resources()
            return True
        except Exception:
            self.connected = False
            return False
    
    async def start_health_monitor(self, interval: float = 30.0):
        """启动健康监控"""
        while True:
            await asyncio.sleep(interval)
            if not await self.health_check():
                print("⚠️ 检测到连接问题,尝试重连...")
                await self.connect_with_retry()
    
    async def disconnect(self):
        """断开连接"""
        if self.session:
            try:
                await self.session.close()
            except Exception as e:
                print(f"断开连接时出错: {e}")
            finally:
                self.session = None
                self.connected = False

性能优化

连接池和缓存

python
import asyncio
from collections import defaultdict
import time

class OptimizedClient:
    def __init__(self):
        self.client = Client()
        self.connection_pool = {}
        self.resource_cache = {}
        self.cache_ttl = defaultdict(float)
        self.cache_duration = 300  # 5分钟缓存
    
    async def get_session(self, server_name: str, server_params):
        """获取或创建会话"""
        if server_name not in self.connection_pool:
            session = await self.client.connect(server_params)
            self.connection_pool[server_name] = session
        
        return self.connection_pool[server_name]
    
    async def cached_read_resource(self, session, uri: str, force_refresh: bool = False):
        """带缓存的资源读取"""
        cache_key = f"{id(session)}:{uri}"
        current_time = time.time()
        
        # 检查缓存
        if not force_refresh and cache_key in self.resource_cache:
            if current_time - self.cache_ttl[cache_key] < self.cache_duration:
                print(f"📋 使用缓存: {uri}")
                return self.resource_cache[cache_key]
        
        # 读取并缓存
        result = await session.read_resource(uri)
        self.resource_cache[cache_key] = result
        self.cache_ttl[cache_key] = current_time
        print(f"🔄 已缓存: {uri}")
        
        return result
    
    async def batch_read_resources(self, session, uris: list):
        """批量读取资源"""
        tasks = []
        for uri in uris:
            task = self.cached_read_resource(session, uri)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        successful_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"❌ 读取 {uris[i]} 失败: {result}")
            else:
                successful_results.append((uris[i], result))
        
        return successful_results
    
    def clear_cache(self, pattern: str = None):
        """清理缓存"""
        if pattern:
            keys_to_remove = [key for key in self.resource_cache.keys() if pattern in key]
            for key in keys_to_remove:
                del self.resource_cache[key]
                del self.cache_ttl[key]
        else:
            self.resource_cache.clear()
            self.cache_ttl.clear()
        
        print(f"🗑️ 已清理缓存")
    
    async def close_all_connections(self):
        """关闭所有连接"""
        for name, session in self.connection_pool.items():
            try:
                await session.close()
                print(f"🔌 已关闭连接: {name}")
            except Exception as e:
                print(f"❌ 关闭连接 {name} 失败: {e}")
        
        self.connection_pool.clear()
        self.clear_cache()

下一步

完成客户端构建后,您可以:

🚀 探索模型上下文协议的无限可能 | 连接 AI 与世界的桥梁 | 让智能更智能