主题
连接远程 MCP 服务器
本指南介绍如何连接和使用部署在远程环境中的 MCP 服务器,包括 HTTP/SSE 和 WebSocket 传输方式。
概述
远程 MCP 服务器通常部署在云环境中,通过网络协议提供服务。主要的传输方式包括:
- Server-Sent Events (SSE): 基于 HTTP 的单向流式通信
- WebSocket: 全双工的实时通信协议
- HTTP: 传统的请求-响应模式
传输方式对比
特性 | SSE | WebSocket | HTTP |
---|---|---|---|
双向通信 | ❌ | ✅ | ❌ |
实时性 | ✅ | ✅ | ❌ |
复杂度 | 低 | 中 | 低 |
防火墙友好 | ✅ | ❌ | ✅ |
推荐场景 | 通知密集 | 交互密集 | 简单查询 |
SSE 连接方式
1. 服务器实现
Python FastAPI 示例
python
# server.py
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import asyncio
import json
import uuid
app = FastAPI()
class MCPServer:
def __init__(self):
self.clients = {}
self.tools = {
"get_weather": self.get_weather,
"calculate": self.calculate
}
async def get_weather(self, location: str):
# 模拟天气 API 调用
return f"Weather in {location}: Sunny, 72°F"
async def calculate(self, expression: str):
# 安全的计算器实现
try:
result = eval(expression) # 生产环境需要更安全的实现
return f"Result: {result}"
except Exception as e:
return f"Error: {str(e)}"
mcp_server = MCPServer()
@app.post("/mcp/initialize")
async def initialize(request: Request):
data = await request.json()
client_id = str(uuid.uuid4())
# 存储客户端信息
mcp_server.clients[client_id] = {
"capabilities": data.get("capabilities", {}),
"clientInfo": data.get("clientInfo", {})
}
return {
"protocolVersion": "2025-06-18",
"capabilities": {
"tools": {"listChanged": True},
"resources": {"subscribe": True}
},
"serverInfo": {
"name": "RemoteWeatherServer",
"version": "1.0.0"
},
"clientId": client_id
}
@app.get("/mcp/events/{client_id}")
async def stream_events(client_id: str):
async def event_generator():
try:
while True:
# 发送保活消息
yield {
"event": "ping",
"data": json.dumps({"timestamp": time.time()})
}
await asyncio.sleep(30)
except asyncio.CancelledError:
# 清理客户端连接
if client_id in mcp_server.clients:
del mcp_server.clients[client_id]
return EventSourceResponse(event_generator())
@app.post("/mcp/tools/call")
async def call_tool(request: Request):
data = await request.json()
tool_name = data.get("name")
arguments = data.get("arguments", {})
if tool_name in mcp_server.tools:
try:
result = await mcp_server.tools[tool_name](**arguments)
return {"content": [{"type": "text", "text": result}]}
except Exception as e:
return {"error": {"code": -32603, "message": str(e)}}
else:
return {"error": {"code": -32601, "message": "Method not found"}}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
部署配置
yaml
# docker-compose.yml
version: '3.8'
services:
mcp-server:
build: .
ports:
- "8000:8000"
environment:
- API_KEY=${WEATHER_API_KEY}
- LOG_LEVEL=INFO
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- mcp-server
2. 客户端连接
JavaScript 客户端
javascript
class SSEMCPClient {
constructor(baseUrl) {
this.baseUrl = baseUrl;
this.clientId = null;
this.eventSource = null;
}
async initialize() {
const response = await fetch(`${this.baseUrl}/mcp/initialize`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
protocolVersion: "2025-06-18",
capabilities: {
roots: { listChanged: true }
},
clientInfo: {
name: "WebClient",
version: "1.0.0"
}
})
});
const data = await response.json();
this.clientId = data.clientId;
// 建立 SSE 连接
this.eventSource = new EventSource(
`${this.baseUrl}/mcp/events/${this.clientId}`
);
this.eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
this.handleServerEvent(data);
};
this.eventSource.onerror = (error) => {
console.error('SSE connection error:', error);
this.reconnect();
};
return data;
}
async callTool(name, arguments) {
const response = await fetch(`${this.baseUrl}/mcp/tools/call`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Client-ID': this.clientId
},
body: JSON.stringify({ name, arguments })
});
return await response.json();
}
handleServerEvent(data) {
switch (data.event) {
case 'ping':
// 处理保活消息
break;
case 'resource_updated':
// 处理资源更新通知
this.onResourceUpdated(data.resource);
break;
default:
console.log('Unknown event:', data);
}
}
async reconnect() {
if (this.eventSource) {
this.eventSource.close();
}
// 指数退避重连
let delay = 1000;
const maxDelay = 30000;
while (true) {
try {
await new Promise(resolve => setTimeout(resolve, delay));
await this.initialize();
console.log('Reconnected successfully');
break;
} catch (error) {
console.error('Reconnection failed:', error);
delay = Math.min(delay * 2, maxDelay);
}
}
}
disconnect() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
}
}
// 使用示例
const client = new SSEMCPClient('https://api.example.com');
await client.initialize();
const result = await client.callTool('get_weather', {
location: 'San Francisco'
});
console.log(result);
WebSocket 连接方式
1. 服务器实现
Node.js WebSocket 服务器
javascript
// websocket-server.js
const WebSocket = require('ws');
const express = require('express');
const http = require('http');
class MCPWebSocketServer {
constructor() {
this.app = express();
this.server = http.createServer(this.app);
this.wss = new WebSocket.Server({ server: this.server });
this.clients = new Map();
this.setupWebSocketHandlers();
}
setupWebSocketHandlers() {
this.wss.on('connection', (ws, req) => {
const clientId = this.generateClientId();
this.clients.set(clientId, {
ws,
initialized: false,
capabilities: {}
});
ws.on('message', async (data) => {
try {
const message = JSON.parse(data);
const response = await this.handleMessage(clientId, message);
if (response) {
ws.send(JSON.stringify(response));
}
} catch (error) {
ws.send(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32700,
message: "Parse error"
},
id: null
}));
}
});
ws.on('close', () => {
this.clients.delete(clientId);
});
ws.on('error', (error) => {
console.error('WebSocket error:', error);
this.clients.delete(clientId);
});
});
}
async handleMessage(clientId, message) {
const client = this.clients.get(clientId);
if (!client) return null;
switch (message.method) {
case 'initialize':
return this.handleInitialize(clientId, message);
case 'tools/list':
return this.handleToolsList(message);
case 'tools/call':
return this.handleToolCall(message);
case 'ping':
return { jsonrpc: "2.0", result: {}, id: message.id };
default:
return {
jsonrpc: "2.0",
error: {
code: -32601,
message: "Method not found"
},
id: message.id
};
}
}
handleInitialize(clientId, message) {
const client = this.clients.get(clientId);
client.initialized = true;
client.capabilities = message.params.capabilities || {};
return {
jsonrpc: "2.0",
result: {
protocolVersion: "2025-06-18",
capabilities: {
tools: { listChanged: true },
resources: { subscribe: true }
},
serverInfo: {
name: "WebSocketMCPServer",
version: "1.0.0"
}
},
id: message.id
};
}
handleToolsList(message) {
return {
jsonrpc: "2.0",
result: {
tools: [
{
name: "get_weather",
description: "Get weather information",
inputSchema: {
type: "object",
properties: {
location: { type: "string" }
},
required: ["location"]
}
}
]
},
id: message.id
};
}
async handleToolCall(message) {
const { name, arguments: args } = message.params;
try {
let result;
switch (name) {
case 'get_weather':
result = await this.getWeather(args.location);
break;
default:
throw new Error(`Unknown tool: ${name}`);
}
return {
jsonrpc: "2.0",
result: {
content: [
{ type: "text", text: result }
]
},
id: message.id
};
} catch (error) {
return {
jsonrpc: "2.0",
error: {
code: -32603,
message: error.message
},
id: message.id
};
}
}
async getWeather(location) {
// 模拟异步天气 API 调用
await new Promise(resolve => setTimeout(resolve, 100));
return `Weather in ${location}: Cloudy, 68°F`;
}
generateClientId() {
return Math.random().toString(36).substring(2, 15);
}
broadcast(message) {
this.clients.forEach((client) => {
if (client.initialized && client.ws.readyState === WebSocket.OPEN) {
client.ws.send(JSON.stringify(message));
}
});
}
start(port = 8080) {
this.server.listen(port, () => {
console.log(`MCP WebSocket server running on port ${port}`);
});
}
}
const server = new MCPWebSocketServer();
server.start();
2. 客户端连接
WebSocket 客户端
javascript
class WebSocketMCPClient {
constructor(url) {
this.url = url;
this.ws = null;
this.requestId = 0;
this.pendingRequests = new Map();
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
}
async connect() {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.url);
this.ws.onopen = async () => {
console.log('WebSocket connected');
this.reconnectAttempts = 0;
try {
await this.initialize();
resolve();
} catch (error) {
reject(error);
}
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
this.ws.onclose = (event) => {
console.log('WebSocket closed:', event.code, event.reason);
this.handleDisconnection();
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
reject(error);
};
});
}
async initialize() {
const response = await this.sendRequest('initialize', {
protocolVersion: "2025-06-18",
capabilities: {
roots: { listChanged: true }
},
clientInfo: {
name: "WebSocketClient",
version: "1.0.0"
}
});
console.log('Initialized:', response);
return response;
}
async sendRequest(method, params = {}) {
return new Promise((resolve, reject) => {
const id = ++this.requestId;
const message = {
jsonrpc: "2.0",
method,
params,
id
};
this.pendingRequests.set(id, { resolve, reject });
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
// 设置超时
setTimeout(() => {
if (this.pendingRequests.has(id)) {
this.pendingRequests.delete(id);
reject(new Error('Request timeout'));
}
}, 30000);
} else {
this.pendingRequests.delete(id);
reject(new Error('WebSocket not connected'));
}
});
}
handleMessage(message) {
if (message.id && this.pendingRequests.has(message.id)) {
const { resolve, reject } = this.pendingRequests.get(message.id);
this.pendingRequests.delete(message.id);
if (message.error) {
reject(new Error(message.error.message));
} else {
resolve(message.result);
}
} else if (message.method) {
// 处理服务器发送的通知
this.handleNotification(message);
}
}
handleNotification(message) {
switch (message.method) {
case 'notifications/resources/updated':
console.log('Resource updated:', message.params);
break;
default:
console.log('Unknown notification:', message);
}
}
async handleDisconnection() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.pow(2, this.reconnectAttempts) * 1000;
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => {
this.connect().catch(console.error);
}, delay);
} else {
console.error('Max reconnection attempts reached');
}
}
async callTool(name, arguments) {
return await this.sendRequest('tools/call', { name, arguments });
}
async listTools() {
return await this.sendRequest('tools/list');
}
disconnect() {
if (this.ws) {
this.ws.close();
this.ws = null;
}
}
}
// 使用示例
const client = new WebSocketMCPClient('wss://api.example.com/mcp');
await client.connect();
const tools = await client.listTools();
console.log('Available tools:', tools);
const result = await client.callTool('get_weather', {
location: 'New York'
});
console.log('Weather result:', result);
安全考虑
1. 认证和授权
API 密钥认证
javascript
// 服务器端中间件
app.use('/mcp', (req, res, next) => {
const apiKey = req.headers['x-api-key'];
if (!apiKey || !isValidApiKey(apiKey)) {
return res.status(401).json({
error: 'Unauthorized: Invalid API key'
});
}
req.apiKey = apiKey;
next();
});
JWT 认证
javascript
const jwt = require('jsonwebtoken');
app.use('/mcp', (req, res, next) => {
const token = req.headers.authorization?.replace('Bearer ', '');
if (!token) {
return res.status(401).json({
error: 'Unauthorized: Missing token'
});
}
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
req.user = decoded;
next();
} catch (error) {
return res.status(401).json({
error: 'Unauthorized: Invalid token'
});
}
});
2. HTTPS/WSS 配置
Nginx 配置
nginx
server {
listen 443 ssl http2;
server_name api.example.com;
ssl_certificate /etc/ssl/certs/api.example.com.crt;
ssl_certificate_key /etc/ssl/private/api.example.com.key;
# WebSocket 支持
location /mcp {
proxy_pass http://localhost:8080;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# SSE 支持
location /mcp/events {
proxy_pass http://localhost:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# SSE 特定配置
proxy_set_header Cache-Control no-cache;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
proxy_buffering off;
}
}
监控和运维
1. 健康检查
python
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"timestamp": time.time(),
"version": "1.0.0",
"active_connections": len(mcp_server.clients)
}
2. 指标收集
python
from prometheus_client import Counter, Histogram, generate_latest
REQUEST_COUNT = Counter('mcp_requests_total', 'Total MCP requests', ['method', 'status'])
REQUEST_DURATION = Histogram('mcp_request_duration_seconds', 'Request duration')
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
REQUEST_DURATION.observe(duration)
REQUEST_COUNT.labels(
method=request.url.path,
status=response.status_code
).inc()
return response
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")
最佳实践
1. 连接管理
- 实现自动重连机制
- 使用连接池管理多个连接
- 设置合理的超时时间
2. 错误处理
- 提供详细的错误信息
- 实现优雅的降级策略
- 记录详细的错误日志
3. 性能优化
- 使用异步操作避免阻塞
- 实现请求缓存机制
- 优化网络传输效率
相关资源
- 构建 MCP 服务器 - 服务器开发指南
- 安全最佳实践 - 安全实现指南
- 传输层规范 - 传输协议详解