Skip to content

连接远程 MCP 服务器

本指南介绍如何连接和使用部署在远程环境中的 MCP 服务器,包括 HTTP/SSE 和 WebSocket 传输方式。

概述

远程 MCP 服务器通常部署在云环境中,通过网络协议提供服务。主要的传输方式包括:

  • Server-Sent Events (SSE): 基于 HTTP 的单向流式通信
  • WebSocket: 全双工的实时通信协议
  • HTTP: 传统的请求-响应模式

传输方式对比

特性SSEWebSocketHTTP
双向通信
实时性
复杂度
防火墙友好
推荐场景通知密集交互密集简单查询

SSE 连接方式

1. 服务器实现

Python FastAPI 示例

python
# server.py
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import asyncio
import json
import uuid

app = FastAPI()

class MCPServer:
    def __init__(self):
        self.clients = {}
        self.tools = {
            "get_weather": self.get_weather,
            "calculate": self.calculate
        }
    
    async def get_weather(self, location: str):
        # 模拟天气 API 调用
        return f"Weather in {location}: Sunny, 72°F"
    
    async def calculate(self, expression: str):
        # 安全的计算器实现
        try:
            result = eval(expression)  # 生产环境需要更安全的实现
            return f"Result: {result}"
        except Exception as e:
            return f"Error: {str(e)}"

mcp_server = MCPServer()

@app.post("/mcp/initialize")
async def initialize(request: Request):
    data = await request.json()
    client_id = str(uuid.uuid4())
    
    # 存储客户端信息
    mcp_server.clients[client_id] = {
        "capabilities": data.get("capabilities", {}),
        "clientInfo": data.get("clientInfo", {})
    }
    
    return {
        "protocolVersion": "2025-06-18",
        "capabilities": {
            "tools": {"listChanged": True},
            "resources": {"subscribe": True}
        },
        "serverInfo": {
            "name": "RemoteWeatherServer",
            "version": "1.0.0"
        },
        "clientId": client_id
    }

@app.get("/mcp/events/{client_id}")
async def stream_events(client_id: str):
    async def event_generator():
        try:
            while True:
                # 发送保活消息
                yield {
                    "event": "ping",
                    "data": json.dumps({"timestamp": time.time()})
                }
                await asyncio.sleep(30)
        except asyncio.CancelledError:
            # 清理客户端连接
            if client_id in mcp_server.clients:
                del mcp_server.clients[client_id]
    
    return EventSourceResponse(event_generator())

@app.post("/mcp/tools/call")
async def call_tool(request: Request):
    data = await request.json()
    tool_name = data.get("name")
    arguments = data.get("arguments", {})
    
    if tool_name in mcp_server.tools:
        try:
            result = await mcp_server.tools[tool_name](**arguments)
            return {"content": [{"type": "text", "text": result}]}
        except Exception as e:
            return {"error": {"code": -32603, "message": str(e)}}
    else:
        return {"error": {"code": -32601, "message": "Method not found"}}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

部署配置

yaml
# docker-compose.yml
version: '3.8'
services:
  mcp-server:
    build: .
    ports:
      - "8000:8000"
    environment:
      - API_KEY=${WEATHER_API_KEY}
      - LOG_LEVEL=INFO
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - mcp-server

2. 客户端连接

JavaScript 客户端

javascript
class SSEMCPClient {
    constructor(baseUrl) {
        this.baseUrl = baseUrl;
        this.clientId = null;
        this.eventSource = null;
    }
    
    async initialize() {
        const response = await fetch(`${this.baseUrl}/mcp/initialize`, {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
            },
            body: JSON.stringify({
                protocolVersion: "2025-06-18",
                capabilities: {
                    roots: { listChanged: true }
                },
                clientInfo: {
                    name: "WebClient",
                    version: "1.0.0"
                }
            })
        });
        
        const data = await response.json();
        this.clientId = data.clientId;
        
        // 建立 SSE 连接
        this.eventSource = new EventSource(
            `${this.baseUrl}/mcp/events/${this.clientId}`
        );
        
        this.eventSource.onmessage = (event) => {
            const data = JSON.parse(event.data);
            this.handleServerEvent(data);
        };
        
        this.eventSource.onerror = (error) => {
            console.error('SSE connection error:', error);
            this.reconnect();
        };
        
        return data;
    }
    
    async callTool(name, arguments) {
        const response = await fetch(`${this.baseUrl}/mcp/tools/call`, {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
                'X-Client-ID': this.clientId
            },
            body: JSON.stringify({ name, arguments })
        });
        
        return await response.json();
    }
    
    handleServerEvent(data) {
        switch (data.event) {
            case 'ping':
                // 处理保活消息
                break;
            case 'resource_updated':
                // 处理资源更新通知
                this.onResourceUpdated(data.resource);
                break;
            default:
                console.log('Unknown event:', data);
        }
    }
    
    async reconnect() {
        if (this.eventSource) {
            this.eventSource.close();
        }
        
        // 指数退避重连
        let delay = 1000;
        const maxDelay = 30000;
        
        while (true) {
            try {
                await new Promise(resolve => setTimeout(resolve, delay));
                await this.initialize();
                console.log('Reconnected successfully');
                break;
            } catch (error) {
                console.error('Reconnection failed:', error);
                delay = Math.min(delay * 2, maxDelay);
            }
        }
    }
    
    disconnect() {
        if (this.eventSource) {
            this.eventSource.close();
            this.eventSource = null;
        }
    }
}

// 使用示例
const client = new SSEMCPClient('https://api.example.com');
await client.initialize();

const result = await client.callTool('get_weather', {
    location: 'San Francisco'
});
console.log(result);

WebSocket 连接方式

1. 服务器实现

Node.js WebSocket 服务器

javascript
// websocket-server.js
const WebSocket = require('ws');
const express = require('express');
const http = require('http');

class MCPWebSocketServer {
    constructor() {
        this.app = express();
        this.server = http.createServer(this.app);
        this.wss = new WebSocket.Server({ server: this.server });
        this.clients = new Map();
        
        this.setupWebSocketHandlers();
    }
    
    setupWebSocketHandlers() {
        this.wss.on('connection', (ws, req) => {
            const clientId = this.generateClientId();
            this.clients.set(clientId, {
                ws,
                initialized: false,
                capabilities: {}
            });
            
            ws.on('message', async (data) => {
                try {
                    const message = JSON.parse(data);
                    const response = await this.handleMessage(clientId, message);
                    if (response) {
                        ws.send(JSON.stringify(response));
                    }
                } catch (error) {
                    ws.send(JSON.stringify({
                        jsonrpc: "2.0",
                        error: {
                            code: -32700,
                            message: "Parse error"
                        },
                        id: null
                    }));
                }
            });
            
            ws.on('close', () => {
                this.clients.delete(clientId);
            });
            
            ws.on('error', (error) => {
                console.error('WebSocket error:', error);
                this.clients.delete(clientId);
            });
        });
    }
    
    async handleMessage(clientId, message) {
        const client = this.clients.get(clientId);
        if (!client) return null;
        
        switch (message.method) {
            case 'initialize':
                return this.handleInitialize(clientId, message);
            case 'tools/list':
                return this.handleToolsList(message);
            case 'tools/call':
                return this.handleToolCall(message);
            case 'ping':
                return { jsonrpc: "2.0", result: {}, id: message.id };
            default:
                return {
                    jsonrpc: "2.0",
                    error: {
                        code: -32601,
                        message: "Method not found"
                    },
                    id: message.id
                };
        }
    }
    
    handleInitialize(clientId, message) {
        const client = this.clients.get(clientId);
        client.initialized = true;
        client.capabilities = message.params.capabilities || {};
        
        return {
            jsonrpc: "2.0",
            result: {
                protocolVersion: "2025-06-18",
                capabilities: {
                    tools: { listChanged: true },
                    resources: { subscribe: true }
                },
                serverInfo: {
                    name: "WebSocketMCPServer",
                    version: "1.0.0"
                }
            },
            id: message.id
        };
    }
    
    handleToolsList(message) {
        return {
            jsonrpc: "2.0",
            result: {
                tools: [
                    {
                        name: "get_weather",
                        description: "Get weather information",
                        inputSchema: {
                            type: "object",
                            properties: {
                                location: { type: "string" }
                            },
                            required: ["location"]
                        }
                    }
                ]
            },
            id: message.id
        };
    }
    
    async handleToolCall(message) {
        const { name, arguments: args } = message.params;
        
        try {
            let result;
            switch (name) {
                case 'get_weather':
                    result = await this.getWeather(args.location);
                    break;
                default:
                    throw new Error(`Unknown tool: ${name}`);
            }
            
            return {
                jsonrpc: "2.0",
                result: {
                    content: [
                        { type: "text", text: result }
                    ]
                },
                id: message.id
            };
        } catch (error) {
            return {
                jsonrpc: "2.0",
                error: {
                    code: -32603,
                    message: error.message
                },
                id: message.id
            };
        }
    }
    
    async getWeather(location) {
        // 模拟异步天气 API 调用
        await new Promise(resolve => setTimeout(resolve, 100));
        return `Weather in ${location}: Cloudy, 68°F`;
    }
    
    generateClientId() {
        return Math.random().toString(36).substring(2, 15);
    }
    
    broadcast(message) {
        this.clients.forEach((client) => {
            if (client.initialized && client.ws.readyState === WebSocket.OPEN) {
                client.ws.send(JSON.stringify(message));
            }
        });
    }
    
    start(port = 8080) {
        this.server.listen(port, () => {
            console.log(`MCP WebSocket server running on port ${port}`);
        });
    }
}

const server = new MCPWebSocketServer();
server.start();

2. 客户端连接

WebSocket 客户端

javascript
class WebSocketMCPClient {
    constructor(url) {
        this.url = url;
        this.ws = null;
        this.requestId = 0;
        this.pendingRequests = new Map();
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
    }
    
    async connect() {
        return new Promise((resolve, reject) => {
            this.ws = new WebSocket(this.url);
            
            this.ws.onopen = async () => {
                console.log('WebSocket connected');
                this.reconnectAttempts = 0;
                
                try {
                    await this.initialize();
                    resolve();
                } catch (error) {
                    reject(error);
                }
            };
            
            this.ws.onmessage = (event) => {
                const message = JSON.parse(event.data);
                this.handleMessage(message);
            };
            
            this.ws.onclose = (event) => {
                console.log('WebSocket closed:', event.code, event.reason);
                this.handleDisconnection();
            };
            
            this.ws.onerror = (error) => {
                console.error('WebSocket error:', error);
                reject(error);
            };
        });
    }
    
    async initialize() {
        const response = await this.sendRequest('initialize', {
            protocolVersion: "2025-06-18",
            capabilities: {
                roots: { listChanged: true }
            },
            clientInfo: {
                name: "WebSocketClient",
                version: "1.0.0"
            }
        });
        
        console.log('Initialized:', response);
        return response;
    }
    
    async sendRequest(method, params = {}) {
        return new Promise((resolve, reject) => {
            const id = ++this.requestId;
            const message = {
                jsonrpc: "2.0",
                method,
                params,
                id
            };
            
            this.pendingRequests.set(id, { resolve, reject });
            
            if (this.ws.readyState === WebSocket.OPEN) {
                this.ws.send(JSON.stringify(message));
                
                // 设置超时
                setTimeout(() => {
                    if (this.pendingRequests.has(id)) {
                        this.pendingRequests.delete(id);
                        reject(new Error('Request timeout'));
                    }
                }, 30000);
            } else {
                this.pendingRequests.delete(id);
                reject(new Error('WebSocket not connected'));
            }
        });
    }
    
    handleMessage(message) {
        if (message.id && this.pendingRequests.has(message.id)) {
            const { resolve, reject } = this.pendingRequests.get(message.id);
            this.pendingRequests.delete(message.id);
            
            if (message.error) {
                reject(new Error(message.error.message));
            } else {
                resolve(message.result);
            }
        } else if (message.method) {
            // 处理服务器发送的通知
            this.handleNotification(message);
        }
    }
    
    handleNotification(message) {
        switch (message.method) {
            case 'notifications/resources/updated':
                console.log('Resource updated:', message.params);
                break;
            default:
                console.log('Unknown notification:', message);
        }
    }
    
    async handleDisconnection() {
        if (this.reconnectAttempts < this.maxReconnectAttempts) {
            this.reconnectAttempts++;
            const delay = Math.pow(2, this.reconnectAttempts) * 1000;
            
            console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
            
            setTimeout(() => {
                this.connect().catch(console.error);
            }, delay);
        } else {
            console.error('Max reconnection attempts reached');
        }
    }
    
    async callTool(name, arguments) {
        return await this.sendRequest('tools/call', { name, arguments });
    }
    
    async listTools() {
        return await this.sendRequest('tools/list');
    }
    
    disconnect() {
        if (this.ws) {
            this.ws.close();
            this.ws = null;
        }
    }
}

// 使用示例
const client = new WebSocketMCPClient('wss://api.example.com/mcp');
await client.connect();

const tools = await client.listTools();
console.log('Available tools:', tools);

const result = await client.callTool('get_weather', {
    location: 'New York'
});
console.log('Weather result:', result);

安全考虑

1. 认证和授权

API 密钥认证

javascript
// 服务器端中间件
app.use('/mcp', (req, res, next) => {
    const apiKey = req.headers['x-api-key'];
    if (!apiKey || !isValidApiKey(apiKey)) {
        return res.status(401).json({
            error: 'Unauthorized: Invalid API key'
        });
    }
    req.apiKey = apiKey;
    next();
});

JWT 认证

javascript
const jwt = require('jsonwebtoken');

app.use('/mcp', (req, res, next) => {
    const token = req.headers.authorization?.replace('Bearer ', '');
    if (!token) {
        return res.status(401).json({
            error: 'Unauthorized: Missing token'
        });
    }
    
    try {
        const decoded = jwt.verify(token, process.env.JWT_SECRET);
        req.user = decoded;
        next();
    } catch (error) {
        return res.status(401).json({
            error: 'Unauthorized: Invalid token'
        });
    }
});

2. HTTPS/WSS 配置

Nginx 配置

nginx
server {
    listen 443 ssl http2;
    server_name api.example.com;
    
    ssl_certificate /etc/ssl/certs/api.example.com.crt;
    ssl_certificate_key /etc/ssl/private/api.example.com.key;
    
    # WebSocket 支持
    location /mcp {
        proxy_pass http://localhost:8080;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
    
    # SSE 支持
    location /mcp/events {
        proxy_pass http://localhost:8000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # SSE 特定配置
        proxy_set_header Cache-Control no-cache;
        proxy_set_header Connection '';
        proxy_http_version 1.1;
        chunked_transfer_encoding off;
        proxy_buffering off;
    }
}

监控和运维

1. 健康检查

python
@app.get("/health")
async def health_check():
    return {
        "status": "healthy",
        "timestamp": time.time(),
        "version": "1.0.0",
        "active_connections": len(mcp_server.clients)
    }

2. 指标收集

python
from prometheus_client import Counter, Histogram, generate_latest

REQUEST_COUNT = Counter('mcp_requests_total', 'Total MCP requests', ['method', 'status'])
REQUEST_DURATION = Histogram('mcp_request_duration_seconds', 'Request duration')

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    duration = time.time() - start_time
    
    REQUEST_DURATION.observe(duration)
    REQUEST_COUNT.labels(
        method=request.url.path,
        status=response.status_code
    ).inc()
    
    return response

@app.get("/metrics")
async def metrics():
    return Response(generate_latest(), media_type="text/plain")

最佳实践

1. 连接管理

  • 实现自动重连机制
  • 使用连接池管理多个连接
  • 设置合理的超时时间

2. 错误处理

  • 提供详细的错误信息
  • 实现优雅的降级策略
  • 记录详细的错误日志

3. 性能优化

  • 使用异步操作避免阻塞
  • 实现请求缓存机制
  • 优化网络传输效率

相关资源

🚀 探索模型上下文协议的无限可能 | 连接 AI 与世界的桥梁 | 让智能更智能