扩展实时通信:释放 Spring、WebSocket 和 RabbitMQ 集成的潜能

确保可扩展性已成为当今技术领域的一项标准要求。在微服务世界中开发缺乏可扩展性的应用程序/工具可能会面临各种挑战。在本文中,YıldızTech 团队将分享他们在使用 WebSocket 时发现的一种解决方案,并介绍如何使用 Spring 和 RabbitMQ 集成开发可扩展的 WebSocket 应用程序。在整篇文章中,我们将假设读者熟悉 Kubernetes 环境和概念。

WebSocket 是一种广泛用于满足实时通信需求的工具,如消息传递、位置共享、直播等。在涉及这些功能的应用程序中,确保将正确的信息无任何损失或延迟地传递给正确的人至关重要。为此,在分布式架构中运行的服务必须相互通信。为了有效地将问题形象化,我们不妨举例说明以下场景。

扩展实时通信:释放 Spring、WebSocket 和 RabbitMQ 集成的潜能
Websocket 用例

如果我们逐一考虑这些情况,第一种情况就是导致撰写本文的一个问题。当用户建立 WebSocket 连接时,他们基本上是在建立双向端到端连接。随着用户数量和连接数的增加,Kubernetes 环境(如果配置了水平 Pod 扩展 [HPA] 功能)会尝试扩展超负荷的服务。不过,在此过程中,如果用户从其连接的 WebSocket 服务以外的服务接收消息(由于扩展到另一个实例),他们将无法看到这些消息。同样,在有多个服务且需要向特定用户发送消息的情况下,如果在用户连接到第一个服务时向第二个服务发送了消息,用户将无法收到预期的消息。虽然可以扩展这些情况,但归根结底,我们的主要要求是确保信息的正确发送。

根据我们以前的研究结果,RabbitMQ 显然可以成功处理消息分发 [2]。正如下面属于我们演示项目 [3] 的代码片段所示,在 Spring 消息库的帮助下,RabbitMQ 可以被定义为消息代理。这样,即使建立了 WebSocket 连接,也可以通过 RabbitMQ 向客户端发送消息。通过这种配置,消息将通过 RabbitMQ 路由,从而实现 WebSocket 和客户端之间的无缝通信。

private static final String[]APP_PREFIXES=new String[]{"/app","/exchange"};

public static final String[]BROKER_PREFIXES=new String[]{"/queue","/topic","/exchange"};

@Override
public void configureMessageBroker(MessageBrokerRegistry registry){
    registry.setPreservePublishOrder(true)

    .setApplicationDestinationPrefixes(APP_PREFIXES)
    .enableStompBrokerRelay(BROKER_PREFIXES)

    .setRelayHost(environmentConfig.getRabbitUrl())
    .setRelayPort(environmentConfig.getRabbitPort())

    .setClientLogin(environmentConfig.getRabbitUser())
    .setClientPasscode(environmentConfig.getRabbitPassword())

    .setSystemLogin(environmentConfig.getRabbitUser())
    .setSystemPasscode(environmentConfig.getRabbitPassword())

    .setUserDestinationBroadcast("/topic/unresolved-user")
    .setUserRegistryBroadcast("/topic/user-registry");

    ...
    }

使用 RabbitMQ 作为消息代理有两方面的优势。首先,RabbitMQ 本身具有可扩展性,可以高效地处理大量消息流量。其次,使用 UserRegistryBroadcast 参数,无论用户连接到哪个 WebSocket pod,都可以通过 /topic/user-registry 队列发送消息。这样就能向所有连接的用户无缝发送信息,确保信息能送达预定的收件人,而不管他们具体连接的是哪个 WebSocket pod。

扩展实时通信:释放 Spring、WebSocket 和 RabbitMQ 集成的潜能
RabbitMQ 中间件

要根据示例项目的日志观察上述情况,可以使用环境变量 SERVER_PORT=8080SERVER_PORT=8081 运行两个不同的应用程序进行模拟。举例来说,由于我们添加了安全层,因此需要通过以下请求获取token,并将相应的token分配给 src/main/resources/client 下客户端代码中的token参数。用户信息可通过 UserDetailsConfig 类访问。请注意,应将 USERNAME 和 PASSWORD 替换为 UserDetailsConfig 中的实际用户名和密码值。

curl --location 'http://localhost:8080/api/v1/token/provide' \
--header 'Content-Type: application/json' \
--data-raw '{
    "username": "{USERNAME}",
    "password": "{PASSWORD}"
}'

要建立 WebSocket 连接,可在浏览器中打开位于 src/main/resources/client 下的 two_user_twoo_pod.html 页面。连接建立后,您将在应用程序控制台中看到以下日志。值得注意的是,每个用户都分配了一个 UUID。Spring 会使用这些 UUID 向相应用户发送消息。

// Pod 1 (Port: 8080)

2023-06-21T17:45:27.922+03:00 TRACE 2316 --- [nboundChannel-2] o.s.m.s.u.UserDestinationMessageHandler  : Translated /user/queue/live-feed-gps-stream -> [/queue/live-feed-gps-stream-user47c7efd6-7eaf-d40a-8d75-70464430f128]
// Pod 2 (Port: 8081)

2023-06-21T17:45:27.922+03:00 TRACE 2316 --- [nboundChannel-2] o.s.m.s.u.UserDestinationMessageHandler  : Translated /user/queue/live-feed-gps-stream -> [/queue/live-feed-gps-stream-userba0eba33-91ce-12d7-72ec-4ed9305f19ae]

连接建立后,当发送以下请求时,可以看到信息已成功发送给第一个 pod 中的相应用户。

curl --location 'http://localhost:8080/web-socket/api/v1/queue/gps' \
--header 'Authorization: Bearer {TOKEN} \
--header 'Content-Type: application/json' \
--data '{
    "plateNumber": "34AB1234",
    "latitude": 40.12314,
    "longitude":38.12314
}'
Pod 1 (Port: 8080)

2023-06-21T17:46:28.928+03:00 TRACE 2304 --- [brokerChannel-1] o.s.m.s.u.UserDestinationMessageHandler  : Translated /user/izzet.kilic@yilditech.co/queue/live-feed-gps-stream -> [/queue/live-feed-gps-stream-user47c7efd6-7eaf-d40a-8d75-70464430f128]
2023-06-21T17:46:28.928+03:00 TRACE 2304 --- [brokerChannel-1] o.s.m.s.u.UserDestinationMessageHandler  : Translated /user/emre.kiziltepe@yilditech.co/queue/live-feed-gps-stream -> [/queue/live-feed-gps-stream-userba0eba33-91ce-12d7-72ec-4ed9305f19ae]

这里需要注意的是,一条传入消息可以通过一个 pod 发送给两个不同的用户。RabbitMQ 使这成为可能,它允许通过提到的 /topic/user-registry 队列从每个 pod 访问用户。

当然,我们并不期望一切都如此简单。我们可以将遇到的一些挑战及其解决方案归纳如下:

未关闭队列

在将我们的项目转移到上述测试环境后,我们注意到 RabbitMQ 的内存开始迅速增加,最终达到了无法响应的程度。在调查该问题时,我们发现尽管应用程序中的活跃用户数量较少,但却存在大量打开的队列。

扩展实时通信:释放 Spring、WebSocket 和 RabbitMQ 集成的潜能
Persistent Queues
扩展实时通信:释放 Spring、WebSocket 和 RabbitMQ 集成的潜能
Auto-Deleted Queues

经过简单研究,我们发现这个问题与 RabbitMQ 有关,为了启用队列的自动关闭,需要在客户端和服务器端添加 auto-delete: true 参数作为头[1]。有了这个参数,当用户断开连接时,队列将自动开始关闭。

//////// client side

var header = {
  'auto-delete': true
};
client.subscribe("/user/queue/{QUEUE_NAME}", function (message) {
...
}, header);
//////// server side

var header=new HashMap<String, Object>();
header.put("auto-delete","true");
simpMessagingTemplate.convertAndSendToUser(...,...,...,header);

队列中消息的累积

当存在不同的 WebSocket 队列时,会出现另一个问题。当用户同时连接到不同队列时,RabbitMQ 会为每个用户创建单独的队列。在 WebSocket 服务中,当消息到达队列时,Spring 会根据用户的会话 ID (UUID) 将该消息路由到用户。让我们以 live-feed-gps-stream 队列和 courier-connection-status-stream 队列为例进行说明。当用户连接到这两个队列时,会生成不同的 UUID,如 817083cf-9ccc-0a06-924b-51dea2700eb0 和 11530489-5963-ad10-4b84-f111811d1b45。如果有消息发送到 live-feed-gps-stream 队列,则会为另一个队列创建一个单独的 RabbitMQ 队列,其 UUID 为 817083cf-9ccc-0a06-924b-51dea2700eb0,并在该队列中累积消息。

扩展实时通信:释放 Spring、WebSocket 和 RabbitMQ 集成的潜能
扩展实时通信:释放 Spring、WebSocket 和 RabbitMQ 集成的潜能
非空队列

即使 RabbitMQ 队列被定义为自动删除(AD),当队列中有消息且连接丢失时,它们也不会自动关闭。为了解决这个问题,我们通过在 RabbitMQ 中定义策略(管理 > 策略)找到了解决方法。通过以下策略,我们确保在连接丢失时保持打开状态的队列也被关闭:

扩展实时通信:释放 Spring、WebSocket 和 RabbitMQ 集成的潜能

通过应用该策略,我们可以确保在连接终止时关闭任何打开的队列,即使队列中还有报文。得益于上述解决方案,我们的系统长期以来一直运行平稳,没有出现任何中断。当与微服务架构结合使用时,扩展可以使系统更加灵活、快速和可用。每个微服务都可以独立扩展,即使在高流量或高需求的情况下也能优化性能。这可以加快业务流程,让用户获得无缝体验。我们希望这篇文章能对面临 WebSocket 类似问题的团队有所帮助,并协助他们找到解决方案。

参考资料

  1. user782220. (2014, January 21). RabbitMQ difference between exclusive and auto-delete? Stack Overflow. https://stackoverflow.com/questions/21248563/rabbitmq-difference-between-exclusive-and-auto-delete
  2. borist2/spring-rabbimq-test-queue. (2019, September 6). GitHub. https://github.com/borist2/spring-rabbimq-test-queue
  3. Yildiz-Tech/spring-boot-scalable-websocket-demo. (2023). GitHub. https://github.com/Yildiz-Tech/spring-boot-scalable-websocket-demo

作者:İzzet Kılıç
来源:Yıldız Tech

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/34078.html

(0)

相关推荐

发表回复

登录后才能评论