首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >LangGraph4j 学习系列(6)-并行工作流

LangGraph4j 学习系列(6)-并行工作流

作者头像
菩提树下的杨过
发布2026-03-02 09:33:53
发布2026-03-02 09:33:53
1250
举报

上节继续,本篇将学习如何实现并行工作流。

image
image

上面这张图,用代码很容易绘制,参考以下代码。

核心代码

代码语言:javascript
复制
public static StateGraph<AgentState> getParallelGraph() throws GraphStateException {
    return new StateGraph<>(AgentState::new)
            .addNode("node-1", node_async(new Node1Action()))
            .addNode("node-2", node_async(new Node2Action()))
            .addNode("node-3", node_async(new Node3Action()))
            .addEdge(START, "node-1")
            .addEdge("node-1", "node-2")
            .addEdge("node-1", "node-3")
            .addEdge("node-2", GraphDefinition.END)
            .addEdge("node-3", GraphDefinition.END);
}

性能问题

虽然图上看着貌似node-2,node-3并行在跑,但真的如此吗?我们把node-2和node-3的apply()里加点sleep

代码语言:javascript
复制
public class Node2Action implements NodeAction<AgentState> {
    @Override
    public Map<String, Object> apply(AgentState state) throws Exception {
        System.out.println("current Node: node-2");
        Thread.sleep(1000);
        return Map.of("myData", "node2-my-value",
                "node2Key", "node2-value");
    }
}
代码语言:javascript
复制
public class Node3Action implements NodeAction<AgentState> {
    @Override
    public Map<String, Object> apply(AgentState state) throws Exception {
        System.out.println("current Node: node-3");
        Thread.sleep(1000);
        return Map.of("myData", "node3-my-value",
                "node3Key", "node3-value");
    }
}

然后在node-1里,记录下start时间戳

代码语言:javascript
复制
public class Node1Action implements NodeAction<AgentState> {
    
    @Override
    public Map<String, Object> apply(AgentState state) throws Exception {
        System.out.println("current Node: node-1");
        Thread.sleep(1000);
        return Map.of(
                "myData", "node1-my-value",
                "node1Key", "node1-value",
                //记录开始时间
                "start", System.currentTimeMillis());
    }
    
}
代码语言:javascript
复制
getParallelGraph().compile()
        .invoke(Map.of("test", "test-init-value"))
        .ifPresent(c -> {
            long start = (long) c.data().getOrDefault("start", 0L);
            System.out.println(c.data());
            long end = System.currentTimeMillis();
            System.out.println((end - start) + "ms");
        });

运行结果

代码语言:javascript
复制
current Node: node-1
current Node: node-2
current Node: node-3
{node1Key=node1-value, start=1770719927373, test=test-init-value, node2Key=node2-value, node3Key=node3-value, myData=node3-my-value}
2017ms

从第5行来看,耗时2秒+,显然是顺序执行的。

多线程提速

LangGraph4可以手动指定线程池实现真正的并发处理。

代码语言:javascript
复制
StateGraph<AgentState> graphNoThreadPool = getParallelGraph();

ExecutorService executorService = Executors.newFixedThreadPool(2);
RunnableConfig rc = RunnableConfig.builder()
        //从node-1开始并行执行node-2和node-3(使用线程池)
        .addParallelNodeExecutor("node-1", executorService)
        .build();
graphNoThreadPool.compile()
        .invoke(Map.of("test", "test-init-value"), rc) //调用时,使用特定的RunnableConfig
        .ifPresent(c -> {
            long start = (long) c.data().getOrDefault("start", 0L);
            System.out.println(c.data());
            long end = System.currentTimeMillis();
            System.out.println((end - start) + "ms");
            //记得关闭线程池
            executorService.shutdown();
        });

运行结果

代码语言:javascript
复制
current Node: node-1
current Node: node-2
current Node: node-3
{node1Key=node1-value, start=1770722528938, test=test-init-value, node2Key=node2-value, node3Key=node3-value, myData=node3-my-value}
1015ms

明显快多了。如果是jdk 25版本,也可以使用虚拟线程:

代码语言:javascript
复制
ExecutorService virtualThreadPerTaskExecutor = Executors.newVirtualThreadPerTaskExecutor();
RunnableConfig rc2 = RunnableConfig.builder()
        //从node-1开始并行执行node-2和node-3(使用线程池)
        .addParallelNodeExecutor("node-1", virtualThreadPerTaskExecutor)
        .build();
graphNoThreadPool2.compile()
        .invoke(Map.of("test", "test-init-value"), rc2)
        .ifPresent(c -> {
            long start = (long) c.data().getOrDefault("start", 0L);
            System.out.println(c.data());
            long end = System.currentTimeMillis();
            System.out.println((end - start) + "ms");
        });

文中源码:langgraph4j-study/src/main/java/org/bsc/langgraph4j/agent/_07_parallel at main · yjmyzz/langgraph4j-study · GitHub

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2026-03-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 核心代码
  • 性能问题
  • 运行结果
  • 多线程提速
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档