主题
安全最佳实践
本文档提供了实施模型上下文协议 (MCP) 时的安全最佳实践指南。
安全原则
1. 深度防御
实施多层安全措施,确保单点故障不会导致整个系统被攻击。
2. 最小权限原则
- 客户端只获得完成任务所需的最小权限
- 定期审查和调整权限
- 实施权限过期机制
3. 零信任架构
- 验证所有请求,无论来源
- 不信任网络边界
- 持续监控和验证
4. 数据保护
- 加密传输中的数据
- 加密静态数据
- 最小化数据收集
传输层安全
TLS/SSL 配置
强制 HTTPS/WSS
javascript
// 错误示例 - 不安全的 HTTP
const server = http.createServer();
// 正确示例 - 安全的 HTTPS
const server = https.createServer({
key: fs.readFileSync('private-key.pem'),
cert: fs.readFileSync('certificate.pem'),
// 使用强密码套件
ciphers: [
'ECDHE-RSA-AES128-GCM-SHA256',
'ECDHE-RSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES128-SHA256',
'ECDHE-RSA-AES256-SHA384'
].join(':'),
honorCipherOrder: true
});
证书验证
python
import ssl
import websockets
# 客户端证书验证
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
# 连接到安全的 WebSocket
async with websockets.connect(
"wss://secure-server.example.com",
ssl=ssl_context
) as websocket:
# 安全通信
pass
相互 TLS (mTLS)
python
import ssl
# 服务器端 mTLS 配置
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain('server-cert.pem', 'server-key.pem')
ssl_context.load_verify_locations('ca-cert.pem')
ssl_context.verify_mode = ssl.CERT_REQUIRED
# 客户端端 mTLS 配置
client_context = ssl.create_default_context()
client_context.load_cert_chain('client-cert.pem', 'client-key.pem')
client_context.load_verify_locations('ca-cert.pem')
身份验证和授权
强身份验证
JWT 最佳实践
python
import jwt
import time
from datetime import datetime, timedelta
class SecureJWTHandler:
def __init__(self, secret_key: str, algorithm: str = 'HS256'):
self.secret_key = secret_key
self.algorithm = algorithm
def create_token(self, user_id: str, permissions: dict) -> str:
"""创建安全的 JWT 令牌"""
now = datetime.utcnow()
payload = {
'sub': user_id,
'iat': now,
'exp': now + timedelta(hours=1), # 短期过期
'nbf': now, # 不早于当前时间
'jti': self._generate_jti(), # 唯一标识符
'permissions': permissions
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> dict:
"""验证 JWT 令牌"""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
options={
'verify_exp': True,
'verify_nbf': True,
'verify_iat': True
}
)
# 检查令牌是否在黑名单中
if self._is_token_blacklisted(payload.get('jti')):
raise jwt.InvalidTokenError("令牌已被撤销")
return payload
except jwt.ExpiredSignatureError:
raise jwt.InvalidTokenError("令牌已过期")
except jwt.InvalidTokenError as e:
raise jwt.InvalidTokenError(f"无效令牌: {str(e)}")
API 密钥管理
python
import secrets
import hashlib
import hmac
class APIKeyManager:
def __init__(self, secret_salt: bytes):
self.secret_salt = secret_salt
def generate_api_key(self) -> tuple[str, str]:
"""生成 API 密钥对"""
# 生成随机密钥
key = secrets.token_urlsafe(32)
# 生成哈希用于存储
key_hash = self._hash_key(key)
return key, key_hash
def verify_api_key(self, provided_key: str, stored_hash: str) -> bool:
"""验证 API 密钥"""
computed_hash = self._hash_key(provided_key)
return hmac.compare_digest(computed_hash, stored_hash)
def _hash_key(self, key: str) -> str:
"""安全哈希 API 密钥"""
return hashlib.pbkdf2_hmac(
'sha256',
key.encode('utf-8'),
self.secret_salt,
100000 # 迭代次数
).hex()
权限控制
基于角色的访问控制 (RBAC)
python
from enum import Enum
from typing import Set, Dict
class Permission(Enum):
READ_FILES = "read_files"
WRITE_FILES = "write_files"
EXECUTE_TOOLS = "execute_tools"
ADMIN_ACCESS = "admin_access"
class Role:
def __init__(self, name: str, permissions: Set[Permission]):
self.name = name
self.permissions = permissions
class RBACManager:
def __init__(self):
self.roles: Dict[str, Role] = {
'viewer': Role('viewer', {Permission.READ_FILES}),
'editor': Role('editor', {Permission.READ_FILES, Permission.WRITE_FILES}),
'developer': Role('developer', {
Permission.READ_FILES,
Permission.WRITE_FILES,
Permission.EXECUTE_TOOLS
}),
'admin': Role('admin', set(Permission))
}
self.user_roles: Dict[str, Set[str]] = {}
def assign_role(self, user_id: str, role_name: str):
"""为用户分配角色"""
if role_name not in self.roles:
raise ValueError(f"未知角色: {role_name}")
if user_id not in self.user_roles:
self.user_roles[user_id] = set()
self.user_roles[user_id].add(role_name)
def check_permission(self, user_id: str, permission: Permission) -> bool:
"""检查用户权限"""
user_roles = self.user_roles.get(user_id, set())
for role_name in user_roles:
role = self.roles.get(role_name)
if role and permission in role.permissions:
return True
return False
输入验证和清理
请求验证
python
from pydantic import BaseModel, validator
from typing import Optional, Dict, Any
import re
class MCPRequest(BaseModel):
jsonrpc: str
method: str
id: Optional[int] = None
params: Optional[Dict[str, Any]] = None
@validator('jsonrpc')
def validate_jsonrpc(cls, v):
if v != "2.0":
raise ValueError('jsonrpc 必须是 "2.0"')
return v
@validator('method')
def validate_method(cls, v):
# 只允许特定的方法名格式
if not re.match(r'^[a-zA-Z][a-zA-Z0-9_/]*$', v):
raise ValueError('无效的方法名格式')
return v
@validator('params')
def validate_params(cls, v):
if v is not None:
# 限制参数大小
if len(str(v)) > 10000: # 10KB 限制
raise ValueError('参数过大')
return v
# 使用示例
def validate_request(raw_request: dict) -> MCPRequest:
try:
return MCPRequest(**raw_request)
except ValueError as e:
raise ValueError(f"请求验证失败: {str(e)}")
路径遍历防护
python
import os
from pathlib import Path
class SecurePathHandler:
def __init__(self, allowed_base_paths: list[str]):
self.allowed_base_paths = [Path(p).resolve() for p in allowed_base_paths]
def validate_path(self, requested_path: str) -> Path:
"""验证路径是否安全"""
try:
# 解析路径
path = Path(requested_path).resolve()
# 检查是否在允许的基础路径内
for base_path in self.allowed_base_paths:
try:
path.relative_to(base_path)
return path
except ValueError:
continue
raise ValueError(f"路径不在允许范围内: {requested_path}")
except (OSError, ValueError) as e:
raise ValueError(f"无效路径: {str(e)}")
# 使用示例
path_handler = SecurePathHandler(["/workspace", "/tmp/mcp"])
safe_path = path_handler.validate_path("/workspace/project/file.txt")
错误处理和日志
安全错误处理
python
import logging
from typing import Dict, Any
class SecureErrorHandler:
def __init__(self):
self.logger = logging.getLogger('mcp_security')
def handle_error(self, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:
"""安全地处理错误"""
# 记录详细错误信息(仅用于内部)
self.logger.error(
f"MCP 错误: {str(error)}",
extra={
'context': context,
'error_type': type(error).__name__
}
)
# 返回安全的错误响应(不泄露敏感信息)
if isinstance(error, PermissionError):
return {
"code": -32001,
"message": "权限不足",
"data": {"type": "permission_denied"}
}
elif isinstance(error, ValueError):
return {
"code": -32602,
"message": "无效参数",
"data": {"type": "invalid_params"}
}
else:
return {
"code": -32603,
"message": "内部错误",
"data": {"type": "internal_error"}
}
安全审计日志
python
import json
from datetime import datetime
from typing import Optional
class SecurityAuditLogger:
def __init__(self, log_file: str):
self.log_file = log_file
def log_event(self,
event_type: str,
user_id: Optional[str],
action: str,
resource: Optional[str] = None,
result: str = "success",
details: Optional[Dict[str, Any]] = None):
"""记录安全事件"""
event = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"user_id": user_id,
"action": action,
"resource": resource,
"result": result,
"details": details or {}
}
with open(self.log_file, 'a') as f:
f.write(json.dumps(event) + '\n')
# 使用示例
audit_logger = SecurityAuditLogger('/var/log/mcp_audit.log')
audit_logger.log_event(
event_type="authentication",
user_id="user_123",
action="login",
result="success"
)
audit_logger.log_event(
event_type="authorization",
user_id="user_123",
action="tools/call",
resource="file_read",
result="denied",
details={"reason": "insufficient_permissions"}
)
速率限制和 DDoS 防护
请求速率限制
python
import time
from collections import defaultdict, deque
from typing import Dict
class RateLimiter:
def __init__(self, max_requests: int, time_window: int):
self.max_requests = max_requests
self.time_window = time_window
self.requests: Dict[str, deque] = defaultdict(deque)
def is_allowed(self, client_id: str) -> bool:
"""检查客户端是否允许发送请求"""
now = time.time()
client_requests = self.requests[client_id]
# 清理过期的请求记录
while client_requests and client_requests[0] <= now - self.time_window:
client_requests.popleft()
# 检查是否超过限制
if len(client_requests) >= self.max_requests:
return False
# 记录新请求
client_requests.append(now)
return True
# 使用示例
rate_limiter = RateLimiter(max_requests=100, time_window=60) # 每分钟 100 请求
def handle_request(client_id: str, request: dict):
if not rate_limiter.is_allowed(client_id):
raise Exception("请求过于频繁,请稍后再试")
# 处理请求
pass
数据保护
敏感数据处理
python
import re
from typing import Any, Dict
class DataSanitizer:
def __init__(self):
# 敏感数据模式
self.sensitive_patterns = [
(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]'),
(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', '[CARD]'),
(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]'),
(r'sk-[a-zA-Z0-9]{48}', '[API_KEY]'),
(r'Bearer\s+[a-zA-Z0-9\-._~+/]+=*', '[TOKEN]')
]
def sanitize(self, data: Any) -> Any:
"""清理敏感数据"""
if isinstance(data, str):
return self._sanitize_string(data)
elif isinstance(data, dict):
return {k: self.sanitize(v) for k, v in data.items()}
elif isinstance(data, list):
return [self.sanitize(item) for item in data]
else:
return data
def _sanitize_string(self, text: str) -> str:
"""清理字符串中的敏感信息"""
for pattern, replacement in self.sensitive_patterns:
text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
return text
# 使用示例
sanitizer = DataSanitizer()
# 清理日志数据
log_data = {
"user": "john@example.com",
"api_key": "sk-1234567890abcdef1234567890abcdef12345678",
"message": "用户登录成功"
}
clean_data = sanitizer.sanitize(log_data)
# 结果: {"user": "[EMAIL]", "api_key": "[API_KEY]", "message": "用户登录成功"}
监控和告警
安全事件监控
python
import asyncio
from typing import Callable, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class SecurityEvent:
timestamp: datetime
event_type: str
severity: str
source: str
description: str
metadata: Dict[str, Any]
class SecurityMonitor:
def __init__(self):
self.handlers: List[Callable[[SecurityEvent], None]] = []
self.event_queue = asyncio.Queue()
def add_handler(self, handler: Callable[[SecurityEvent], None]):
"""添加事件处理器"""
self.handlers.append(handler)
async def emit_event(self, event: SecurityEvent):
"""发出安全事件"""
await self.event_queue.put(event)
async def process_events(self):
"""处理安全事件"""
while True:
event = await self.event_queue.get()
for handler in self.handlers:
try:
handler(event)
except Exception as e:
print(f"事件处理器错误: {e}")
# 事件处理器示例
def log_security_event(event: SecurityEvent):
"""记录安全事件"""
print(f"[{event.severity}] {event.timestamp}: {event.description}")
def alert_on_critical_event(event: SecurityEvent):
"""关键事件告警"""
if event.severity == "critical":
# 发送告警通知
send_alert(f"关键安全事件: {event.description}")
# 使用示例
monitor = SecurityMonitor()
monitor.add_handler(log_security_event)
monitor.add_handler(alert_on_critical_event)
# 发出安全事件
await monitor.emit_event(SecurityEvent(
timestamp=datetime.utcnow(),
event_type="authentication_failure",
severity="warning",
source="mcp_server",
description="多次登录失败",
metadata={"user_id": "user_123", "attempts": 5}
))
部署安全
容器安全
dockerfile
# 使用非 root 用户
FROM python:3.11-slim
# 创建非特权用户
RUN groupadd -r mcpuser && useradd -r -g mcpuser mcpuser
# 设置工作目录
WORKDIR /app
# 复制应用文件
COPY --chown=mcpuser:mcpuser . .
# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt
# 切换到非特权用户
USER mcpuser
# 暴露端口
EXPOSE 8080
# 启动应用
CMD ["python", "server.py"]
环境变量安全
python
import os
from typing import Optional
class SecureConfig:
def __init__(self):
self.secret_key = self._get_required_env('MCP_SECRET_KEY')
self.database_url = self._get_required_env('MCP_DATABASE_URL')
self.debug = self._get_env_bool('MCP_DEBUG', False)
def _get_required_env(self, key: str) -> str:
"""获取必需的环境变量"""
value = os.getenv(key)
if not value:
raise ValueError(f"必需的环境变量未设置: {key}")
return value
def _get_env_bool(self, key: str, default: bool) -> bool:
"""获取布尔型环境变量"""
value = os.getenv(key, str(default)).lower()
return value in ('true', '1', 'yes', 'on')
# 使用示例
config = SecureConfig()
安全检查清单
开发阶段
- [ ] 实施输入验证
- [ ] 使用参数化查询
- [ ] 实施权限检查
- [ ] 添加安全测试
部署阶段
- [ ] 启用 HTTPS/WSS
- [ ] 配置防火墙
- [ ] 设置监控和日志
- [ ] 定期安全扫描
运维阶段
- [ ] 定期更新依赖
- [ ] 监控安全事件
- [ ] 备份和恢复测试
- [ ] 安全培训