主题
示例和用例
本页面提供了 Model Context Protocol (MCP) 的实际应用示例,帮助您快速上手并了解最佳实践。
基础示例
1. 简单文件服务器
一个基础的文件系统服务器,提供文件读取和目录浏览功能。
Python 实现
python
#!/usr/bin/env python3
import asyncio
import json
import os
from pathlib import Path
from typing import Any, Dict, List
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource,
Tool,
TextContent,
ListResourcesResult,
ReadResourceResult,
CallToolResult,
)
# 创建服务器实例
server = Server("file-server")
# 配置基础目录
BASE_DIR = Path.home() / "Documents"
@server.list_resources()
async def list_resources() -> ListResourcesResult:
"""列出可用的文件资源"""
resources = []
try:
for file_path in BASE_DIR.rglob("*.txt"):
if file_path.is_file():
relative_path = file_path.relative_to(BASE_DIR)
resources.append(
Resource(
uri=f"file://{relative_path}",
name=file_path.name,
description=f"文本文件: {relative_path}",
mimeType="text/plain"
)
)
except Exception as e:
print(f"列出资源时出错: {e}")
return ListResourcesResult(resources=resources)
@server.read_resource()
async def read_resource(uri: str) -> ReadResourceResult:
"""读取文件内容"""
if not uri.startswith("file://"):
raise ValueError(f"不支持的 URI 方案: {uri}")
# 移除 file:// 前缀
relative_path = uri[7:]
file_path = BASE_DIR / relative_path
# 安全检查:确保文件在基础目录内
try:
file_path.resolve().relative_to(BASE_DIR.resolve())
except ValueError:
raise ValueError("访问被拒绝:文件在允许目录之外")
if not file_path.exists():
raise FileNotFoundError(f"文件不存在: {relative_path}")
try:
content = file_path.read_text(encoding='utf-8')
return ReadResourceResult(
contents=[
TextContent(
type="text",
text=content
)
]
)
except Exception as e:
raise RuntimeError(f"读取文件失败: {e}")
@server.list_tools()
async def list_tools() -> List[Tool]:
"""列出可用工具"""
return [
Tool(
name="search_files",
description="在文件中搜索文本",
inputSchema={
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "要搜索的文本模式"
},
"file_extension": {
"type": "string",
"description": "文件扩展名过滤器",
"default": ".txt"
}
},
"required": ["pattern"]
}
),
Tool(
name="create_file",
description="创建新文件",
inputSchema={
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "文件名"
},
"content": {
"type": "string",
"description": "文件内容"
}
},
"required": ["filename", "content"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""调用工具"""
if name == "search_files":
pattern = arguments["pattern"]
file_extension = arguments.get("file_extension", ".txt")
results = []
try:
for file_path in BASE_DIR.rglob(f"*{file_extension}"):
if file_path.is_file():
try:
content = file_path.read_text(encoding='utf-8')
if pattern.lower() in content.lower():
relative_path = file_path.relative_to(BASE_DIR)
results.append(f"找到匹配: {relative_path}")
except Exception:
continue
except Exception as e:
return CallToolResult(
content=[TextContent(type="text", text=f"搜索失败: {e}")],
isError=True
)
if results:
result_text = "\n".join(results)
else:
result_text = f"未找到包含 '{pattern}' 的文件"
return CallToolResult(
content=[TextContent(type="text", text=result_text)]
)
elif name == "create_file":
filename = arguments["filename"]
content = arguments["content"]
# 安全检查文件名
if "/" in filename or "\\" in filename or ".." in filename:
return CallToolResult(
content=[TextContent(type="text", text="无效的文件名")],
isError=True
)
file_path = BASE_DIR / filename
try:
file_path.write_text(content, encoding='utf-8')
return CallToolResult(
content=[TextContent(type="text", text=f"文件已创建: {filename}")]
)
except Exception as e:
return CallToolResult(
content=[TextContent(type="text", text=f"创建文件失败: {e}")],
isError=True
)
else:
return CallToolResult(
content=[TextContent(type="text", text=f"未知工具: {name}")],
isError=True
)
async def main():
"""启动服务器"""
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
TypeScript 实现
typescript
#!/usr/bin/env node
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
CallToolRequestSchema,
ListResourcesRequestSchema,
ListToolsRequestSchema,
ReadResourceRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import * as fs from 'fs/promises';
import * as path from 'path';
import { fileURLToPath } from 'url';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const BASE_DIR = path.join(process.env.HOME || '', 'Documents');
// 创建服务器
const server = new Server(
{
name: 'file-server',
version: '1.0.0',
},
{
capabilities: {
resources: {},
tools: {},
},
}
);
// 列出资源
server.setRequestHandler(ListResourcesRequestSchema, async () => {
const resources = [];
try {
const files = await fs.readdir(BASE_DIR, { recursive: true });
for (const file of files) {
const filePath = path.join(BASE_DIR, file);
const stats = await fs.stat(filePath);
if (stats.isFile() && path.extname(file) === '.txt') {
resources.push({
uri: `file://${file}`,
name: path.basename(file),
description: `文本文件: ${file}`,
mimeType: 'text/plain',
});
}
}
} catch (error) {
console.error('列出资源时出错:', error);
}
return { resources };
});
// 读取资源
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const { uri } = request.params;
if (!uri.startsWith('file://')) {
throw new Error(`不支持的 URI 方案: ${uri}`);
}
const relativePath = uri.slice(7);
const filePath = path.join(BASE_DIR, relativePath);
// 安全检查
const resolvedPath = path.resolve(filePath);
const resolvedBase = path.resolve(BASE_DIR);
if (!resolvedPath.startsWith(resolvedBase)) {
throw new Error('访问被拒绝:文件在允许目录之外');
}
try {
const content = await fs.readFile(filePath, 'utf-8');
return {
contents: [
{
uri,
mimeType: 'text/plain',
text: content,
},
],
};
} catch (error) {
throw new Error(`读取文件失败: ${error.message}`);
}
});
// 列出工具
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: 'search_files',
description: '在文件中搜索文本',
inputSchema: {
type: 'object',
properties: {
pattern: {
type: 'string',
description: '要搜索的文本模式',
},
fileExtension: {
type: 'string',
description: '文件扩展名过滤器',
default: '.txt',
},
},
required: ['pattern'],
},
},
{
name: 'create_file',
description: '创建新文件',
inputSchema: {
type: 'object',
properties: {
filename: {
type: 'string',
description: '文件名',
},
content: {
type: 'string',
description: '文件内容',
},
},
required: ['filename', 'content'],
},
},
],
};
});
// 调用工具
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
switch (name) {
case 'search_files': {
const { pattern, fileExtension = '.txt' } = args as {
pattern: string;
fileExtension?: string;
};
const results: string[] = [];
try {
const files = await fs.readdir(BASE_DIR, { recursive: true });
for (const file of files) {
if (typeof file === 'string' && file.endsWith(fileExtension)) {
const filePath = path.join(BASE_DIR, file);
try {
const content = await fs.readFile(filePath, 'utf-8');
if (content.toLowerCase().includes(pattern.toLowerCase())) {
results.push(`找到匹配: ${file}`);
}
} catch {
// 忽略无法读取的文件
}
}
}
} catch (error) {
return {
content: [
{
type: 'text',
text: `搜索失败: ${error.message}`,
},
],
isError: true,
};
}
const resultText = results.length > 0
? results.join('\n')
: `未找到包含 '${pattern}' 的文件`;
return {
content: [
{
type: 'text',
text: resultText,
},
],
};
}
case 'create_file': {
const { filename, content } = args as {
filename: string;
content: string;
};
// 安全检查文件名
if (filename.includes('/') || filename.includes('\\') || filename.includes('..')) {
return {
content: [
{
type: 'text',
text: '无效的文件名',
},
],
isError: true,
};
}
const filePath = path.join(BASE_DIR, filename);
try {
await fs.writeFile(filePath, content, 'utf-8');
return {
content: [
{
type: 'text',
text: `文件已创建: ${filename}`,
},
],
};
} catch (error) {
return {
content: [
{
type: 'text',
text: `创建文件失败: ${error.message}`,
},
],
isError: true,
};
}
}
default:
throw new Error(`未知工具: ${name}`);
}
});
// 启动服务器
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error('文件服务器已启动');
}
main().catch((error) => {
console.error('服务器启动失败:', error);
process.exit(1);
});
2. 天气服务器
一个连接外部 API 的天气信息服务器。
Python 实现
python
#!/usr/bin/env python3
import asyncio
import json
import os
from typing import Any, Dict, List
import aiohttp
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource,
Tool,
TextContent,
ListResourcesResult,
ReadResourceResult,
CallToolResult,
)
# 创建服务器实例
server = Server("weather-server")
# API 配置(使用免费的 OpenWeatherMap API)
API_KEY = os.getenv("OPENWEATHER_API_KEY", "your_api_key_here")
BASE_URL = "http://api.openweathermap.org/data/2.5"
@server.list_resources()
async def list_resources() -> ListResourcesResult:
"""列出天气资源"""
return ListResourcesResult(
resources=[
Resource(
uri="weather://current/beijing",
name="北京当前天气",
description="北京市当前天气信息",
mimeType="application/json"
),
Resource(
uri="weather://current/shanghai",
name="上海当前天气",
description="上海市当前天气信息",
mimeType="application/json"
),
Resource(
uri="weather://forecast/beijing",
name="北京天气预报",
description="北京市5天天气预报",
mimeType="application/json"
)
]
)
@server.read_resource()
async def read_resource(uri: str) -> ReadResourceResult:
"""读取天气资源"""
if not uri.startswith("weather://"):
raise ValueError(f"不支持的 URI 方案: {uri}")
parts = uri.split("/")
if len(parts) < 3:
raise ValueError(f"无效的天气 URI: {uri}")
weather_type = parts[2] # current 或 forecast
city = parts[3] if len(parts) > 3 else "beijing"
try:
async with aiohttp.ClientSession() as session:
if weather_type == "current":
url = f"{BASE_URL}/weather"
params = {
"q": city,
"appid": API_KEY,
"units": "metric",
"lang": "zh_cn"
}
elif weather_type == "forecast":
url = f"{BASE_URL}/forecast"
params = {
"q": city,
"appid": API_KEY,
"units": "metric",
"lang": "zh_cn"
}
else:
raise ValueError(f"不支持的天气类型: {weather_type}")
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
return ReadResourceResult(
contents=[
TextContent(
type="text",
text=json.dumps(data, ensure_ascii=False, indent=2)
)
]
)
else:
error_data = await response.text()
raise RuntimeError(f"API 请求失败: {response.status} - {error_data}")
except Exception as e:
raise RuntimeError(f"获取天气数据失败: {e}")
@server.list_tools()
async def list_tools() -> List[Tool]:
"""列出天气工具"""
return [
Tool(
name="get_weather",
description="获取指定城市的当前天气",
inputSchema={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称(中文或英文)"
},
"units": {
"type": "string",
"enum": ["metric", "imperial", "kelvin"],
"description": "温度单位",
"default": "metric"
}
},
"required": ["city"]
}
),
Tool(
name="get_forecast",
description="获取指定城市的天气预报",
inputSchema={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称(中文或英文)"
},
"days": {
"type": "integer",
"minimum": 1,
"maximum": 5,
"description": "预报天数",
"default": 3
}
},
"required": ["city"]
}
),
Tool(
name="compare_weather",
description="比较多个城市的天气",
inputSchema={
"type": "object",
"properties": {
"cities": {
"type": "array",
"items": {"type": "string"},
"description": "要比较的城市列表",
"minItems": 2,
"maxItems": 5
}
},
"required": ["cities"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""调用天气工具"""
try:
async with aiohttp.ClientSession() as session:
if name == "get_weather":
city = arguments["city"]
units = arguments.get("units", "metric")
url = f"{BASE_URL}/weather"
params = {
"q": city,
"appid": API_KEY,
"units": units,
"lang": "zh_cn"
}
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
# 格式化天气信息
weather_info = f"""
🌍 {data['name']} 当前天气
🌡️ 温度: {data['main']['temp']}°C
🌡️ 体感温度: {data['main']['feels_like']}°C
📊 湿度: {data['main']['humidity']}%
🌪️ 风速: {data['wind']['speed']} m/s
☁️ 天气: {data['weather'][0]['description']}
👁️ 能见度: {data.get('visibility', 'N/A')} m
""".strip()
return CallToolResult(
content=[TextContent(type="text", text=weather_info)]
)
else:
error_text = await response.text()
return CallToolResult(
content=[TextContent(type="text", text=f"获取天气失败: {error_text}")],
isError=True
)
elif name == "get_forecast":
city = arguments["city"]
days = arguments.get("days", 3)
url = f"{BASE_URL}/forecast"
params = {
"q": city,
"appid": API_KEY,
"units": "metric",
"lang": "zh_cn"
}
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
# 处理预报数据(每3小时一个数据点,取每天的中午数据)
forecast_info = f"🌍 {data['city']['name']} {days}天天气预报\n\n"
daily_forecasts = []
for i in range(0, min(days * 8, len(data['list'])), 8): # 每8个数据点约为1天
forecast = data['list'][i]
date = forecast['dt_txt'].split(' ')[0]
temp = forecast['main']['temp']
desc = forecast['weather'][0]['description']
daily_forecasts.append(f"📅 {date}: {temp}°C, {desc}")
forecast_info += "\n".join(daily_forecasts)
return CallToolResult(
content=[TextContent(type="text", text=forecast_info)]
)
else:
error_text = await response.text()
return CallToolResult(
content=[TextContent(type="text", text=f"获取预报失败: {error_text}")],
isError=True
)
elif name == "compare_weather":
cities = arguments["cities"]
weather_data = []
for city in cities:
url = f"{BASE_URL}/weather"
params = {
"q": city,
"appid": API_KEY,
"units": "metric",
"lang": "zh_cn"
}
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
weather_data.append({
"city": data['name'],
"temp": data['main']['temp'],
"humidity": data['main']['humidity'],
"description": data['weather'][0]['description']
})
else:
weather_data.append({
"city": city,
"error": f"获取失败: {response.status}"
})
# 格式化比较结果
comparison = "🌍 城市天气比较\n\n"
for data in weather_data:
if "error" in data:
comparison += f"❌ {data['city']}: {data['error']}\n"
else:
comparison += f"🏙️ {data['city']}: {data['temp']}°C, {data['humidity']}% 湿度, {data['description']}\n"
return CallToolResult(
content=[TextContent(type="text", text=comparison)]
)
else:
return CallToolResult(
content=[TextContent(type="text", text=f"未知工具: {name}")],
isError=True
)
except Exception as e:
return CallToolResult(
content=[TextContent(type="text", text=f"工具执行失败: {e}")],
isError=True
)
async def main():
"""启动服务器"""
if API_KEY == "your_api_key_here":
print("警告: 请设置 OPENWEATHER_API_KEY 环境变量")
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
3. 数据库服务器
一个简单的 SQLite 数据库服务器示例。
Python 实现
python
#!/usr/bin/env python3
import asyncio
import sqlite3
import json
from typing import Any, Dict, List
from pathlib import Path
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource,
Tool,
TextContent,
ListResourcesResult,
ReadResourceResult,
CallToolResult,
)
# 创建服务器实例
server = Server("database-server")
# 数据库配置
DB_PATH = Path("example.db")
def init_database():
"""初始化数据库"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 创建示例表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
age INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
price REAL NOT NULL,
category TEXT,
stock INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 插入示例数据
cursor.execute("SELECT COUNT(*) FROM users")
if cursor.fetchone()[0] == 0:
sample_users = [
("张三", "zhangsan@example.com", 25),
("李四", "lisi@example.com", 30),
("王五", "wangwu@example.com", 28),
]
cursor.executemany(
"INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
sample_users
)
cursor.execute("SELECT COUNT(*) FROM products")
if cursor.fetchone()[0] == 0:
sample_products = [
("笔记本电脑", 5999.99, "电子产品", 10),
("无线鼠标", 199.99, "电子产品", 50),
("办公椅", 899.99, "家具", 20),
]
cursor.executemany(
"INSERT INTO products (name, price, category, stock) VALUES (?, ?, ?, ?)",
sample_products
)
conn.commit()
conn.close()
# 初始化数据库
init_database()
@server.list_resources()
async def list_resources() -> ListResourcesResult:
"""列出数据库资源"""
return ListResourcesResult(
resources=[
Resource(
uri="db://tables",
name="数据库表列表",
description="显示所有数据库表",
mimeType="application/json"
),
Resource(
uri="db://users",
name="用户表",
description="用户数据表内容",
mimeType="application/json"
),
Resource(
uri="db://products",
name="产品表",
description="产品数据表内容",
mimeType="application/json"
),
Resource(
uri="db://schema",
name="数据库架构",
description="数据库表结构信息",
mimeType="application/json"
)
]
)
@server.read_resource()
async def read_resource(uri: str) -> ReadResourceResult:
"""读取数据库资源"""
if not uri.startswith("db://"):
raise ValueError(f"不支持的 URI 方案: {uri}")
resource_type = uri[5:] # 移除 "db://" 前缀
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row # 使结果可以按列名访问
cursor = conn.cursor()
try:
if resource_type == "tables":
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
data = {"tables": tables}
elif resource_type == "users":
cursor.execute("SELECT * FROM users ORDER BY id")
rows = cursor.fetchall()
data = {
"table": "users",
"count": len(rows),
"data": [dict(row) for row in rows]
}
elif resource_type == "products":
cursor.execute("SELECT * FROM products ORDER BY id")
rows = cursor.fetchall()
data = {
"table": "products",
"count": len(rows),
"data": [dict(row) for row in rows]
}
elif resource_type == "schema":
# 获取表结构信息
schema_info = {}
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
for table in tables:
cursor.execute(f"PRAGMA table_info({table})")
columns = cursor.fetchall()
schema_info[table] = [
{
"name": col[1],
"type": col[2],
"not_null": bool(col[3]),
"default": col[4],
"primary_key": bool(col[5])
}
for col in columns
]
data = {"schema": schema_info}
else:
raise ValueError(f"未知资源类型: {resource_type}")
return ReadResourceResult(
contents=[
TextContent(
type="text",
text=json.dumps(data, ensure_ascii=False, indent=2)
)
]
)
except Exception as e:
raise RuntimeError(f"读取数据库资源失败: {e}")
finally:
conn.close()
@server.list_tools()
async def list_tools() -> List[Tool]:
"""列出数据库工具"""
return [
Tool(
name="execute_query",
description="执行 SQL 查询",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "要执行的 SQL 查询语句"
},
"params": {
"type": "array",
"items": {"type": "string"},
"description": "查询参数(可选)",
"default": []
}
},
"required": ["query"]
}
),
Tool(
name="insert_user",
description="插入新用户",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "用户姓名"
},
"email": {
"type": "string",
"description": "用户邮箱"
},
"age": {
"type": "integer",
"description": "用户年龄",
"minimum": 0,
"maximum": 150
}
},
"required": ["name", "email"]
}
),
Tool(
name="update_product_stock",
description="更新产品库存",
inputSchema={
"type": "object",
"properties": {
"product_id": {
"type": "integer",
"description": "产品 ID"
},
"new_stock": {
"type": "integer",
"description": "新的库存数量",
"minimum": 0
}
},
"required": ["product_id", "new_stock"]
}
),
Tool(
name="search_products",
description="搜索产品",
inputSchema={
"type": "object",
"properties": {
"keyword": {
"type": "string",
"description": "搜索关键词"
},
"category": {
"type": "string",
"description": "产品类别过滤器(可选)"
},
"min_price": {
"type": "number",
"description": "最低价格(可选)"
},
"max_price": {
"type": "number",
"description": "最高价格(可选)"
}
},
"required": ["keyword"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""调用数据库工具"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
if name == "execute_query":
query = arguments["query"]
params = arguments.get("params", [])
# 安全检查:只允许 SELECT 查询
if not query.strip().upper().startswith("SELECT"):
return CallToolResult(
content=[TextContent(type="text", text="出于安全考虑,只允许 SELECT 查询")],
isError=True
)
cursor.execute(query, params)
rows = cursor.fetchall()
result = {
"query": query,
"row_count": len(rows),
"data": [dict(row) for row in rows]
}
return CallToolResult(
content=[TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, indent=2)
)]
)
elif name == "insert_user":
name = arguments["name"]
email = arguments["email"]
age = arguments.get("age")
try:
if age is not None:
cursor.execute(
"INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
(name, email, age)
)
else:
cursor.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
(name, email)
)
conn.commit()
user_id = cursor.lastrowid
return CallToolResult(
content=[TextContent(
type="text",
text=f"用户创建成功,ID: {user_id}"
)]
)
except sqlite3.IntegrityError as e:
return CallToolResult(
content=[TextContent(type="text", text=f"创建用户失败: {e}")],
isError=True
)
elif name == "update_product_stock":
product_id = arguments["product_id"]
new_stock = arguments["new_stock"]
cursor.execute(
"UPDATE products SET stock = ? WHERE id = ?",
(new_stock, product_id)
)
if cursor.rowcount > 0:
conn.commit()
return CallToolResult(
content=[TextContent(
type="text",
text=f"产品 {product_id} 库存已更新为 {new_stock}"
)]
)
else:
return CallToolResult(
content=[TextContent(type="text", text=f"未找到产品 ID: {product_id}")],
isError=True
)
elif name == "search_products":
keyword = arguments["keyword"]
category = arguments.get("category")
min_price = arguments.get("min_price")
max_price = arguments.get("max_price")
# 构建查询
query = "SELECT * FROM products WHERE name LIKE ?"
params = [f"%{keyword}%"]
if category:
query += " AND category = ?"
params.append(category)
if min_price is not None:
query += " AND price >= ?"
params.append(min_price)
if max_price is not None:
query += " AND price <= ?"
params.append(max_price)
query += " ORDER BY name"
cursor.execute(query, params)
rows = cursor.fetchall()
result = {
"search_keyword": keyword,
"filters": {
"category": category,
"min_price": min_price,
"max_price": max_price
},
"result_count": len(rows),
"products": [dict(row) for row in rows]
}
return CallToolResult(
content=[TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, indent=2)
)]
)
else:
return CallToolResult(
content=[TextContent(type="text", text=f"未知工具: {name}")],
isError=True
)
except Exception as e:
return CallToolResult(
content=[TextContent(type="text", text=f"数据库操作失败: {e}")],
isError=True
)
finally:
conn.close()
async def main():
"""启动服务器"""
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
高级示例
1. 多服务器客户端
一个连接和管理多个 MCP 服务器的客户端示例。
python
#!/usr/bin/env python3
import asyncio
import json
from typing import Dict, List, Any
from dataclasses import dataclass
from mcp.client import Client
from mcp.client.stdio import stdio_client
@dataclass
class ServerConfig:
name: str
command: str
args: List[str]
description: str
class MultiServerClient:
def __init__(self):
self.clients: Dict[str, Client] = {}
self.server_configs = [
ServerConfig(
name="file-server",
command="python",
args=["file_server.py"],
description="文件系统服务器"
),
ServerConfig(
name="weather-server",
command="python",
args=["weather_server.py"],
description="天气信息服务器"
),
ServerConfig(
name="database-server",
command="python",
args=["database_server.py"],
description="数据库服务器"
)
]
async def connect_all_servers(self):
"""连接所有服务器"""
for config in self.server_configs:
try:
print(f"正在连接 {config.name}...")
async with stdio_client(config.command, config.args) as (read, write):
client = Client("multi-client")
await client.initialize(read, write)
self.clients[config.name] = client
print(f"✅ {config.name} 连接成功")
except Exception as e:
print(f"❌ {config.name} 连接失败: {e}")
async def list_all_capabilities(self):
"""列出所有服务器的能力"""
print("\n🔍 服务器能力概览:")
print("=" * 50)
for server_name, client in self.clients.items():
try:
print(f"\n📡 {server_name}:")
# 列出资源
try:
resources = await client.list_resources()
print(f" 📁 资源 ({len(resources.resources)}):")
for resource in resources.resources[:3]: # 只显示前3个
print(f" - {resource.name}: {resource.description}")
if len(resources.resources) > 3:
print(f" ... 还有 {len(resources.resources) - 3} 个资源")
except Exception as e:
print(f" ❌ 获取资源失败: {e}")
# 列出工具
try:
tools = await client.list_tools()
print(f" 🔧 工具 ({len(tools.tools)}):")
for tool in tools.tools[:3]: # 只显示前3个
print(f" - {tool.name}: {tool.description}")
if len(tools.tools) > 3:
print(f" ... 还有 {len(tools.tools) - 3} 个工具")
except Exception as e:
print(f" ❌ 获取工具失败: {e}")
except Exception as e:
print(f" ❌ 服务器 {server_name} 不可用: {e}")
async def execute_workflow(self, workflow: List[Dict[str, Any]]):
"""执行工作流"""
print("\n🚀 执行工作流:")
print("=" * 30)
results = []
for i, step in enumerate(workflow, 1):
server_name = step["server"]
action_type = step["type"]
action_name = step["name"]
params = step.get("params", {})
print(f"\n步骤 {i}: {server_name} - {action_type} - {action_name}")
if server_name not in self.clients:
error_msg = f"服务器 {server_name} 未连接"
print(f" ❌ {error_msg}")
results.append({"step": i, "error": error_msg})
continue
client = self.clients[server_name]
try:
if action_type == "resource":
result = await client.read_resource(action_name)
print(f" ✅ 资源读取成功")
results.append({"step": i, "result": result})
elif action_type == "tool":
result = await client.call_tool(action_name, params)
print(f" ✅ 工具调用成功")
results.append({"step": i, "result": result})
else:
error_msg = f"未知操作类型: {action_type}"
print(f" ❌ {error_msg}")
results.append({"step": i, "error": error_msg})
except Exception as e:
error_msg = f"执行失败: {e}"
print(f" ❌ {error_msg}")
results.append({"step": i, "error": error_msg})
return results
async def interactive_mode(self):
"""交互模式"""
print("\n🎮 进入交互模式 (输入 'help' 查看帮助, 'quit' 退出)")
print("=" * 50)
while True:
try:
command = input("\n> ").strip()
if command == "quit":
break
elif command == "help":
print("""
可用命令:
help - 显示此帮助
list - 列出所有服务器
capabilities - 显示所有服务器能力
use <server> - 切换到指定服务器
resources <server> - 列出服务器资源
tools <server> - 列出服务器工具
call <server> <tool> <args> - 调用工具
quit - 退出
""")
elif command == "list":
print("\n连接的服务器:")
for name in self.clients.keys():
print(f" - {name}")
elif command == "capabilities":
await self.list_all_capabilities()
elif command.startswith("resources "):
server_name = command.split(" ", 1)[1]
if server_name in self.clients:
client = self.clients[server_name]
resources = await client.list_resources()
print(f"\n{server_name} 的资源:")
for resource in resources.resources:
print(f" - {resource.name}: {resource.description}")
else:
print(f"服务器 {server_name} 未连接")
elif command.startswith("tools "):
server_name = command.split(" ", 1)[1]
if server_name in self.clients:
client = self.clients[server_name]
tools = await client.list_tools()
print(f"\n{server_name} 的工具:")
for tool in tools.tools:
print(f" - {tool.name}: {tool.description}")
else:
print(f"服务器 {server_name} 未连接")
elif command.startswith("call "):
parts = command.split(" ", 3)
if len(parts) >= 3:
server_name = parts[1]
tool_name = parts[2]
args_str = parts[3] if len(parts) > 3 else "{}"
if server_name in self.clients:
try:
args = json.loads(args_str)
client = self.clients[server_name]
result = await client.call_tool(tool_name, args)
print(f"\n工具调用结果:")
for content in result.content:
print(content.text)
except json.JSONDecodeError:
print("参数格式错误,请使用 JSON 格式")
except Exception as e:
print(f"工具调用失败: {e}")
else:
print(f"服务器 {server_name} 未连接")
else:
print("用法: call <server> <tool> <args>")
else:
print("未知命令,输入 'help' 查看帮助")
except KeyboardInterrupt:
break
except Exception as e:
print(f"命令执行错误: {e}")
async def run_demo(self):
"""运行演示"""
# 连接服务器
await self.connect_all_servers()
if not self.clients:
print("❌ 没有可用的服务器连接")
return
# 显示能力
await self.list_all_capabilities()
# 执行示例工作流
demo_workflow = [
{
"server": "weather-server",
"type": "tool",
"name": "get_weather",
"params": {"city": "北京"}
},
{
"server": "file-server",
"type": "tool",
"name": "create_file",
"params": {
"filename": "weather_report.txt",
"content": "今日天气报告已生成"
}
},
{
"server": "database-server",
"type": "tool",
"name": "search_products",
"params": {"keyword": "电脑"}
}
]
workflow_results = await self.execute_workflow(demo_workflow)
print(f"\n📊 工作流执行完成,共 {len(workflow_results)} 个步骤")
# 进入交互模式
await self.interactive_mode()
async def main():
client = MultiServerClient()
await client.run_demo()
if __name__ == "__main__":
asyncio.run(main())
2. 智能代理示例
一个使用多个 MCP 服务器的智能代理。
python
#!/usr/bin/env python3
import asyncio
import json
import re
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from mcp.client import Client
from mcp.client.stdio import stdio_client
@dataclass
class Task:
id: str
description: str
status: str = "pending" # pending, running, completed, failed
result: Optional[Any] = None
error: Optional[str] = None
class IntelligentAgent:
def __init__(self):
self.clients: Dict[str, Client] = {}
self.tasks: List[Task] = []
self.capabilities: Dict[str, Dict] = {}
async def initialize(self):
"""初始化代理"""
# 连接服务器
server_configs = [
("file-server", "python", ["file_server.py"]),
("weather-server", "python", ["weather_server.py"]),
("database-server", "python", ["database_server.py"]),
]
for name, command, args in server_configs:
try:
async with stdio_client(command, args) as (read, write):
client = Client("intelligent-agent")
await client.initialize(read, write)
self.clients[name] = client
# 缓存服务器能力
capabilities = {}
try:
resources = await client.list_resources()
capabilities["resources"] = [r.name for r in resources.resources]
except:
capabilities["resources"] = []
try:
tools = await client.list_tools()
capabilities["tools"] = [t.name for t in tools.tools]
except:
capabilities["tools"] = []
self.capabilities[name] = capabilities
print(f"✅ {name} 已连接并缓存能力")
except Exception as e:
print(f"❌ {name} 连接失败: {e}")
def parse_natural_language_request(self, request: str) -> List[Dict[str, Any]]:
"""解析自然语言请求为任务序列"""
request = request.lower()
tasks = []
# 天气相关
if "天气" in request or "weather" in request:
city_match = re.search(r'(北京|上海|广州|深圳|杭州|成都|西安|武汉)', request)
city = city_match.group(1) if city_match else "北京"
tasks.append({
"server": "weather-server",
"type": "tool",
"name": "get_weather",
"params": {"city": city},
"description": f"获取{city}天气信息"
})
# 文件操作
if "创建文件" in request or "写文件" in request:
filename_match = re.search(r'文件名?[是为]?(.+?)(?:\s|$)', request)
filename = filename_match.group(1) if filename_match else "output.txt"
content_match = re.search(r'内容[是为]?(.+?)(?:\s|$)', request)
content = content_match.group(1) if content_match else "默认内容"
tasks.append({
"server": "file-server",
"type": "tool",
"name": "create_file",
"params": {"filename": filename, "content": content},
"description": f"创建文件 {filename}"
})
if "搜索文件" in request:
pattern_match = re.search(r'搜索[文件]?(.+?)(?:\s|$)', request)
pattern = pattern_match.group(1) if pattern_match else "test"
tasks.append({
"server": "file-server",
"type": "tool",
"name": "search_files",
"params": {"pattern": pattern},
"description": f"搜索包含 '{pattern}' 的文件"
})
# 数据库操作
if "搜索产品" in request or "查找产品" in request:
keyword_match = re.search(r'产品[名称]?[是为]?(.+?)(?:\s|$)', request)
keyword = keyword_match.group(1) if keyword_match else "电脑"
tasks.append({
"server": "database-server",
"type": "tool",
"name": "search_products",
"params": {"keyword": keyword},
"description": f"搜索产品: {keyword}"
})
if "添加用户" in request or "创建用户" in request:
name_match = re.search(r'姓名[是为]?(.+?)(?:\s|邮箱|年龄|$)', request)
email_match = re.search(r'邮箱[是为]?(.+?)(?:\s|年龄|$)', request)
age_match = re.search(r'年龄[是为]?(\d+)', request)
name = name_match.group(1) if name_match else "测试用户"
email = email_match.group(1) if email_match else "test@example.com"
age = int(age_match.group(1)) if age_match else None
params = {"name": name, "email": email}
if age:
params["age"] = age
tasks.append({
"server": "database-server",
"type": "tool",
"name": "insert_user",
"params": params,
"description": f"添加用户: {name}"
})
# 如果没有识别到任何任务,提供帮助
if not tasks:
tasks.append({
"type": "help",
"description": "未识别到具体任务,请尝试以下格式的请求",
"examples": [
"获取北京天气",
"创建文件名为report.txt内容为测试报告",
"搜索文件包含python",
"搜索产品电脑",
"添加用户姓名为张三邮箱为zhangsan@example.com年龄为25"
]
})
return tasks
async def execute_task_sequence(self, tasks: List[Dict[str, Any]]) -> List[Task]:
"""执行任务序列"""
executed_tasks = []
for i, task_def in enumerate(tasks):
task = Task(
id=f"task_{i+1}",
description=task_def["description"]
)
if task_def.get("type") == "help":
task.status = "completed"
task.result = {
"message": task_def["description"],
"examples": task_def["examples"]
}
executed_tasks.append(task)
continue
task.status = "running"
executed_tasks.append(task)
server_name = task_def["server"]
action_type = task_def["type"]
action_name = task_def["name"]
params = task_def.get("params", {})
print(f"🔄 执行任务: {task.description}")
if server_name not in self.clients:
task.status = "failed"
task.error = f"服务器 {server_name} 不可用"
print(f" ❌ {task.error}")
continue
client = self.clients[server_name]
try:
if action_type == "tool":
result = await client.call_tool(action_name, params)
task.status = "completed"
task.result = result
print(f" ✅ 任务完成")
# 显示结果摘要
if result.content:
content_preview = result.content[0].text[:100]
if len(result.content[0].text) > 100:
content_preview += "..."
print(f" 📄 结果预览: {content_preview}")
elif action_type == "resource":
result = await client.read_resource(action_name)
task.status = "completed"
task.result = result
print(f" ✅ 资源读取完成")
else:
task.status = "failed"
task.error = f"未知操作类型: {action_type}"
print(f" ❌ {task.error}")
except Exception as e:
task.status = "failed"
task.error = str(e)
print(f" ❌ 任务失败: {e}")
return executed_tasks
def generate_summary_report(self, tasks: List[Task]) -> str:
"""生成任务执行摘要报告"""
total_tasks = len(tasks)
completed_tasks = len([t for t in tasks if t.status == "completed"])
failed_tasks = len([t for t in tasks if t.status == "failed"])
report = f"""
📊 任务执行摘要报告
{'='*40}
📈 统计信息:
- 总任务数: {total_tasks}
- 成功完成: {completed_tasks}
- 执行失败: {failed_tasks}
- 成功率: {(completed_tasks/total_tasks*100):.1f}%
📋 任务详情:
"""
for task in tasks:
status_emoji = {
"completed": "✅",
"failed": "❌",
"running": "🔄",
"pending": "⏳"
}.get(task.status, "❓")
report += f" {status_emoji} {task.description}\n"
if task.status == "failed" and task.error:
report += f" 错误: {task.error}\n"
elif task.status == "completed" and task.result:
if hasattr(task.result, 'content') and task.result.content:
preview = task.result.content[0].text[:50]
if len(task.result.content[0].text) > 50:
preview += "..."
report += f" 结果: {preview}\n"
return report
async def chat_mode(self):
"""聊天模式"""
print("\n🤖 智能代理聊天模式")
print("=" * 40)
print("您可以用自然语言描述您想要执行的任务")
print("例如: '获取北京天气并创建天气报告文件'")
print("输入 'quit' 退出\n")
while True:
try:
user_input = input("👤 您: ").strip()
if user_input.lower() == "quit":
print("👋 再见!")
break
if not user_input:
continue
print(f"\n🤖 代理: 正在分析您的请求...")
# 解析请求
tasks = self.parse_natural_language_request(user_input)
if not tasks:
print("抱歉,我无法理解您的请求。请尝试更具体的描述。")
continue
print(f"我理解您想要执行 {len(tasks)} 个任务:")
for i, task in enumerate(tasks, 1):
print(f" {i}. {task['description']}")
# 执行任务
print(f"\n开始执行任务...")
executed_tasks = await self.execute_task_sequence(tasks)
# 生成报告
report = self.generate_summary_report(executed_tasks)
print(report)
except KeyboardInterrupt:
print("\n👋 再见!")
break
except Exception as e:
print(f"❌ 处理请求时出错: {e}")
async def main():
"""主函数"""
agent = IntelligentAgent()
print("🚀 正在初始化智能代理...")
await agent.initialize()
if not agent.clients:
print("❌ 没有可用的服务器连接,无法启动代理")
return
print(f"✅ 代理初始化完成,连接了 {len(agent.clients)} 个服务器")
# 显示可用能力
print("\n🔧 可用服务器和能力:")
for server_name, capabilities in agent.capabilities.items():
print(f" 📡 {server_name}:")
if capabilities["tools"]:
print(f" 🔧 工具: {', '.join(capabilities['tools'])}")
if capabilities["resources"]:
print(f" 📁 资源: {', '.join(capabilities['resources'])}")
# 启动聊天模式
await agent.chat_mode()
if __name__ == "__main__":
asyncio.run(main())
部署示例
1. Docker 部署
Dockerfile
dockerfile
# 多阶段构建 - Python 服务器
FROM python:3.11-slim as python-base
WORKDIR /app
# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制 Python 服务器文件
COPY servers/python/ ./servers/python/
# 多阶段构建 - Node.js 服务器
FROM node:18-slim as node-base
WORKDIR /app
# 安装 Node.js 依赖
COPY package.json package-lock.json ./
RUN npm ci --only=production
# 复制 TypeScript 服务器文件
COPY servers/typescript/ ./servers/typescript/
# 最终镜像
FROM python:3.11-slim
# 安装 Node.js
RUN apt-get update && apt-get install -y \
curl \
&& curl -fsSL https://deb.nodesource.com/setup_18.x | bash - \
&& apt-get install -y nodejs \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# 从构建阶段复制文件
COPY --from=python-base /app ./
COPY --from=node-base /app/node_modules ./node_modules
COPY --from=node-base /app/servers/typescript ./servers/typescript
# 创建启动脚本
COPY docker/start.sh ./
RUN chmod +x start.sh
# 暴露端口
EXPOSE 8000 8001 8002
# 启动命令
CMD ["./start.sh"]
docker-compose.yml
yaml
version: '3.8'
services:
mcp-servers:
build: .
ports:
- "8000:8000" # 文件服务器
- "8001:8001" # 天气服务器
- "8002:8002" # 数据库服务器
environment:
- OPENWEATHER_API_KEY=${OPENWEATHER_API_KEY}
- DATABASE_URL=sqlite:///data/app.db
volumes:
- ./data:/app/data
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/ssl:/etc/nginx/ssl
depends_on:
- mcp-servers
restart: unless-stopped
volumes:
data:
logs:
2. Kubernetes 部署
deployment.yaml
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-servers
labels:
app: mcp-servers
spec:
replicas: 3
selector:
matchLabels:
app: mcp-servers
template:
metadata:
labels:
app: mcp-servers
spec:
containers:
- name: mcp-servers
image: mcp-servers:latest
ports:
- containerPort: 8000
- containerPort: 8001
- containerPort: 8002
env:
- name: OPENWEATHER_API_KEY
valueFrom:
secretKeyRef:
name: mcp-secrets
key: openweather-api-key
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: mcp-service
spec:
selector:
app: mcp-servers
ports:
- name: file-server
port: 8000
targetPort: 8000
- name: weather-server
port: 8001
targetPort: 8001
- name: database-server
port: 8002
targetPort: 8002
type: LoadBalancer
测试示例
1. 单元测试
python
#!/usr/bin/env python3
import pytest
import asyncio
import tempfile
import os
from pathlib import Path
from mcp.server import Server
from mcp.types import TextContent
# 测试文件服务器
class TestFileServer:
@pytest.fixture
async def server(self):
"""创建测试服务器"""
server = Server("test-file-server")
# 设置临时目录
self.temp_dir = Path(tempfile.mkdtemp())
# 创建测试文件
test_file = self.temp_dir / "test.txt"
test_file.write_text("测试内容")
return server
@pytest.mark.asyncio
async def test_list_resources(self, server):
"""测试资源列表"""
# 这里需要实现具体的测试逻辑
pass
@pytest.mark.asyncio
async def test_read_resource(self, server):
"""测试资源读取"""
# 这里需要实现具体的测试逻辑
pass
@pytest.mark.asyncio
async def test_create_file_tool(self, server):
"""测试文件创建工具"""
# 这里需要实现具体的测试逻辑
pass
# 运行测试
if __name__ == "__main__":
pytest.main([__file__, "-v"])
2. 集成测试
python
#!/usr/bin/env python3
import asyncio
import pytest
from mcp.client import Client
from mcp.client.stdio import stdio_client
class TestIntegration:
@pytest.mark.asyncio
async def test_full_workflow(self):
"""测试完整工作流"""
# 启动服务器并连接客户端
async with stdio_client("python", ["file_server.py"]) as (read, write):
client = Client("test-client")
await client.initialize(read, write)
# 测试工具调用
result = await client.call_tool("create_file", {
"filename": "test_integration.txt",
"content": "集成测试内容"
})
assert not result.isError
assert "创建成功" in result.content[0].text
# 测试资源读取
resources = await client.list_resources()
assert len(resources.resources) > 0
if __name__ == "__main__":
pytest.main([__file__, "-v"])
最佳实践
1. 错误处理
python
# 统一错误处理装饰器
def handle_errors(func):
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except ValueError as e:
return CallToolResult(
content=[TextContent(type="text", text=f"参数错误: {e}")],
isError=True
)
except FileNotFoundError as e:
return CallToolResult(
content=[TextContent(type="text", text=f"文件未找到: {e}")],
isError=True
)
except Exception as e:
return CallToolResult(
content=[TextContent(type="text", text=f"内部错误: {e}")],
isError=True
)
return wrapper
@server.call_tool()
@handle_errors
async def call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
# 工具实现
pass
2. 配置管理
python
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class ServerConfig:
base_dir: str = os.getenv("MCP_BASE_DIR", str(Path.home() / "Documents"))
max_file_size: int = int(os.getenv("MCP_MAX_FILE_SIZE", "10485760")) # 10MB
allowed_extensions: list = None
log_level: str = os.getenv("MCP_LOG_LEVEL", "INFO")
def __post_init__(self):
if self.allowed_extensions is None:
self.allowed_extensions = [".txt", ".md", ".json", ".csv"]
config = ServerConfig()
3. 日志记录
python
import logging
import sys
def setup_logging(level: str = "INFO"):
"""设置日志"""
logging.basicConfig(
level=getattr(logging, level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stderr),
logging.FileHandler('mcp_server.log')
]
)
return logging.getLogger(__name__)
logger = setup_logging(config.log_level)
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
logger.info(f"调用工具: {name}, 参数: {arguments}")
try:
# 工具实现
result = await execute_tool(name, arguments)
logger.info(f"工具执行成功: {name}")
return result
except Exception as e:
logger.error(f"工具执行失败: {name}, 错误: {e}")
raise