AIGC 技术爆发促使大模型从实验阶段迈向企业级大规模应用,但它任务耗时久、算力成本高、流量波动大、智能体协作复杂这些核心特点,给底层通信和调度基础设施带来了严峻挑战,企业通常会遇到长会话连续性不佳、算力调度效率低、多智能体协作不可靠等问题。
作为火山引擎消息中间件产品矩阵之一的RocketMQ,目前结合社区已有方案完成了 AI 原生能力升级,推出了 RocketMQ For AI 解决方案。该方案以轻量化通信、智能化调度、企业级可靠性为核心,凭借 LiteTopic、优先级消息核心能力,精准解决大模型场景的核心难题,保障会话连续性、调度公平性以及系统稳定性,实现算力利用率最大化、降低成本,为企业大模型应用落地提供坚实的通信与调度基础。
一、新的挑战:AI 时代传统 MQ 的局限性
作为分布式系统的核心消息中间件,长期以来 RocketMQ 在各类业务场景中承担着异步解耦、流量削峰、数据分发关键角色。
- 异步解耦:解决微服务架构中系统间的强依赖问题,例如订单系统与库存、支付、通知系统的联动,通过消息异步传递减少接口直连带来的耦合度,提升系统容错性和响应速度
- 流量削峰:应对秒杀、大促等短时高并发场景,将瞬时激增的请求缓存至消息队列,由消费者按照系统处理能力匀速消费,避免下游服务因流量过载崩溃
- 事件驱动与数据分发:支持广播消费、多消费者订阅等模式,实现同一事件多系统协同处理,例如用户支付完成后,同步触发积分计算、物流调度、消息通知等一系列操作,同时也可用于日志收集、用户行为跟踪等数据流转场景,为大数据分析提供稳定的数据输入通道
在 LLM 架构中,传统 MQ 的能力仍然被广泛使用:
- 异步解耦降低大模型系统耦合度:如推理请求异步化,当用户提交长文本总结或者图片生成等耗时任务,通过 MQ 解耦能力使得用户无需同步等待。
- 削峰填谷保护GPU与推理集群:GPU 是极其昂贵且有限的资源,通过MQ对高并发削峰,如聊天瞬时流量、训练任务排队,可避免资源争抢以及保护后端资源
- 数据分发支撑 AI 多系统协同:AI 系统天然是事件驱动架构,一个事件通常需要多系统同时处理 、多模型联动 、多 Agent 协作,MQ 就成为 AI 事件总线

但是随着大模型技术的快速普及,传统 RocketMQ 的使用模式逐渐暴露出适配短板,尤其在多智能体协同、长会话交互以及优先级消息等场景下:
- 不能区分消息优先级:AI 系统通常存在不同优先级的消息场景,例如实时对话、Agent 任务、向量化处理、离线训练等。系统需要优先保障用户实时请求的低延迟和高可用性,避免长任务阻塞核心推理链路。而 RocketMQ 作为消息传输链路调度组件,当前并不支持优先级消息。
- 不支持海量Queue创建:传统 Queue 由于底层物理资源占用其数量会受到集群限制,在多租户场景下,业务通常需要妥协并共享队列,在大模型长任务场景下极易出现单消息头部阻塞、关键消息与普通消息混杂导致的处理延迟等问题。
这些问题不仅影响传统场景的处理效率,更无法满足大模型场景对精细化流量治理、差异化消息调度的高阶需求。在此背景下,火山引擎 RocketMQ 启动内核能力升级,并非脱离传统场景的全新重构,而是在保留原有核心优势的基础上,针对传统场景的痛点进行优化升级,同时延伸适配大模型等新型场景,让消息中间件的能力更好地贴合当前业务架构的演进趋势。
二、特性解析:新一代消息能力全景概述
1、轻量级主题(Lite Topic)
在 RocketMQ 传统架构中,Topic 每新建一个 Queue,Broker 就需要在磁盘上创建对应的 ConsumeQueue 文件。集群需要限制整体 Queue 的总数量,否则会出现下述问题:
- 文件句柄耗尽:操作系统对单个进程能打开的文件数量,海量 Queue 会瞬间耗尽句柄,导致无法写入新消息
- 磁盘 IOPS 压力:操作系统在管理海量小文件时,随机读写性能会急剧下降,导致磁盘 IOPS 成为瓶颈
在过去,租户为应对队列 Queue 数量的限制,不得不将原本单会话/单场景绑定独立队列的方式,降级为共享队列,并采用标签tag或自定义属性进行消息隔离。该方案不可避免的出现严重的相互干扰问题。
而 LiteTopic 则允许租户创建上百万个 Topic,每个 Topic 也可容纳百万 Queue。其底层实现的三个关键技术细节:

- 统一写入:和传统 RocketMQ 机制一致,所有消息统一追加写入同一个CommitLog文件,避免海量 Queue 造成随机写从而影响集群性能
- RocksDB 替代 CQ 文件:这是 LiteTopic 相对于传统 Topic Queue 最核心的改动,系统为每个 LiteTopic 维护独立的队列索引(LiteCQ),但这些索引不再以文件形式存在,而是作为 Key-Value 对 存储在 RocksDB 中。
- 高性能投递:通过长轮询 + 维护消息集合,兼顾海量 Queue 读取性能以及集群稳定性
当前 LiteTopic 为了支持会话级的强顺序性,采用会话即主题模型,默认只有一个 Queue,消费者可采用顺序消费模式严格保序。此外由于长任务场景意味着每条消息处理耗时很长,吞吐量瓶颈在于单条消息的处理时长,而非队列数量,通常也无需扩展 Queue 数量从而造成乱序。
LiteTopic相对于传统 Topic 还有下述优势:
- 无需预先配置即可动态创建
- 生命周期短,自动释放
- 每个主题的消息数量很少(几条到几千条不等)
- 精细化订阅(无需筛选)
- 绑定到预先创建的父主题,用于路由和命名空间管理
- 一个 LiteTopic 在同一时间只被同一个消费组下的单个消费者实例订阅和处理,杜绝了多个消费者并发处理,严格保序
在实践中与传统 Topic 存在差异:
- 传统模式下所有客户的订阅内容都相同,而Lite Topic 模式下所有客户的订阅内容则各不相同。
- 传统模式的订阅主题很少(几个到十几个),而Lite Topic 模式的订阅主题则有成千上万个。
- 在 Lite Topic 版模型中,主题是动态生成的。
- 传统模型的主题生命周期较长,而 Lite Topic 的主题生命周期则很短。
2、优先级消息(Priority Message)

当前,若依赖 RocketMQ 实现优先级队列,需用户自行通过拆分 Topic 或 Queue,并由业务自身分配消费资源来实现优先级,此方式过于繁琐且缺乏灵活性。本次推出了优先级队列,从原生语义上提供了易用、灵活的优先级消息功能。
实现机制:
- 参考标准: 借鉴 JMS 和 AMQP 标准,采用数值范围定义优先级 1-10。
- 核心存储逻辑 : 将不同的优先级数值直接映射到不同的ConsumeQueue 上。这意味着一个优先级数值 = 一个专属的 ConsumeQueue。
消费策略与限制:
- 消费模式: 仅支持 Pop 消费模式
- 拉取逻辑: 改变原有的随机选择队列策略,改为顺序扫描:优先从高优先级对应的 ConsumeQueue 中拉取消息
分布式视角 :
- 局域性: 只保证单个 Broker 内部的优先级处理
- 宏观有序: 不实现跨所有 Broker 的全局排序,但在宏观上保证高优先级消息被优先处
三、场景落地:大模型业务赋能实践
1、长会话链路
传统 IM 聊天应用普遍采用 WebSocket 协议。该协议通过在客户端和服务器之间建立一条持久化的双向通信通道,实现了真正的实时交互。同时通过精细化模块拆分,解决了长会话状态管理难题,并具备可弹性扩展的特性。

- 分布式路由与状态解耦:在分布式架构中,A 和 B 可能连接在不同的网关节点上。网关节点之间不需要互相认识,网关 A 将用户消息投递到 MQ 的特定 Topic 中即可。消息路由模块消费到该消息后,从Redis 中查询到 B 所在的网关信息,然后将消息经过网关推送给用户 B 即可
- 削峰填谷保护后端数据库:消息先快速写入 MQ,后端服务按照数据库能承受的速度慢慢从 MQ 拉取并入库
- 异步解耦:用户发送消息时,后端不仅仅要转发,还要做很多其他的事情,例如消息持久化、推送通知,使用 MQ 能并行异步处理,同时保证可扩展性
对于企业级大模型场景下的 IM 链路,下游 MQ 消费者由原先消息分发模块直接消费发送给对端,变成了大模型后端服务直接消费,然后调用后端 LLM 获取推理结果后返回给 MQ,再由消息分发模块推送给用户。

假设有 1000 万个用户,对于普通 Topic 模型:
- IM 可能预先创建 N 个 Topic,每个 Topic 创建 N 个 Queue。因 Queue 资源受限,MQ 系统不允许为每个会话都绑定一个 Queue,通常可能仅配置百级别数量。
- 消息发送服务从网关接入某条消息后,通常通过取模计算该消息所属的 Queue,后续该用户的消息都会写入该 Queue,实现顺序写入。
传统 IM(如微信聊天)虽然也有海量 Session,但它和 AI Long Session 的消息模型完全不同。在 IM 时代消息模型是离散型,具有短平快、不强调严格上下顺序、不持续占用资源的特点,单条消息消费往往是毫秒级别的。而在 AI 长会话场景下,消息具备长生命周期任务的特点,单个AI Session 会持续占用消费资源,用户单个消息不是瞬间完成,而是持续产生子任务、中间状态,单个消息可能会长时间占用队列头部,如果还使用多队列共享模型,会有如下问题:
- 无法做到会话级隔离,会话互相干扰严重,易出现单个用户消息长时间占用队列头部,从而阻塞其他用户任务
- 无法保证 Session 顺序性,长会话 AI 特别依赖上下文顺序,传统 MQ 如果多个 Session 共用 Queue,单个消费实例为提高消费能力往往会使用多个线程消费,从而导致乱序
- 无法支撑百万级AI Session,一个AI系统不是几千个用户,而是百万级 Session,一个 Session 一个 Queue,传统 MQ 会出现元数据会爆炸。
新的特性 LiteTopic 可以支撑海量 Queue,完美契合 AI 长会话模型,开发者可以为每个会话绑定独立的 LiteTopic 来达到 Session 天然隔离,会话之间互不干扰的目的,新的架构如下:

上述架构相较于传统架构具备以下优点:
- 多会话隔离:AI 长会话、Agent 会话、RAG 任务都可以映射成独立 LiteTopic,实现会话级隔离,避免不同用户任务互相干扰。
- 保证上下文顺序性:LiteTopic 为了支持强顺序性,默认只有一个 Queue,消费者可采用顺序消费模式严格保序。
- 提升会话顺序性与可追踪性:每个会话对应独立逻辑消息流,便于保证上下文顺序,也方便追踪 Agent 执行过程。
- 更适合 AI 事件驱动架构:未来 AI 系统会产生大量细粒度事件,例如 Token 流、工具调用、Memory 更新、Embedding 任务等,LiteTopic 能以更低成本承载这些高并发事件流。
- 降低存储与元数据成本:传统 MQ 创建大量 Queue 会带来文件、索引、路由元数据膨胀,而 LiteTopic 通过统一存储、多路分发降低资源消耗。
2、Multi-Agent协作
新的特性不仅在长会话中展现优势,在企业级大模型 Multi – Agent 协作方面同样具有独特用途。因为 Multi-Agent 天然就是一个分布式异步事件系统,而 MQ 恰好最擅长解决这种系统里的解耦、调度、缓冲与协同问题。
当前在单个用户场景下,3 – 5 个智能体协作就到上限了。但企业级应用需要支持几万甚至更多员工,这并非仅靠几十个智能体便能满足生产需求,得构建由不同角色 Agent 组成的集群或团队一起协同处理用户的复杂任务,这样能提高并发度和可靠性。上面说的这些场景让传统的 Queue 模式面临很大挑战,一方面,长任务场景需要更强的隔离性。另外,Agent 不像传统微服务那样常驻运行,而是具备函数计算用完即走、按需分配的特性,这要求队列资源也要有按需创建,用完就丢弃的能力。同时不同优先级的消息也需要 MQ 有原生的差异化调度能力。而轻量级的 Lite Topic 以及能区分任务优先级的 Priority Message 特性,能精准支持企业级多智能体协作场景。

这个方案的核心在于请求与响应的解耦:利用 LiteTopic 做 Sub-Agent 任务分发以及结果回收,同时其独特的生命周期管理优势正好匹配临时任务通信这一痛点的关键。
任务分发阶段(请求链路)
这一阶段主要解决任务如何高效下发的问题
- 任务拆解与发送:主 agent 将用户任务拆解为多个子模块,每个模块会再拆成多个 Task,发到对应的 LiteTopic 中
- 队列准备:MQ会动态为子模块创建LiteTopic,当任务完成 LiteTopic 可自动释放
- 优先级调度:可以为每个 Sub-Agent 绑定专属优队列,实现差异化调度
- 效果:Supervisor 发送完即返回,无需阻塞等待,实现了任务的异步解耦和并行调度,同时最大化利用算力资源
Muti-Agent 协作(任务处理)
这一阶段多 Agent 协作处理完成任务请求。
- 获取消息:每个 Sub-Agent 从绑定的 LiteTopic 中读取到待处理任务
- 顺序执行:按照顺序、不同优先级处理任务,多 Agent 之间并行处理
- 异步结果回传:任务完成后,Sub-Agents 将结果消息发送到 Supervisor 指定的LiteTopic 中。
- 片段回传:每个 Task 无需任务全部完成统一投递,可阶段性投递思考过程,另外即使 Sub-Agent 重启,仍可按照上一次进度继续处理,实现类断点续传能力
结果回收阶段(响应链路)
这一阶段由主 Agent 汇总所有的任务处理结果返回给用户。
- 队列准备:Sub-Agent 在推送结果时 MQ 会自动创建对应的 LiteTopic
- 实时流式推送:Supervisor 通过订阅实时获取 LiteTopic 中的子任务结果,并立即通过 HTTP SSE 协议将数据流式推送给用户
- 生命周期优势体现:依靠 LiteTopic 独特生命周期完成实现任务结束资源自动回收
新的协作架构,利用 MQ 将 Multi – Agent 阻塞式交互转变为异步事件处理,不仅加快处理速度、提升可靠性,还具备下述优势:
- 避免不同类型 Agent 不同用户任务互相干扰:Supervisor 不再需要挂起线程等待 Sub-Agent 处理完成,Sub-Agent 独占 Queue,也不会出现消息头阻塞的问题,系统并发能力得到显著提升
- 海量并发支持:利用 LiteTopic 自动创建、自动销毁的生命周期管理能力,系统可以轻松支撑百万级的并发任务,而无需担心资源泄漏或元数据爆炸,同时节省运维人力
- 支持任务优先级:可以利用 Priority Message 在计算高峰期保证高优先级任务先执行,保证算力合理利用
- 结果有序且隔离:每个任务拥有独立的 LiteTopic,保证了同一个任务内的子结果是严格有序的,且不同任务之间互不干扰
四、未来展望:AI 原生消息引擎的持续进化
随着大模型和 AI Agent 快速发展,MQ 未来在企业级 AI 架构里的定位,不再只是传统的异步通信组件,而是逐渐变成 AI 系统中的事件总线。同时,多模态 AI 会让 MQ 数据类型大量增加,高吞吐、不可预测的流量模型会迫使 MQ 架构更趋向 Serverless 化。
1、Agent 驱动 MQ 事件化
未来 AI Agent 会越来越复杂,用户规模、长 Session 和持续协作结合起来,会产生巨大数量的 Agent。比如一个企业 AI 平台,有 20 万企业用户,每个用户有 3 个 Agent,Agent 事件频率是每秒 5 次,这样就会同时有 60 万个 Agent Session 在线,每秒会有 300 万个 Events。传统 MQ 只负责存储转发,而 Agent 能实时感知事件流、分析上下文并自主决策。因为有百万量级的事件和长任务的特点,传统重资源的 Queue 模型会慢慢被淘汰,而 LiteTopic 会变得越来越重要。
2、MQ 作为 RAG 数据总线
高质量的 RAG 一般需要混合检索,像同时开展关键词检索、向量检索和知识图谱检索。主程序能把不同的检索请求分发到 MQ 不同的主题或分区,下游消费者实例则启动多 Worker 并行处理,最后通过一个聚合服务收集所有结果并重新排序,这种模式大幅降低了端到端的响应延迟。此外对于要求极高的实时 RAG,可以利用利用 MQ 的变更数据捕获能力,将业务数据库的每一次写操作实时同步为事件,驱动 RAG 索引做近实时更新,从而让检索结果始终与数据源保持一致,真正实现毫秒级延迟,秒级同步。
3、AI 推动 MQ Serverless化
为进一步简化客户对云上服务的使用,业界提出了 Serverless 概念。在此场景下,Serverless 主要是指客户无需考虑 MQ 服务的容量问题,系统会在短时间内自动满足客户的实际用量需求,并依据使用量进行计费。
这种 Serverless 模式能够解决客户对业务容量和增长预估不准确的问题,使客户无需关注和规划后端服务的容量,从而专注于业务增长。如此一来,业务的快速增长不会因底层服务能力的限制而受阻。特别是当前 AI 任务越来越呈现突发、弹性、短生命周期、按量计费的特点。
总结
随着大模型场景实现规模化落地,消息队列租户资源争抢、隔离能力薄弱、架构耦合致使弹性能力较弱等短板愈发显著。面对多 Agent 协同、长会话交互等复杂业务需求,依托内核持续进行演进迭代,借助 LiteTopic 轻量主题、优先级消息等核心能力,弥补了能力短板。在大模型时代,RocketMQ 持续为业务提供具备高可靠性、高弹性、可定制性的消息底座,帮助 AI 应用高效且稳定地落地,也致力于构建面向大模型时代的新一代分布式事件架构。
版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。