
本示例在极简Agent基础上,新增三大核心拓展:真实联网搜索(替换模拟搜索)、人工介入断点(持久化+人机协同)、工作流可视化(可直观看到节点跳转),完全贴合LangGraph四大核心功能,可直接运行。
pip install langgraph langchain-openai python-dotenv langchain_community python-multipart说明:langchain_community 提供真实搜索工具,python-multipart 用于可视化渲染,dotenv 管理环境变量。
from typing import TypedDict, Optional
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver # 持久化(断点核心)
from langchain_openai import ChatOpenAI
from langchain_community.tools import SerpAPIWrapper # 真实联网搜索工具
import os
from dotenv import load_dotenv
# ====================== 1. 环境变量配置(必做)======================
# 加载.env文件(里面存放你的API Key,避免硬编码)
load_dotenv()
# 配置两个关键API Key(需自行申请)
# 1. OpenAI API Key(用于LLM调用)
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") # 从.env读取
# 2. SerpAPI Key(用于真实联网搜索,申请地址:https://serpapi.com/)
os.environ["SERPAPI_API_KEY"] = os.getenv("SERPAPI_API_KEY")
# ====================== 2. 状态定义(增强版:适配断点+搜索+人工介入)======================
class AgentState(TypedDict):
"""Agent 共享状态:所有节点可读写,支持持久化"""
question: str # 用户原始问题
search_result: Optional[str] # 真实联网搜索结果(可选,初始为None)
answer: Optional[str] # 最终生成的答案(可选)
retry_count: int # 重试次数(控制循环,避免死循环)
human_approval: bool # 人工审批标记(断点核心:是否需要人工介入)
need_retry: bool # 重试标记(判断是否需要重新搜索)
# ====================== 3. 核心工具初始化======================
# 1. LLM模型(用于判断、生成答案)
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
# 2. 真实联网搜索工具(替换上一个示例的模拟搜索)
search_tool = SerpAPIWrapper() # 调用SerpAPI实现谷歌搜索(国内可替换为百度API)
# 3. 持久化器(断点核心:保存工作流中间状态,支持暂停/恢复)
checkpointer = MemorySaver()
# ====================== 4. 定义节点函数(新增人工介入节点,完善逻辑)======================
def search_node(state: AgentState) -> AgentState:
"""搜索节点:调用真实联网工具,获取搜索结果"""
question = state["question"]
print(f"\n🔍 正在执行真实联网搜索:{question}")
# 调用SerpAPI进行真实搜索(返回字符串格式结果)
real_search_result = search_tool.run(question)
# 更新状态:保存搜索结果、重试次数+1、重置人工审批标记
return {
"search_result": real_search_result,
"retry_count": state["retry_count"] + 1,
"human_approval": False, # 初始无需人工审批
"need_retry": False # 初始默认不需要重试
}
def judge_node(state: AgentState) -> str:
"""判断节点:精细控制流程走向,决定「重试/人工介入/生成答案」"""
search_result = state["search_result"]
retry_count = state["retry_count"]
question = state["question"]
print(f"\n✅ 正在判断搜索结果是否满足需求(重试次数:{retry_count})")
print(f"📄 搜索结果预览:{search_result[:50]}...")
# 调用LLM判断:搜索结果是否足够回答问题、是否需要重试、是否需要人工介入
judge_prompt = f"""
你是Agent的判断器,根据以下信息做3个判断:
1. 用户问题:{question}
2. 搜索结果:{search_result}
3. 已重试次数:{retry_count}(最大重试2次)
判断规则:
- 若搜索结果能完整回答问题,且重试次数≤2 → 返回「answer」(进入生成答案节点)
- 若搜索结果不完整/不相关,但重试次数<2 → 返回「retry」(重新搜索)
- 若重试次数≥2,或搜索结果存在争议/不确定 → 返回「human_check」(人工介入)
"""
judge_result = llm.invoke(judge_prompt).content.strip()
# 输出判断结果,方便调试
print(f"🔀 判断结果:{judge_result}")
return judge_result
def human_check_node(state: AgentState) -> AgentState:
"""人工介入节点(断点核心):工作流暂停,等待人工审批/修改"""
print("\n🛑 触发人工介入(断点),请进行以下操作:")
print(f"用户问题:{state['question']}")
print(f"当前搜索结果:{state['search_result']}")
print(f"已重试次数:{state['retry_count']}")
# 人工输入审批意见(模拟人机协同,真实场景可对接前端界面)
human_input = input("请输入审批意见(1=继续生成答案,2=重新搜索,3=终止流程):")
# 根据人工输入更新状态
if human_input == "1":
return {"human_approval": True, "need_retry": False}
elif human_input == "2":
return {"human_approval": True, "need_retry": True, "retry_count": state["retry_count"]}
else:
return {"human_approval": False, "need_retry": False}
def answer_node(state: AgentState) -> AgentState:
"""答案生成节点:根据搜索结果,生成最终可直接使用的答案"""
question = state["question"]
search_result = state["search_result"]
print("\n📝 正在生成最终答案...")
answer_prompt = f"""
请结合以下搜索结果,简洁、准确地回答用户问题,语言通俗易懂,无需多余内容。
用户问题:{question}
搜索结果:{search_result}
"""
final_answer = llm.invoke(answer_prompt).content.strip()
return {"answer": final_answer}
# ====================== 5. 构建工作流图(含循环、条件边、断点)======================
# 初始化工作流,指定状态类型和持久化器(开启断点功能)
workflow = StateGraph(AgentState)
# 添加所有节点(搜索、判断、人工介入、生成答案)
workflow.add_node("search", search_node) # 搜索节点
workflow.add_node("judge", judge_node) # 判断节点
workflow.add_node("human_check", human_check_node) # 人工介入节点(断点)
workflow.add_node("answer", answer_node) # 答案生成节点
# 设置工作流入口:从搜索节点开始
workflow.set_entry_point("search")
# 构建流程边(核心逻辑)
# 1. 搜索 → 判断(搜索完成后,进入判断节点)
workflow.add_edge("search", "judge")
# 2. 判断节点 → 条件边(根据判断结果跳转)
workflow.add_conditional_edges(
"judge", # 条件边的起点节点
judge_node, # 条件判断函数(返回跳转目标)
{
"retry": "search", # 判断为「重试」→ 回到搜索节点(循环)
"human_check": "human_check", # 判断为「人工介入」→ 进入断点节点
"answer": "answer" # 判断为「可生成答案」→ 进入答案节点
}
)
# 3. 人工介入节点 → 条件边(根据人工审批结果跳转)
workflow.add_conditional_edges(
"human_check",
lambda state: "retry" if state["need_retry"] else ("answer" if state["human_approval"] else END),
{
"retry": "search", # 人工要求重新搜索 → 回到搜索节点
"answer": "answer", # 人工同意生成答案 → 进入答案节点
END: END # 人工要求终止 → 结束工作流
}
)
# 4. 答案生成 → 结束工作流
workflow.add_edge("answer", END)
# 编译工作流(开启持久化,支持断点续跑)
app = workflow.compile(checkpointer=checkpointer)
# ====================== 6. 工作流可视化(生成图片,直观查看流程)======================
def visualize_workflow():
"""生成工作流可视化图片(保存为agent_workflow.png)"""
try:
from langgraph.graph import visualize
# 生成可视化图,保存到当前目录
visualize(app, filename="agent_workflow.png", format="png")
print("\n✅ 工作流可视化图已生成:agent_workflow.png(可直接打开查看)")
except Exception as e:
print(f"\n⚠️ 可视化生成失败(可忽略,不影响运行):{str(e)}")
# ====================== 7. 运行Agent(支持断点续跑、人工介入)======================
if __name__ == "__main__":
# 1. 生成工作流可视化图
visualize_workflow()
# 2. 输入用户问题(可修改为任意问题)
user_question = input("\n请输入你想查询的问题:")
# 3. 初始化输入状态
initial_inputs = {
"question": user_question,
"search_result": None,
"answer": None,
"retry_count": 0,
"human_approval": False,
"need_retry": False
}
# 4. 执行工作流(支持断点续跑:若中途终止,重新运行可恢复上次状态)
# 注:checkpointer默认用内存存储,重启程序后断点会丢失;生产环境可替换为Redis/SQL
result = app.invoke(initial_inputs)
# 5. 输出最终结果
print("\n==================== 最终结果 ====================")
if result["answer"]:
print(f"用户问题:{result['question']}")
print(f"最终答案:{result['answer']}")
else:
print("工作流已被人工终止,未生成最终答案。")在代码同级目录下,创建 .env 文件,内容如下:
OPENAI_API_KEY=你的OpenAI API Key
SERPAPI_API_KEY=你的SerpAPI Key核心功能 | 实现方式 |
|---|---|
循环工作流 | judge节点判断为「retry」时,跳转回search节点,用retry_count限制最大重试2次 |
状态管理 | AgentState统一存储所有数据,所有节点读写状态,自动传递上下文 |
持久化与断点 | MemorySaver作为持久化器,human_check节点实现人工介入,支持暂停/恢复/终止 |
精细控制 | 条件边控制流程跳转,自定义所有节点逻辑,可视化图可直观查看执行路径 |
✅ 工作流可视化图已生成:agent_workflow.png(可直接打开查看)
请输入你想查询的问题:2026年马年春节是哪一天?
🔍 正在执行真实联网搜索:2026年马年春节是哪一天?
✅ 正在判断搜索结果是否满足需求(重试次数:1)
📄 搜索结果预览:2026年马年春节是公历2026年1月27日,农历丙午年正月初一...
🔀 判断结果:answer
📝 正在生成最终答案...
==================== 最终结果 ====================
用户问题:2026年马年春节是哪一天?
最终答案:2026年马年春节是公历2026年1月27日,对应农历丙午年正月初一。注:若运行中出现「联网失败」,大概率是SerpAPI Key失效或网络问题,可替换为其他搜索工具(如百度API)。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。