书籍名称:Agentic Design Patterns: A Hands-On Guide to Building Intelligent Systems
本书作者:Antonio Gulli
链接地址:https://docs.google.com/document/d/1rsaK53T3Lg5KoGwvf8ukOUvbELRtH-V0LnOIFDxBryE
内容摘要:本文是对《智能体设计模式》一书第三章节的翻译,此章节主要介绍了并行化(Parallelization)模式。
翻译:煤矿工厂
摘要:
并行化是什么
许多智能体工作流程由多个子任务组成,这些子任务需要依次完成才能达到最终目标。若采用完全顺序的执行方式——即每个任务都必须等待前一个任务结束才能开始——通常会导致低效率与高延迟。当这些任务依赖外部 I/O 操作(如调用不同的 API 或查询多个数据库)时,这种延迟尤为显著。 在缺乏并发执行机制的情况下,总处理时间会等于所有任务时长的累加,从而成为系统性能与响应速度的主要瓶颈。
为什么要进行并行化处理
并行化(Parallelization)的设计模式通过允许独立任务的同时执行,为这一问题提供了标准化解决方案。其核心原理是识别出工作流中彼此独立、不依赖即时输出的部分(如不同工具调用或 LLM 推理任务),并使它们并行运行。像 LangChain 和 Google ADK 这样的智能体框架,都内置了定义与管理此类并发操作的机制。例如,一个主流程可以同时调用多个子任务,让它们并行执行,并在全部完成后再进入下一阶段。 通过让这些相互独立的任务同时而非顺序地运行,该模式能显著减少总执行时间。
经验法则
当你的工作流中存在多个可同时进行的独立操作时,应采用此模式。典型的应用案例包括:
- 从多个 API 同时获取数据;
- 并行处理不同数据块;
- 生成多份内容以便后续综合分析。

并行化模式概述
在前面的章节中,我们探讨了用于顺序工作流的提示链以及用于动态决策与路径切换的路由模式。虽然这些模式虽然很重要,但许多复杂的智能体任务往往需要包含多个可同时执行的子任务,而非必须一个接一个地处理。此时, 并行化(Parallelization) 模式便显得尤为关键。
并行化指的是并发地执行多个组件,例如 LLM 调用、工具使用,甚至整个子智能体(见图2)。在这种模式下,不必等一步完成再开始下一步,相互间独立任务可同时运行,显著缩短了任务的总体执行时间。
假设一个智能体需要研究某个主题并总结结果:
顺序方式可能如下:
- 搜索来源 A
- 总结来源 A
- 搜索来源 B
- 总结来源 B
- 综合 A、B 的摘要得出最终答案
并行方式则可以:
- 同时搜索来源 A 与来源 B
- 待两项搜索均完成后,同时总结来源 A 与来源 B
- 综合两份摘要得出最终答案
核心思想是:找出工作流中不依赖其他部分输出的环节,并行执行。当涉及外部服务(如 API 或数据库)且存在延迟时,可以一次性发出多个并发请求,这一做法非常高效。
实现并行化通常需要支持异步执行或 多线程/多进程(multi-threading/multi-processing) 的框架。现代智能体框架本身就考虑到了异步操作,让你能轻松定义可并行运行的步骤。

LangChain、LangGraph 和 Google ADK 等框架均提供了并行执行机制。
- 在LangChain 表达式语言(LCEL) 中,可用
|运算符串行组合runnable对象,同时通过构造可并发执行的分支来实现并行。 - LangGraph 凭借图结构,可定义多个节点在同一状态转移下被执行,从而在工作流中形成并行分支。
- Google ADK 则提供强大的原生机制,方便地管理智能体的并行执行,显著提升复杂多智能体系统的效率与可扩展性。借助 ADK,开发者可设计多个智能体并发运行的解决方案。
并行化模式的使用场景
并行化模式对于提升智能体系统的效率与响应速度至关重要,尤其适用于包含多次独立查询、计算或与外部服务交互的任务,是优化复杂智能体工作流性能的关键技术。
并行化是一种强大的优化模式,能够在各种应用中提升智能体的性能:
- 信息收集与研究: 同时从多个来源收集信息是经典的应用场景。
- 应用场景:智能体研究公司。
- 并行任务:同时搜索新闻文章、获取股票数据、检查社交媒体的提及、查询公司数据库。
- 优点:比起顺序查找,更快速地收集到全面的信息。
- 数据处理与分析: 同时应用不同的分析技术或并行处理不同的数据片段。
- 应用场景:智能体分析客户反馈。
- 并行任务:同时进行情感分析、关键词提取、分类反馈,并识别紧急问题,处理一批反馈数据。
- 优点:能快速提供多维度的分析结果。
- 多API或工具交互: 调用多个独立的API或工具来收集不同类型的信息或执行不同的操作。
- 应用场景:旅游规划智能体。
- 并行任务:同时检查航班价格、搜索酒店空房、查找当地活动和餐厅推荐。
- 优点:能够更快速地呈现完整的旅行计划。
- 多组件内容生成: 并行生成复杂内容的不同部分。
- 应用场景:智能体生成营销邮件。
- 并行任务:同时生成邮件主题、草拟邮件正文、寻找相关图片并为邮件中的链接按钮设计一句吸引用户点击的文本。
- 优点:更高效地组合最终的邮件内容。
- 验证与核查: 同时执行多个独立的检查或验证任务。
- 应用场景:智能体验证用户输入。
- 并行任务:同时检查邮箱格式、验证电话号码、核对地址是否符合数据库,并检查是否有不当内容。
- 优点:能够更快速地反馈全部输入有效性。
- 多模态处理: 同时处理同一输入的不同模态(文本、图像、音频)。
- 应用场景:智能体分析带有文本和图片的社交媒体帖子。
- 并行任务:同时分析文本的情感与关键词,并分析图片中的物体和场景描述。
- 优点:能够更快速地整合来自不同模态的信息。
- A/B测试或多选项生成: 并行生成多个不同的响应或输出,以便选择最佳选项。
- 应用场景:智能体生成不同的创意文本选项。
- 并行任务:使用略有不同的提示或模型,同时生成三种不同的文章标题。
- 优点:能够多样比较并选择最佳选项。
并行化是智能体设计中的一个基本优化技术,通过利用并发执行独立任务,帮助开发者构建更高效、更响应迅速的应用程序。
并行化模式的实践代码
LangChain
在 LangChain 框架中,并行执行是通过 LangChain 表达式语言来实现的。主要方法是将多个可运行的组件组织在字典或列表结构中。当这些组件作为输入传递给链中的后续组件时,LCEL 运行时会并行执行这些组件。
在 LangGraph 的上下文中,这一原则被应用于图的拓扑结构。并行工作流是通过设计图的结构,使得多个节点之间没有直接的顺序依赖,可以从一个公共节点启动。这些并行路径会独立执行,直到它们的结果在图中的后续合并点被聚合。
以下实现演示了一个使用 LangChain 框架构建的并行处理工作流。这个工作流旨在响应单个用户查询并同时执行两个独立的操作。这些并行处理过程作为不同的链或函数实例化,它们的输出最终被聚合为一个统一的结果。
该实现需要安装必要的 Python 包,如 langchain、langchain-community,以及一个模型提供者库(例如 langchain-openai)。此外,必须在本地环境中配置有效的 API 密钥以进行身份验证。
import os
import asyncio
from typing import Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthrough
# --- 配置 ---
# 确保已设置 API 密钥环境变量(例如,OPENAI_API_KEY)
try:
llm: Optional[ChatOpenAI] = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
except Exception as e:
print(f"初始化语言模型时出错: {e}")
llm = None
# --- 定义独立的链 ---
# 这三条链表示可以并行执行的不同任务。
summarize_chain: Runnable = (
ChatPromptTemplate.from_messages([
("system", "简要总结以下话题:"),
("user", "{topic}")
])
| llm
| StrOutputParser()
)
questions_chain: Runnable = (
ChatPromptTemplate.from_messages([
("system", "生成三个有趣的问题,关于以下话题:"),
("user", "{topic}")
])
| llm
| StrOutputParser()
)
terms_chain: Runnable = (
ChatPromptTemplate.from_messages([
("system", "识别以下话题中的 5-10 个关键术语,用逗号分隔:"),
("user", "{topic}")
])
| llm
| StrOutputParser()
)
# --- 构建并行 + 综合链 ---
# 1. 定义并行执行的任务块。它们的结果与原始话题一起被传递到下一步。
map_chain = RunnableParallel(
{
"summary": summarize_chain, # 话题总结
"questions": questions_chain, # 相关问题
"key_terms": terms_chain, # 关键术语
"topic": RunnablePassthrough(), # 原始话题传递
}
)
# 2. 定义最终的综合提示模板,将并行结果合并。
synthesis_prompt = ChatPromptTemplate.from_messages([
("system", """根据以下信息:
总结: {summary}
相关问题: {questions}
关键术语: {key_terms}
综合给出一个答案。"""),
("user", "原始话题: {topic}")
])
# 3. 构建完整的链,将并行结果直接传入综合提示,再由语言模型和输出解析器处理。
full_parallel_chain = map_chain | synthesis_prompt | llm | StrOutputParser()
# --- 运行链 ---
asyncdef run_parallel_example(topic: str) -> None:
"""
异步调用并行处理链,处理指定话题并打印综合结果。
参数:
topic: 要处理的输入话题。
"""
ifnot llm:
print("语言模型未初始化,无法运行示例。")
return
print(f"\n--- 正在运行并行 LangChain 示例,话题: '{topic}' ---")
try:
# `ainvoke` 的输入是单个 'topic' 字符串,
# 然后将其传递给 `map_chain` 中的每个可执行组件。
response = await full_parallel_chain.ainvoke(topic)
print("\n--- 最终响应 ---")
print(response)
except Exception as e:
print(f"\n执行链时发生错误: {e}")
if __name__ == "__main__":
test_topic = "太空探索的历史"
# 在 Python 3.7+ 中,使用 asyncio.run 是运行异步函数的标准方法。
asyncio.run(run_parallel_example(test_topic))
这段 Python 代码实现了一个 LangChain 应用,其设计目标是通过并行执行机制高效地处理给定话题。需要注意的是,asyncio 提供的是并发(concurrency),而非真正的并行(parallelism)。它通过一个事件循环,在某个任务空闲(例如等待网络请求)时智能地切换执行,从而让多个任务“看起来”同时进行。但实际上,整个程序依然运行在单个线程中,并受到 Python 全局解释器锁(GIL) 的限制。
代码首先从 langchain_openai 和 langchain_core 导入必要的模块,包括用于语言模型、提示模板、输出解析器以及可运行结构的组件。
随后,代码尝试初始化一个 ChatOpenAI 实例,指定使用 "gpt-4o-mini" 模型,并设置 temperature 参数来控制模型的创造性输出。同时,使用 try-except 块以确保语言模型初始化过程的健壮性(防止网络或配置错误导致中断)。
构建任务链
代码定义了三个相互独立的 LangChain 链,每个链都针对输入话题执行特定任务:
- 第一条链:对话题进行简要总结,包含系统消息(system message)和带有话题占位符的用户消息(user message)。
- 第二条链:生成与话题相关的三个有趣问题。
- 第三条链:识别话题中 5 到 10 个关键术语,并要求以逗号分隔输出。
每条链都由以下部分组成:
- 一个专门为该任务设计的
ChatPromptTemplate; - 预先初始化好的语言模型实例;
- 一个
StrOutputParser,用于将模型输出格式化为字符串。
构建并行执行模块
接下来,代码构建了一个 RunnableParallel 模块,将这三条独立的链组合起来,使它们能够同时执行。
该并行结构中还包含一个 RunnablePassthrough,确保原始输入话题可以被保留并传递到后续步骤中使用。
综合生成阶段
随后,定义了一个独立的 ChatPromptTemplate,用于在最终阶段综合前面得到的结果,包括摘要、问题、关键术语以及原始话题。模型在这一阶段会根据这些输入生成一个完整且结构化的综合回答。
构建完整工作流与异步执行
最终,完整的处理链被构建为 full_parallel_chain, 其执行顺序为:
并行块(
map_chain) → 综合提示模板 → 语言模型 → 输出解析器。
代码定义了一个异步函数 run_parallel_example,用于演示如何调用整个链。该函数接收一个话题作为输入,并使用 .invoke() 异步运行完整的链。
在标准的 Python 主程序块
if __name__ == "__main__":
中,演示了如何通过 asyncio.run 调用 run_parallel_example,并以 "The history of space exploration"(太空探索史)为示例输入执行该异步工作流。
从本质上看,这段代码构建了一个工作流:
针对给定话题,同时发起多个 LLM 调用(用于摘要、提问和术语提取),然后通过最终一次 LLM 调用将这些结果进行整合与生成。这清晰展示了在 LangChain 框架中实现代理式并行化工作流的核心理念。
Google ADK
下面我们将通过一个具体示例来说明这些概念在 Google ADK 框架 中的实际应用。我们将展示如何使用 ADK 的核心原语(如 ParallelAgent 和 SequentialAgent)来构建一个能够并发执行的智能体流程,以提升整体效率。
from google.adk.agents import LlmAgent, ParallelAgent, SequentialAgent
from google.adk.tools import google_search
GEMINI_MODEL="gemini-2.0-flash"
# --- 1. 定义研究型子智能体(并行运行) ---
# 研究员 1:可再生能源
researcher_agent_1 = LlmAgent(
name="RenewableEnergyResearcher",
model=GEMINI_MODEL,
instruction="""You are an AI Research Assistant specializing in energy.
Research the latest advancements in 'renewable energy sources'.
Use the Google Search tool provided.
Summarize your key findings concisely (1-2 sentences).
Output *only* the summary.
""",
description="负责研究可再生能源领域。",
tools=[google_search],
# 将结果存储到状态中,以供后续的合并智能体使用
output_key="renewable_energy_result"
)
# 研究员 2:电动汽车
researcher_agent_2 = LlmAgent(
name="EVResearcher",
model=GEMINI_MODEL,
instruction="""You are an AI Research Assistant specializing in transportation.
Research the latest developments in 'electric vehicle technology'.
Use the Google Search tool provided.
Summarize your key findings concisely (1-2 sentences).
Output *only* the summary.
""",
description="负责研究电动汽车技术。",
tools=[google_search],
# 将结果存储到状态中,以供后续的合并智能体使用
output_key="ev_technology_result"
)
# 研究员 3:碳捕获
researcher_agent_3 = LlmAgent(
name="CarbonCaptureResearcher",
model=GEMINI_MODEL,
instruction="""You are an AI Research Assistant specializing in climate solutions.
Research the current state of 'carbon capture methods'.
Use the Google Search tool provided.
Summarize your key findings concisely (1-2 sentences).
Output *only* the summary.
""",
description="负责研究碳捕获方法。",
tools=[google_search],
# 将结果存储到状态中,以供后续的合并智能体使用
output_key="carbon_capture_result"
)
# --- 2. 创建 ParallelAgent(并行执行研究员) ---
# 该智能体负责并行调度所有研究员智能体。
# 当所有研究员完成任务并将结果存入状态后,该阶段结束。
parallel_research_agent = ParallelAgent(
name="ParallelWebResearchAgent",
sub_agents=[researcher_agent_1, researcher_agent_2, researcher_agent_3],
description="并行运行多个研究智能体以收集信息。"
)
# --- 3. 定义合并智能体(在并行阶段结束后运行) ---
# 此智能体会读取由各并行研究智能体存入状态的结果,
# 并将这些结果综合为一份结构化报告,附带来源归属。
merger_agent = LlmAgent(
name="SynthesisAgent",
model=GEMINI_MODEL, # 若需要更强的生成能力,也可使用更大的模型
instruction="""You are an AI Assistant responsible for combining research findings into a structured report.
Your primary task is to synthesize the following research summaries, clearly attributing findings to their source areas. Structure your response using headings for each topic. Ensure the report is coherent and integrates the key points smoothly.
**Crucially: Your entire response MUST be grounded *exclusively* on the information provided in the 'Input Summaries' below. Do NOT add any external knowledge, facts, or details not present in these specific summaries.**
**Input Summaries:**
* **Renewable Energy:**
{renewable_energy_result}
* **Electric Vehicles:**
{ev_technology_result}
* **Carbon Capture:**
{carbon_capture_result}
**Output Format:**
## Summary of Recent Sustainable Technology Advancements
### Renewable Energy Findings
(Based on RenewableEnergyResearcher's findings)
[Synthesize and elaborate *only* on the renewable energy input summary provided above.]
### Electric Vehicle Findings
(Based on EVResearcher's findings)
[Synthesize and elaborate *only* on the EV input summary provided above.]
### Carbon Capture Findings
(Based on CarbonCaptureResearcher's findings)
[Synthesize and elaborate *only* on the carbon capture input summary provided above.]
### Overall Conclusion
[Provide a brief (1-2 sentence) concluding statement that connects *only* the findings presented above.]
Output *only* the structured report following this format. Do not include introductory or concluding phrases outside this structure, and strictly adhere to using only the provided input summary content.
""",
description="将并行研究智能体的结果整合为一份结构化、带引用的报告,严格基于已提供的输入。",
# 该阶段不需要额外工具
# 不需要 output_key,因为输出将直接作为最终结果返回
)
# --- 4. 创建 SequentialAgent(负责总体流程调度) ---
# 这是主控制智能体。它首先执行 ParallelAgent 来进行并行研究,
# 待研究完成后,再执行合并智能体以生成最终报告。
sequential_pipeline_agent = SequentialAgent(
name="ResearchAndSynthesisPipeline",
# 先运行并行研究阶段,再执行结果合并阶段
sub_agents=[parallel_research_agent, merger_agent],
description="协调并行研究与结果综合的整体流程。"
)
root_agent = sequential_pipeline_agent
这段代码定义了一个用于研究并整合可持续技术进展的多智能体系统。
三个研究子智能体(LlmAgent) 分别负责不同领域的研究:
ResearcherAgent_1聚焦可再生能源;ResearcherAgent_2研究电动汽车技术;ResearcherAgent_3研究碳捕获方法。 每个研究智能体都使用GEMINI_MODEL模型和google_search工具,要求将研究结果总结为 1–2 句话,并存储在状态中供后续使用。
并行处理智能体 (ParallelWebResearchAgent) 负责并行调度上述三个研究智能体,使其能够同时执行以节省时间。当所有研究任务完成后,ParallelAgent 执行结束。
合并智能体 (MergerAgent) 读取各研究员在状态中存储的结果,将这些研究总结整合为一份结构化报告。其指令特别强调:
- 输出必须严格基于提供的输入内容;
- 不得引入任何外部知识或额外信息;
- 报告需清晰区分每个研究领域的内容,并以结构化格式输出。
顺序处理智能体 (ResearchAndSynthesisPipeline) 作为顶层控制器,顺序执行两个阶段:
- 阶段一:并行研究(
ParallelWebResearchAgent); - 阶段二:结果合并(
MergerAgent)。
该智能体被设置为系统的 root_agent,即整个流程的入口。
通过这种设计,系统可以高效地并行收集多源信息,并在后续阶段将结果整合成一致、结构化的报告。
本章要点
以下是本节的主要结论:
- 并行化是一种设计模式,用于同时执行多个相互独立的任务,以提升整体执行效率。
- 当任务需要等待外部资源(例如 API 调用或数据库响应)时,并行化尤为有用。
- 采用并发或并行架构虽然能提升性能,但也会带来显著的复杂性与成本,影响系统的设计、调试和日志监控等关键开发阶段。
- LangChain 与 Google ADK等框架提供了对并行执行的内建支持,使开发者能够更方便地定义与管理并行任务。
- 在 LangChain Expression Language(LCEL) 中,
RunnableParallel是实现多任务并行执行的核心构件。 - 在 Google ADK 中,并行化可通过 LLM 驱动的任务委派(LLM-Driven Delegation) 实现:协调者智能体(Coordinator Agent)的语言模型会自动识别可独立处理的子任务,并触发专门子智能体的并行执行。
- 并行化能够显著减少整体延迟,使智能体系统在处理复杂任务时更加高效与灵活。
总结
并行化模式是一种通过并发执行独立子任务来优化计算型工作流的方法。 这种方法能有效降低总体延迟,尤其适用于涉及多模型推理或多外部服务调用的复杂操作场景。
不同框架在实现该模式时采用了各自的机制:
- 在 LangChain 中,可使用诸如
RunnableParallel等构件显式定义并同时运行多个处理链。 - 而在 Google Agent Developer Kit (ADK) 中,并行化通常通过**多智能体委派实现:主协调模型将不同子任务分配给具备特定能力的子智能体,并让它们同时运行。
通过将并行处理与顺序控制及条件路由相结合,开发者可以构建出更为复杂、高性能的计算系统,从容应对多样且复杂的任务需求。
引用文献
- LangChain Expression Language (LCEL) Documentation (Parallelism): https://python.langchain.com/docs/concepts/lcel/
- Google Agent Developer Kit (ADK) Documentation (Multi-Agent Systems): https://google.github.io/adk-docs/agents/multi-agents/
- Python asyncio Documentation: https://docs.python.org/3/library/asyncio.html
往期回顾
《Agentic Design Patterns:构建智能系统的实战指南》- 前言
《Agentic Design Patterns:构建智能系统的实战指南》- 第一章 提示链
《Agentic Design Patterns:构建智能系统的实战指南》- 第二章 路由
版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。