任何超过两个参与者的 WebRTC 应用,其复杂性都会从 P2P 连接的媒体协商,迅速转移到服务器端的状态管理上。当需求从简单的“双人通话”演变为“支持数百个动态创建房间、数千个并发用户的协作平台”时,一个天真的、频繁读写数据库的信令服务器会立刻崩溃。问题的核心在于,WebRTC 信令交互是瞬时且密集的,而用户和房间的生命周期状态则是持久化的。将这两者混为一谈,是架构设计上的第一个、也是最致命的错误。
我们最初的构想是为每个信令消息(如 join
, leave
, offer
)都去数据库验证和更新状态。这个方案在原型阶段表现尚可,但在内部压力测试中,当并发连接数超过200时,MariaDB 的连接池被打满,CPU 的 iowait
指标飙升,信令延迟从几十毫秒恶化到数秒,最终导致大量的客户端因超时而连接失败。这迫使我们重新审视整个状态管理模型。
痛点很明确:我们需要一个能处理高频读写的实时状态层,同时还需要一个能提供持久化和数据一致性的存储层。将 MariaDB 用于后者是正确的选择,它负责存储用户账户、房间配置、权限等不频繁变更的数据。而前者的实现,我们决定在 Tornado 服务进程的内存中构建一个专门的状态管理器。这规避了网络和磁盘 I/O,但引入了新的挑战:状态的线程安全、进程崩溃后的状态丢失,以及未来水平扩展的难题。这是一个典型的架构权衡,我们选择了优先解决眼下的性能瓶颈。
架构核心:内存状态与持久化存储的分离
我们的架构决策是将状态分为两类:
- 持久化状态 (Persistent State): 存储在 MariaDB 中。这包括用户认证信息、房间的静态元数据(如房间名、最大人数、是否需要密码等)。这部分数据的特点是读多写少。
- 临时状态 (Ephemeral State): 存在于 Tornado 应用的内存中。这包括当前在线的用户、每个房间内的参与者列表、每个参与者的 WebSocket 连接实例、以及他们的 WebRTC 会话状态(如是否已发送 Offer)。这部分数据读写极其频繁。
下面的 Mermaid 图清晰地展示了“用户加入房间”这一关键流程,体现了这种分离设计。
sequenceDiagram participant Client as JavaScript 客户端 participant Server as Tornado WebSocket participant StateMgr as 内存状态管理器 participant DB as MariaDB Client->>+Server: 发送 `join` 消息 (附带 token, roomId) Server->>+DB: 使用 token 查询用户信息 DB-->>-Server: 返回用户信息 (合法) Server->>+StateMgr: 调用 `add_peer_to_room(user, roomId, ws_connection)` StateMgr->>StateMgr: 1. 查找或创建 Room 对象 StateMgr->>StateMgr: 2. 创建 Peer 对象 StateMgr->>StateMgr: 3. 将 Peer 添加到 Room (加锁) StateMgr-->>-Server: 返回成功,及房间内其他 Peer 列表 Server->>-Client: 回复 `join_success` (附带 Peer 列表) loop 遍历房间内其他 Peer Server-->>OtherClients: 广播 `peer_joined` 消息 (新 Peer 信息) end
这种架构下,数据库只在会话建立的初始阶段(如用户加入房间时)被访问一次,用于鉴权和获取基础数据。之后所有的信令交互,例如转发 SDP Offer/Answer 或 ICE Candidate,都只在内存状态管理器中进行,速度极快。
数据库模型设计 (MariaDB)
数据库层保持尽可能的简单,只负责它最擅长的事情。
users
表:
存储用户信息,auth_token
用于 WebSocket 连接时的快速验证。在生产环境中,这通常是与主业务账户系统关联的外键。
rooms
表:
存储房间的持久化配置。
-- DDL for MariaDB
CREATE TABLE `users` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`username` VARCHAR(50) NOT NULL UNIQUE,
`auth_token` VARCHAR(255) NOT NULL UNIQUE,
`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `rooms` (
`id` VARCHAR(64) PRIMARY KEY, -- 使用 UUID 或唯一字符串作为房间 ID
`name` VARCHAR(100) NOT NULL,
`max_peers` INT DEFAULT 10,
`owner_id` INT,
`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (`owner_id`) REFERENCES `users`(`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 为性能关键的查询添加索引
CREATE INDEX `idx_auth_token` ON `users` (`auth_token`);
这里的关键是 users.auth_token
上的索引,它确保了每次 WebSocket 连接建立时的身份验证都非常高效。
内存状态管理器的 Python 实现
这是整个系统的核心。我们设计了三个主要的类:Peer
,Room
和 StateManager
。为了保证在异步环境中对共享数据结构(如房间内的 peer 列表)的访问是安全的,我们必须使用 asyncio.Lock
。
# state_manager.py
import asyncio
import logging
from typing import Dict, Optional, Set
from tornado.websocket import WebSocketHandler
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class Peer:
"""代表一个连接到房间的参与者"""
def __init__(self, user_id: int, username: str, connection: WebSocketHandler):
self.user_id = user_id
self.username = username
self.connection = connection
async def send(self, message: Dict):
"""向该 Peer 发送消息"""
if self.connection and self.connection.ws_connection:
try:
await self.connection.write_message(message)
except Exception as e:
logging.warning(f"Failed to send message to peer {self.user_id}: {e}")
class Room:
"""代表一个房间,管理其中的所有 Peer"""
def __init__(self, room_id: str):
self.room_id = room_id
self.peers: Dict[int, Peer] = {}
self._lock = asyncio.Lock()
async def add_peer(self, peer: Peer) -> Set[Peer]:
"""将一个 Peer 添加到房间,并返回房间内已有的其他 Peer"""
async with self._lock:
if peer.user_id in self.peers:
logging.warning(f"Peer {peer.user_id} already in room {self.room_id}")
return set()
# 获取当前房间内的其他 peers,用于通知新加入者
other_peers = set(self.peers.values())
self.peers[peer.user_id] = peer
logging.info(f"Peer {peer.user_id} ({peer.username}) joined room {self.room_id}")
return other_peers
async def remove_peer(self, user_id: int) -> Optional[Peer]:
"""从房间中移除一个 Peer"""
async with self._lock:
peer = self.peers.pop(user_id, None)
if peer:
logging.info(f"Peer {user_id} left room {self.room_id}")
if not self.peers:
# 房间为空,返回 True 以便 StateManager 可以清理它
return peer
return peer
async def broadcast(self, message: Dict, exclude_ids: Set[int] = None):
"""向房间内所有(或除指定外的所有)Peer 广播消息"""
async with self._lock:
peers_to_notify = [
p for uid, p in self.peers.items()
if not exclude_ids or uid not in exclude_ids
]
# 在锁外执行 IO 操作,避免长时间持有锁
tasks = [peer.send(message) for peer in peers_to_notify]
if tasks:
await asyncio.gather(*tasks)
async def get_peer(self, user_id: int) -> Optional[Peer]:
"""根据 user_id 获取 Peer 实例"""
async with self._lock:
return self.peers.get(user_id)
class StateManager:
"""
单例模式的状态管理器,管理所有房间
在真实项目中,这个单例的初始化和访问需要更严谨的控制
"""
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(StateManager, cls).__new__(cls, *args, **kwargs)
return cls._instance
def __init__(self):
# 防止重复初始化
if hasattr(self, '_initialized'):
return
self.rooms: Dict[str, Room] = {}
self._lock = asyncio.Lock()
self._initialized = True
logging.info("StateManager initialized.")
async def get_or_create_room(self, room_id: str) -> Room:
"""获取或创建一个房间实例"""
async with self._lock:
if room_id not in self.rooms:
logging.info(f"Creating new room: {room_id}")
self.rooms[room_id] = Room(room_id)
return self.rooms[room_id]
async def remove_room_if_empty(self, room_id: str):
"""如果房间为空,则从管理器中移除"""
async with self._lock:
room = self.rooms.get(room_id)
if room and not room.peers:
logging.info(f"Removing empty room: {room_id}")
del self.rooms[room_id]
# 创建一个全局实例
state_manager = StateManager()
这段代码的几个关键设计点:
- 异步锁 (
asyncio.Lock
): 每当对共享资源(如Room
类的peers
字典)进行修改时,都必须获取锁。这可以防止在并发环境下出现竞态条件。 - 锁的作用域: 锁的持有时间应尽可能短。在
broadcast
方法中,我们先在锁内复制需要通知的peer
列表,然后在锁外执行实际的网络发送操作 (asyncio.gather
)。这是一个常见的优化,避免了 I/O 操作阻塞关键区。 - 单例模式:
StateManager
被设计为单例,确保整个 Tornado 应用中只有一个状态管理实例。 - 空房间清理: 当一个
Peer
离开后,如果房间变空,StateManager
会将其从字典中移除,以回收内存。
Tornado WebSocket Handler 的实现
WebSocketHandler
是信令服务器的入口点,它负责解析客户端消息,并调用 StateManager
来更新和查询状态。
# server.py
import json
import tornado.ioloop
import tornado.web
import tornado.websocket
import aiomysql
from typing import Any, Awaitable
from state_manager import state_manager, Peer
# 模拟数据库连接池,在生产环境中应使用更健壮的库
async def get_db_pool():
# 这里的配置应该来自配置文件
return await aiomysql.create_pool(
host='127.0.0.1', port=3306,
user='your_user', password='your_password',
db='webrtc_db', autocommit=True
)
class SignalingHandler(tornado.websocket.WebSocketHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.peer: Optional[Peer] = None
self.room_id: Optional[str] = None
self.db_pool = self.application.settings['db_pool']
async def open(self):
logging.info("WebSocket connection opened.")
async def on_close(self):
logging.info("WebSocket connection closed.")
if self.peer and self.room_id:
room = await state_manager.get_or_create_room(self.room_id)
await room.remove_peer(self.peer.user_id)
# 广播 peer 离开的消息
await room.broadcast({
"type": "peer_left",
"payload": {"userId": self.peer.user_id}
})
# 检查房间是否需要被清理
await state_manager.remove_room_if_empty(self.room_id)
async def on_message(self, message: str):
try:
data = json.loads(message)
msg_type = data.get("type")
payload = data.get("payload", {})
except json.JSONDecodeError:
logging.error("Invalid JSON received.")
return
# 路由到不同的处理函数
# 一个常见的错误是把所有逻辑都堆在这里,导致 on_message 变得臃肿不堪
handler_method = getattr(self, f"handle_{msg_type}", None)
if handler_method and callable(handler_method):
await handler_method(payload)
else:
logging.warning(f"Unknown message type: {msg_type}")
async def handle_join(self, payload: Dict[str, Any]):
"""处理加入房间的请求"""
token = payload.get("token")
room_id = payload.get("roomId")
if not token or not room_id:
await self.write_message({"type": "error", "payload": {"message": "Missing token or roomId"}})
self.close()
return
# 1. 数据库认证
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT id, username FROM users WHERE auth_token = %s", (token,))
user_data = await cur.fetchone()
if not user_data:
await self.write_message({"type": "error", "payload": {"message": "Invalid token"}})
self.close()
return
user_id, username = user_data
# 2. 更新内存状态
self.room_id = room_id
self.peer = Peer(user_id, username, self)
room = await state_manager.get_or_create_room(room_id)
other_peers = await room.add_peer(self.peer)
# 3. 响应和广播
await self.write_message({
"type": "join_success",
"payload": {
"selfId": user_id,
"peers": [
{"userId": p.user_id, "username": p.username} for p in other_peers
]
}
})
await room.broadcast({
"type": "peer_joined",
"payload": {"userId": user_id, "username": username}
}, exclude_ids={user_id})
async def handle_relay(self, payload: Dict[str, Any]):
"""处理需要转发的消息,如 SDP 和 ICE Candidate"""
target_id = payload.get("targetId")
if not target_id or not self.peer or not self.room_id:
return
room = await state_manager.get_or_create_room(self.room_id)
target_peer = await room.get_peer(target_id)
if target_peer:
# 在消息中注入发送方 ID
payload["senderId"] = self.peer.user_id
await target_peer.send({
"type": payload.get("signalType"), # e.g., "offer", "answer", "candidate"
"payload": payload
})
else:
logging.warning(f"Relay failed: Target peer {target_id} not found in room {self.room_id}")
def make_app(db_pool):
return tornado.web.Application(
[(r"/ws", SignalingHandler)],
db_pool=db_pool
)
async def main():
db_pool = await get_db_pool()
app = make_app(db_pool)
app.listen(8888)
logging.info("Server started on port 8888")
await asyncio.Event().wait()
if __name__ == "__main__":
tornado.ioloop.IOLoop.current().run_sync(main)
在这个处理器中:
-
on_close
的健壮性: 这是生产级代码的关键。无论客户端是正常关闭还是异常断开(如关闭浏览器标签页),on_close
都会被触发。我们必须在这里执行清理逻辑,将Peer
从Room
中移除,并通知房间里的其他人。否则,就会产生“幽灵用户”。 - 消息路由: 使用
getattr
动态分发消息到不同的handle_
方法,这比一个巨大的if-elif-else
结构更清晰,也更易于扩展。 -
handle_relay
: 所有需要点对点转发的信令(offer
,answer
,candidate
)都通过一个统一的relay
类型消息来处理,服务器只负责根据targetId
查找目标Peer
并转发,不对信令内容做任何解析。这使得信令服务器的职责非常单一。
前端 JavaScript 客户端逻辑
前端需要一个对应的状态机来处理与信令服务器的交互和 RTCPeerConnection
的管理。
// client.js - A simplified client-side implementation
class WebRTCManager {
constructor(roomId, token, signalServerUrl) {
this.roomId = roomId;
this.token = token;
this.ws = new WebSocket(signalServerUrl);
this.peerConnections = new Map(); // Map<userId, RTCPeerConnection>
this.selfId = null;
// WebRTC STUN server configuration
this.rtcConfig = {
iceServers: [{ urls: 'stun:stun.l.google.com:19302' }]
};
this.ws.onopen = () => {
console.log("WebSocket connected. Joining room...");
this.sendMessage('join', { roomId: this.roomId, token: this.token });
};
this.ws.onmessage = (event) => this.handleMessage(event);
this.ws.onclose = () => console.error("WebSocket disconnected.");
this.ws.onerror = (err) => console.error("WebSocket error:", err);
}
sendMessage(type, payload) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type, payload }));
}
}
async handleMessage(event) {
const { type, payload } = JSON.parse(event.data);
console.log(`Received message: ${type}`, payload);
switch (type) {
case 'join_success':
this.selfId = payload.selfId;
console.log(`Joined room successfully. My ID is ${this.selfId}`);
// For each existing peer, create an offer
for (const peer of payload.peers) {
this.createPeerConnection(peer.userId, true);
}
break;
case 'peer_joined':
console.log(`Peer ${payload.userId} (${payload.username}) joined.`);
// A new peer joined, I will initiate the connection
this.createPeerConnection(payload.userId, true);
break;
case 'peer_left':
console.log(`Peer ${payload.userId} left.`);
if (this.peerConnections.has(payload.userId)) {
this.peerConnections.get(payload.userId).close();
this.peerConnections.delete(payload.userId);
}
break;
case 'offer':
this.handleOffer(payload);
break;
case 'answer':
this.handleAnswer(payload);
break;
case 'candidate':
this.handleCandidate(payload);
break;
case 'error':
console.error(`Server error: ${payload.message}`);
break;
}
}
createPeerConnection(targetId, isInitiator) {
if (this.peerConnections.has(targetId)) return;
const pc = new RTCPeerConnection(this.rtcConfig);
this.peerConnections.set(targetId, pc);
pc.onicecandidate = (event) => {
if (event.candidate) {
this.sendMessage('relay', {
targetId: targetId,
signalType: 'candidate',
candidate: event.candidate
});
}
};
pc.ontrack = (event) => {
console.log(`Received track from ${targetId}`);
// Here you would attach event.streams[0] to a <video> element
};
// Add local media stream tracks to the connection (assuming you have a localStream)
// localStream.getTracks().forEach(track => pc.addTrack(track, localStream));
if (isInitiator) {
pc.createOffer()
.then(offer => pc.setLocalDescription(offer))
.then(() => {
this.sendMessage('relay', {
targetId: targetId,
signalType: 'offer',
sdp: pc.localDescription
});
})
.catch(e => console.error("Create offer error:", e));
}
}
async handleOffer(payload) {
const { senderId, sdp } = payload;
if (!this.peerConnections.has(senderId)) {
this.createPeerConnection(senderId, false);
}
const pc = this.peerConnections.get(senderId);
try {
await pc.setRemoteDescription(new RTCSessionDescription(sdp));
const answer = await pc.createAnswer();
await pc.setLocalDescription(answer);
this.sendMessage('relay', {
targetId: senderId,
signalType: 'answer',
sdp: pc.localDescription
});
} catch(e) {
console.error("Handle offer error:", e);
}
}
async handleAnswer(payload) {
const { senderId, sdp } = payload;
const pc = this.peerConnections.get(senderId);
if (pc) {
await pc.setRemoteDescription(new RTCSessionDescription(sdp));
}
}
async handleCandidate(payload) {
const { senderId, candidate } = payload;
const pc = this.peerConnections.get(senderId);
if (pc) {
await pc.addIceCandidate(new RTCIceCandidate(candidate));
}
}
}
客户端代码展示了如何响应服务器的各种状态通知,并相应地创建、管理和销毁 RTCPeerConnection
实例。这是服务器状态管理在客户端的直接体现。
方案的局限性与未来迭代
当前这套架构在单机部署的情况下,性能表现优异,足以支撑中等规模(数千并发连接)的应用。但它的局限性也非常明显:
- 单点故障 (SPOF): Tornado 服务器进程是单点的。如果进程崩溃,所有内存中的状态(房间、参与者)将全部丢失。虽然用户可以重新连接,但所有进行中的通话都会被中断。
- 垂直扩展限制: 所有的负载都由单个 Python 进程处理。受限于单个服务器的 CPU 和内存,它无法无限扩展。
要解决这些问题,未来的迭代方向是明确的:
- 状态的分布式化: 将内存状态从 Tornado 进程中剥离出来,存放到一个外部的、高速的、分布式的存储中,比如 Redis 或一个专门的内存数据网格(如 Hazelcast, Apache Ignite)。Tornado 实例将变成无状态的信令网关,它们都从这个共享的状态存储中读写数据。
- 服务间通信: 当一个 Tornado 实例需要向另一个实例上的 WebSocket 连接发送消息时,就需要一个发布/订阅系统(如 Redis Pub/Sub, NATS, or RabbitMQ)。例如,用户A连接在服务器1上,用户B连接在服务器2上,当A发送信令给B时,服务器1会发布一条消息到特定频道,服务器2订阅了该频道,接收消息后再通过 WebSocket 转发给B。
这种演进会将架构从一个单体信令服务器,转变为一个分布式的信令服务集群。这会带来更高的复杂性,比如需要处理网络分区、保证消息顺序和一致性等,但它也为系统的水平扩展和高可用性铺平了道路。选择何时进行这种演进,取决于业务的实际规模和对服务可用性的要求。