基于 Express 与 Redis Streams 构建可水平扩展的 WebRTC 信令服务器


最初的 WebRTC 信令服务器原型简单得可笑,一个 Node.js 进程,一个全局的 Map 对象,roomId 作为键,一个 Set 包含所有客户端的 WebSocket 连接作为值。它能工作,但在第一次架构评审时就被否决了。单点故障、无法水平扩展、进程重启后所有状态丢失——这些都是生产环境的禁忌。当流量上来,单个 Node.js 进程的 CPU 很快会成为瓶颈,增加实例是唯一的出路。但一旦部署多个实例,那个简单的 Map 就成了数据孤岛。一个房间内的两个用户,如果被负载均衡器分配到不同的服务器实例,他们将永远无法完成信令交换。

问题很明确:我们需要一个独立于应用服务器实例的、共享的、持久化的消息通道来处理信令。

第一个进入脑海的方案是 Redis Pub/Sub。轻量、快速,似乎很匹配。但它有一个致命缺陷:fire-and-forget。如果某个信令服务器实例因为网络抖动或重启,在订阅的瞬间错过了一条关键的 SDP offer 消息,那么这个 WebRTC 连接的建立就会彻底失败。对于信令这种要求高可靠性的场景,消息丢失是不可接受的。

我们需要的是一个消息队列的特性,但又不想引入像 RabbitMQ 或 Kafka 这样重量级的中间件,那会显著增加架构的复杂度和运维成本。目光最终落在了 Redis Streams 上。它像一个只能追加的日志文件,提供了持久化保证。更重要的是,它支持消费者组(Consumer Groups),这个特性完美契合了我们的多实例场景。多个信令服务器实例可以组成一个消费者组,共同消费同一个房间的信令流,Redis 会确保每条消息只被组内的一个消费者(一个服务器实例)处理。即使某个实例短暂下线,它重新上线后也能从上次消费的位置继续,不会丢失任何信令。这就是我们需要的技术选型。

架构设计与数据流

在动手编码之前,我们先用图表清晰地定义架构。客户端通过负载均衡器连接到任意一个可用的 Express.js 服务器实例。HTTP 请求用于房间的创建和加入,随后连接会升级为 WebSocket 用于实时信令交换。所有服务器实例共享同一个 Redis 实例。

graph TD
    subgraph "用户设备"
        ClientA[客户端 A]
        ClientB[客户端 B]
    end

    subgraph "负载均衡器"
        LB(Nginx / ALB)
    end

    subgraph "可扩展信令服务层 (Node.js/Express)"
        Server1[实例 1]
        Server2[实例 2]
        Server3[实例 N]
    end

    subgraph "共享状态与消息总线"
        Redis[(Redis Streams)]
    end

    ClientA -- wss:// --> LB -- 分发 --> Server1
    ClientB -- wss:// --> LB -- 分发 --> Server2

    Server1 -- XADD (发布信令) --> Redis
    Server2 -- XADD (发布信令) --> Redis

    Redis -- XREADGROUP (消费信令) --> Server1
    Redis -- XREADGROUP (消费信令) --> Server2
    Redis -- XREADGROUP (消费信令) --> Server3

    linkStyle 4 stroke:#f00,stroke-width:2px;
    linkStyle 5 stroke:#f00,stroke-width:2px;
    linkStyle 6 stroke:#00f,stroke-width:2px;
    linkStyle 7 stroke:#00f,stroke-width:2px;
    linkStyle 8 stroke:#00f,stroke-width:2px;

数据流如下:

  1. 客户端 A 连接到实例 1,客户端 B 连接到实例 2,它们希望加入同一个房间 room-123
  2. 客户端 A 通过 WebSocket 发送一个 offer SDP 给实例 1。
  3. 实例 1 并不直接寻找客户端 B 的 WebSocket 连接(因为它在实例 2 上),而是将这个 offer 消息通过 XADD 命令发布到 Redis Stream stream:room-123 中。
  4. 实例 1、2、3 都在监听 stream:room-123。Redis 将这条 offer 消息分发给消费者组中的一个成员,比如实例 2。
  5. 实例 2 收到消息后,发现是发给客户端 B 的,于是通过本地持有的 WebSocket 连接将 offer 转发给客户端 B。
  6. 后续的 answerice-candidate 交换遵循同样的路程,确保了跨实例的通信。

环境搭建与配置

我们将使用 TypeScript 来编写项目,以获得更好的类型安全。

package.json 依赖:

{
  "name": "scalable-webrtc-signaling",
  "version": "1.0.0",
  "main": "dist/server.js",
  "scripts": {
    "build": "tsc",
    "start": "node dist/server.js",
    "dev": "ts-node-dev --respawn --transpile-only src/server.ts"
  },
  "dependencies": {
    "express": "^4.18.2",
    "ioredis": "^5.3.2",
    "uuid": "^9.0.1",
    "ws": "^8.14.2"
  },
  "devDependencies": {
    "@types/express": "^4.17.20",
    "@types/node": "^20.8.10",
    "@types/uuid": "^9.0.6",
    "@types/ws": "^8.5.8",
    "ts-node-dev": "^2.0.0",
    "typescript": "^5.2.2"
  }
}

Redis 客户端封装

在真实项目中,直接在业务代码里使用 ioredis 实例是不可取的。我们需要一个专用的模块来管理 Redis 连接、处理重连逻辑,并提供业务相关的接口。

src/services/redis.service.ts

import IORedis, { Redis } from 'ioredis';
import { v4 as uuidv4 } from 'uuid';
import { logger } from '../utils/logger';

// 从环境变量中读取配置,这是生产级应用的实践
const REDIS_HOST = process.env.REDIS_HOST || '127.0.0.1';
const REDIS_PORT = parseInt(process.env.REDIS_PORT || '6379', 10);
const CONSUMER_GROUP_NAME = 'signaling_servers';
const CONSUMER_NAME = `consumer-${uuidv4()}`; // 每个实例都有唯一的消费者名称

class RedisService {
  private static instance: RedisService;
  public readonly client: Redis;
  private isConnected = false;

  private constructor() {
    this.client = new IORedis({
      host: REDIS_HOST,
      port: REDIS_PORT,
      maxRetriesPerRequest: 3, // 增加健壮性
      retryStrategy(times) {
        const delay = Math.min(times * 50, 2000);
        logger.warn(`Redis: Reconnecting in ${delay}ms (attempt ${times})`);
        return delay;
      },
    });

    this.client.on('connect', () => {
      this.isConnected = true;
      logger.info('Redis: Successfully connected.');
    });

    this.client.on('error', (err) => {
      logger.error('Redis: Connection error.', { error: err.message });
      this.isConnected = false;
    });

    this.client.on('close', () => {
        this.isConnected = false;
        logger.warn('Redis: Connection closed.');
    });
  }

  public static getInstance(): RedisService {
    if (!RedisService.instance) {
      RedisService.instance = new RedisService();
    }
    return RedisService.instance;
  }
  
  // 确保消费者组存在,这是一个幂等操作
  public async ensureConsumerGroupExists(streamKey: string): Promise<void> {
    try {
      await this.client.xgroup('CREATE', streamKey, CONSUMER_GROUP_NAME, '0', 'MKSTREAM');
      logger.info(`Redis: Consumer group '${CONSUMER_GROUP_NAME}' created for stream '${streamKey}'.`);
    } catch (error: any) {
      // 'BUSYGROUP Consumer Group name already exists' 是正常情况,我们忽略它
      if (error.message.includes('BUSYGROUP')) {
        logger.debug(`Redis: Consumer group '${CONSUMER_GROUP_NAME}' already exists for stream '${streamKey}'.`);
      } else {
        logger.error(`Redis: Failed to create consumer group for stream '${streamKey}'.`, { error: error.message });
        throw error;
      }
    }
  }

  public getConsumerId(): { group: string; consumer: string } {
    return { group: CONSUMER_GROUP_NAME, consumer: CONSUMER_NAME };
  }

  public isReady(): boolean {
    return this.isConnected && this.client.status === 'ready';
  }
}

export const redisService = RedisService.getInstance();

这个服务类使用了单例模式,并处理了连接日志和重试策略。每个服务器实例启动时都会生成一个唯一的消费者名称,这对于在消费者组中识别不同实例非常重要。ensureConsumerGroupExists 方法中的 MKSTREAM 选项意味着如果流不存在,Redis 会自动创建它,简化了房间创建的逻辑。

信令服务核心实现

我们的信令服务将由一个 SignalingManager 类来管理,它负责处理 WebSocket 连接和与 Redis Streams 的交互。

src/services/signaling.manager.ts

import { WebSocket, RawData } from 'ws';
import { v4 as uuidv4 } from 'uuid';
import { redisService } from './redis.service';
import { logger } from '../utils/logger';

// 定义信令消息的结构
interface SignalingMessage {
  type: 'offer' | 'answer' | 'candidate' | 'user-joined' | 'user-left';
  from: string; // 发送方的 socketId
  to?: string; // 接收方的 socketId, 'all' 表示广播
  payload: any;
}

// 管理每个 WebSocket 连接及其关联的房间和 ID
class ClientConnection {
  public readonly id: string;
  constructor(public readonly ws: WebSocket, public roomId: string) {
    this.id = uuidv4();
  }
}

class SignalingManager {
  // 使用 Map 在内存中快速查找本地连接
  private clients: Map<string, ClientConnection> = new Map();
  private roomListeners: Map<string, boolean> = new Map();

  constructor() {
    this.init();
  }

  // 服务器启动时,处理可能存在的未确认消息
  private async init() {
    logger.info('SignalingManager initialized.');
    // 实践中,这里可以添加逻辑来处理实例重启前未ACK的消息
  }

  public handleNewConnection(ws: WebSocket, roomId: string) {
    if (!redisService.isReady()) {
        ws.close(1011, 'Signaling service is not ready.');
        return;
    }

    const client = new ClientConnection(ws, roomId);
    this.clients.set(client.id, client);

    logger.info(`New client connected: ${client.id} to room ${roomId}`);

    ws.on('message', (message: RawData) => {
      this.handleClientMessage(client, message);
    });

    ws.on('close', () => {
      this.handleClientDisconnect(client);
    });
    
    ws.on('error', (err) => {
      logger.error(`WebSocket error for client ${client.id}`, { error: err.message });
    });

    // 为该房间启动 Redis Stream 监听器(如果尚未启动)
    this.startListeningToRoom(roomId);
    
    // 通知房间内其他用户有新人加入
    this.publishMessage(roomId, {
      type: 'user-joined',
      from: client.id,
      payload: { id: client.id }
    });
  }

  private async handleClientMessage(client: ClientConnection, rawMessage: RawData) {
    try {
      const message: SignalingMessage = JSON.parse(rawMessage.toString());
      // 消息必须包含发送方ID,我们从服务器侧强制设置
      message.from = client.id;
      logger.debug(`Received message from ${client.id} for room ${client.roomId}`, { message });
      await this.publishMessage(client.roomId, message);
    } catch (error) {
      logger.warn(`Failed to parse message from client ${client.id}`, { error });
    }
  }

  private handleClientDisconnect(client: ClientConnection) {
    this.clients.delete(client.id);
    logger.info(`Client disconnected: ${client.id} from room ${client.roomId}`);
    
    // 通知房间内其他用户有人离开
    this.publishMessage(client.roomId, {
      type: 'user-left',
      from: client.id,
      payload: { id: client.id }
    });

    // 注意:这里我们不停止监听房间,因为可能还有其他本地客户端。
    // 在生产环境中,需要一个更复杂的机制来决定何时停止监听。
  }

  // 将消息发布到 Redis Stream
  private async publishMessage(roomId: string, message: SignalingMessage): Promise<void> {
    const streamKey = `stream:room:${roomId}`;
    const messagePayload = JSON.stringify(message);

    try {
      await redisService.client.xadd(streamKey, '*', 'message', messagePayload);
    } catch (error) {
      logger.error(`Failed to publish message to Redis stream ${streamKey}`, { error });
    }
  }

  // 核心逻辑:为房间启动一个长轮询的监听器
  private async startListeningToRoom(roomId: string) {
    if (this.roomListeners.has(roomId)) {
      return; // 已经为这个房间启动了监听器
    }
    this.roomListeners.set(roomId, true);

    const streamKey = `stream:room:${roomId}`;
    const { group, consumer } = redisService.getConsumerId();
    
    await redisService.ensureConsumerGroupExists(streamKey);
    logger.info(`Starting listener for room ${roomId} using consumer ${consumer}`);

    const listen = async () => {
      if (!this.roomListeners.has(roomId)) {
        logger.info(`Stopping listener for room ${roomId}`);
        return; // 如果房间监听器被停止,则退出循环
      }

      try {
        // 使用 BLOCK 0 实现长轮询,等待新消息
        const response = await redisService.client.xreadgroup(
          'GROUP', group, consumer,
          'BLOCK', 0,
          'STREAMS', streamKey, '>'
        );

        if (response) {
          // response格式: [ [streamKey, [ [messageId, [field, value, ...]] ]] ]
          const messages = response[0][1];
          for (const [messageId, fields] of messages) {
            try {
              const messageJson = fields[1];
              const message: SignalingMessage = JSON.parse(messageJson);
              this.dispatchMessageToLocalClients(roomId, message);
              // 确认消息,防止重复消费
              await redisService.client.xack(streamKey, group, messageId);
            } catch (err) {
              logger.error(`Error processing message ${messageId} from ${streamKey}`, { error: err });
              // 在这里可以添加死信队列逻辑
            }
          }
        }
      } catch (error) {
        logger.error(`Error reading from Redis stream ${streamKey}`, { error });
        // 等待一段时间再重试,防止CPU空转
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
      
      // 递归调用,持续监听
      process.nextTick(listen);
    };

    listen();
  }

  // 将从 Redis 收到的消息分发给此实例上的相关客户端
  private dispatchMessageToLocalClients(roomId: string, message: SignalingMessage) {
    logger.debug(`Dispatching message in room ${roomId}`, { message });
    const messageStr = JSON.stringify(message);

    this.clients.forEach(client => {
      // 确保客户端在正确的房间,并且不是消息的发送方
      if (client.roomId === roomId && client.id !== message.from) {
        // 'to' 字段用于点对点消息
        if (!message.to || message.to === 'all' || message.to === client.id) {
          if (client.ws.readyState === WebSocket.OPEN) {
            client.ws.send(messageStr);
          }
        }
      }
    });
  }
}

export const signalingManager = new SignalingManager();

这段代码是整个系统的核心。startListeningToRoom 方法启动了一个异步的无限循环,使用 XREADGROUPBLOCK 选项来高效地等待新消息,避免了空轮询。收到消息后,dispatchMessageToLocalClients 负责将消息转发给连接到当前服务器实例的 WebSocket 客户端。XACK 命令是保证消息不被重复处理的关键。

整合 Express 和 WebSocket 服务器

最后一步是将所有部分整合到 Express 服务器中。

src/server.ts

import express from 'express';
import http from 'http';
import { WebSocketServer } from 'ws';
import { signalingManager } from './services/signaling.manager';
import { logger } from './utils/logger'; // 一个简单的日志工具,可以用 winston 等实现

const app = express();
const server = http.createServer(app);
const wss = new WebSocketServer({ noServer: true });

const PORT = process.env.PORT || 3000;

app.use(express.json());

// 一个简单的健康检查端点
app.get('/health', (req, res) => {
    res.status(200).send({ status: 'ok' });
});

// 处理 WebSocket 升级请求
server.on('upgrade', (request, socket, head) => {
  // 这里可以进行身份验证
  // 例如,解析 URL 查询参数来获取房间 ID 和 token
  const url = new URL(request.url!, `http://${request.headers.host}`);
  const roomId = url.searchParams.get('roomId');

  if (!roomId) {
    socket.write('HTTP/1.1 400 Bad Request\r\n\r\n');
    socket.destroy();
    return;
  }

  wss.handleUpgrade(request, socket, head, (ws) => {
    wss.emit('connection', ws, request, roomId);
  });
});

wss.on('connection', (ws, request, roomId: string) => {
  signalingManager.handleNewConnection(ws, roomId);
});

server.listen(PORT, () => {
  logger.info(`Server is running on http://localhost:${PORT}`);
  logger.info(`Instance ID: ${process.pid}`); // 打印进程ID,方便调试多实例场景
});

// 优雅关闭
process.on('SIGTERM', () => {
  logger.info('SIGTERM signal received: closing HTTP server');
  server.close(() => {
    logger.info('HTTP server closed');
    // 这里可以添加关闭 Redis 连接等的逻辑
    process.exit(0);
  });
});

server.on('upgrade', ...) 部分,我们拦截了标准的 HTTP 升级请求。这是将 WebSocket 与 Express 集成的标准方式。我们从 URL 中解析出 roomId,并在验证后才将其交给 ws 库处理。这种方式比直接将 WebSocket 服务器附加到 HTTP 服务器上更灵活,因为它允许我们在升级连接之前执行自定义逻辑,如认证。

方案的局限性与未来迭代

这个基于 Redis Streams 的架构解决了信令服务器的水平扩展和单点故障问题,但在生产环境中,还有几个问题需要考虑。

首先是 Redis Stream 的内存管理。Stream 会无限增长,如果不进行修剪,会耗尽 Redis 的内存。必须有一个后台任务或策略,使用 XTRIM 命令定期清理旧的、已经被所有消费者组确认的消息。清理策略可以是基于消息数量(MAXLEN)或消息ID(MINID,基于时间)。

其次是空房间的流清理。当一个房间的所有用户都离开后,对应的 Stream stream:room:{roomId} 就成了僵尸流。需要一个机制来检测并删除这些不再使用的流,比如通过 Redis 的 key 过期事件或一个独立的垃圾回收服务。

最后,这个架构只解决了信令的扩展性。当一个房间内的参与者数量增加时(例如超过 5-6 人),WebRTC 的全网状(Mesh)连接模式会导致客户端的上行带宽和 CPU 成为瓶颈。此时,架构的下一个演进方向是引入媒体服务器,如 SFU (Selective Forwarding Unit) 或 MCU (Multipoint Conferencing Unit)。信令服务器的角色将保持不变,但它需要与 SFU 协同工作,为客户端分配媒体流的收发任务。这标志着系统从纯粹的 P2P 辅助服务向真正的媒体处理中心演进。


  目录