最初的 WebRTC 信令服务器原型简单得可笑,一个 Node.js 进程,一个全局的 Map
对象,roomId
作为键,一个 Set
包含所有客户端的 WebSocket 连接作为值。它能工作,但在第一次架构评审时就被否决了。单点故障、无法水平扩展、进程重启后所有状态丢失——这些都是生产环境的禁忌。当流量上来,单个 Node.js 进程的 CPU 很快会成为瓶颈,增加实例是唯一的出路。但一旦部署多个实例,那个简单的 Map
就成了数据孤岛。一个房间内的两个用户,如果被负载均衡器分配到不同的服务器实例,他们将永远无法完成信令交换。
问题很明确:我们需要一个独立于应用服务器实例的、共享的、持久化的消息通道来处理信令。
第一个进入脑海的方案是 Redis Pub/Sub。轻量、快速,似乎很匹配。但它有一个致命缺陷:fire-and-forget。如果某个信令服务器实例因为网络抖动或重启,在订阅的瞬间错过了一条关键的 SDP offer
消息,那么这个 WebRTC 连接的建立就会彻底失败。对于信令这种要求高可靠性的场景,消息丢失是不可接受的。
我们需要的是一个消息队列的特性,但又不想引入像 RabbitMQ 或 Kafka 这样重量级的中间件,那会显著增加架构的复杂度和运维成本。目光最终落在了 Redis Streams 上。它像一个只能追加的日志文件,提供了持久化保证。更重要的是,它支持消费者组(Consumer Groups),这个特性完美契合了我们的多实例场景。多个信令服务器实例可以组成一个消费者组,共同消费同一个房间的信令流,Redis 会确保每条消息只被组内的一个消费者(一个服务器实例)处理。即使某个实例短暂下线,它重新上线后也能从上次消费的位置继续,不会丢失任何信令。这就是我们需要的技术选型。
架构设计与数据流
在动手编码之前,我们先用图表清晰地定义架构。客户端通过负载均衡器连接到任意一个可用的 Express.js 服务器实例。HTTP 请求用于房间的创建和加入,随后连接会升级为 WebSocket 用于实时信令交换。所有服务器实例共享同一个 Redis 实例。
graph TD subgraph "用户设备" ClientA[客户端 A] ClientB[客户端 B] end subgraph "负载均衡器" LB(Nginx / ALB) end subgraph "可扩展信令服务层 (Node.js/Express)" Server1[实例 1] Server2[实例 2] Server3[实例 N] end subgraph "共享状态与消息总线" Redis[(Redis Streams)] end ClientA -- wss:// --> LB -- 分发 --> Server1 ClientB -- wss:// --> LB -- 分发 --> Server2 Server1 -- XADD (发布信令) --> Redis Server2 -- XADD (发布信令) --> Redis Redis -- XREADGROUP (消费信令) --> Server1 Redis -- XREADGROUP (消费信令) --> Server2 Redis -- XREADGROUP (消费信令) --> Server3 linkStyle 4 stroke:#f00,stroke-width:2px; linkStyle 5 stroke:#f00,stroke-width:2px; linkStyle 6 stroke:#00f,stroke-width:2px; linkStyle 7 stroke:#00f,stroke-width:2px; linkStyle 8 stroke:#00f,stroke-width:2px;
数据流如下:
- 客户端 A 连接到实例 1,客户端 B 连接到实例 2,它们希望加入同一个房间
room-123
。 - 客户端 A 通过 WebSocket 发送一个
offer
SDP 给实例 1。 - 实例 1 并不直接寻找客户端 B 的 WebSocket 连接(因为它在实例 2 上),而是将这个
offer
消息通过XADD
命令发布到 Redis Streamstream:room-123
中。 - 实例 1、2、3 都在监听
stream:room-123
。Redis 将这条offer
消息分发给消费者组中的一个成员,比如实例 2。 - 实例 2 收到消息后,发现是发给客户端 B 的,于是通过本地持有的 WebSocket 连接将
offer
转发给客户端 B。 - 后续的
answer
和ice-candidate
交换遵循同样的路程,确保了跨实例的通信。
环境搭建与配置
我们将使用 TypeScript 来编写项目,以获得更好的类型安全。
package.json
依赖:
{
"name": "scalable-webrtc-signaling",
"version": "1.0.0",
"main": "dist/server.js",
"scripts": {
"build": "tsc",
"start": "node dist/server.js",
"dev": "ts-node-dev --respawn --transpile-only src/server.ts"
},
"dependencies": {
"express": "^4.18.2",
"ioredis": "^5.3.2",
"uuid": "^9.0.1",
"ws": "^8.14.2"
},
"devDependencies": {
"@types/express": "^4.17.20",
"@types/node": "^20.8.10",
"@types/uuid": "^9.0.6",
"@types/ws": "^8.5.8",
"ts-node-dev": "^2.0.0",
"typescript": "^5.2.2"
}
}
Redis 客户端封装
在真实项目中,直接在业务代码里使用 ioredis
实例是不可取的。我们需要一个专用的模块来管理 Redis 连接、处理重连逻辑,并提供业务相关的接口。
src/services/redis.service.ts
import IORedis, { Redis } from 'ioredis';
import { v4 as uuidv4 } from 'uuid';
import { logger } from '../utils/logger';
// 从环境变量中读取配置,这是生产级应用的实践
const REDIS_HOST = process.env.REDIS_HOST || '127.0.0.1';
const REDIS_PORT = parseInt(process.env.REDIS_PORT || '6379', 10);
const CONSUMER_GROUP_NAME = 'signaling_servers';
const CONSUMER_NAME = `consumer-${uuidv4()}`; // 每个实例都有唯一的消费者名称
class RedisService {
private static instance: RedisService;
public readonly client: Redis;
private isConnected = false;
private constructor() {
this.client = new IORedis({
host: REDIS_HOST,
port: REDIS_PORT,
maxRetriesPerRequest: 3, // 增加健壮性
retryStrategy(times) {
const delay = Math.min(times * 50, 2000);
logger.warn(`Redis: Reconnecting in ${delay}ms (attempt ${times})`);
return delay;
},
});
this.client.on('connect', () => {
this.isConnected = true;
logger.info('Redis: Successfully connected.');
});
this.client.on('error', (err) => {
logger.error('Redis: Connection error.', { error: err.message });
this.isConnected = false;
});
this.client.on('close', () => {
this.isConnected = false;
logger.warn('Redis: Connection closed.');
});
}
public static getInstance(): RedisService {
if (!RedisService.instance) {
RedisService.instance = new RedisService();
}
return RedisService.instance;
}
// 确保消费者组存在,这是一个幂等操作
public async ensureConsumerGroupExists(streamKey: string): Promise<void> {
try {
await this.client.xgroup('CREATE', streamKey, CONSUMER_GROUP_NAME, '0', 'MKSTREAM');
logger.info(`Redis: Consumer group '${CONSUMER_GROUP_NAME}' created for stream '${streamKey}'.`);
} catch (error: any) {
// 'BUSYGROUP Consumer Group name already exists' 是正常情况,我们忽略它
if (error.message.includes('BUSYGROUP')) {
logger.debug(`Redis: Consumer group '${CONSUMER_GROUP_NAME}' already exists for stream '${streamKey}'.`);
} else {
logger.error(`Redis: Failed to create consumer group for stream '${streamKey}'.`, { error: error.message });
throw error;
}
}
}
public getConsumerId(): { group: string; consumer: string } {
return { group: CONSUMER_GROUP_NAME, consumer: CONSUMER_NAME };
}
public isReady(): boolean {
return this.isConnected && this.client.status === 'ready';
}
}
export const redisService = RedisService.getInstance();
这个服务类使用了单例模式,并处理了连接日志和重试策略。每个服务器实例启动时都会生成一个唯一的消费者名称,这对于在消费者组中识别不同实例非常重要。ensureConsumerGroupExists
方法中的 MKSTREAM
选项意味着如果流不存在,Redis 会自动创建它,简化了房间创建的逻辑。
信令服务核心实现
我们的信令服务将由一个 SignalingManager
类来管理,它负责处理 WebSocket 连接和与 Redis Streams 的交互。
src/services/signaling.manager.ts
import { WebSocket, RawData } from 'ws';
import { v4 as uuidv4 } from 'uuid';
import { redisService } from './redis.service';
import { logger } from '../utils/logger';
// 定义信令消息的结构
interface SignalingMessage {
type: 'offer' | 'answer' | 'candidate' | 'user-joined' | 'user-left';
from: string; // 发送方的 socketId
to?: string; // 接收方的 socketId, 'all' 表示广播
payload: any;
}
// 管理每个 WebSocket 连接及其关联的房间和 ID
class ClientConnection {
public readonly id: string;
constructor(public readonly ws: WebSocket, public roomId: string) {
this.id = uuidv4();
}
}
class SignalingManager {
// 使用 Map 在内存中快速查找本地连接
private clients: Map<string, ClientConnection> = new Map();
private roomListeners: Map<string, boolean> = new Map();
constructor() {
this.init();
}
// 服务器启动时,处理可能存在的未确认消息
private async init() {
logger.info('SignalingManager initialized.');
// 实践中,这里可以添加逻辑来处理实例重启前未ACK的消息
}
public handleNewConnection(ws: WebSocket, roomId: string) {
if (!redisService.isReady()) {
ws.close(1011, 'Signaling service is not ready.');
return;
}
const client = new ClientConnection(ws, roomId);
this.clients.set(client.id, client);
logger.info(`New client connected: ${client.id} to room ${roomId}`);
ws.on('message', (message: RawData) => {
this.handleClientMessage(client, message);
});
ws.on('close', () => {
this.handleClientDisconnect(client);
});
ws.on('error', (err) => {
logger.error(`WebSocket error for client ${client.id}`, { error: err.message });
});
// 为该房间启动 Redis Stream 监听器(如果尚未启动)
this.startListeningToRoom(roomId);
// 通知房间内其他用户有新人加入
this.publishMessage(roomId, {
type: 'user-joined',
from: client.id,
payload: { id: client.id }
});
}
private async handleClientMessage(client: ClientConnection, rawMessage: RawData) {
try {
const message: SignalingMessage = JSON.parse(rawMessage.toString());
// 消息必须包含发送方ID,我们从服务器侧强制设置
message.from = client.id;
logger.debug(`Received message from ${client.id} for room ${client.roomId}`, { message });
await this.publishMessage(client.roomId, message);
} catch (error) {
logger.warn(`Failed to parse message from client ${client.id}`, { error });
}
}
private handleClientDisconnect(client: ClientConnection) {
this.clients.delete(client.id);
logger.info(`Client disconnected: ${client.id} from room ${client.roomId}`);
// 通知房间内其他用户有人离开
this.publishMessage(client.roomId, {
type: 'user-left',
from: client.id,
payload: { id: client.id }
});
// 注意:这里我们不停止监听房间,因为可能还有其他本地客户端。
// 在生产环境中,需要一个更复杂的机制来决定何时停止监听。
}
// 将消息发布到 Redis Stream
private async publishMessage(roomId: string, message: SignalingMessage): Promise<void> {
const streamKey = `stream:room:${roomId}`;
const messagePayload = JSON.stringify(message);
try {
await redisService.client.xadd(streamKey, '*', 'message', messagePayload);
} catch (error) {
logger.error(`Failed to publish message to Redis stream ${streamKey}`, { error });
}
}
// 核心逻辑:为房间启动一个长轮询的监听器
private async startListeningToRoom(roomId: string) {
if (this.roomListeners.has(roomId)) {
return; // 已经为这个房间启动了监听器
}
this.roomListeners.set(roomId, true);
const streamKey = `stream:room:${roomId}`;
const { group, consumer } = redisService.getConsumerId();
await redisService.ensureConsumerGroupExists(streamKey);
logger.info(`Starting listener for room ${roomId} using consumer ${consumer}`);
const listen = async () => {
if (!this.roomListeners.has(roomId)) {
logger.info(`Stopping listener for room ${roomId}`);
return; // 如果房间监听器被停止,则退出循环
}
try {
// 使用 BLOCK 0 实现长轮询,等待新消息
const response = await redisService.client.xreadgroup(
'GROUP', group, consumer,
'BLOCK', 0,
'STREAMS', streamKey, '>'
);
if (response) {
// response格式: [ [streamKey, [ [messageId, [field, value, ...]] ]] ]
const messages = response[0][1];
for (const [messageId, fields] of messages) {
try {
const messageJson = fields[1];
const message: SignalingMessage = JSON.parse(messageJson);
this.dispatchMessageToLocalClients(roomId, message);
// 确认消息,防止重复消费
await redisService.client.xack(streamKey, group, messageId);
} catch (err) {
logger.error(`Error processing message ${messageId} from ${streamKey}`, { error: err });
// 在这里可以添加死信队列逻辑
}
}
}
} catch (error) {
logger.error(`Error reading from Redis stream ${streamKey}`, { error });
// 等待一段时间再重试,防止CPU空转
await new Promise(resolve => setTimeout(resolve, 5000));
}
// 递归调用,持续监听
process.nextTick(listen);
};
listen();
}
// 将从 Redis 收到的消息分发给此实例上的相关客户端
private dispatchMessageToLocalClients(roomId: string, message: SignalingMessage) {
logger.debug(`Dispatching message in room ${roomId}`, { message });
const messageStr = JSON.stringify(message);
this.clients.forEach(client => {
// 确保客户端在正确的房间,并且不是消息的发送方
if (client.roomId === roomId && client.id !== message.from) {
// 'to' 字段用于点对点消息
if (!message.to || message.to === 'all' || message.to === client.id) {
if (client.ws.readyState === WebSocket.OPEN) {
client.ws.send(messageStr);
}
}
}
});
}
}
export const signalingManager = new SignalingManager();
这段代码是整个系统的核心。startListeningToRoom
方法启动了一个异步的无限循环,使用 XREADGROUP
的 BLOCK
选项来高效地等待新消息,避免了空轮询。收到消息后,dispatchMessageToLocalClients
负责将消息转发给连接到当前服务器实例的 WebSocket 客户端。XACK
命令是保证消息不被重复处理的关键。
整合 Express 和 WebSocket 服务器
最后一步是将所有部分整合到 Express 服务器中。
src/server.ts
import express from 'express';
import http from 'http';
import { WebSocketServer } from 'ws';
import { signalingManager } from './services/signaling.manager';
import { logger } from './utils/logger'; // 一个简单的日志工具,可以用 winston 等实现
const app = express();
const server = http.createServer(app);
const wss = new WebSocketServer({ noServer: true });
const PORT = process.env.PORT || 3000;
app.use(express.json());
// 一个简单的健康检查端点
app.get('/health', (req, res) => {
res.status(200).send({ status: 'ok' });
});
// 处理 WebSocket 升级请求
server.on('upgrade', (request, socket, head) => {
// 这里可以进行身份验证
// 例如,解析 URL 查询参数来获取房间 ID 和 token
const url = new URL(request.url!, `http://${request.headers.host}`);
const roomId = url.searchParams.get('roomId');
if (!roomId) {
socket.write('HTTP/1.1 400 Bad Request\r\n\r\n');
socket.destroy();
return;
}
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request, roomId);
});
});
wss.on('connection', (ws, request, roomId: string) => {
signalingManager.handleNewConnection(ws, roomId);
});
server.listen(PORT, () => {
logger.info(`Server is running on http://localhost:${PORT}`);
logger.info(`Instance ID: ${process.pid}`); // 打印进程ID,方便调试多实例场景
});
// 优雅关闭
process.on('SIGTERM', () => {
logger.info('SIGTERM signal received: closing HTTP server');
server.close(() => {
logger.info('HTTP server closed');
// 这里可以添加关闭 Redis 连接等的逻辑
process.exit(0);
});
});
在 server.on('upgrade', ...)
部分,我们拦截了标准的 HTTP 升级请求。这是将 WebSocket 与 Express 集成的标准方式。我们从 URL 中解析出 roomId
,并在验证后才将其交给 ws
库处理。这种方式比直接将 WebSocket 服务器附加到 HTTP 服务器上更灵活,因为它允许我们在升级连接之前执行自定义逻辑,如认证。
方案的局限性与未来迭代
这个基于 Redis Streams 的架构解决了信令服务器的水平扩展和单点故障问题,但在生产环境中,还有几个问题需要考虑。
首先是 Redis Stream 的内存管理。Stream 会无限增长,如果不进行修剪,会耗尽 Redis 的内存。必须有一个后台任务或策略,使用 XTRIM
命令定期清理旧的、已经被所有消费者组确认的消息。清理策略可以是基于消息数量(MAXLEN)或消息ID(MINID,基于时间)。
其次是空房间的流清理。当一个房间的所有用户都离开后,对应的 Stream stream:room:{roomId}
就成了僵尸流。需要一个机制来检测并删除这些不再使用的流,比如通过 Redis 的 key 过期事件或一个独立的垃圾回收服务。
最后,这个架构只解决了信令的扩展性。当一个房间内的参与者数量增加时(例如超过 5-6 人),WebRTC 的全网状(Mesh)连接模式会导致客户端的上行带宽和 CPU 成为瓶颈。此时,架构的下一个演进方向是引入媒体服务器,如 SFU (Selective Forwarding Unit) 或 MCU (Multipoint Conferencing Unit)。信令服务器的角色将保持不变,但它需要与 SFU 协同工作,为客户端分配媒体流的收发任务。这标志着系统从纯粹的 P2P 辅助服务向真正的媒体处理中心演进。