Skip to content

构建 MCP 服务器

本指南将带您完成构建 MCP 服务器的完整过程,从基础设置到高级功能实现。

前提条件

Python 环境

  • Python 3.10 或更高版本
  • 必须使用 Python MCP SDK 1.2.0 或更高版本

安装依赖

bash
# 使用 pip 安装
pip install mcp

# 或使用 uv(推荐)
uv add mcp

基础服务器实现

1. 创建简单服务器

python
from mcp.server.fastmcp import FastMCP

# 创建服务器实例
mcp = FastMCP("my-first-server")

@mcp.resource("file://{path}")
async def read_file(path: str) -> str:
    """读取文件内容
    
    Args:
        path: 文件路径
    """
    try:
        with open(path, 'r', encoding='utf-8') as f:
            return f.read()
    except FileNotFoundError:
        return f"文件 {path} 不存在"
    except Exception as e:
        return f"读取文件时出错: {e}"

@mcp.tool()
async def calculate(expression: str) -> str:
    """安全的数学计算器
    
    Args:
        expression: 数学表达式(如 "2 + 3 * 4")
    """
    try:
        # 简单的安全检查
        allowed_chars = set('0123456789+-*/().')
        if not all(c in allowed_chars or c.isspace() for c in expression):
            return "表达式包含不允许的字符"
        
        result = eval(expression)
        return str(result)
    except Exception as e:
        return f"计算错误: {e}"

@mcp.prompt("code-review")
async def code_review_prompt(language: str = "python") -> str:
    """代码审查提示模板
    
    Args:
        language: 编程语言
    """
    return f"""请审查以下 {language} 代码,关注以下方面:

1. 代码质量和可读性
2. 潜在的 bug 和安全问题
3. 性能优化建议
4. 最佳实践遵循情况

请提供具体的改进建议。

代码:
```{language}
[在此处粘贴代码]
```"""

if __name__ == "__main__":
    mcp.run()

2. 运行服务器

bash
python server.py

高级功能实现

资源管理

python
import os
import json
from typing import List, Dict, Any

@mcp.resource("config://settings")
async def get_settings() -> str:
    """获取应用程序设置"""
    settings = {
        "version": "1.0.0",
        "debug": True,
        "max_connections": 100
    }
    return json.dumps(settings, indent=2)

@mcp.resource("directory://{path}")
async def list_directory(path: str) -> str:
    """列出目录内容"""
    try:
        if not os.path.exists(path):
            return f"目录 {path} 不存在"
        
        if not os.path.isdir(path):
            return f"{path} 不是一个目录"
        
        items = []
        for item in os.listdir(path):
            item_path = os.path.join(path, item)
            item_type = "目录" if os.path.isdir(item_path) else "文件"
            size = os.path.getsize(item_path) if os.path.isfile(item_path) else "-"
            items.append({
                "name": item,
                "type": item_type,
                "size": size
            })
        
        return json.dumps(items, indent=2, ensure_ascii=False)
    except Exception as e:
        return f"列出目录时出错: {e}"

工具实现

python
import requests
import subprocess
from datetime import datetime

@mcp.tool()
async def web_search(query: str, max_results: int = 5) -> str:
    """网络搜索工具
    
    Args:
        query: 搜索查询
        max_results: 最大结果数量
    """
    # 这里使用示例 API,实际使用时需要替换为真实的搜索 API
    try:
        # 模拟搜索结果
        results = [
            {
                "title": f"搜索结果 {i+1}",
                "url": f"https://example.com/result{i+1}",
                "snippet": f"关于 '{query}' 的搜索结果 {i+1}"
            }
            for i in range(min(max_results, 3))
        ]
        
        return json.dumps(results, indent=2, ensure_ascii=False)
    except Exception as e:
        return f"搜索时出错: {e}"

@mcp.tool()
async def run_command(command: str, working_dir: str = ".") -> str:
    """执行系统命令(需要谨慎使用)
    
    Args:
        command: 要执行的命令
        working_dir: 工作目录
    """
    try:
        # 安全检查:只允许特定的安全命令
        safe_commands = ["ls", "pwd", "date", "whoami", "echo"]
        cmd_parts = command.split()
        if not cmd_parts or cmd_parts[0] not in safe_commands:
            return "不允许执行此命令"
        
        result = subprocess.run(
            command,
            shell=True,
            cwd=working_dir,
            capture_output=True,
            text=True,
            timeout=10
        )
        
        output = {
            "stdout": result.stdout,
            "stderr": result.stderr,
            "return_code": result.returncode
        }
        
        return json.dumps(output, indent=2, ensure_ascii=False)
    except subprocess.TimeoutExpired:
        return "命令执行超时"
    except Exception as e:
        return f"执行命令时出错: {e}"

@mcp.tool()
async def get_weather(city: str) -> str:
    """获取天气信息
    
    Args:
        city: 城市名称
    """
    # 模拟天气 API 调用
    weather_data = {
        "city": city,
        "temperature": "25°C",
        "condition": "晴朗",
        "humidity": "60%",
        "wind": "5 km/h",
        "timestamp": datetime.now().isoformat()
    }
    
    return json.dumps(weather_data, indent=2, ensure_ascii=False)

提示模板

python
@mcp.prompt("data-analysis")
async def data_analysis_prompt(data_type: str = "CSV", analysis_goal: str = "general") -> str:
    """数据分析提示模板
    
    Args:
        data_type: 数据类型(CSV, JSON, Excel 等)
        analysis_goal: 分析目标(general, trends, anomalies 等)
    """
    base_prompt = f"""请分析以下 {data_type} 数据,重点关注 {analysis_goal} 分析:

分析步骤:
1. 数据概览和基本统计
2. 数据质量检查
3. 关键模式和趋势识别
4. 异常值检测
5. 结论和建议

请提供详细的分析结果和可视化建议。

数据:
[在此处粘贴数据]
"""
    
    if analysis_goal == "trends":
        base_prompt += "\n\n特别关注:\n- 时间序列趋势\n- 季节性模式\n- 增长率分析"
    elif analysis_goal == "anomalies":
        base_prompt += "\n\n特别关注:\n- 异常值识别\n- 数据质量问题\n- 潜在错误检测"
    
    return base_prompt

@mcp.prompt("api-documentation")
async def api_docs_prompt(api_name: str, version: str = "v1") -> str:
    """API 文档生成提示
    
    Args:
        api_name: API 名称
        version: API 版本
    """
    return f"""请为 {api_name} {version} API 生成完整的文档,包括:

## API 概述
- API 用途和功能
- 基础 URL 和认证方式
- 支持的数据格式

## 端点文档
对于每个端点,请包括:
- HTTP 方法和路径
- 请求参数(路径、查询、请求体)
- 响应格式和状态码
- 错误处理
- 使用示例

## 认证和安全
- 认证方式
- API 密钥管理
- 速率限制

## SDK 和示例
- 各语言的 SDK 使用示例
- 常见用例的代码示例

请确保文档清晰、完整且易于理解。
"""

错误处理和日志记录

错误处理

python
from mcp.types import McpError
import logging

# 配置日志记录(重要:STDIO 服务器不能使用 stdout)
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]  # 输出到 stderr
)

logger = logging.getLogger(__name__)

@mcp.tool()
async def safe_file_operation(operation: str, file_path: str, content: str = "") -> str:
    """安全的文件操作
    
    Args:
        operation: 操作类型(read, write, delete)
        file_path: 文件路径
        content: 写入内容(仅用于 write 操作)
    """
    try:
        logger.info(f"执行文件操作: {operation} on {file_path}")
        
        # 安全检查
        if not file_path or ".." in file_path:
            raise McpError(
                code="INVALID_PARAMS",
                message="无效的文件路径"
            )
        
        if operation == "read":
            with open(file_path, 'r', encoding='utf-8') as f:
                result = f.read()
                logger.info(f"成功读取文件: {file_path}")
                return result
                
        elif operation == "write":
            with open(file_path, 'w', encoding='utf-8') as f:
                f.write(content)
                logger.info(f"成功写入文件: {file_path}")
                return f"文件 {file_path} 写入成功"
                
        elif operation == "delete":
            os.remove(file_path)
            logger.info(f"成功删除文件: {file_path}")
            return f"文件 {file_path} 删除成功"
            
        else:
            raise McpError(
                code="INVALID_PARAMS",
                message=f"不支持的操作: {operation}"
            )
            
    except FileNotFoundError:
        logger.error(f"文件不存在: {file_path}")
        raise McpError(
            code="RESOURCE_NOT_FOUND",
            message=f"文件 {file_path} 不存在"
        )
    except PermissionError:
        logger.error(f"权限不足: {file_path}")
        raise McpError(
            code="PERMISSION_DENIED",
            message=f"没有权限访问文件 {file_path}"
        )
    except Exception as e:
        logger.error(f"文件操作失败: {e}")
        raise McpError(
            code="INTERNAL_ERROR",
            message=f"文件操作失败: {str(e)}"
        )

配置和部署

配置文件

python
import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class ServerConfig:
    """服务器配置"""
    name: str = "mcp-server"
    debug: bool = False
    max_connections: int = 100
    timeout: int = 30
    log_level: str = "INFO"
    data_dir: Optional[str] = None
    
    @classmethod
    def from_env(cls):
        """从环境变量加载配置"""
        return cls(
            name=os.getenv("MCP_SERVER_NAME", "mcp-server"),
            debug=os.getenv("MCP_DEBUG", "false").lower() == "true",
            max_connections=int(os.getenv("MCP_MAX_CONNECTIONS", "100")),
            timeout=int(os.getenv("MCP_TIMEOUT", "30")),
            log_level=os.getenv("MCP_LOG_LEVEL", "INFO"),
            data_dir=os.getenv("MCP_DATA_DIR")
        )

# 使用配置
config = ServerConfig.from_env()
mcp = FastMCP(config.name)

Docker 部署

dockerfile
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt

# 复制源代码
COPY . .

# 设置环境变量
ENV MCP_SERVER_NAME=docker-mcp-server
ENV MCP_LOG_LEVEL=INFO

# 运行服务器
CMD ["python", "server.py"]
yaml
# docker-compose.yml
version: '3.8'

services:
  mcp-server:
    build: .
    environment:
      - MCP_SERVER_NAME=production-server
      - MCP_DEBUG=false
      - MCP_MAX_CONNECTIONS=200
    volumes:
      - ./data:/app/data
    restart: unless-stopped

测试

单元测试

python
import pytest
import asyncio
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters

@pytest.mark.asyncio
async def test_calculator_tool():
    """测试计算器工具"""
    server_params = StdioServerParameters(
        command="python",
        args=["server.py"]
    )
    
    client = Client()
    async with client.connect(server_params) as session:
        # 测试正常计算
        result = await session.call_tool("calculate", {"expression": "2 + 3"})
        assert result.content[0].text == "5"
        
        # 测试错误处理
        result = await session.call_tool("calculate", {"expression": "invalid"})
        assert "错误" in result.content[0].text

@pytest.mark.asyncio
async def test_file_resource():
    """测试文件资源"""
    # 创建测试文件
    test_file = "test.txt"
    with open(test_file, 'w') as f:
        f.write("测试内容")
    
    try:
        server_params = StdioServerParameters(
            command="python",
            args=["server.py"]
        )
        
        client = Client()
        async with client.connect(server_params) as session:
            content = await session.read_resource(f"file://{test_file}")
            assert "测试内容" in content.content[0].text
    finally:
        os.remove(test_file)

性能优化

异步处理

python
import asyncio
import aiofiles
from concurrent.futures import ThreadPoolExecutor

# 创建线程池用于 CPU 密集型任务
executor = ThreadPoolExecutor(max_workers=4)

@mcp.tool()
async def process_large_file(file_path: str) -> str:
    """异步处理大文件"""
    try:
        # 异步读取文件
        async with aiofiles.open(file_path, 'r') as f:
            content = await f.read()
        
        # CPU 密集型处理在线程池中执行
        def cpu_intensive_task(data):
            # 模拟 CPU 密集型处理
            return len(data.split())
        
        word_count = await asyncio.get_event_loop().run_in_executor(
            executor, cpu_intensive_task, content
        )
        
        return f"文件 {file_path} 包含 {word_count} 个单词"
    except Exception as e:
        return f"处理文件时出错: {e}"

缓存机制

python
from functools import lru_cache
import time

class CacheManager:
    def __init__(self):
        self.cache = {}
        self.cache_ttl = {}
    
    def get(self, key: str, ttl: int = 300):
        """获取缓存值"""
        if key in self.cache:
            if time.time() - self.cache_ttl[key] < ttl:
                return self.cache[key]
            else:
                del self.cache[key]
                del self.cache_ttl[key]
        return None
    
    def set(self, key: str, value):
        """设置缓存值"""
        self.cache[key] = value
        self.cache_ttl[key] = time.time()

cache_manager = CacheManager()

@mcp.tool()
async def cached_api_call(endpoint: str) -> str:
    """带缓存的 API 调用"""
    cached_result = cache_manager.get(endpoint)
    if cached_result:
        return f"缓存结果: {cached_result}"
    
    # 模拟 API 调用
    await asyncio.sleep(1)  # 模拟网络延迟
    result = f"API 响应来自 {endpoint}"
    
    cache_manager.set(endpoint, result)
    return result

下一步

完成服务器构建后,您可以:

🚀 探索模型上下文协议的无限可能 | 连接 AI 与世界的桥梁 | 让智能更智能