在管理许多有状态连接时,使用分布式系统会带来新的复杂性,就像在 Web 应用程序中实现实时消息传递功能时所必须的那样。本文旨在提供一个简洁的示例,说明如何使用 WebSockets 向客户端发送实时数据,并利用 Redis 作为 pub/sub 代理将传入数据导向正确的套接字。该示例是 MERN 网络应用(MongoDB、Express.js、React、Node.js)的一部分。
创建 WebSocketServer 以处理来自客户端的 WebSocket 连接
import ws from "ws";
interface WSS {
server: ws.WebSocketServer;
clients: Map<string, ws.WebSocket>;
}
const wss: WSS = {
server: new ws.WebSocketServer({ noServer: true, clientTracking: false }),
clients: new Map(),
}
第一步是在 HTTP 服务器旁创建一个 WebSocketServer。我们还需要跟踪打开的客户端连接,因此需要初始化一个新对象,该对象包含服务器和一个包含客户端套接字的映射。我们以 noServer 模式初始化,并将 clientTracking 设为 false,以确保它与 HTTP 服务器完全分离。
接下来,我们需要允许 WebSocketServer 处理对 HTTP 服务器的 “升级 “请求。如果您不熟悉,WebSocket 连接需要经过三个步骤:
- 客户端通过向服务器发送 HTTP GET 请求来启动连接,请求中包含表示协议升级的特殊标头。
- 服务器会处理请求,可能会执行身份验证,然后返回同意升级的回复。
- 连接升级后,所有后续通信都将通过 WebSocket 协议进行。

在下面的示例中,我使用了一个身份验证函数,它返回一个 IncomingRequestWithUser,这只是一个普通的请求对象,并附加了当前会话用户的 mongo 文档。
withSocket(server: http.Server): void {
server.on("upgrade", async (req, socket, head) => {
socket.on("error", console.error);
try {
const reqWithUser = await authenticateSocket(req); // Custom authentication function
wss.server.handleUpgrade(reqWithUser, socket, head, (ws) => {
wss.server.emit("connection", ws, reqWithUser);
});
} catch (err) {
console.log("error", err);
socket.write(err.message); // HTTP response string
socket.destroy();
}
});
wss.server.on("connection", (socket, req: IncomingMessageWithUser) => {
const user = req.user;
if (!user) return socket.close(1008, "Unauthorized");
console.log(`Websocket opened (${user.email})`);
const userId = user._id.toString();
wss.clients.set(userId, socket);
socket.on("close", () => {
console.log(`Websocket closed (${user.email})`);
wss.clients.delete(userId);
});
});
}
在这里,我们通过验证请求,然后使用 WebSocketServer 的 handleUpgrade 方法来处理 “升级 “事件。我们还为 “连接 “事件添加了一个监听器。在本例中,我使用 mongo 用户文档的 ObjectId 作为键,将套接字存储在客户端映射中。这样,每当该用户收到消息时,我们就可以轻松地返回到该套接字。我们还确保在套接字关闭时清理地图条目。
向客户端发送信息
现在我们已经建立了 WebSocketServer,可以通过打开的套接字向客户端发送消息。例如,你有一个 Webhook,可以监听来自外部的消息,并将其实时发送给你的用户。只要能从请求中识别出clients映射中的套接字,就能非常简单地检查该键是否存在开放的套接字,并在找到后发送数据。
webhook.post("/message", async (req, res) => {
const { message, userId } = req.body;
const socket = wss.clients.get(userId);
socket?.send(JSON.stringify(message));
return res.sendStatus(200);
})
现在,我们可以在前端轻松打开 WebSocket 连接并处理传入的信息。
const ws = new WebSocket("ws://example.com");
ws.onmessage = (e) => {
const { message } = JSON.parse(e.data);
console.log(message);
};
如果我们是在单台机器上的单个服务器实例上运行,那么我们就完成了!服务器存储打开的套接字连接,Webhook 逻辑能够访问用户连接的套接字(如果存在),并通过套接字发送消息数据。但是,如果我们在其中加入一点并行化,情况就会变得复杂一些。
假设你使用节点集群模块在同一台机器上运行多个后端实例。我们目前编写的代码将无法工作!因为现在有多个 WebSocketServers,每个 WebSocketServers 都会跟踪自己的客户端套接字,而 webhook 可能会在任何一个 HTTP 服务器实例上被触发,因此 webhook 被触发的实例很可能与存储我们所需套接字的实例不同。在节点集群模块的范围内,这个问题可以通过向主进程发送消息并转发给所有 Worker 来解决。
但是,如果我们进一步扩展,接收 webhook 请求的实例与保存套接字的实例之间的距离就会拉大。现在,假设您使用 AWS ECS 等云容器编排服务在多台机器上运行节点集群。我们该如何在不同机器上运行的实例之间高效地通信消息数据呢?这就是 Redis 的用武之地。
Redis 作为 pub/sub 代理
Redis 是一种流行的开源内存数据存储,许多知名公司都在使用它,包括 GitHub、Twitter 和 Snapchat。它最常用于缓存和会话存储,但也提供了另一种功能:pub/sub 消息传递范例。
Pub/Sub 是发布/订阅的简称,是一种消息传递范例,发布者通过订阅者收听的频道发送数据。它效率高、健壮、扩展轻松,是通过分布式架构转发消息数据的最佳选择。
让我们在后端实现 pub/sub 代理。
import Redis from "ioredis";
interface ChannelRouter {
[channel: string]: (message: string) => Promise<void>;
}
export default class Broker {
private readonly pub: Redis;
private readonly sub: Redis;
private router: ChannelRouter;
constructor(pub: Redis, sub: Redis) {
this.pub = pub;
this.sub = sub;
this.router = {};
}
route(channel: string, handler: ChannelRouter[string]) {
this.router[channel] = handler;
}
async subscribe() {
this.sub.on(
"message",
async (channel, message) => await this.router[channel](message),
);
await this.sub.subscribe(...Object.keys(this.router));
}
async publish(channel: string, message: any) {
await this.pub.publish(channel, JSON.stringify(message));
}
}
在这里,我们创建了一个 Broker 类来管理 Redis 连接,并将通道消息路由到处理函数。我们提供了一个route方法来处理建立新的 “路由”,或与处理通过这些通道传递的数据的逻辑相匹配的通道。我们还提供了一个 subscribe 方法来处理事件监听器的设置和将我们的 “子 “客户端订阅到我们建立的所有通道,以及一个通过指定通道发送消息对象的publish方法。如你所见,Redis 为发布消息和订阅通道提供了一个非常简单的 API。
需要注意的是,由于每个服务器实例都能接收 webhook 请求并持有客户端套接字,因此每个实例都应同时是发布者和订阅者。Redis 客户端在订阅通道时无法发布数据,因此我们需要两个独立的 Redis 连接。
将一切整合在一起
现在有了Broker类,让我们在应用程序中使用它!首先,我们将初始化一个 Broker,并为传入的 Webhook 消息定义一个通道。
import Redis from "ioredis";
import Broker from "./broker";
// ...
const pub = new Redis(6379, "localhost"); // Replace with your real redis port and host
const sub = new Redis(6379, "localhost");
const broker = new Broker(pub, sub);
broker.route("webhook:message", async (message) => {
const { userId, data } = JSON.parse(message);
const socket = wss.clients.get(userId);
socket?.send(JSON.stringify(data));
});
await broker.subscribe();
console.log("Pub/Sub broker initialized.");
在这里,我们创建了两个新的 Redis 连接,将它们传递给 Broker 构造函数,并使用 route 方法定义了一个名为 “webhook:message “的通道。现在,只要有消息通过 “webhook:message “通道发送,我们的处理函数就会在数据上被调用!我们会在clients映射中检查是否有以传递的 userId 为关键字的套接字。如果找到了,我们就通过它发送消息数据。
在本例中,我们只为 Broker 定义了一个路由,但由于我们定义了 Broker 类,添加和处理更多通道就像再次调用路由方法一样简单。
现在,让我们回到 webhook 路由,更新它以发布收到的消息。
webhook.post("/message", async (req, res) => {
const { message, userId } = req.body;
await broker.publish("webhook:message", {
userId,
data: message,
});
return res.sendStatus(200);
});
现在,我们不再直接检查 userId 的套接字,而是通过 “webhook:message“通道发布数据。这将把数据转发给后端的所有实例,包括持有我们正在寻找的套接字的实例。
我们就大功告成了!现在,我们不必再担心哪个实例持有哪个套接字连接,我们可以享受 WebSockets 的强大功能和分布式架构的高效率。这是一个强大的解决方案,可以让你走得更远,当然也足以处理绝大多数情况。
(如果速度不够快,或者负载太大,代理成为瓶颈,那么更先进的解决方案是使用 ZMQ,它是一个非常强大的无代理消息库)。
作者:Ben Barber
译自medium.
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/38943.html