使用 Spring WebSocket 和 StompJs 实现一对一聊天

有没有想过 WhatsApp/Slack/Discord 是如何工作的?人们是如何实时收发信息的?它们还能让你知道收件人是否发送或阅读了信息。

我的第一个猜测是,客户端必须不断轮询服务器以获取新的更新。但是,当你面对数以百万计的并发用户时,这种方式无法扩展。我们需要一种方法,让服务器可以在客户端不请求的情况下向客户端发回信息。

这就是 WebSockets!它允许客户端和服务器之间进行双向通信。WebSockets 是网络浏览器和服务器之间的一种双向、全双工、持久连接。

使用 Spring WebSocket 和 StompJs 实现一对一聊天

本文主题是关于使用 WebSocket 构建聊天应用程序,创建一个简单的 1对1 聊天应用程序。

使用 Spring WebSocket 和 StompJs 实现一对一聊天

它涵盖两种基本情况,即用户 A 向用户 B 发送消息:

  • 如果双方都在线,消息交互是实时的。
  • 如果 B 处于离线状态,A 的消息应被持久保存在后台,标记为 “未送达”,并在 B 联机时显示在 B 的收件箱中。

看起来很简单吧?让我们开始吧!

我在这里只介绍与WebSocket相关的方面。完整源代码在这里: https://github.com/SatvikNema/satchat。

后台

获取依赖关系:

  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-websocket</artifactId>
  </dependency>

配置“WebSocketMessageBrokerConfigurer”

package com.satvik.satchat.config.websocket;

import com.satvik.satchat.filter.WebSocketTokenFilter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.*;
import org.springframework.web.socket.server.RequestUpgradeStrategy;
import org.springframework.web.socket.server.standard.TomcatRequestUpgradeStrategy;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

@Configuration
@EnableWebSocketMessageBroker
@Order(Ordered.HIGHEST_PRECEDENCE + 1)
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

  @Value("${frontend.caller.host:http://localhost:3000}")
  private String frontendCallerHost;

  private final WebSocketTokenFilter webSocketTokenFilter;

  public WebSocketConfig(WebSocketTokenFilter webSocketTokenFilter) {
    this.webSocketTokenFilter = webSocketTokenFilter;
  }

  @Override
  public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.interceptors(webSocketTokenFilter);
  }

  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) {
    RequestUpgradeStrategy upgradeStrategy = new TomcatRequestUpgradeStrategy();
    registry
        .addEndpoint("/ws")
        .setHandshakeHandler(new DefaultHandshakeHandler(upgradeStrategy))
        .setAllowedOrigins(frontendCallerHost);
  }

  @Override
  public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry
        .setApplicationDestinationPrefixes("/app")
        .enableSimpleBroker("/topic")
        .setTaskScheduler(heartBeatScheduler())
        .setHeartbeatValue(new long[] {10000L, 10000L});
  }

  @Bean
  public TaskScheduler heartBeatScheduler() {
    return new ThreadPoolTaskScheduler();
  }
}

‘WebSocketTokenFilter’ 拦截来自客户端的 WebSocket 请求,并填充 Spring Security 上下文。这是最重要的功能,因为它会将 Web 套接字与登录用户关联起来,帮助我们跟踪哪个用户在线。

WebSocketTokenFilter:

package com.satvik.satchat.filter;

import com.satvik.satchat.config.UserDetailsServiceImpl;
import com.satvik.satchat.utils.JWTUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;

@Component
public class WebSocketTokenFilter implements ChannelInterceptor {
  private final JWTUtils jwtUtils;
  private final UserDetailsServiceImpl userDetailsService;

  public WebSocketTokenFilter(JWTUtils jwtUtils, UserDetailsServiceImpl userDetailsService) {
    this.jwtUtils = jwtUtils;
    this.userDetailsService = userDetailsService;
  }

  @Override
  public Message<?> preSend(Message<?> message, MessageChannel channel) {
    final StompHeaderAccessor accessor =
        MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
    if (StompCommand.CONNECT == accessor.getCommand()) {

      String jwt = jwtUtils.parseJwt(accessor);
      if (jwt != null && jwtUtils.validateJwtToken(jwt)) {
        String username = jwtUtils.getUserNameFromJwtToken(jwt);

        UserDetails userDetails = userDetailsService.loadUserByUsername(username);
        UsernamePasswordAuthenticationToken authentication =
            new UsernamePasswordAuthenticationToken(
                userDetails, null, userDetails.getAuthorities());
        accessor.setUser(authentication);
      }
    }
    return message;
  }
}

本文中,我将不再讨论如何为 REST 设置 Spring-security,可以查看上文提到的GitHub库。

那么,两个用户实际上是如何互相发送信息的呢?相关的两个用户和服务器需要一起监听同一个频道。

为此,我将 2 个用户的用户 ID 连接在一起,这样,2 个用户在聊天时将始终拥有一个专用的唯一频道。

package com.satvik.satchat.controller;

import com.satvik.satchat.model.ChatMessage;
import com.satvik.satchat.service.ChatService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.stereotype.Controller;

@Controller
@Slf4j
public class ChatController {

  private final ChatService chatService;

  @Autowired
  public ChatController(ChatService chatService) {
    this.chatService = chatService;
  }

  @MessageMapping("/chat/sendMessage/{convId}")
  public ChatMessage sendMessageToConvId(
      @Payload ChatMessage chatMessage,
      SimpMessageHeaderAccessor headerAccessor,
      @DestinationVariable("convId") String conversationId) {
    chatService.sendMessageToConvId(chatMessage, conversationId, headerAccessor);
    return chatMessage;
  }
}

@MessageMapping 注解为 WebSocket 客户端创建了一个发送信息的端点。在这里,convId 代表 2 个用户的共同通道。但在发送消息之前,我们需要知道接收方是在线还是离线?这将影响信息的发送状态。

离线有三种情况:

  • 用户不在网络上。
  • 用户在网络上,但应用程序未打开
  • 用户在网络上,应用程序已打开,但未打开特定聊天视图。

为此,我们使用 WebSocketEventListener 监听 WebSocket 事件。这有助于我们在用户连接/断开应用程序或订阅/取消订阅频道时采取行动。

WebSocketEventListener:

package com.satvik.satchat.config.websocket;

import com.satvik.satchat.service.OnlineOfflineService;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionConnectedEvent;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;
import org.springframework.web.socket.messaging.SessionUnsubscribeEvent;

@Component
@Slf4j
public class WebSocketEventListener {

  private final OnlineOfflineService onlineOfflineService;

  private final Map<String, String> simpSessionIdToSubscriptionId;

  public WebSocketEventListener(OnlineOfflineService onlineOfflineService) {
    this.onlineOfflineService = onlineOfflineService;
    this.simpSessionIdToSubscriptionId = new ConcurrentHashMap<>();
  }

  @EventListener
  public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
    onlineOfflineService.removeOnlineUser(event.getUser());
  }

  @EventListener
  @SendToUser
  public void handleSubscribeEvent(SessionSubscribeEvent sessionSubscribeEvent) {
    String subscribedChannel =
        (String) sessionSubscribeEvent.getMessage().getHeaders().get("simpDestination");
    String simpSessionId =
        (String) sessionSubscribeEvent.getMessage().getHeaders().get("simpSessionId");
    if (subscribedChannel == null) {
      log.error("SUBSCRIBED TO NULL?? WAT?!");
      return;
    }
    simpSessionIdToSubscriptionId.put(simpSessionId, subscribedChannel);
    onlineOfflineService.addUserSubscribed(sessionSubscribeEvent.getUser(), subscribedChannel);
  }

  @EventListener
  public void handleUnSubscribeEvent(SessionUnsubscribeEvent unsubscribeEvent) {
    String simpSessionId = (String) unsubscribeEvent.getMessage().getHeaders().get("simpSessionId");
    String unSubscribedChannel = simpSessionIdToSubscriptionId.get(simpSessionId);
    onlineOfflineService.removeUserSubscribed(unsubscribeEvent.getUser(), unSubscribedChannel);
  }

  @EventListener
  public void handleConnectedEvent(SessionConnectedEvent sessionConnectedEvent) {
    onlineOfflineService.addOnlineUser(sessionConnectedEvent.getUser());
  }
}

请注意,我们正在使用消息头提取当前用户。这是因为我们使用 WebSocketTokenFilter 填充了 Spring-security 上下文。

OnlineOfflineService 负责跟踪活跃用户。它还会广播哪些用户上线,哪些用户下线:

package com.satvik.satchat.service;

import com.satvik.satchat.config.UserDetailsImpl;
import com.satvik.satchat.entity.ConversationEntity;
import com.satvik.satchat.entity.UserEntity;
import com.satvik.satchat.model.ChatMessage;
import com.satvik.satchat.model.MessageDeliveryStatusEnum;
import com.satvik.satchat.model.MessageDeliveryStatusUpdate;
import com.satvik.satchat.model.MessageType;
import com.satvik.satchat.model.UserConnection;
import com.satvik.satchat.model.UserResponse;
import com.satvik.satchat.repository.UserRepository;
import java.security.Principal;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class OnlineOfflineService {
  private final Set<UUID> onlineUsers;
  private final Map<UUID, Set<String>> userSubscribed;
  private final UserRepository userRepository;
  private final SimpMessageSendingOperations simpMessageSendingOperations;

  public OnlineOfflineService(
      UserRepository userRepository, SimpMessageSendingOperations simpMessageSendingOperations) {
    this.onlineUsers = new ConcurrentSkipListSet<>();
    this.userSubscribed = new ConcurrentHashMap<>();
    this.userRepository = userRepository;
    this.simpMessageSendingOperations = simpMessageSendingOperations;
  }

  public void addOnlineUser(Principal user) {
    if (user == null) return;
    UserDetailsImpl userDetails = getUserDetails(user);
    log.info("{} is online", userDetails.getUsername());
    for (UUID id : onlineUsers) {
      simpMessageSendingOperations.convertAndSend(
          "/topic/" + id,
          ChatMessage.builder()
              .messageType(MessageType.FRIEND_ONLINE)
              .userConnection(UserConnection.builder().connectionId(userDetails.getId()).build())
              .build());
    }
    onlineUsers.add(userDetails.getId());
  }

  public void removeOnlineUser(Principal user) {
    if (user != null) {
      UserDetailsImpl userDetails = getUserDetails(user);
      log.info("{} went offline", userDetails.getUsername());
      onlineUsers.remove(userDetails.getId());
      userSubscribed.remove(userDetails.getId());
      for (UUID id : onlineUsers) {
        simpMessageSendingOperations.convertAndSend(
            "/topic/" + id,
            ChatMessage.builder()
                .messageType(MessageType.FRIEND_OFFLINE)
                .userConnection(UserConnection.builder().connectionId(userDetails.getId()).build())
                .build());
      }
    }
  }

  public boolean isUserOnline(UUID userId) {
    return onlineUsers.contains(userId);
  }

  private UserDetailsImpl getUserDetails(Principal principal) {
    UsernamePasswordAuthenticationToken user = (UsernamePasswordAuthenticationToken) principal;
    Object object = user.getPrincipal();
    return (UserDetailsImpl) object;
  }

  public List<UserResponse> getOnlineUsers() {
    return userRepository.findAllById(onlineUsers).stream()
        .map(
            userEntity ->
                new UserResponse(
                    userEntity.getId(), userEntity.getUsername(), userEntity.getEmail()))
        .toList();
  }

  public void addUserSubscribed(Principal user, String subscribedChannel) {
    UserDetailsImpl userDetails = getUserDetails(user);
    log.info("{} subscribed to {}", userDetails.getUsername(), subscribedChannel);
    Set<String> subscriptions = userSubscribed.getOrDefault(userDetails.getId(), new HashSet<>());
    subscriptions.add(subscribedChannel);
    userSubscribed.put(userDetails.getId(), subscriptions);
  }

  public void removeUserSubscribed(Principal user, String subscribedChannel) {
    UserDetailsImpl userDetails = getUserDetails(user);
    log.info("unsubscription! {} unsubscribed {}", userDetails.getUsername(), subscribedChannel);
    Set<String> subscriptions = userSubscribed.getOrDefault(userDetails.getId(), new HashSet<>());
    subscriptions.remove(subscribedChannel);
    userSubscribed.put(userDetails.getId(), subscriptions);
  }

  public boolean isUserSubscribed(UUID username, String subscription) {
    Set<String> subscriptions = userSubscribed.getOrDefault(username, new HashSet<>());
    return subscriptions.contains(subscription);
  }

  public Map<String, Set<String>> getUserSubscribed() {
    Map<String, Set<String>> result = new HashMap<>();
    List<UserEntity> users = userRepository.findAllById(userSubscribed.keySet());
    users.forEach(user -> result.put(user.getUsername(), userSubscribed.get(user.getId())));
    return result;
  }

  public void notifySender(
      UUID senderId,
      List<ConversationEntity> entities,
      MessageDeliveryStatusEnum messageDeliveryStatusEnum) {
    if (!isUserOnline(senderId)) {
      log.info(
          "{} is not online. cannot inform the socket. will persist in database",
          senderId.toString());
      return;
    }
    List<MessageDeliveryStatusUpdate> messageDeliveryStatusUpdates =
        entities.stream()
            .map(
                e ->
                    MessageDeliveryStatusUpdate.builder()
                        .id(e.getId())
                        .messageDeliveryStatusEnum(messageDeliveryStatusEnum)
                        .content(e.getContent())
                        .build())
            .toList();
    for (ConversationEntity entity : entities) {
      simpMessageSendingOperations.convertAndSend(
          "/topic/" + senderId,
          ChatMessage.builder()
              .id(entity.getId())
              .messageDeliveryStatusUpdates(messageDeliveryStatusUpdates)
              .messageType(MessageType.MESSAGE_DELIVERY_UPDATE)
              .content(entity.getContent())
              .build());
    }
  }
}

SimpMessageSendingOperations 由 Springframework 提供,用于协助通过已建立的通道发送消息。

OnlineOfflineService 使我们可以非常容易地确定:

  • 用户是否正在使用应用程序:isUserOnline(userId)
  • 用户是否订阅了频道(当前聊天视图):isUserSubscribed(username, subscription)

如果以下两个返回值都为 false,则表示用户离线。消息传送状态应为 not_delievered

为了区分离线和在线(但不在聊天视图中),isUserSubscribed 应该为 false,而 isUserOnline 应该为 true。在这种情况下,信息的发送状态应该是已发送。

如果两个返回值都为 true,那么发送状态就会显示为两个用户都在同一个聊天视图上。

请参考https://github.com/SatvikNema/satchat/blob/main/src/main/java/com/satvik/satchat/service/ChatService.java,了解它是如何使用这些方法的。

对于用户界面,我们将使用带有 stompjs 7.0.0 的 react。 这些是依赖项:

"dependencies": {
    "@stomp/stompjs": "^7.0.0",
    "@testing-library/jest-dom": "^5.17.0",
    "@testing-library/react": "^13.4.0",
    "@testing-library/user-event": "^13.5.0",
    "react": "^18.2.0",
    "react-dom": "^18.2.0",
    "react-scripts": "5.0.1",
    "web-vitals": "^2.1.4",
    "websocket": "^1.0.34",
    "ws": "^8.16.0"
  }

配置 stompjs 客户端:

import { Client } from "@stomp/stompjs";
class SocketClient {
  constructor(url, jwt) {
    this.url = url;
    this.jwt = jwt;
    this.client = new Client();

    this.client.configure({
      brokerURL: url,
      connectHeaders: {
        Authorization: `Bearer ${jwt}`,
      },
      onConnect: () => {
        console.log("connected!");
      },
    });

    this.client.activate();
  }

  publish = ({ destination, body }) => {
    this.client.publish({
      destination: destination,
      body: JSON.stringify(body),
    });
  };

  deactivate = () => {
    this.client.deactivate();
  };

  subscribe = (topic, callback, ...forMessageTypes) => {
    return this.client.subscribe(topic, (message) => {
      if (
        !forMessageTypes ||
        forMessageTypes.includes(JSON.parse(message.body).messageType)
      ) {
        callback(message);
      }
    });
  };

  awaitConnect = async (awaitConnectConfig) => {
    const {
      retries = 3,
      curr = 0,
      timeinterval = 100,
    } = awaitConnectConfig || {};
    return new Promise((resolve, reject) => {
      console.log(timeinterval);
      setTimeout(() => {
        if (this.connected) {
          resolve();
        } else {
          console.log("failed to connect! retrying");
          if (curr >= retries) {
            console.log("failed to connect within the specified time interval");
            reject();
          }
          this.awaitConnect({ ...awaitConnectConfig, curr: curr + 1 });
        }
      }, timeinterval);
    });
  };

  get connected() {
    return this.client.connected;
  }

  get jwt() {
    return this.jwt;
  }

  set jwt(value) {}
}

export default SocketClient;

connectHeaders 是我们传递 JWT 的地方。每次请求时,WebSocketTokenFilter 都会读取 JWT。

为了获取 JWT,我们有单独的 /login REST 端点。请参阅源码里面的 AuthController 和 BackendClient。

以上内容就涵盖了制作实时 1对1 聊天应用程序的所有主要部分。

需要考虑的事项

  • 消息安全

没有内置消息加密功能。任何拥有数据库访问权限的人都可以读取发送的所有信息。为了防止这种情况,我们应该使用公钥加密技术,为每个对话设置唯一的密钥。使用主密钥加密后,密钥本身可以存储在同一个对话表中。

  • 群聊

这个软件还不支持群聊。但它可以很容易地扩展。我们需要一个给定群组的通用 convId,它应能使后台正常工作。一种方法是为给定群组使用随机 UUID 并将其保存在数据库中。

  • 文件附件

同样,只要先将附件上传到 s3 存储桶,然后将其 URI 作为信息传递给接收者,就能轻松实现这一功能。

  • 横向扩展

当用户数量增加时,单个服务器无法满足需要。我们的 OnlineOfflineService 目前使用内存地图来跟踪活跃用户。当服务器数量增加时,我们就无法在内存中维护数据了。我们可以使用此处所述的 redis pubsub(插入 nasser hussein 的缩放 websockets 视频),在分布式缓存中维护(服务器 -> 套接字)全局地图。

作者:Satvik Nema

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/47561.html

(0)

相关推荐

发表回复

登录后才能评论