使用 WebSockets、Redis、Kafka 和 PostgreSQL 构建可扩展的聊天应用程序

从 WhatsApp 到多人游戏,实时通信为一切提供了动力。虽然搭建一个 WebSocket 服务器可能感觉像一个周末就能完成的项目,但如何将其扩展到数千个用户而不至于在负载下崩溃呢?这正是架构真正重要的地方。

在本文中,我们将从简单的基本聊天设置开始,逐步构建一个容错、可水平扩展的实时消息传递系统。系统使用:

  • Redis 用于实时 fanout
  • Kafka 用于可靠缓冲
  • PostgreSQL 用于长期存储

下面将逐步讲解代码、架构以及每个步骤背后的设计思想。

问题 1:单一服务器的局限性

大多数实时应用程序都以相同的方式开始,基于 Socket.IO 或 WS 的服务器向连接的客户端广播消息:

io.on('connection', (socket) => {
  socket.on('message', (data) => {
    io.emit('message', data); // Broadcast to all clients
  });
});

✅ 在本地运行良好
❌但无法扩展。

当用户数量增加时,您需要在负载平衡器后面启动多态服务器。突然:

  • u1、u2、u3 连接到服务器 1
  • u4 连接到服务器 2

如果 u1 发送一条消息,u4 永远看不到。为什么会这样?因为 WebSocket 连接是粘性的,每个服务器只知道自己的客户端。

您刚刚撞上了可扩展性障碍。

解决方案 1:使用 Redis Pub/Sub 进行跨服务器通信

要解决这个问题,我们需要一个共享消息代理,帮助服务器之间相互通信。这就是 Redis Pub/Sub。

Redis 方案

  • 服务器向 Redis 频道发布消息;
  • 所有服务器都订阅相同的频道;
  • 当一台服务器发布消息时,每台服务器都会收到它,并将其推送给自己的客户端。
const pub = redis.createClient();
const sub = redis.createClient();
sub.subscribe('chat');
sub.on('message', (_, message) => {
  io.emit('message', JSON.parse(message));
});
io.on('connection', (socket) => {
  socket.on('message', (data) => {
    pub.publish('chat', JSON.stringify(data));
  });
});

流程摘要

User → WebSocket Server → Redis → All Servers → Connected Clients

现在 u1 和 u4 可以聊天,即使他们在不同的服务器上。

问题 2:消息未存储

虽然 Redis 可以处理广播,但它不会保留任何内容。

您需要历史消息记录,以便:

  • 新用户加入较晚
  • 离线同步
  • 分析
  • 信息合规性

错误的解决方案: 直接写入 PostgreSQL

socket.on('message', async (data) => {
  pub.publish('chat', JSON.stringify(data));
  await db.insert(data); // 负载时速度慢!
});

这对小流量有效,但:

  • 数据库写入会阻碍实时流;
  • 高 QPS 会使数据库过载;
  • 即使是暂时的减速也会影响用户体验。

解决方案 2:Kafka 用于缓冲、持久写入

我们使用 Apache Kafka 解耦实时和持久化路径。

双路径架构

  • 实时路径:
    User → Server → Redis → Clients
  • 持久化路径:
    Server → Kafka → Consumer → PostgreSQL
io.on('connection', (socket) => {
  socket.on('message', (data) => {
    pub.publish('chat', JSON.stringify(data)); // fast path
    kafkaProducer.send({
      topic: 'messages',
      messages: [{ value: JSON.stringify(data) }]
    }); // buffered path
  });
});

Consumer Service (DB Writer)

consumer.subscribe({ topic: 'messages' });
consumer.run({
  eachMessage: async ({ message }) => {
    const { userId, content, timestamp } = JSON.parse(message.value.toString());
    await db.query(`INSERT INTO messages VALUES (?, ?, ?)`, [userId, content, timestamp]);
  }
});

Kafka 确保:

  • 持久性:消息不会丢失
  • 背压处理:如果 DB 滞后,Kafka 会缓冲
  • 顾虑分离:消息传递保持快速,DB 保持安全

最终架构概述

使用 WebSockets、Redis、Kafka 和 PostgreSQL 构建可扩展的聊天应用程序

此架构为何有效

横向可扩展

  • 轻松添加更多 WebSocket 服务器
  • Kafka 和消费者可独立扩展

有弹性

  • 如果数据库宕机,Kafka 会保留消息
  • Redis 故障不会影响持久性

高性能

  • 实时消息传递从不受阻
  • 数据库写入以异步方式进行

生产就绪

  • 易于监控、调试和保护
  • 每一层都是模块化和可测试的

监控和安全

需要跟踪的指标

const msgCount = new prom.Counter({
  name: 'chat_messages_total',
  help: 'Total messages processed'
});
  • Redis 延迟
  • Kafka 队列长度
  • 消费者处理滞后
  • 数据库写入错误

安全必备

  • JWT 认证的 WebSocket 连接
  • 每个 IP/用户的速率限制
  • 输入清理
  • 所有传输均采用 TLS 加密

小结

构建实时应用程序令人兴奋,但扩展它们又是另一回事。Redis、Kafka 和 PostgreSQL 的组合,将为您提供兼具性能、持久性和可扩展性的完美组合。

因此,下次构建聊天应用程序时,请记住:快速交付、可靠存储、规模设计。

作者:Neel.S
来源:medium.

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/58957.html

(0)

相关推荐

发表回复

登录后才能评论