从 WhatsApp 到多人游戏,实时通信为一切提供了动力。虽然搭建一个 WebSocket 服务器可能感觉像一个周末就能完成的项目,但如何将其扩展到数千个用户而不至于在负载下崩溃呢?这正是架构真正重要的地方。
在本文中,我们将从简单的基本聊天设置开始,逐步构建一个容错、可水平扩展的实时消息传递系统。系统使用:
- Redis 用于实时 fanout
- Kafka 用于可靠缓冲
- PostgreSQL 用于长期存储
下面将逐步讲解代码、架构以及每个步骤背后的设计思想。
问题 1:单一服务器的局限性
大多数实时应用程序都以相同的方式开始,基于 Socket.IO 或 WS 的服务器向连接的客户端广播消息:
io.on('connection', (socket) => {
socket.on('message', (data) => {
io.emit('message', data); // Broadcast to all clients
});
});
✅ 在本地运行良好
❌但无法扩展。
当用户数量增加时,您需要在负载平衡器后面启动多态服务器。突然:
- u1、u2、u3 连接到服务器 1
- u4 连接到服务器 2
如果 u1 发送一条消息,u4 永远看不到。为什么会这样?因为 WebSocket 连接是粘性的,每个服务器只知道自己的客户端。
您刚刚撞上了可扩展性障碍。
解决方案 1:使用 Redis Pub/Sub 进行跨服务器通信
要解决这个问题,我们需要一个共享消息代理,帮助服务器之间相互通信。这就是 Redis Pub/Sub。
Redis 方案
- 服务器向 Redis 频道发布消息;
- 所有服务器都订阅相同的频道;
- 当一台服务器发布消息时,每台服务器都会收到它,并将其推送给自己的客户端。
const pub = redis.createClient();
const sub = redis.createClient();
sub.subscribe('chat');
sub.on('message', (_, message) => {
io.emit('message', JSON.parse(message));
});
io.on('connection', (socket) => {
socket.on('message', (data) => {
pub.publish('chat', JSON.stringify(data));
});
});
流程摘要
User → WebSocket Server → Redis → All Servers → Connected Clients
现在 u1 和 u4 可以聊天,即使他们在不同的服务器上。
问题 2:消息未存储
虽然 Redis 可以处理广播,但它不会保留任何内容。
您需要历史消息记录,以便:
- 新用户加入较晚
- 离线同步
- 分析
- 信息合规性
错误的解决方案: 直接写入 PostgreSQL
socket.on('message', async (data) => {
pub.publish('chat', JSON.stringify(data));
await db.insert(data); // 负载时速度慢!
});
这对小流量有效,但:
- 数据库写入会阻碍实时流;
- 高 QPS 会使数据库过载;
- 即使是暂时的减速也会影响用户体验。
解决方案 2:Kafka 用于缓冲、持久写入
我们使用 Apache Kafka 解耦实时和持久化路径。
双路径架构
- 实时路径:
User → Server → Redis → Clients
- 持久化路径:
Server → Kafka → Consumer → PostgreSQL
io.on('connection', (socket) => {
socket.on('message', (data) => {
pub.publish('chat', JSON.stringify(data)); // fast path
kafkaProducer.send({
topic: 'messages',
messages: [{ value: JSON.stringify(data) }]
}); // buffered path
});
});
Consumer Service (DB Writer)
consumer.subscribe({ topic: 'messages' });
consumer.run({
eachMessage: async ({ message }) => {
const { userId, content, timestamp } = JSON.parse(message.value.toString());
await db.query(`INSERT INTO messages VALUES (?, ?, ?)`, [userId, content, timestamp]);
}
});
Kafka 确保:
- 持久性:消息不会丢失
- 背压处理:如果 DB 滞后,Kafka 会缓冲
- 顾虑分离:消息传递保持快速,DB 保持安全
最终架构概述

此架构为何有效
横向可扩展
- 轻松添加更多 WebSocket 服务器
- Kafka 和消费者可独立扩展
有弹性
- 如果数据库宕机,Kafka 会保留消息
- Redis 故障不会影响持久性
高性能
- 实时消息传递从不受阻
- 数据库写入以异步方式进行
生产就绪
- 易于监控、调试和保护
- 每一层都是模块化和可测试的
监控和安全
需要跟踪的指标
const msgCount = new prom.Counter({
name: 'chat_messages_total',
help: 'Total messages processed'
});
- Redis 延迟
- Kafka 队列长度
- 消费者处理滞后
- 数据库写入错误
安全必备
- JWT 认证的 WebSocket 连接
- 每个 IP/用户的速率限制
- 输入清理
- 所有传输均采用 TLS 加密
小结
构建实时应用程序令人兴奋,但扩展它们又是另一回事。Redis、Kafka 和 PostgreSQL 的组合,将为您提供兼具性能、持久性和可扩展性的完美组合。
因此,下次构建聊天应用程序时,请记住:快速交付、可靠存储、规模设计。
作者:Neel.S
来源:medium.
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/58957.html