作者:Tono,信也科技后端研发专家
来源:拍码场
链接:https://mp.weixin.qq.com/s/jy8Q8plgJQLiueHF2k5OVA
什么是 SSE
SSE(Server-Sent Events)是一种基于 HTTP 协议的服务器推送技术,允许服务器主动向客户端(如浏览器)发送实时数据更新,而无需客户端重复发起请求。该技术通过建立持久化的单向通信通道,实现了高效的实时信息传输。

SSE与其他协议对比

SSE的业务实践
在坐席业务中,电话是联系客户的主要手段。系统支持点呼和批量外呼。批量外呼可同时拨打多个电话,接通后转接坐席。但当两种呼叫方式同时使用,且坐席忙于点呼时,批量外呼接通的电话需排队等待,可能导致客户放弃等待,从而增加呼损率。
解决方案
- 对接语音平台话务事件,对进入队列等待转接到坐席的外呼及时通过SSE推送到前端
- 前端接收到队列中的话务事件,如果当前点呼拨打中(未接通),立即挂断当前电话
- 如果坐席在通话中,外呼看板中展示对应排队的号码,坐席可选择继续通话或挂断当前通话,等待排队中的电话转接进来
示意图
- Genesys-语音平台:通话的每个状态产生话务事件,通过MQ广播出去
- datalink-业务系统数据对接服务:对接MQ话务事件,通知到对应保持SSE连接的Server
- home-直接对接前端的服务:维护SSE连接,推送消息到前端

消息发送流程图
- 一个SSE连接只能与一台Server保持连接,发送消息时需要找到对应的Server。在创建连接时,本地缓存连接对象,并将本地IP缓存到redis,key为坐席id
- 一个坐席只需要存一个SSE连接,在新建连接后需关闭并清除之前连接

SSE使用技巧
SSE 使用基础
- NIO 支持
- Tomcat 从版本 7 开始就支持 NIO(非阻塞 I/O),可以通过配置来启用 NIO
- Tomcat 8 开始,默认采用 NIO(非阻塞 I/O)模式处理 HTTP 请求
- Springboot 1.5.x 以上默认使用 tomcat 8,Springboot2.0 以上版本基本默认 tomcat 9
- 系统容量评估SSE只有在创建、发送消息时占用tomcat工作线程,可根据情况,适当调整 tomcat 工作线程,满足 SSE 新增的消耗
SSE 连接可靠性

代码示例
- 后端SSE连接管理,需要注意无用连接的销毁,防止JVM内存占用过多
- 本地缓存SSE连接需使用线程安全的对象,如ConcurrentHashMap
- 前端连接断开,后端无法感知,需要设置SSE连接的超时时间
- 无效连接,需要调用SseEmitter.complete()销毁对象
- 后端需要主动断开SSE连接时,需要前端配合调用相关API关闭连接
- 后端调用SseEmitter.complete()后连接断开,后端服务发布等造成的连接断开,前端收到的都是error事件,此时前端无法区分是否需要断开连接还是交给浏览器重连
- 后端需定义事件,如CLOSE,前端收到后主动调用API关闭连接。后端再通过SseEmitter.complete()销毁连接对象
后端代码示例
@RequestMapping("/sse")
public class SseController {
//本地缓存,用于存储SseEmitter
private final static ConcurrentHashMap<Long, SseEmitter> EMITTERS = new ConcurrentHashMap<>();
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter createSSE(@RequestParam Long userId) {
SseEmitter emitter = new SseEmitter(60_000L); // 60秒超时
// 连接关闭处理,移除SseEmitter
emitter.onCompletion(() -> EMITTERS.remove(userId));
emitter.onTimeout(() -> EMITTERS.remove(userId));
//本地缓存中存储SseEmitter
EMITTERS.put(userId, emitter);
// TODO redis中维护userId与当前实例IP映射
return emitter;
}
@PostMapping(value = "/send")
public ResponseEntity<String> sendMessage(@RequestBody Message message) {
// 本地缓存获取SseEmitter
Long userId = message.getUserId();
SseEmitter sseEmitter = EMITTERS.get(userId);
try {
// 发送消息
sseEmitter.send(SseEmitter.event().name("message").data(message));
} catch (IOException e) {
// 异常时销毁对象
sseEmitter.completeWithError(e);
// 移除本地缓存
EMITTERS.remove(userId);
}
return ResponseEntity.ok("ok");
}
}
前端代码示例
<script>
// 自动创建SSE连接
window.onload = function() {
// 创建EventSource实例(替换为你的SSE端点)
const sse = new EventSource('https://domain/sse/stream?token=***');
// 连接成功回调
sse.onopen = () => {
//do business
};
// 接收消息处理, event.type=message
sse.onmessage = (event) => {
// do business
};
// 自定义事件处理(可选),例如以Close事件告知前端主动关闭连接
sse.addEventListener('Close', (e) => {
// do business
sse.close();
});
// 错误处理
// 后端通过complete事件主动关闭连接时,如果未通过代码主动关闭连接浏览器会自动重连
sse.onerror = () => {
readyState:
//连接已打开并准备进行通信,异常未断开连接。如后端主动推送error事件
if(sse.readyState === EventSource.OPEN){
sse.close();
}
// 连接尚未打开或异常关闭,如后端的complete事件,后端服务重启,nginx reload,网络异常等...
else if(sse.readyState === EventSource.CONNECTING) {
// do nothing,浏览器会自动重连,默认是3s重连一次,可修改
}
};
};
</script>
推荐 NGINX 配置
# 强制关闭Keep-Alive连接
proxy_set_header Connection "";
# 实时传输但增加后端压力
proxy_buffering off;
# 读取超时时间, 建立连接后1200s服务端未推送消息断开连接
proxy_read_timeout 1200;
# 发送超时时间
proxy_send_timeout 1200;
# 关闭缓存
proxy_cache off;
# 禁用分块传输
chunked_transfer_encoding off;
# 强制客户端不缓存
add_header Cache-Control no-cache;
# 正确使用HTTP/1.1协议
proxy_http_version 1.1;
总结
SSE是一种基于HTTP协议的服务器推送技术,SpringMVC为之提供了完善的实现方案。浏览器原生支持SSE连接中断后自动重连,从而确保连接的可靠性。这一技术尤为适合应用于业务监控大盘、站内消息传递以及AI问答等众多业务场景。
版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。