基于SSE的信息推送实践

作者:Tono,信也科技后端研发专家
来源:拍码场
链接:https://mp.weixin.qq.com/s/jy8Q8plgJQLiueHF2k5OVA

什么是 SSE

SSE(Server-Sent Events)是一种基于 HTTP 协议的服务器推送技术,允许服务器主动向客户端(如浏览器)发送实时数据更新,而无需客户端重复发起请求。该技术通过建立持久化的单向通信通道,实现了高效的实时信息传输。

基于SSE的信息推送实践

SSE与其他协议对比

基于SSE的信息推送实践

SSE的业务实践

在坐席业务中,电话是联系客户的主要手段。系统支持点呼和批量外呼。批量外呼可同时拨打多个电话,接通后转接坐席。但当两种呼叫方式同时使用,且坐席忙于点呼时,批量外呼接通的电话需排队等待,可能导致客户放弃等待,从而增加呼损率。

解决方案

  • 对接语音平台话务事件,对进入队列等待转接到坐席的外呼及时通过SSE推送到前端
  • 前端接收到队列中的话务事件,如果当前点呼拨打中(未接通),立即挂断当前电话
  • 如果坐席在通话中,外呼看板中展示对应排队的号码,坐席可选择继续通话或挂断当前通话,等待排队中的电话转接进来

示意图

  • Genesys-语音平台:通话的每个状态产生话务事件,通过MQ广播出去
  • datalink-业务系统数据对接服务:对接MQ话务事件,通知到对应保持SSE连接的Server
  • home-直接对接前端的服务:维护SSE连接,推送消息到前端
基于SSE的信息推送实践

消息发送流程图

  • 一个SSE连接只能与一台Server保持连接,发送消息时需要找到对应的Server。在创建连接时,本地缓存连接对象,并将本地IP缓存到redis,key为坐席id
  • 一个坐席只需要存一个SSE连接,在新建连接后需关闭并清除之前连接
基于SSE的信息推送实践

SSE使用技巧

SSE 使用基础

  • NIO 支持
  • Tomcat 从版本 7 开始就支持 NIO(非阻塞 I/O),可以通过配置来启用 NIO
  • Tomcat 8 开始,默认采用 NIO(非阻塞 I/O)模式处理 HTTP 请求
  • Springboot 1.5.x 以上默认使用 tomcat 8,Springboot2.0 以上版本基本默认 tomcat 9
  • 系统容量评估SSE只有在创建、发送消息时占用tomcat工作线程,可根据情况,适当调整 tomcat 工作线程,满足 SSE 新增的消耗

SSE 连接可靠性

基于SSE的信息推送实践

代码示例

  • 后端SSE连接管理,需要注意无用连接的销毁,防止JVM内存占用过多
  • 本地缓存SSE连接需使用线程安全的对象,如ConcurrentHashMap
  • 前端连接断开,后端无法感知,需要设置SSE连接的超时时间
  • 无效连接,需要调用SseEmitter.complete()销毁对象
  • 后端需要主动断开SSE连接时,需要前端配合调用相关API关闭连接
  • 后端调用SseEmitter.complete()后连接断开,后端服务发布等造成的连接断开,前端收到的都是error事件,此时前端无法区分是否需要断开连接还是交给浏览器重连
  • 后端需定义事件,如CLOSE,前端收到后主动调用API关闭连接。后端再通过SseEmitter.complete()销毁连接对象

后端代码示例

@RequestMapping("/sse")
public class SseController {
    //本地缓存,用于存储SseEmitter
    private final static ConcurrentHashMap<Long, SseEmitter> EMITTERS = new ConcurrentHashMap<>();

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter createSSE(@RequestParam Long userId) {
        SseEmitter emitter = new SseEmitter(60_000L); // 60秒超时
        // 连接关闭处理,移除SseEmitter
        emitter.onCompletion(() -> EMITTERS.remove(userId));
        emitter.onTimeout(() -> EMITTERS.remove(userId));
        //本地缓存中存储SseEmitter
        EMITTERS.put(userId, emitter);
        // TODO redis中维护userId与当前实例IP映射
        return emitter;
    }

    @PostMapping(value = "/send")
    public ResponseEntity<String> sendMessage(@RequestBody Message message) {
        // 本地缓存获取SseEmitter
        Long userId = message.getUserId();
        SseEmitter sseEmitter = EMITTERS.get(userId);
        try {
            // 发送消息
            sseEmitter.send(SseEmitter.event().name("message").data(message));
        } catch (IOException e) {
            // 异常时销毁对象
            sseEmitter.completeWithError(e);
            // 移除本地缓存
            EMITTERS.remove(userId);
        }
        return ResponseEntity.ok("ok");
    }

}

前端代码示例

<script>
        // 自动创建SSE连接
        window.onload = function() {
            // 创建EventSource实例(替换为你的SSE端点)
            const sse = new EventSource('https://domain/sse/stream?token=***');

            // 连接成功回调
            sse.onopen = () => {
//do business
            };

            // 接收消息处理, event.type=message
            sse.onmessage = (event) => {
                // do business
            };

            // 自定义事件处理(可选),例如以Close事件告知前端主动关闭连接

            sse.addEventListener('Close', (e) => {
                // do business
                sse.close();

            });

            // 错误处理
            // 后端通过complete事件主动关闭连接时,如果未通过代码主动关闭连接浏览器会自动重连
            sse.onerror = () => {
readyState:
//连接已打开并准备进行通信,异常未断开连接。如后端主动推送error事件
if(sse.readyState === EventSource.OPEN){
					sse.close();
				}
                // 连接尚未打开或异常关闭,如后端的complete事件,后端服务重启,nginx reload,网络异常等...
                else if(sse.readyState === EventSource.CONNECTING) {
// do nothing,浏览器会自动重连,默认是3s重连一次,可修改
				}
            };
        };
    </script>

推荐 NGINX 配置

# 强制关闭Keep-Alive连接
proxy_set_header Connection "";
# 实时传输但增加后端压力
proxy_buffering off;
# 读取超时时间, 建立连接后1200s服务端未推送消息断开连接
proxy_read_timeout 1200;
# 发送超时时间
proxy_send_timeout 1200;
# 关闭缓存
proxy_cache off;
# 禁用分块传输
chunked_transfer_encoding off;
# 强制客户端不缓存
add_header Cache-Control no-cache;
# 正确使用HTTP/1.1协议
proxy_http_version 1.1;

总结

SSE是一种基于HTTP协议的服务器推送技术,SpringMVC为之提供了完善的实现方案。浏览器原生支持SSE连接中断后自动重连,从而确保连接的可靠性。这一技术尤为适合应用于业务监控大盘、站内消息传递以及AI问答等众多业务场景。

版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。

(0)

相关推荐

发表回复

登录后才能评论