Skip to content

示例和用例

本页面提供了 Model Context Protocol (MCP) 的实际应用示例,帮助您快速上手并了解最佳实践。

基础示例

1. 简单文件服务器

一个基础的文件系统服务器,提供文件读取和目录浏览功能。

Python 实现

python
#!/usr/bin/env python3
import asyncio
import json
import os
from pathlib import Path
from typing import Any, Dict, List

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
    Resource,
    Tool,
    TextContent,
    ListResourcesResult,
    ReadResourceResult,
    CallToolResult,
)

# 创建服务器实例
server = Server("file-server")

# 配置基础目录
BASE_DIR = Path.home() / "Documents"

@server.list_resources()
async def list_resources() -> ListResourcesResult:
    """列出可用的文件资源"""
    resources = []
    
    try:
        for file_path in BASE_DIR.rglob("*.txt"):
            if file_path.is_file():
                relative_path = file_path.relative_to(BASE_DIR)
                resources.append(
                    Resource(
                        uri=f"file://{relative_path}",
                        name=file_path.name,
                        description=f"文本文件: {relative_path}",
                        mimeType="text/plain"
                    )
                )
    except Exception as e:
        print(f"列出资源时出错: {e}")
    
    return ListResourcesResult(resources=resources)

@server.read_resource()
async def read_resource(uri: str) -> ReadResourceResult:
    """读取文件内容"""
    if not uri.startswith("file://"):
        raise ValueError(f"不支持的 URI 方案: {uri}")
    
    # 移除 file:// 前缀
    relative_path = uri[7:]
    file_path = BASE_DIR / relative_path
    
    # 安全检查:确保文件在基础目录内
    try:
        file_path.resolve().relative_to(BASE_DIR.resolve())
    except ValueError:
        raise ValueError("访问被拒绝:文件在允许目录之外")
    
    if not file_path.exists():
        raise FileNotFoundError(f"文件不存在: {relative_path}")
    
    try:
        content = file_path.read_text(encoding='utf-8')
        return ReadResourceResult(
            contents=[
                TextContent(
                    type="text",
                    text=content
                )
            ]
        )
    except Exception as e:
        raise RuntimeError(f"读取文件失败: {e}")

@server.list_tools()
async def list_tools() -> List[Tool]:
    """列出可用工具"""
    return [
        Tool(
            name="search_files",
            description="在文件中搜索文本",
            inputSchema={
                "type": "object",
                "properties": {
                    "pattern": {
                        "type": "string",
                        "description": "要搜索的文本模式"
                    },
                    "file_extension": {
                        "type": "string",
                        "description": "文件扩展名过滤器",
                        "default": ".txt"
                    }
                },
                "required": ["pattern"]
            }
        ),
        Tool(
            name="create_file",
            description="创建新文件",
            inputSchema={
                "type": "object",
                "properties": {
                    "filename": {
                        "type": "string",
                        "description": "文件名"
                    },
                    "content": {
                        "type": "string",
                        "description": "文件内容"
                    }
                },
                "required": ["filename", "content"]
            }
        )
    ]

@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
    """调用工具"""
    if name == "search_files":
        pattern = arguments["pattern"]
        file_extension = arguments.get("file_extension", ".txt")
        
        results = []
        try:
            for file_path in BASE_DIR.rglob(f"*{file_extension}"):
                if file_path.is_file():
                    try:
                        content = file_path.read_text(encoding='utf-8')
                        if pattern.lower() in content.lower():
                            relative_path = file_path.relative_to(BASE_DIR)
                            results.append(f"找到匹配: {relative_path}")
                    except Exception:
                        continue
        except Exception as e:
            return CallToolResult(
                content=[TextContent(type="text", text=f"搜索失败: {e}")],
                isError=True
            )
        
        if results:
            result_text = "\n".join(results)
        else:
            result_text = f"未找到包含 '{pattern}' 的文件"
        
        return CallToolResult(
            content=[TextContent(type="text", text=result_text)]
        )
    
    elif name == "create_file":
        filename = arguments["filename"]
        content = arguments["content"]
        
        # 安全检查文件名
        if "/" in filename or "\\" in filename or ".." in filename:
            return CallToolResult(
                content=[TextContent(type="text", text="无效的文件名")],
                isError=True
            )
        
        file_path = BASE_DIR / filename
        
        try:
            file_path.write_text(content, encoding='utf-8')
            return CallToolResult(
                content=[TextContent(type="text", text=f"文件已创建: {filename}")]
            )
        except Exception as e:
            return CallToolResult(
                content=[TextContent(type="text", text=f"创建文件失败: {e}")],
                isError=True
            )
    
    else:
        return CallToolResult(
            content=[TextContent(type="text", text=f"未知工具: {name}")],
            isError=True
        )

async def main():
    """启动服务器"""
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

TypeScript 实现

typescript
#!/usr/bin/env node

import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
  CallToolRequestSchema,
  ListResourcesRequestSchema,
  ListToolsRequestSchema,
  ReadResourceRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import * as fs from 'fs/promises';
import * as path from 'path';
import { fileURLToPath } from 'url';

const __dirname = path.dirname(fileURLToPath(import.meta.url));
const BASE_DIR = path.join(process.env.HOME || '', 'Documents');

// 创建服务器
const server = new Server(
  {
    name: 'file-server',
    version: '1.0.0',
  },
  {
    capabilities: {
      resources: {},
      tools: {},
    },
  }
);

// 列出资源
server.setRequestHandler(ListResourcesRequestSchema, async () => {
  const resources = [];
  
  try {
    const files = await fs.readdir(BASE_DIR, { recursive: true });
    
    for (const file of files) {
      const filePath = path.join(BASE_DIR, file);
      const stats = await fs.stat(filePath);
      
      if (stats.isFile() && path.extname(file) === '.txt') {
        resources.push({
          uri: `file://${file}`,
          name: path.basename(file),
          description: `文本文件: ${file}`,
          mimeType: 'text/plain',
        });
      }
    }
  } catch (error) {
    console.error('列出资源时出错:', error);
  }
  
  return { resources };
});

// 读取资源
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
  const { uri } = request.params;
  
  if (!uri.startsWith('file://')) {
    throw new Error(`不支持的 URI 方案: ${uri}`);
  }
  
  const relativePath = uri.slice(7);
  const filePath = path.join(BASE_DIR, relativePath);
  
  // 安全检查
  const resolvedPath = path.resolve(filePath);
  const resolvedBase = path.resolve(BASE_DIR);
  
  if (!resolvedPath.startsWith(resolvedBase)) {
    throw new Error('访问被拒绝:文件在允许目录之外');
  }
  
  try {
    const content = await fs.readFile(filePath, 'utf-8');
    
    return {
      contents: [
        {
          uri,
          mimeType: 'text/plain',
          text: content,
        },
      ],
    };
  } catch (error) {
    throw new Error(`读取文件失败: ${error.message}`);
  }
});

// 列出工具
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: 'search_files',
        description: '在文件中搜索文本',
        inputSchema: {
          type: 'object',
          properties: {
            pattern: {
              type: 'string',
              description: '要搜索的文本模式',
            },
            fileExtension: {
              type: 'string',
              description: '文件扩展名过滤器',
              default: '.txt',
            },
          },
          required: ['pattern'],
        },
      },
      {
        name: 'create_file',
        description: '创建新文件',
        inputSchema: {
          type: 'object',
          properties: {
            filename: {
              type: 'string',
              description: '文件名',
            },
            content: {
              type: 'string',
              description: '文件内容',
            },
          },
          required: ['filename', 'content'],
        },
      },
    ],
  };
});

// 调用工具
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;
  
  switch (name) {
    case 'search_files': {
      const { pattern, fileExtension = '.txt' } = args as {
        pattern: string;
        fileExtension?: string;
      };
      
      const results: string[] = [];
      
      try {
        const files = await fs.readdir(BASE_DIR, { recursive: true });
        
        for (const file of files) {
          if (typeof file === 'string' && file.endsWith(fileExtension)) {
            const filePath = path.join(BASE_DIR, file);
            
            try {
              const content = await fs.readFile(filePath, 'utf-8');
              
              if (content.toLowerCase().includes(pattern.toLowerCase())) {
                results.push(`找到匹配: ${file}`);
              }
            } catch {
              // 忽略无法读取的文件
            }
          }
        }
      } catch (error) {
        return {
          content: [
            {
              type: 'text',
              text: `搜索失败: ${error.message}`,
            },
          ],
          isError: true,
        };
      }
      
      const resultText = results.length > 0 
        ? results.join('\n')
        : `未找到包含 '${pattern}' 的文件`;
      
      return {
        content: [
          {
            type: 'text',
            text: resultText,
          },
        ],
      };
    }
    
    case 'create_file': {
      const { filename, content } = args as {
        filename: string;
        content: string;
      };
      
      // 安全检查文件名
      if (filename.includes('/') || filename.includes('\\') || filename.includes('..')) {
        return {
          content: [
            {
              type: 'text',
              text: '无效的文件名',
            },
          ],
          isError: true,
        };
      }
      
      const filePath = path.join(BASE_DIR, filename);
      
      try {
        await fs.writeFile(filePath, content, 'utf-8');
        
        return {
          content: [
            {
              type: 'text',
              text: `文件已创建: ${filename}`,
            },
          ],
        };
      } catch (error) {
        return {
          content: [
            {
              type: 'text',
              text: `创建文件失败: ${error.message}`,
            },
          ],
          isError: true,
        };
      }
    }
    
    default:
      throw new Error(`未知工具: ${name}`);
  }
});

// 启动服务器
async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error('文件服务器已启动');
}

main().catch((error) => {
  console.error('服务器启动失败:', error);
  process.exit(1);
});

2. 天气服务器

一个连接外部 API 的天气信息服务器。

Python 实现

python
#!/usr/bin/env python3
import asyncio
import json
import os
from typing import Any, Dict, List
import aiohttp

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
    Resource,
    Tool,
    TextContent,
    ListResourcesResult,
    ReadResourceResult,
    CallToolResult,
)

# 创建服务器实例
server = Server("weather-server")

# API 配置(使用免费的 OpenWeatherMap API)
API_KEY = os.getenv("OPENWEATHER_API_KEY", "your_api_key_here")
BASE_URL = "http://api.openweathermap.org/data/2.5"

@server.list_resources()
async def list_resources() -> ListResourcesResult:
    """列出天气资源"""
    return ListResourcesResult(
        resources=[
            Resource(
                uri="weather://current/beijing",
                name="北京当前天气",
                description="北京市当前天气信息",
                mimeType="application/json"
            ),
            Resource(
                uri="weather://current/shanghai",
                name="上海当前天气", 
                description="上海市当前天气信息",
                mimeType="application/json"
            ),
            Resource(
                uri="weather://forecast/beijing",
                name="北京天气预报",
                description="北京市5天天气预报",
                mimeType="application/json"
            )
        ]
    )

@server.read_resource()
async def read_resource(uri: str) -> ReadResourceResult:
    """读取天气资源"""
    if not uri.startswith("weather://"):
        raise ValueError(f"不支持的 URI 方案: {uri}")
    
    parts = uri.split("/")
    if len(parts) < 3:
        raise ValueError(f"无效的天气 URI: {uri}")
    
    weather_type = parts[2]  # current 或 forecast
    city = parts[3] if len(parts) > 3 else "beijing"
    
    try:
        async with aiohttp.ClientSession() as session:
            if weather_type == "current":
                url = f"{BASE_URL}/weather"
                params = {
                    "q": city,
                    "appid": API_KEY,
                    "units": "metric",
                    "lang": "zh_cn"
                }
            elif weather_type == "forecast":
                url = f"{BASE_URL}/forecast"
                params = {
                    "q": city,
                    "appid": API_KEY,
                    "units": "metric",
                    "lang": "zh_cn"
                }
            else:
                raise ValueError(f"不支持的天气类型: {weather_type}")
            
            async with session.get(url, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    return ReadResourceResult(
                        contents=[
                            TextContent(
                                type="text",
                                text=json.dumps(data, ensure_ascii=False, indent=2)
                            )
                        ]
                    )
                else:
                    error_data = await response.text()
                    raise RuntimeError(f"API 请求失败: {response.status} - {error_data}")
    
    except Exception as e:
        raise RuntimeError(f"获取天气数据失败: {e}")

@server.list_tools()
async def list_tools() -> List[Tool]:
    """列出天气工具"""
    return [
        Tool(
            name="get_weather",
            description="获取指定城市的当前天气",
            inputSchema={
                "type": "object",
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "城市名称(中文或英文)"
                    },
                    "units": {
                        "type": "string",
                        "enum": ["metric", "imperial", "kelvin"],
                        "description": "温度单位",
                        "default": "metric"
                    }
                },
                "required": ["city"]
            }
        ),
        Tool(
            name="get_forecast",
            description="获取指定城市的天气预报",
            inputSchema={
                "type": "object",
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "城市名称(中文或英文)"
                    },
                    "days": {
                        "type": "integer",
                        "minimum": 1,
                        "maximum": 5,
                        "description": "预报天数",
                        "default": 3
                    }
                },
                "required": ["city"]
            }
        ),
        Tool(
            name="compare_weather",
            description="比较多个城市的天气",
            inputSchema={
                "type": "object",
                "properties": {
                    "cities": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "要比较的城市列表",
                        "minItems": 2,
                        "maxItems": 5
                    }
                },
                "required": ["cities"]
            }
        )
    ]

@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
    """调用天气工具"""
    try:
        async with aiohttp.ClientSession() as session:
            if name == "get_weather":
                city = arguments["city"]
                units = arguments.get("units", "metric")
                
                url = f"{BASE_URL}/weather"
                params = {
                    "q": city,
                    "appid": API_KEY,
                    "units": units,
                    "lang": "zh_cn"
                }
                
                async with session.get(url, params=params) as response:
                    if response.status == 200:
                        data = await response.json()
                        
                        # 格式化天气信息
                        weather_info = f"""
🌍 {data['name']} 当前天气

🌡️ 温度: {data['main']['temp']}°C
🌡️ 体感温度: {data['main']['feels_like']}°C
📊 湿度: {data['main']['humidity']}%
🌪️ 风速: {data['wind']['speed']} m/s
☁️ 天气: {data['weather'][0]['description']}
👁️ 能见度: {data.get('visibility', 'N/A')} m
                        """.strip()
                        
                        return CallToolResult(
                            content=[TextContent(type="text", text=weather_info)]
                        )
                    else:
                        error_text = await response.text()
                        return CallToolResult(
                            content=[TextContent(type="text", text=f"获取天气失败: {error_text}")],
                            isError=True
                        )
            
            elif name == "get_forecast":
                city = arguments["city"]
                days = arguments.get("days", 3)
                
                url = f"{BASE_URL}/forecast"
                params = {
                    "q": city,
                    "appid": API_KEY,
                    "units": "metric",
                    "lang": "zh_cn"
                }
                
                async with session.get(url, params=params) as response:
                    if response.status == 200:
                        data = await response.json()
                        
                        # 处理预报数据(每3小时一个数据点,取每天的中午数据)
                        forecast_info = f"🌍 {data['city']['name']} {days}天天气预报\n\n"
                        
                        daily_forecasts = []
                        for i in range(0, min(days * 8, len(data['list'])), 8):  # 每8个数据点约为1天
                            forecast = data['list'][i]
                            date = forecast['dt_txt'].split(' ')[0]
                            temp = forecast['main']['temp']
                            desc = forecast['weather'][0]['description']
                            
                            daily_forecasts.append(f"📅 {date}: {temp}°C, {desc}")
                        
                        forecast_info += "\n".join(daily_forecasts)
                        
                        return CallToolResult(
                            content=[TextContent(type="text", text=forecast_info)]
                        )
                    else:
                        error_text = await response.text()
                        return CallToolResult(
                            content=[TextContent(type="text", text=f"获取预报失败: {error_text}")],
                            isError=True
                        )
            
            elif name == "compare_weather":
                cities = arguments["cities"]
                
                weather_data = []
                for city in cities:
                    url = f"{BASE_URL}/weather"
                    params = {
                        "q": city,
                        "appid": API_KEY,
                        "units": "metric",
                        "lang": "zh_cn"
                    }
                    
                    async with session.get(url, params=params) as response:
                        if response.status == 200:
                            data = await response.json()
                            weather_data.append({
                                "city": data['name'],
                                "temp": data['main']['temp'],
                                "humidity": data['main']['humidity'],
                                "description": data['weather'][0]['description']
                            })
                        else:
                            weather_data.append({
                                "city": city,
                                "error": f"获取失败: {response.status}"
                            })
                
                # 格式化比较结果
                comparison = "🌍 城市天气比较\n\n"
                for data in weather_data:
                    if "error" in data:
                        comparison += f"❌ {data['city']}: {data['error']}\n"
                    else:
                        comparison += f"🏙️ {data['city']}: {data['temp']}°C, {data['humidity']}% 湿度, {data['description']}\n"
                
                return CallToolResult(
                    content=[TextContent(type="text", text=comparison)]
                )
            
            else:
                return CallToolResult(
                    content=[TextContent(type="text", text=f"未知工具: {name}")],
                    isError=True
                )
    
    except Exception as e:
        return CallToolResult(
            content=[TextContent(type="text", text=f"工具执行失败: {e}")],
            isError=True
        )

async def main():
    """启动服务器"""
    if API_KEY == "your_api_key_here":
        print("警告: 请设置 OPENWEATHER_API_KEY 环境变量")
    
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

3. 数据库服务器

一个简单的 SQLite 数据库服务器示例。

Python 实现

python
#!/usr/bin/env python3
import asyncio
import sqlite3
import json
from typing import Any, Dict, List
from pathlib import Path

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
    Resource,
    Tool,
    TextContent,
    ListResourcesResult,
    ReadResourceResult,
    CallToolResult,
)

# 创建服务器实例
server = Server("database-server")

# 数据库配置
DB_PATH = Path("example.db")

def init_database():
    """初始化数据库"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    # 创建示例表
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            email TEXT UNIQUE NOT NULL,
            age INTEGER,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            price REAL NOT NULL,
            category TEXT,
            stock INTEGER DEFAULT 0,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    
    # 插入示例数据
    cursor.execute("SELECT COUNT(*) FROM users")
    if cursor.fetchone()[0] == 0:
        sample_users = [
            ("张三", "zhangsan@example.com", 25),
            ("李四", "lisi@example.com", 30),
            ("王五", "wangwu@example.com", 28),
        ]
        cursor.executemany(
            "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
            sample_users
        )
    
    cursor.execute("SELECT COUNT(*) FROM products")
    if cursor.fetchone()[0] == 0:
        sample_products = [
            ("笔记本电脑", 5999.99, "电子产品", 10),
            ("无线鼠标", 199.99, "电子产品", 50),
            ("办公椅", 899.99, "家具", 20),
        ]
        cursor.executemany(
            "INSERT INTO products (name, price, category, stock) VALUES (?, ?, ?, ?)",
            sample_products
        )
    
    conn.commit()
    conn.close()

# 初始化数据库
init_database()

@server.list_resources()
async def list_resources() -> ListResourcesResult:
    """列出数据库资源"""
    return ListResourcesResult(
        resources=[
            Resource(
                uri="db://tables",
                name="数据库表列表",
                description="显示所有数据库表",
                mimeType="application/json"
            ),
            Resource(
                uri="db://users",
                name="用户表",
                description="用户数据表内容",
                mimeType="application/json"
            ),
            Resource(
                uri="db://products",
                name="产品表",
                description="产品数据表内容",
                mimeType="application/json"
            ),
            Resource(
                uri="db://schema",
                name="数据库架构",
                description="数据库表结构信息",
                mimeType="application/json"
            )
        ]
    )

@server.read_resource()
async def read_resource(uri: str) -> ReadResourceResult:
    """读取数据库资源"""
    if not uri.startswith("db://"):
        raise ValueError(f"不支持的 URI 方案: {uri}")
    
    resource_type = uri[5:]  # 移除 "db://" 前缀
    
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row  # 使结果可以按列名访问
    cursor = conn.cursor()
    
    try:
        if resource_type == "tables":
            cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
            tables = [row[0] for row in cursor.fetchall()]
            data = {"tables": tables}
        
        elif resource_type == "users":
            cursor.execute("SELECT * FROM users ORDER BY id")
            rows = cursor.fetchall()
            data = {
                "table": "users",
                "count": len(rows),
                "data": [dict(row) for row in rows]
            }
        
        elif resource_type == "products":
            cursor.execute("SELECT * FROM products ORDER BY id")
            rows = cursor.fetchall()
            data = {
                "table": "products",
                "count": len(rows),
                "data": [dict(row) for row in rows]
            }
        
        elif resource_type == "schema":
            # 获取表结构信息
            schema_info = {}
            
            cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
            tables = [row[0] for row in cursor.fetchall()]
            
            for table in tables:
                cursor.execute(f"PRAGMA table_info({table})")
                columns = cursor.fetchall()
                schema_info[table] = [
                    {
                        "name": col[1],
                        "type": col[2],
                        "not_null": bool(col[3]),
                        "default": col[4],
                        "primary_key": bool(col[5])
                    }
                    for col in columns
                ]
            
            data = {"schema": schema_info}
        
        else:
            raise ValueError(f"未知资源类型: {resource_type}")
        
        return ReadResourceResult(
            contents=[
                TextContent(
                    type="text",
                    text=json.dumps(data, ensure_ascii=False, indent=2)
                )
            ]
        )
    
    except Exception as e:
        raise RuntimeError(f"读取数据库资源失败: {e}")
    
    finally:
        conn.close()

@server.list_tools()
async def list_tools() -> List[Tool]:
    """列出数据库工具"""
    return [
        Tool(
            name="execute_query",
            description="执行 SQL 查询",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "要执行的 SQL 查询语句"
                    },
                    "params": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "查询参数(可选)",
                        "default": []
                    }
                },
                "required": ["query"]
            }
        ),
        Tool(
            name="insert_user",
            description="插入新用户",
            inputSchema={
                "type": "object",
                "properties": {
                    "name": {
                        "type": "string",
                        "description": "用户姓名"
                    },
                    "email": {
                        "type": "string",
                        "description": "用户邮箱"
                    },
                    "age": {
                        "type": "integer",
                        "description": "用户年龄",
                        "minimum": 0,
                        "maximum": 150
                    }
                },
                "required": ["name", "email"]
            }
        ),
        Tool(
            name="update_product_stock",
            description="更新产品库存",
            inputSchema={
                "type": "object",
                "properties": {
                    "product_id": {
                        "type": "integer",
                        "description": "产品 ID"
                    },
                    "new_stock": {
                        "type": "integer",
                        "description": "新的库存数量",
                        "minimum": 0
                    }
                },
                "required": ["product_id", "new_stock"]
            }
        ),
        Tool(
            name="search_products",
            description="搜索产品",
            inputSchema={
                "type": "object",
                "properties": {
                    "keyword": {
                        "type": "string",
                        "description": "搜索关键词"
                    },
                    "category": {
                        "type": "string",
                        "description": "产品类别过滤器(可选)"
                    },
                    "min_price": {
                        "type": "number",
                        "description": "最低价格(可选)"
                    },
                    "max_price": {
                        "type": "number",
                        "description": "最高价格(可选)"
                    }
                },
                "required": ["keyword"]
            }
        )
    ]

@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
    """调用数据库工具"""
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()
    
    try:
        if name == "execute_query":
            query = arguments["query"]
            params = arguments.get("params", [])
            
            # 安全检查:只允许 SELECT 查询
            if not query.strip().upper().startswith("SELECT"):
                return CallToolResult(
                    content=[TextContent(type="text", text="出于安全考虑,只允许 SELECT 查询")],
                    isError=True
                )
            
            cursor.execute(query, params)
            rows = cursor.fetchall()
            
            result = {
                "query": query,
                "row_count": len(rows),
                "data": [dict(row) for row in rows]
            }
            
            return CallToolResult(
                content=[TextContent(
                    type="text",
                    text=json.dumps(result, ensure_ascii=False, indent=2)
                )]
            )
        
        elif name == "insert_user":
            name = arguments["name"]
            email = arguments["email"]
            age = arguments.get("age")
            
            try:
                if age is not None:
                    cursor.execute(
                        "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
                        (name, email, age)
                    )
                else:
                    cursor.execute(
                        "INSERT INTO users (name, email) VALUES (?, ?)",
                        (name, email)
                    )
                
                conn.commit()
                user_id = cursor.lastrowid
                
                return CallToolResult(
                    content=[TextContent(
                        type="text",
                        text=f"用户创建成功,ID: {user_id}"
                    )]
                )
            
            except sqlite3.IntegrityError as e:
                return CallToolResult(
                    content=[TextContent(type="text", text=f"创建用户失败: {e}")],
                    isError=True
                )
        
        elif name == "update_product_stock":
            product_id = arguments["product_id"]
            new_stock = arguments["new_stock"]
            
            cursor.execute(
                "UPDATE products SET stock = ? WHERE id = ?",
                (new_stock, product_id)
            )
            
            if cursor.rowcount > 0:
                conn.commit()
                return CallToolResult(
                    content=[TextContent(
                        type="text",
                        text=f"产品 {product_id} 库存已更新为 {new_stock}"
                    )]
                )
            else:
                return CallToolResult(
                    content=[TextContent(type="text", text=f"未找到产品 ID: {product_id}")],
                    isError=True
                )
        
        elif name == "search_products":
            keyword = arguments["keyword"]
            category = arguments.get("category")
            min_price = arguments.get("min_price")
            max_price = arguments.get("max_price")
            
            # 构建查询
            query = "SELECT * FROM products WHERE name LIKE ?"
            params = [f"%{keyword}%"]
            
            if category:
                query += " AND category = ?"
                params.append(category)
            
            if min_price is not None:
                query += " AND price >= ?"
                params.append(min_price)
            
            if max_price is not None:
                query += " AND price <= ?"
                params.append(max_price)
            
            query += " ORDER BY name"
            
            cursor.execute(query, params)
            rows = cursor.fetchall()
            
            result = {
                "search_keyword": keyword,
                "filters": {
                    "category": category,
                    "min_price": min_price,
                    "max_price": max_price
                },
                "result_count": len(rows),
                "products": [dict(row) for row in rows]
            }
            
            return CallToolResult(
                content=[TextContent(
                    type="text",
                    text=json.dumps(result, ensure_ascii=False, indent=2)
                )]
            )
        
        else:
            return CallToolResult(
                content=[TextContent(type="text", text=f"未知工具: {name}")],
                isError=True
            )
    
    except Exception as e:
        return CallToolResult(
            content=[TextContent(type="text", text=f"数据库操作失败: {e}")],
            isError=True
        )
    
    finally:
        conn.close()

async def main():
    """启动服务器"""
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

高级示例

1. 多服务器客户端

一个连接和管理多个 MCP 服务器的客户端示例。

python
#!/usr/bin/env python3
import asyncio
import json
from typing import Dict, List, Any
from dataclasses import dataclass

from mcp.client import Client
from mcp.client.stdio import stdio_client

@dataclass
class ServerConfig:
    name: str
    command: str
    args: List[str]
    description: str

class MultiServerClient:
    def __init__(self):
        self.clients: Dict[str, Client] = {}
        self.server_configs = [
            ServerConfig(
                name="file-server",
                command="python",
                args=["file_server.py"],
                description="文件系统服务器"
            ),
            ServerConfig(
                name="weather-server", 
                command="python",
                args=["weather_server.py"],
                description="天气信息服务器"
            ),
            ServerConfig(
                name="database-server",
                command="python", 
                args=["database_server.py"],
                description="数据库服务器"
            )
        ]
    
    async def connect_all_servers(self):
        """连接所有服务器"""
        for config in self.server_configs:
            try:
                print(f"正在连接 {config.name}...")
                
                async with stdio_client(config.command, config.args) as (read, write):
                    client = Client("multi-client")
                    await client.initialize(read, write)
                    
                    self.clients[config.name] = client
                    print(f"✅ {config.name} 连接成功")
                    
            except Exception as e:
                print(f"❌ {config.name} 连接失败: {e}")
    
    async def list_all_capabilities(self):
        """列出所有服务器的能力"""
        print("\n🔍 服务器能力概览:")
        print("=" * 50)
        
        for server_name, client in self.clients.items():
            try:
                print(f"\n📡 {server_name}:")
                
                # 列出资源
                try:
                    resources = await client.list_resources()
                    print(f"  📁 资源 ({len(resources.resources)}):")
                    for resource in resources.resources[:3]:  # 只显示前3个
                        print(f"    - {resource.name}: {resource.description}")
                    if len(resources.resources) > 3:
                        print(f"    ... 还有 {len(resources.resources) - 3} 个资源")
                except Exception as e:
                    print(f"    ❌ 获取资源失败: {e}")
                
                # 列出工具
                try:
                    tools = await client.list_tools()
                    print(f"  🔧 工具 ({len(tools.tools)}):")
                    for tool in tools.tools[:3]:  # 只显示前3个
                        print(f"    - {tool.name}: {tool.description}")
                    if len(tools.tools) > 3:
                        print(f"    ... 还有 {len(tools.tools) - 3} 个工具")
                except Exception as e:
                    print(f"    ❌ 获取工具失败: {e}")
                
            except Exception as e:
                print(f"  ❌ 服务器 {server_name} 不可用: {e}")
    
    async def execute_workflow(self, workflow: List[Dict[str, Any]]):
        """执行工作流"""
        print("\n🚀 执行工作流:")
        print("=" * 30)
        
        results = []
        
        for i, step in enumerate(workflow, 1):
            server_name = step["server"]
            action_type = step["type"]
            action_name = step["name"]
            params = step.get("params", {})
            
            print(f"\n步骤 {i}: {server_name} - {action_type} - {action_name}")
            
            if server_name not in self.clients:
                error_msg = f"服务器 {server_name} 未连接"
                print(f"  ❌ {error_msg}")
                results.append({"step": i, "error": error_msg})
                continue
            
            client = self.clients[server_name]
            
            try:
                if action_type == "resource":
                    result = await client.read_resource(action_name)
                    print(f"  ✅ 资源读取成功")
                    results.append({"step": i, "result": result})
                
                elif action_type == "tool":
                    result = await client.call_tool(action_name, params)
                    print(f"  ✅ 工具调用成功")
                    results.append({"step": i, "result": result})
                
                else:
                    error_msg = f"未知操作类型: {action_type}"
                    print(f"  ❌ {error_msg}")
                    results.append({"step": i, "error": error_msg})
            
            except Exception as e:
                error_msg = f"执行失败: {e}"
                print(f"  ❌ {error_msg}")
                results.append({"step": i, "error": error_msg})
        
        return results
    
    async def interactive_mode(self):
        """交互模式"""
        print("\n🎮 进入交互模式 (输入 'help' 查看帮助, 'quit' 退出)")
        print("=" * 50)
        
        while True:
            try:
                command = input("\n> ").strip()
                
                if command == "quit":
                    break
                
                elif command == "help":
                    print("""
可用命令:
  help                    - 显示此帮助
  list                    - 列出所有服务器
  capabilities            - 显示所有服务器能力
  use <server>            - 切换到指定服务器
  resources <server>      - 列出服务器资源
  tools <server>          - 列出服务器工具
  call <server> <tool> <args> - 调用工具
  quit                    - 退出
                    """)
                
                elif command == "list":
                    print("\n连接的服务器:")
                    for name in self.clients.keys():
                        print(f"  - {name}")
                
                elif command == "capabilities":
                    await self.list_all_capabilities()
                
                elif command.startswith("resources "):
                    server_name = command.split(" ", 1)[1]
                    if server_name in self.clients:
                        client = self.clients[server_name]
                        resources = await client.list_resources()
                        print(f"\n{server_name} 的资源:")
                        for resource in resources.resources:
                            print(f"  - {resource.name}: {resource.description}")
                    else:
                        print(f"服务器 {server_name} 未连接")
                
                elif command.startswith("tools "):
                    server_name = command.split(" ", 1)[1]
                    if server_name in self.clients:
                        client = self.clients[server_name]
                        tools = await client.list_tools()
                        print(f"\n{server_name} 的工具:")
                        for tool in tools.tools:
                            print(f"  - {tool.name}: {tool.description}")
                    else:
                        print(f"服务器 {server_name} 未连接")
                
                elif command.startswith("call "):
                    parts = command.split(" ", 3)
                    if len(parts) >= 3:
                        server_name = parts[1]
                        tool_name = parts[2]
                        args_str = parts[3] if len(parts) > 3 else "{}"
                        
                        if server_name in self.clients:
                            try:
                                args = json.loads(args_str)
                                client = self.clients[server_name]
                                result = await client.call_tool(tool_name, args)
                                print(f"\n工具调用结果:")
                                for content in result.content:
                                    print(content.text)
                            except json.JSONDecodeError:
                                print("参数格式错误,请使用 JSON 格式")
                            except Exception as e:
                                print(f"工具调用失败: {e}")
                        else:
                            print(f"服务器 {server_name} 未连接")
                    else:
                        print("用法: call <server> <tool> <args>")
                
                else:
                    print("未知命令,输入 'help' 查看帮助")
            
            except KeyboardInterrupt:
                break
            except Exception as e:
                print(f"命令执行错误: {e}")
    
    async def run_demo(self):
        """运行演示"""
        # 连接服务器
        await self.connect_all_servers()
        
        if not self.clients:
            print("❌ 没有可用的服务器连接")
            return
        
        # 显示能力
        await self.list_all_capabilities()
        
        # 执行示例工作流
        demo_workflow = [
            {
                "server": "weather-server",
                "type": "tool",
                "name": "get_weather",
                "params": {"city": "北京"}
            },
            {
                "server": "file-server", 
                "type": "tool",
                "name": "create_file",
                "params": {
                    "filename": "weather_report.txt",
                    "content": "今日天气报告已生成"
                }
            },
            {
                "server": "database-server",
                "type": "tool", 
                "name": "search_products",
                "params": {"keyword": "电脑"}
            }
        ]
        
        workflow_results = await self.execute_workflow(demo_workflow)
        
        print(f"\n📊 工作流执行完成,共 {len(workflow_results)} 个步骤")
        
        # 进入交互模式
        await self.interactive_mode()

async def main():
    client = MultiServerClient()
    await client.run_demo()

if __name__ == "__main__":
    asyncio.run(main())

2. 智能代理示例

一个使用多个 MCP 服务器的智能代理。

python
#!/usr/bin/env python3
import asyncio
import json
import re
from typing import Dict, List, Any, Optional
from dataclasses import dataclass

from mcp.client import Client
from mcp.client.stdio import stdio_client

@dataclass
class Task:
    id: str
    description: str
    status: str = "pending"  # pending, running, completed, failed
    result: Optional[Any] = None
    error: Optional[str] = None

class IntelligentAgent:
    def __init__(self):
        self.clients: Dict[str, Client] = {}
        self.tasks: List[Task] = []
        self.capabilities: Dict[str, Dict] = {}
    
    async def initialize(self):
        """初始化代理"""
        # 连接服务器
        server_configs = [
            ("file-server", "python", ["file_server.py"]),
            ("weather-server", "python", ["weather_server.py"]),
            ("database-server", "python", ["database_server.py"]),
        ]
        
        for name, command, args in server_configs:
            try:
                async with stdio_client(command, args) as (read, write):
                    client = Client("intelligent-agent")
                    await client.initialize(read, write)
                    
                    self.clients[name] = client
                    
                    # 缓存服务器能力
                    capabilities = {}
                    try:
                        resources = await client.list_resources()
                        capabilities["resources"] = [r.name for r in resources.resources]
                    except:
                        capabilities["resources"] = []
                    
                    try:
                        tools = await client.list_tools()
                        capabilities["tools"] = [t.name for t in tools.tools]
                    except:
                        capabilities["tools"] = []
                    
                    self.capabilities[name] = capabilities
                    print(f"✅ {name} 已连接并缓存能力")
                    
            except Exception as e:
                print(f"❌ {name} 连接失败: {e}")
    
    def parse_natural_language_request(self, request: str) -> List[Dict[str, Any]]:
        """解析自然语言请求为任务序列"""
        request = request.lower()
        tasks = []
        
        # 天气相关
        if "天气" in request or "weather" in request:
            city_match = re.search(r'(北京|上海|广州|深圳|杭州|成都|西安|武汉)', request)
            city = city_match.group(1) if city_match else "北京"
            
            tasks.append({
                "server": "weather-server",
                "type": "tool",
                "name": "get_weather",
                "params": {"city": city},
                "description": f"获取{city}天气信息"
            })
        
        # 文件操作
        if "创建文件" in request or "写文件" in request:
            filename_match = re.search(r'文件名?[是为]?(.+?)(?:\s|$)', request)
            filename = filename_match.group(1) if filename_match else "output.txt"
            
            content_match = re.search(r'内容[是为]?(.+?)(?:\s|$)', request)
            content = content_match.group(1) if content_match else "默认内容"
            
            tasks.append({
                "server": "file-server",
                "type": "tool", 
                "name": "create_file",
                "params": {"filename": filename, "content": content},
                "description": f"创建文件 {filename}"
            })
        
        if "搜索文件" in request:
            pattern_match = re.search(r'搜索[文件]?(.+?)(?:\s|$)', request)
            pattern = pattern_match.group(1) if pattern_match else "test"
            
            tasks.append({
                "server": "file-server",
                "type": "tool",
                "name": "search_files", 
                "params": {"pattern": pattern},
                "description": f"搜索包含 '{pattern}' 的文件"
            })
        
        # 数据库操作
        if "搜索产品" in request or "查找产品" in request:
            keyword_match = re.search(r'产品[名称]?[是为]?(.+?)(?:\s|$)', request)
            keyword = keyword_match.group(1) if keyword_match else "电脑"
            
            tasks.append({
                "server": "database-server",
                "type": "tool",
                "name": "search_products",
                "params": {"keyword": keyword},
                "description": f"搜索产品: {keyword}"
            })
        
        if "添加用户" in request or "创建用户" in request:
            name_match = re.search(r'姓名[是为]?(.+?)(?:\s|邮箱|年龄|$)', request)
            email_match = re.search(r'邮箱[是为]?(.+?)(?:\s|年龄|$)', request)
            age_match = re.search(r'年龄[是为]?(\d+)', request)
            
            name = name_match.group(1) if name_match else "测试用户"
            email = email_match.group(1) if email_match else "test@example.com"
            age = int(age_match.group(1)) if age_match else None
            
            params = {"name": name, "email": email}
            if age:
                params["age"] = age
            
            tasks.append({
                "server": "database-server",
                "type": "tool",
                "name": "insert_user",
                "params": params,
                "description": f"添加用户: {name}"
            })
        
        # 如果没有识别到任何任务,提供帮助
        if not tasks:
            tasks.append({
                "type": "help",
                "description": "未识别到具体任务,请尝试以下格式的请求",
                "examples": [
                    "获取北京天气",
                    "创建文件名为report.txt内容为测试报告",
                    "搜索文件包含python",
                    "搜索产品电脑",
                    "添加用户姓名为张三邮箱为zhangsan@example.com年龄为25"
                ]
            })
        
        return tasks
    
    async def execute_task_sequence(self, tasks: List[Dict[str, Any]]) -> List[Task]:
        """执行任务序列"""
        executed_tasks = []
        
        for i, task_def in enumerate(tasks):
            task = Task(
                id=f"task_{i+1}",
                description=task_def["description"]
            )
            
            if task_def.get("type") == "help":
                task.status = "completed"
                task.result = {
                    "message": task_def["description"],
                    "examples": task_def["examples"]
                }
                executed_tasks.append(task)
                continue
            
            task.status = "running"
            executed_tasks.append(task)
            
            server_name = task_def["server"]
            action_type = task_def["type"]
            action_name = task_def["name"]
            params = task_def.get("params", {})
            
            print(f"🔄 执行任务: {task.description}")
            
            if server_name not in self.clients:
                task.status = "failed"
                task.error = f"服务器 {server_name} 不可用"
                print(f"  ❌ {task.error}")
                continue
            
            client = self.clients[server_name]
            
            try:
                if action_type == "tool":
                    result = await client.call_tool(action_name, params)
                    task.status = "completed"
                    task.result = result
                    print(f"  ✅ 任务完成")
                    
                    # 显示结果摘要
                    if result.content:
                        content_preview = result.content[0].text[:100]
                        if len(result.content[0].text) > 100:
                            content_preview += "..."
                        print(f"  📄 结果预览: {content_preview}")
                
                elif action_type == "resource":
                    result = await client.read_resource(action_name)
                    task.status = "completed"
                    task.result = result
                    print(f"  ✅ 资源读取完成")
                
                else:
                    task.status = "failed"
                    task.error = f"未知操作类型: {action_type}"
                    print(f"  ❌ {task.error}")
            
            except Exception as e:
                task.status = "failed"
                task.error = str(e)
                print(f"  ❌ 任务失败: {e}")
        
        return executed_tasks
    
    def generate_summary_report(self, tasks: List[Task]) -> str:
        """生成任务执行摘要报告"""
        total_tasks = len(tasks)
        completed_tasks = len([t for t in tasks if t.status == "completed"])
        failed_tasks = len([t for t in tasks if t.status == "failed"])
        
        report = f"""
📊 任务执行摘要报告
{'='*40}

📈 统计信息:
  - 总任务数: {total_tasks}
  - 成功完成: {completed_tasks}
  - 执行失败: {failed_tasks}
  - 成功率: {(completed_tasks/total_tasks*100):.1f}%

📋 任务详情:
"""
        
        for task in tasks:
            status_emoji = {
                "completed": "✅",
                "failed": "❌", 
                "running": "🔄",
                "pending": "⏳"
            }.get(task.status, "❓")
            
            report += f"  {status_emoji} {task.description}\n"
            
            if task.status == "failed" and task.error:
                report += f"    错误: {task.error}\n"
            elif task.status == "completed" and task.result:
                if hasattr(task.result, 'content') and task.result.content:
                    preview = task.result.content[0].text[:50]
                    if len(task.result.content[0].text) > 50:
                        preview += "..."
                    report += f"    结果: {preview}\n"
        
        return report
    
    async def chat_mode(self):
        """聊天模式"""
        print("\n🤖 智能代理聊天模式")
        print("=" * 40)
        print("您可以用自然语言描述您想要执行的任务")
        print("例如: '获取北京天气并创建天气报告文件'")
        print("输入 'quit' 退出\n")
        
        while True:
            try:
                user_input = input("👤 您: ").strip()
                
                if user_input.lower() == "quit":
                    print("👋 再见!")
                    break
                
                if not user_input:
                    continue
                
                print(f"\n🤖 代理: 正在分析您的请求...")
                
                # 解析请求
                tasks = self.parse_natural_language_request(user_input)
                
                if not tasks:
                    print("抱歉,我无法理解您的请求。请尝试更具体的描述。")
                    continue
                
                print(f"我理解您想要执行 {len(tasks)} 个任务:")
                for i, task in enumerate(tasks, 1):
                    print(f"  {i}. {task['description']}")
                
                # 执行任务
                print(f"\n开始执行任务...")
                executed_tasks = await self.execute_task_sequence(tasks)
                
                # 生成报告
                report = self.generate_summary_report(executed_tasks)
                print(report)
                
            except KeyboardInterrupt:
                print("\n👋 再见!")
                break
            except Exception as e:
                print(f"❌ 处理请求时出错: {e}")

async def main():
    """主函数"""
    agent = IntelligentAgent()
    
    print("🚀 正在初始化智能代理...")
    await agent.initialize()
    
    if not agent.clients:
        print("❌ 没有可用的服务器连接,无法启动代理")
        return
    
    print(f"✅ 代理初始化完成,连接了 {len(agent.clients)} 个服务器")
    
    # 显示可用能力
    print("\n🔧 可用服务器和能力:")
    for server_name, capabilities in agent.capabilities.items():
        print(f"  📡 {server_name}:")
        if capabilities["tools"]:
            print(f"    🔧 工具: {', '.join(capabilities['tools'])}")
        if capabilities["resources"]:
            print(f"    📁 资源: {', '.join(capabilities['resources'])}")
    
    # 启动聊天模式
    await agent.chat_mode()

if __name__ == "__main__":
    asyncio.run(main())

部署示例

1. Docker 部署

Dockerfile

dockerfile
# 多阶段构建 - Python 服务器
FROM python:3.11-slim as python-base

WORKDIR /app

# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制 Python 服务器文件
COPY servers/python/ ./servers/python/

# 多阶段构建 - Node.js 服务器
FROM node:18-slim as node-base

WORKDIR /app

# 安装 Node.js 依赖
COPY package.json package-lock.json ./
RUN npm ci --only=production

# 复制 TypeScript 服务器文件
COPY servers/typescript/ ./servers/typescript/

# 最终镜像
FROM python:3.11-slim

# 安装 Node.js
RUN apt-get update && apt-get install -y \
    curl \
    && curl -fsSL https://deb.nodesource.com/setup_18.x | bash - \
    && apt-get install -y nodejs \
    && apt-get clean \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

# 从构建阶段复制文件
COPY --from=python-base /app ./
COPY --from=node-base /app/node_modules ./node_modules
COPY --from=node-base /app/servers/typescript ./servers/typescript

# 创建启动脚本
COPY docker/start.sh ./
RUN chmod +x start.sh

# 暴露端口
EXPOSE 8000 8001 8002

# 启动命令
CMD ["./start.sh"]

docker-compose.yml

yaml
version: '3.8'

services:
  mcp-servers:
    build: .
    ports:
      - "8000:8000"  # 文件服务器
      - "8001:8001"  # 天气服务器
      - "8002:8002"  # 数据库服务器
    environment:
      - OPENWEATHER_API_KEY=${OPENWEATHER_API_KEY}
      - DATABASE_URL=sqlite:///data/app.db
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx/nginx.conf:/etc/nginx/nginx.conf
      - ./nginx/ssl:/etc/nginx/ssl
    depends_on:
      - mcp-servers
    restart: unless-stopped

volumes:
  data:
  logs:

2. Kubernetes 部署

deployment.yaml

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-servers
  labels:
    app: mcp-servers
spec:
  replicas: 3
  selector:
    matchLabels:
      app: mcp-servers
  template:
    metadata:
      labels:
        app: mcp-servers
    spec:
      containers:
      - name: mcp-servers
        image: mcp-servers:latest
        ports:
        - containerPort: 8000
        - containerPort: 8001
        - containerPort: 8002
        env:
        - name: OPENWEATHER_API_KEY
          valueFrom:
            secretKeyRef:
              name: mcp-secrets
              key: openweather-api-key
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: mcp-service
spec:
  selector:
    app: mcp-servers
  ports:
  - name: file-server
    port: 8000
    targetPort: 8000
  - name: weather-server
    port: 8001
    targetPort: 8001
  - name: database-server
    port: 8002
    targetPort: 8002
  type: LoadBalancer

测试示例

1. 单元测试

python
#!/usr/bin/env python3
import pytest
import asyncio
import tempfile
import os
from pathlib import Path

from mcp.server import Server
from mcp.types import TextContent

# 测试文件服务器
class TestFileServer:
    @pytest.fixture
    async def server(self):
        """创建测试服务器"""
        server = Server("test-file-server")
        
        # 设置临时目录
        self.temp_dir = Path(tempfile.mkdtemp())
        
        # 创建测试文件
        test_file = self.temp_dir / "test.txt"
        test_file.write_text("测试内容")
        
        return server
    
    @pytest.mark.asyncio
    async def test_list_resources(self, server):
        """测试资源列表"""
        # 这里需要实现具体的测试逻辑
        pass
    
    @pytest.mark.asyncio
    async def test_read_resource(self, server):
        """测试资源读取"""
        # 这里需要实现具体的测试逻辑
        pass
    
    @pytest.mark.asyncio
    async def test_create_file_tool(self, server):
        """测试文件创建工具"""
        # 这里需要实现具体的测试逻辑
        pass

# 运行测试
if __name__ == "__main__":
    pytest.main([__file__, "-v"])

2. 集成测试

python
#!/usr/bin/env python3
import asyncio
import pytest
from mcp.client import Client
from mcp.client.stdio import stdio_client

class TestIntegration:
    @pytest.mark.asyncio
    async def test_full_workflow(self):
        """测试完整工作流"""
        # 启动服务器并连接客户端
        async with stdio_client("python", ["file_server.py"]) as (read, write):
            client = Client("test-client")
            await client.initialize(read, write)
            
            # 测试工具调用
            result = await client.call_tool("create_file", {
                "filename": "test_integration.txt",
                "content": "集成测试内容"
            })
            
            assert not result.isError
            assert "创建成功" in result.content[0].text
            
            # 测试资源读取
            resources = await client.list_resources()
            assert len(resources.resources) > 0

if __name__ == "__main__":
    pytest.main([__file__, "-v"])

最佳实践

1. 错误处理

python
# 统一错误处理装饰器
def handle_errors(func):
    async def wrapper(*args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except ValueError as e:
            return CallToolResult(
                content=[TextContent(type="text", text=f"参数错误: {e}")],
                isError=True
            )
        except FileNotFoundError as e:
            return CallToolResult(
                content=[TextContent(type="text", text=f"文件未找到: {e}")],
                isError=True
            )
        except Exception as e:
            return CallToolResult(
                content=[TextContent(type="text", text=f"内部错误: {e}")],
                isError=True
            )
    return wrapper

@server.call_tool()
@handle_errors
async def call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
    # 工具实现
    pass

2. 配置管理

python
import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class ServerConfig:
    base_dir: str = os.getenv("MCP_BASE_DIR", str(Path.home() / "Documents"))
    max_file_size: int = int(os.getenv("MCP_MAX_FILE_SIZE", "10485760"))  # 10MB
    allowed_extensions: list = None
    log_level: str = os.getenv("MCP_LOG_LEVEL", "INFO")
    
    def __post_init__(self):
        if self.allowed_extensions is None:
            self.allowed_extensions = [".txt", ".md", ".json", ".csv"]

config = ServerConfig()

3. 日志记录

python
import logging
import sys

def setup_logging(level: str = "INFO"):
    """设置日志"""
    logging.basicConfig(
        level=getattr(logging, level.upper()),
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.StreamHandler(sys.stderr),
            logging.FileHandler('mcp_server.log')
        ]
    )
    
    return logging.getLogger(__name__)

logger = setup_logging(config.log_level)

@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
    logger.info(f"调用工具: {name}, 参数: {arguments}")
    
    try:
        # 工具实现
        result = await execute_tool(name, arguments)
        logger.info(f"工具执行成功: {name}")
        return result
    except Exception as e:
        logger.error(f"工具执行失败: {name}, 错误: {e}")
        raise

下一步

获取帮助

🚀 探索模型上下文协议的无限可能 | 连接 AI 与世界的桥梁 | 让智能更智能