Skip to content

安全最佳实践

本文档提供了实施模型上下文协议 (MCP) 时的安全最佳实践指南。

安全原则

1. 深度防御

实施多层安全措施,确保单点故障不会导致整个系统被攻击。

2. 最小权限原则

  • 客户端只获得完成任务所需的最小权限
  • 定期审查和调整权限
  • 实施权限过期机制

3. 零信任架构

  • 验证所有请求,无论来源
  • 不信任网络边界
  • 持续监控和验证

4. 数据保护

  • 加密传输中的数据
  • 加密静态数据
  • 最小化数据收集

传输层安全

TLS/SSL 配置

强制 HTTPS/WSS

javascript
// 错误示例 - 不安全的 HTTP
const server = http.createServer();

// 正确示例 - 安全的 HTTPS
const server = https.createServer({
    key: fs.readFileSync('private-key.pem'),
    cert: fs.readFileSync('certificate.pem'),
    // 使用强密码套件
    ciphers: [
        'ECDHE-RSA-AES128-GCM-SHA256',
        'ECDHE-RSA-AES256-GCM-SHA384',
        'ECDHE-RSA-AES128-SHA256',
        'ECDHE-RSA-AES256-SHA384'
    ].join(':'),
    honorCipherOrder: true
});

证书验证

python
import ssl
import websockets

# 客户端证书验证
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED

# 连接到安全的 WebSocket
async with websockets.connect(
    "wss://secure-server.example.com",
    ssl=ssl_context
) as websocket:
    # 安全通信
    pass

相互 TLS (mTLS)

python
import ssl

# 服务器端 mTLS 配置
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain('server-cert.pem', 'server-key.pem')
ssl_context.load_verify_locations('ca-cert.pem')
ssl_context.verify_mode = ssl.CERT_REQUIRED

# 客户端端 mTLS 配置
client_context = ssl.create_default_context()
client_context.load_cert_chain('client-cert.pem', 'client-key.pem')
client_context.load_verify_locations('ca-cert.pem')

身份验证和授权

强身份验证

JWT 最佳实践

python
import jwt
import time
from datetime import datetime, timedelta

class SecureJWTHandler:
    def __init__(self, secret_key: str, algorithm: str = 'HS256'):
        self.secret_key = secret_key
        self.algorithm = algorithm
        
    def create_token(self, user_id: str, permissions: dict) -> str:
        """创建安全的 JWT 令牌"""
        now = datetime.utcnow()
        payload = {
            'sub': user_id,
            'iat': now,
            'exp': now + timedelta(hours=1),  # 短期过期
            'nbf': now,  # 不早于当前时间
            'jti': self._generate_jti(),  # 唯一标识符
            'permissions': permissions
        }
        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
    
    def verify_token(self, token: str) -> dict:
        """验证 JWT 令牌"""
        try:
            payload = jwt.decode(
                token, 
                self.secret_key, 
                algorithms=[self.algorithm],
                options={
                    'verify_exp': True,
                    'verify_nbf': True,
                    'verify_iat': True
                }
            )
            
            # 检查令牌是否在黑名单中
            if self._is_token_blacklisted(payload.get('jti')):
                raise jwt.InvalidTokenError("令牌已被撤销")
                
            return payload
        except jwt.ExpiredSignatureError:
            raise jwt.InvalidTokenError("令牌已过期")
        except jwt.InvalidTokenError as e:
            raise jwt.InvalidTokenError(f"无效令牌: {str(e)}")

API 密钥管理

python
import secrets
import hashlib
import hmac

class APIKeyManager:
    def __init__(self, secret_salt: bytes):
        self.secret_salt = secret_salt
    
    def generate_api_key(self) -> tuple[str, str]:
        """生成 API 密钥对"""
        # 生成随机密钥
        key = secrets.token_urlsafe(32)
        
        # 生成哈希用于存储
        key_hash = self._hash_key(key)
        
        return key, key_hash
    
    def verify_api_key(self, provided_key: str, stored_hash: str) -> bool:
        """验证 API 密钥"""
        computed_hash = self._hash_key(provided_key)
        return hmac.compare_digest(computed_hash, stored_hash)
    
    def _hash_key(self, key: str) -> str:
        """安全哈希 API 密钥"""
        return hashlib.pbkdf2_hmac(
            'sha256',
            key.encode('utf-8'),
            self.secret_salt,
            100000  # 迭代次数
        ).hex()

权限控制

基于角色的访问控制 (RBAC)

python
from enum import Enum
from typing import Set, Dict

class Permission(Enum):
    READ_FILES = "read_files"
    WRITE_FILES = "write_files"
    EXECUTE_TOOLS = "execute_tools"
    ADMIN_ACCESS = "admin_access"

class Role:
    def __init__(self, name: str, permissions: Set[Permission]):
        self.name = name
        self.permissions = permissions

class RBACManager:
    def __init__(self):
        self.roles: Dict[str, Role] = {
            'viewer': Role('viewer', {Permission.READ_FILES}),
            'editor': Role('editor', {Permission.READ_FILES, Permission.WRITE_FILES}),
            'developer': Role('developer', {
                Permission.READ_FILES, 
                Permission.WRITE_FILES, 
                Permission.EXECUTE_TOOLS
            }),
            'admin': Role('admin', set(Permission))
        }
        self.user_roles: Dict[str, Set[str]] = {}
    
    def assign_role(self, user_id: str, role_name: str):
        """为用户分配角色"""
        if role_name not in self.roles:
            raise ValueError(f"未知角色: {role_name}")
        
        if user_id not in self.user_roles:
            self.user_roles[user_id] = set()
        
        self.user_roles[user_id].add(role_name)
    
    def check_permission(self, user_id: str, permission: Permission) -> bool:
        """检查用户权限"""
        user_roles = self.user_roles.get(user_id, set())
        
        for role_name in user_roles:
            role = self.roles.get(role_name)
            if role and permission in role.permissions:
                return True
        
        return False

输入验证和清理

请求验证

python
from pydantic import BaseModel, validator
from typing import Optional, Dict, Any
import re

class MCPRequest(BaseModel):
    jsonrpc: str
    method: str
    id: Optional[int] = None
    params: Optional[Dict[str, Any]] = None
    
    @validator('jsonrpc')
    def validate_jsonrpc(cls, v):
        if v != "2.0":
            raise ValueError('jsonrpc 必须是 "2.0"')
        return v
    
    @validator('method')
    def validate_method(cls, v):
        # 只允许特定的方法名格式
        if not re.match(r'^[a-zA-Z][a-zA-Z0-9_/]*$', v):
            raise ValueError('无效的方法名格式')
        return v
    
    @validator('params')
    def validate_params(cls, v):
        if v is not None:
            # 限制参数大小
            if len(str(v)) > 10000:  # 10KB 限制
                raise ValueError('参数过大')
        return v

# 使用示例
def validate_request(raw_request: dict) -> MCPRequest:
    try:
        return MCPRequest(**raw_request)
    except ValueError as e:
        raise ValueError(f"请求验证失败: {str(e)}")

路径遍历防护

python
import os
from pathlib import Path

class SecurePathHandler:
    def __init__(self, allowed_base_paths: list[str]):
        self.allowed_base_paths = [Path(p).resolve() for p in allowed_base_paths]
    
    def validate_path(self, requested_path: str) -> Path:
        """验证路径是否安全"""
        try:
            # 解析路径
            path = Path(requested_path).resolve()
            
            # 检查是否在允许的基础路径内
            for base_path in self.allowed_base_paths:
                try:
                    path.relative_to(base_path)
                    return path
                except ValueError:
                    continue
            
            raise ValueError(f"路径不在允许范围内: {requested_path}")
            
        except (OSError, ValueError) as e:
            raise ValueError(f"无效路径: {str(e)}")

# 使用示例
path_handler = SecurePathHandler(["/workspace", "/tmp/mcp"])
safe_path = path_handler.validate_path("/workspace/project/file.txt")

错误处理和日志

安全错误处理

python
import logging
from typing import Dict, Any

class SecureErrorHandler:
    def __init__(self):
        self.logger = logging.getLogger('mcp_security')
        
    def handle_error(self, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:
        """安全地处理错误"""
        # 记录详细错误信息(仅用于内部)
        self.logger.error(
            f"MCP 错误: {str(error)}",
            extra={
                'context': context,
                'error_type': type(error).__name__
            }
        )
        
        # 返回安全的错误响应(不泄露敏感信息)
        if isinstance(error, PermissionError):
            return {
                "code": -32001,
                "message": "权限不足",
                "data": {"type": "permission_denied"}
            }
        elif isinstance(error, ValueError):
            return {
                "code": -32602,
                "message": "无效参数",
                "data": {"type": "invalid_params"}
            }
        else:
            return {
                "code": -32603,
                "message": "内部错误",
                "data": {"type": "internal_error"}
            }

安全审计日志

python
import json
from datetime import datetime
from typing import Optional

class SecurityAuditLogger:
    def __init__(self, log_file: str):
        self.log_file = log_file
        
    def log_event(self, 
                  event_type: str,
                  user_id: Optional[str],
                  action: str,
                  resource: Optional[str] = None,
                  result: str = "success",
                  details: Optional[Dict[str, Any]] = None):
        """记录安全事件"""
        event = {
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "user_id": user_id,
            "action": action,
            "resource": resource,
            "result": result,
            "details": details or {}
        }
        
        with open(self.log_file, 'a') as f:
            f.write(json.dumps(event) + '\n')

# 使用示例
audit_logger = SecurityAuditLogger('/var/log/mcp_audit.log')

audit_logger.log_event(
    event_type="authentication",
    user_id="user_123",
    action="login",
    result="success"
)

audit_logger.log_event(
    event_type="authorization",
    user_id="user_123",
    action="tools/call",
    resource="file_read",
    result="denied",
    details={"reason": "insufficient_permissions"}
)

速率限制和 DDoS 防护

请求速率限制

python
import time
from collections import defaultdict, deque
from typing import Dict

class RateLimiter:
    def __init__(self, max_requests: int, time_window: int):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests: Dict[str, deque] = defaultdict(deque)
    
    def is_allowed(self, client_id: str) -> bool:
        """检查客户端是否允许发送请求"""
        now = time.time()
        client_requests = self.requests[client_id]
        
        # 清理过期的请求记录
        while client_requests and client_requests[0] <= now - self.time_window:
            client_requests.popleft()
        
        # 检查是否超过限制
        if len(client_requests) >= self.max_requests:
            return False
        
        # 记录新请求
        client_requests.append(now)
        return True

# 使用示例
rate_limiter = RateLimiter(max_requests=100, time_window=60)  # 每分钟 100 请求

def handle_request(client_id: str, request: dict):
    if not rate_limiter.is_allowed(client_id):
        raise Exception("请求过于频繁,请稍后再试")
    
    # 处理请求
    pass

数据保护

敏感数据处理

python
import re
from typing import Any, Dict

class DataSanitizer:
    def __init__(self):
        # 敏感数据模式
        self.sensitive_patterns = [
            (r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]'),
            (r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', '[CARD]'),
            (r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]'),
            (r'sk-[a-zA-Z0-9]{48}', '[API_KEY]'),
            (r'Bearer\s+[a-zA-Z0-9\-._~+/]+=*', '[TOKEN]')
        ]
    
    def sanitize(self, data: Any) -> Any:
        """清理敏感数据"""
        if isinstance(data, str):
            return self._sanitize_string(data)
        elif isinstance(data, dict):
            return {k: self.sanitize(v) for k, v in data.items()}
        elif isinstance(data, list):
            return [self.sanitize(item) for item in data]
        else:
            return data
    
    def _sanitize_string(self, text: str) -> str:
        """清理字符串中的敏感信息"""
        for pattern, replacement in self.sensitive_patterns:
            text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
        return text

# 使用示例
sanitizer = DataSanitizer()

# 清理日志数据
log_data = {
    "user": "john@example.com",
    "api_key": "sk-1234567890abcdef1234567890abcdef12345678",
    "message": "用户登录成功"
}

clean_data = sanitizer.sanitize(log_data)
# 结果: {"user": "[EMAIL]", "api_key": "[API_KEY]", "message": "用户登录成功"}

监控和告警

安全事件监控

python
import asyncio
from typing import Callable, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class SecurityEvent:
    timestamp: datetime
    event_type: str
    severity: str
    source: str
    description: str
    metadata: Dict[str, Any]

class SecurityMonitor:
    def __init__(self):
        self.handlers: List[Callable[[SecurityEvent], None]] = []
        self.event_queue = asyncio.Queue()
    
    def add_handler(self, handler: Callable[[SecurityEvent], None]):
        """添加事件处理器"""
        self.handlers.append(handler)
    
    async def emit_event(self, event: SecurityEvent):
        """发出安全事件"""
        await self.event_queue.put(event)
    
    async def process_events(self):
        """处理安全事件"""
        while True:
            event = await self.event_queue.get()
            for handler in self.handlers:
                try:
                    handler(event)
                except Exception as e:
                    print(f"事件处理器错误: {e}")

# 事件处理器示例
def log_security_event(event: SecurityEvent):
    """记录安全事件"""
    print(f"[{event.severity}] {event.timestamp}: {event.description}")

def alert_on_critical_event(event: SecurityEvent):
    """关键事件告警"""
    if event.severity == "critical":
        # 发送告警通知
        send_alert(f"关键安全事件: {event.description}")

# 使用示例
monitor = SecurityMonitor()
monitor.add_handler(log_security_event)
monitor.add_handler(alert_on_critical_event)

# 发出安全事件
await monitor.emit_event(SecurityEvent(
    timestamp=datetime.utcnow(),
    event_type="authentication_failure",
    severity="warning",
    source="mcp_server",
    description="多次登录失败",
    metadata={"user_id": "user_123", "attempts": 5}
))

部署安全

容器安全

dockerfile
# 使用非 root 用户
FROM python:3.11-slim

# 创建非特权用户
RUN groupadd -r mcpuser && useradd -r -g mcpuser mcpuser

# 设置工作目录
WORKDIR /app

# 复制应用文件
COPY --chown=mcpuser:mcpuser . .

# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt

# 切换到非特权用户
USER mcpuser

# 暴露端口
EXPOSE 8080

# 启动应用
CMD ["python", "server.py"]

环境变量安全

python
import os
from typing import Optional

class SecureConfig:
    def __init__(self):
        self.secret_key = self._get_required_env('MCP_SECRET_KEY')
        self.database_url = self._get_required_env('MCP_DATABASE_URL')
        self.debug = self._get_env_bool('MCP_DEBUG', False)
    
    def _get_required_env(self, key: str) -> str:
        """获取必需的环境变量"""
        value = os.getenv(key)
        if not value:
            raise ValueError(f"必需的环境变量未设置: {key}")
        return value
    
    def _get_env_bool(self, key: str, default: bool) -> bool:
        """获取布尔型环境变量"""
        value = os.getenv(key, str(default)).lower()
        return value in ('true', '1', 'yes', 'on')

# 使用示例
config = SecureConfig()

安全检查清单

开发阶段

  • [ ] 实施输入验证
  • [ ] 使用参数化查询
  • [ ] 实施权限检查
  • [ ] 添加安全测试

部署阶段

  • [ ] 启用 HTTPS/WSS
  • [ ] 配置防火墙
  • [ ] 设置监控和日志
  • [ ] 定期安全扫描

运维阶段

  • [ ] 定期更新依赖
  • [ ] 监控安全事件
  • [ ] 备份和恢复测试
  • [ ] 安全培训

相关资源

🚀 探索模型上下文协议的无限可能 | 连接 AI 与世界的桥梁 | 让智能更智能