主题
构建 MCP 服务器
本指南将带您完成构建 MCP 服务器的完整过程,从基础设置到高级功能实现。
前提条件
Python 环境
- Python 3.10 或更高版本
- 必须使用 Python MCP SDK 1.2.0 或更高版本
安装依赖
bash
# 使用 pip 安装
pip install mcp
# 或使用 uv(推荐)
uv add mcp
基础服务器实现
1. 创建简单服务器
python
from mcp.server.fastmcp import FastMCP
# 创建服务器实例
mcp = FastMCP("my-first-server")
@mcp.resource("file://{path}")
async def read_file(path: str) -> str:
"""读取文件内容
Args:
path: 文件路径
"""
try:
with open(path, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
return f"文件 {path} 不存在"
except Exception as e:
return f"读取文件时出错: {e}"
@mcp.tool()
async def calculate(expression: str) -> str:
"""安全的数学计算器
Args:
expression: 数学表达式(如 "2 + 3 * 4")
"""
try:
# 简单的安全检查
allowed_chars = set('0123456789+-*/().')
if not all(c in allowed_chars or c.isspace() for c in expression):
return "表达式包含不允许的字符"
result = eval(expression)
return str(result)
except Exception as e:
return f"计算错误: {e}"
@mcp.prompt("code-review")
async def code_review_prompt(language: str = "python") -> str:
"""代码审查提示模板
Args:
language: 编程语言
"""
return f"""请审查以下 {language} 代码,关注以下方面:
1. 代码质量和可读性
2. 潜在的 bug 和安全问题
3. 性能优化建议
4. 最佳实践遵循情况
请提供具体的改进建议。
代码:
```{language}
[在此处粘贴代码]
```"""
if __name__ == "__main__":
mcp.run()
2. 运行服务器
bash
python server.py
高级功能实现
资源管理
python
import os
import json
from typing import List, Dict, Any
@mcp.resource("config://settings")
async def get_settings() -> str:
"""获取应用程序设置"""
settings = {
"version": "1.0.0",
"debug": True,
"max_connections": 100
}
return json.dumps(settings, indent=2)
@mcp.resource("directory://{path}")
async def list_directory(path: str) -> str:
"""列出目录内容"""
try:
if not os.path.exists(path):
return f"目录 {path} 不存在"
if not os.path.isdir(path):
return f"{path} 不是一个目录"
items = []
for item in os.listdir(path):
item_path = os.path.join(path, item)
item_type = "目录" if os.path.isdir(item_path) else "文件"
size = os.path.getsize(item_path) if os.path.isfile(item_path) else "-"
items.append({
"name": item,
"type": item_type,
"size": size
})
return json.dumps(items, indent=2, ensure_ascii=False)
except Exception as e:
return f"列出目录时出错: {e}"
工具实现
python
import requests
import subprocess
from datetime import datetime
@mcp.tool()
async def web_search(query: str, max_results: int = 5) -> str:
"""网络搜索工具
Args:
query: 搜索查询
max_results: 最大结果数量
"""
# 这里使用示例 API,实际使用时需要替换为真实的搜索 API
try:
# 模拟搜索结果
results = [
{
"title": f"搜索结果 {i+1}",
"url": f"https://example.com/result{i+1}",
"snippet": f"关于 '{query}' 的搜索结果 {i+1}"
}
for i in range(min(max_results, 3))
]
return json.dumps(results, indent=2, ensure_ascii=False)
except Exception as e:
return f"搜索时出错: {e}"
@mcp.tool()
async def run_command(command: str, working_dir: str = ".") -> str:
"""执行系统命令(需要谨慎使用)
Args:
command: 要执行的命令
working_dir: 工作目录
"""
try:
# 安全检查:只允许特定的安全命令
safe_commands = ["ls", "pwd", "date", "whoami", "echo"]
cmd_parts = command.split()
if not cmd_parts or cmd_parts[0] not in safe_commands:
return "不允许执行此命令"
result = subprocess.run(
command,
shell=True,
cwd=working_dir,
capture_output=True,
text=True,
timeout=10
)
output = {
"stdout": result.stdout,
"stderr": result.stderr,
"return_code": result.returncode
}
return json.dumps(output, indent=2, ensure_ascii=False)
except subprocess.TimeoutExpired:
return "命令执行超时"
except Exception as e:
return f"执行命令时出错: {e}"
@mcp.tool()
async def get_weather(city: str) -> str:
"""获取天气信息
Args:
city: 城市名称
"""
# 模拟天气 API 调用
weather_data = {
"city": city,
"temperature": "25°C",
"condition": "晴朗",
"humidity": "60%",
"wind": "5 km/h",
"timestamp": datetime.now().isoformat()
}
return json.dumps(weather_data, indent=2, ensure_ascii=False)
提示模板
python
@mcp.prompt("data-analysis")
async def data_analysis_prompt(data_type: str = "CSV", analysis_goal: str = "general") -> str:
"""数据分析提示模板
Args:
data_type: 数据类型(CSV, JSON, Excel 等)
analysis_goal: 分析目标(general, trends, anomalies 等)
"""
base_prompt = f"""请分析以下 {data_type} 数据,重点关注 {analysis_goal} 分析:
分析步骤:
1. 数据概览和基本统计
2. 数据质量检查
3. 关键模式和趋势识别
4. 异常值检测
5. 结论和建议
请提供详细的分析结果和可视化建议。
数据:
[在此处粘贴数据]
"""
if analysis_goal == "trends":
base_prompt += "\n\n特别关注:\n- 时间序列趋势\n- 季节性模式\n- 增长率分析"
elif analysis_goal == "anomalies":
base_prompt += "\n\n特别关注:\n- 异常值识别\n- 数据质量问题\n- 潜在错误检测"
return base_prompt
@mcp.prompt("api-documentation")
async def api_docs_prompt(api_name: str, version: str = "v1") -> str:
"""API 文档生成提示
Args:
api_name: API 名称
version: API 版本
"""
return f"""请为 {api_name} {version} API 生成完整的文档,包括:
## API 概述
- API 用途和功能
- 基础 URL 和认证方式
- 支持的数据格式
## 端点文档
对于每个端点,请包括:
- HTTP 方法和路径
- 请求参数(路径、查询、请求体)
- 响应格式和状态码
- 错误处理
- 使用示例
## 认证和安全
- 认证方式
- API 密钥管理
- 速率限制
## SDK 和示例
- 各语言的 SDK 使用示例
- 常见用例的代码示例
请确保文档清晰、完整且易于理解。
"""
错误处理和日志记录
错误处理
python
from mcp.types import McpError
import logging
# 配置日志记录(重要:STDIO 服务器不能使用 stdout)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()] # 输出到 stderr
)
logger = logging.getLogger(__name__)
@mcp.tool()
async def safe_file_operation(operation: str, file_path: str, content: str = "") -> str:
"""安全的文件操作
Args:
operation: 操作类型(read, write, delete)
file_path: 文件路径
content: 写入内容(仅用于 write 操作)
"""
try:
logger.info(f"执行文件操作: {operation} on {file_path}")
# 安全检查
if not file_path or ".." in file_path:
raise McpError(
code="INVALID_PARAMS",
message="无效的文件路径"
)
if operation == "read":
with open(file_path, 'r', encoding='utf-8') as f:
result = f.read()
logger.info(f"成功读取文件: {file_path}")
return result
elif operation == "write":
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
logger.info(f"成功写入文件: {file_path}")
return f"文件 {file_path} 写入成功"
elif operation == "delete":
os.remove(file_path)
logger.info(f"成功删除文件: {file_path}")
return f"文件 {file_path} 删除成功"
else:
raise McpError(
code="INVALID_PARAMS",
message=f"不支持的操作: {operation}"
)
except FileNotFoundError:
logger.error(f"文件不存在: {file_path}")
raise McpError(
code="RESOURCE_NOT_FOUND",
message=f"文件 {file_path} 不存在"
)
except PermissionError:
logger.error(f"权限不足: {file_path}")
raise McpError(
code="PERMISSION_DENIED",
message=f"没有权限访问文件 {file_path}"
)
except Exception as e:
logger.error(f"文件操作失败: {e}")
raise McpError(
code="INTERNAL_ERROR",
message=f"文件操作失败: {str(e)}"
)
配置和部署
配置文件
python
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class ServerConfig:
"""服务器配置"""
name: str = "mcp-server"
debug: bool = False
max_connections: int = 100
timeout: int = 30
log_level: str = "INFO"
data_dir: Optional[str] = None
@classmethod
def from_env(cls):
"""从环境变量加载配置"""
return cls(
name=os.getenv("MCP_SERVER_NAME", "mcp-server"),
debug=os.getenv("MCP_DEBUG", "false").lower() == "true",
max_connections=int(os.getenv("MCP_MAX_CONNECTIONS", "100")),
timeout=int(os.getenv("MCP_TIMEOUT", "30")),
log_level=os.getenv("MCP_LOG_LEVEL", "INFO"),
data_dir=os.getenv("MCP_DATA_DIR")
)
# 使用配置
config = ServerConfig.from_env()
mcp = FastMCP(config.name)
Docker 部署
dockerfile
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt
# 复制源代码
COPY . .
# 设置环境变量
ENV MCP_SERVER_NAME=docker-mcp-server
ENV MCP_LOG_LEVEL=INFO
# 运行服务器
CMD ["python", "server.py"]
yaml
# docker-compose.yml
version: '3.8'
services:
mcp-server:
build: .
environment:
- MCP_SERVER_NAME=production-server
- MCP_DEBUG=false
- MCP_MAX_CONNECTIONS=200
volumes:
- ./data:/app/data
restart: unless-stopped
测试
单元测试
python
import pytest
import asyncio
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters
@pytest.mark.asyncio
async def test_calculator_tool():
"""测试计算器工具"""
server_params = StdioServerParameters(
command="python",
args=["server.py"]
)
client = Client()
async with client.connect(server_params) as session:
# 测试正常计算
result = await session.call_tool("calculate", {"expression": "2 + 3"})
assert result.content[0].text == "5"
# 测试错误处理
result = await session.call_tool("calculate", {"expression": "invalid"})
assert "错误" in result.content[0].text
@pytest.mark.asyncio
async def test_file_resource():
"""测试文件资源"""
# 创建测试文件
test_file = "test.txt"
with open(test_file, 'w') as f:
f.write("测试内容")
try:
server_params = StdioServerParameters(
command="python",
args=["server.py"]
)
client = Client()
async with client.connect(server_params) as session:
content = await session.read_resource(f"file://{test_file}")
assert "测试内容" in content.content[0].text
finally:
os.remove(test_file)
性能优化
异步处理
python
import asyncio
import aiofiles
from concurrent.futures import ThreadPoolExecutor
# 创建线程池用于 CPU 密集型任务
executor = ThreadPoolExecutor(max_workers=4)
@mcp.tool()
async def process_large_file(file_path: str) -> str:
"""异步处理大文件"""
try:
# 异步读取文件
async with aiofiles.open(file_path, 'r') as f:
content = await f.read()
# CPU 密集型处理在线程池中执行
def cpu_intensive_task(data):
# 模拟 CPU 密集型处理
return len(data.split())
word_count = await asyncio.get_event_loop().run_in_executor(
executor, cpu_intensive_task, content
)
return f"文件 {file_path} 包含 {word_count} 个单词"
except Exception as e:
return f"处理文件时出错: {e}"
缓存机制
python
from functools import lru_cache
import time
class CacheManager:
def __init__(self):
self.cache = {}
self.cache_ttl = {}
def get(self, key: str, ttl: int = 300):
"""获取缓存值"""
if key in self.cache:
if time.time() - self.cache_ttl[key] < ttl:
return self.cache[key]
else:
del self.cache[key]
del self.cache_ttl[key]
return None
def set(self, key: str, value):
"""设置缓存值"""
self.cache[key] = value
self.cache_ttl[key] = time.time()
cache_manager = CacheManager()
@mcp.tool()
async def cached_api_call(endpoint: str) -> str:
"""带缓存的 API 调用"""
cached_result = cache_manager.get(endpoint)
if cached_result:
return f"缓存结果: {cached_result}"
# 模拟 API 调用
await asyncio.sleep(1) # 模拟网络延迟
result = f"API 响应来自 {endpoint}"
cache_manager.set(endpoint, result)
return result
下一步
完成服务器构建后,您可以: