首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >LangGraph4j 学习系列(9)-人机协同(human_in_the_loop)

LangGraph4j 学习系列(9)-人机协同(human_in_the_loop)

作者头像
菩提树下的杨过
发布2026-03-02 09:25:46
发布2026-03-02 09:25:46
980
举报

上节继续,在某些循环迭代流程中,希望人工干预来影响流程走向,也就是所谓的human_in_the_loop

image
image

代码示例

代码语言:javascript
复制
public class HumanInLoopGraphApplication {

    private static final String LOOP_COUNT_KEY = "loopCount";

    /**
     * 共享 Scanner,不关闭以免关闭 System.in导致后续无法读取
     */
    private static final Scanner CONSOLE_SCANNER = new Scanner(System.in);

    public static void main(String[] args) throws GraphStateException {
        StateGraph<AgentState> graph = getLoopGraph();
        System.out.println(graph.getGraph(GraphRepresentation.Type.MERMAID, "human-in-loop Graph", true).content());
        AsyncGenerator<NodeOutput<AgentState>> stream = graph.compile().stream(Map.of());
        for (NodeOutput<AgentState> output : stream) {
            System.out.println(output.node() + "->" + output.state().value(LOOP_COUNT_KEY).orElse(0));
        }
    }

    public static StateGraph<AgentState> getLoopGraph() throws GraphStateException {
        return new StateGraph<>(AgentState::new)
                .addNode("node-1", node_async(state -> Map.of(LOOP_COUNT_KEY, (int) state.value(LOOP_COUNT_KEY).orElse(0) + 1)))
                .addNode("node-2", node_async(state -> Map.of()))
                .addEdge(GraphDefinition.START, "node-1")
                .addEdge("node-2", "node-1")
                .addConditionalEdges("node-1", state -> CompletableFuture.supplyAsync(HumanInLoopGraphApplication::waitForHumanDecision),
                        Map.of(
                                "exit", GraphDefinition.END,
                                "next", "node-2",
                                "back", "node-1"));
    }

    /**
     * 控制台等待用户输入:C(Continue) 进入 node-2,Q(Quit) 结束到 END。
     * 使用共享 CONSOLE_SCANNER,不能关闭否则会关闭 System.in 导致下次 No line found。
     */
    private static String waitForHumanDecision() {
        while (true) {
            System.out.print("请输入 N(next) 继续到 node-2,B(back) 退回到 node-1,或 Q(Quit) 结束 [N/B/Q]: ");
            if (!CONSOLE_SCANNER.hasNextLine()) {
                return "exit";
            }
            String input = CONSOLE_SCANNER.nextLine();
            if (input == null) {
                continue;
            }
            String trimmed = input.trim().toUpperCase();
            if ("N".equals(trimmed)) {
                return "next";
            }
            if ("B".equals(trimmed)) {
                return "back";
            }
            if ("Q".equals(trimmed)) {
                return "exit";
            }
            System.out.println("无效输入,请只输入 N 或 B 或 Q");
        }
    }
}

运行效果

代码语言:javascript
复制
请输入 N(next) 继续到 node-2,B(back) 退回到 node-1,或 Q(Quit) 结束 [N/B/Q]: N
__START__->0
node-1->1
请输入 N(next) 继续到 node-2,B(back) 退回到 node-1,或 Q(Quit) 结束 [N/B/Q]: N
node-2->1
node-1->2
请输入 N(next) 继续到 node-2,B(back) 退回到 node-1,或 Q(Quit) 结束 [N/B/Q]: B
node-2->2
请输入 N(next) 继续到 node-2,B(back) 退回到 node-1,或 Q(Quit) 结束 [N/B/Q]: B
node-1->3
请输入 N(next) 继续到 node-2,B(back) 退回到 node-1,或 Q(Quit) 结束 [N/B/Q]: Q
node-1->4
node-1->5
__END__->5

复杂示例

上面的示例比较简单,可以引入CheckPoint,在进入node-2、node-reset 这两个节点前interrupt,同时也触发checkpoint保存。

  • 按N后,从保存的checkpoint中恢复,继续执行
  • 按R后,将loopCount恢复初始值,同时清空checkpoint历史记录
image
image
代码语言:javascript
复制
/**
 * 人机协同(Human-in-the-Loop)示例:在图执行到指定节点前中断,等待控制台输入后再恢复。
 * <p>
 * 图结构:START → node-1 → [条件边] → node-2 / node-reset / node-1 / END
 * <ul>
 *   <li>node-1:递增 loopCount,然后根据用户输入决定下一跳</li>
 *   <li>node-2、node-reset:空节点,执行后回到 node-1</li>
 *   <li>条件边由 {@link #waitForHumanDecision()} 驱动:N(next)→node-2,B(back)→node-1,R(Reset)→node-reset,Q(Quit)→END</li>
 * </ul>
 * 通过 {@code interruptBefore("node-2")} 与 {@code interruptBefore("node-reset")} 在进入下一节点前暂停,
 * 使用 checkpoint 保存状态,再通过 {@link GraphInput#resume()} 恢复执行,实现“按 N 一直循环”等人机交互。
 *
 * @see org.bsc.langgraph4j.CompiledGraph#stream
 * @see org.bsc.langgraph4j.CompileConfig.Builder#interruptBefore
 */
public class HumanInLoopGraph2Application {

    /**
     * 状态中的循环计数字段名
     */
    private static final String LOOP_COUNT_KEY = "loopCount";
    /**
     * 控制台输入扫描器,供 waitForHumanDecision 使用
     */
    private static final Scanner CONSOLE_SCANNER = new Scanner(System.in);

    public static void main(String[] args) throws Exception {
        // ---------- 1. 构建图并打印 Mermaid ----------
        StateGraph<AgentState> graph = getLoopGraph();
        System.out.println(graph.getGraph(GraphRepresentation.Type.MERMAID, "human-in-loop Graph", true).content());

        // ---------- 2. 配置:在 node-2、node-reset 前中断,并使用 MemorySaver 做 checkpoint ----------
        BaseCheckpointSaver saver = new MemorySaver();
        CompileConfig compileConfig = CompileConfig.builder()
                .interruptBefore("node-2")
                .interruptBefore("node-reset")
                .checkpointSaver(saver)
                .interruptBeforeEdge(false)
                .build();

        RunnableConfig runnableConfig = RunnableConfig.builder()
                .threadId("thread-1")
                .build();

        // ---------- 3. 首次执行:跑到第一次中断(interruptBefore)后 stream 结束 ----------
        CompiledGraph<AgentState> workflow = graph.compile(compileConfig);
        AsyncGenerator.Cancellable<NodeOutput<AgentState>> stream = workflow.stream(Map.of(), runnableConfig);
        for (NodeOutput<AgentState> output : stream) {
            System.out.println(output.node() + "->" + output.state().value(LOOP_COUNT_KEY).orElse(0));
        }

        // ---------- 4. 循环:取 checkpoint → 按需更新状态(含 reset 时清零 loopCount)→ resume 直至用户选 Q 或无快照 ----------
        boolean isQuit = false;
        while (true) {
            StateSnapshot<AgentState> snapshot = workflow.getState(runnableConfig);
            if (snapshot == null || isQuit) {
                break;
            }
            System.out.println("snapshot=>" + snapshot.state().data());

            AsyncGenerator.Cancellable<NodeOutput<AgentState>> streamResume;
            if (snapshot.next().contains("reset")) {
                System.out.println("reset");
                streamResume = workflow.stream(GraphInput.resume(), workflow.updateState(runnableConfig, Map.of(LOOP_COUNT_KEY, 0)));
                //reset后,清空历史checkpoint
                saver.release(runnableConfig);
            } else {
                streamResume = workflow.stream(GraphInput.resume(), workflow.updateState(runnableConfig, snapshot.state().data()));
            }
            for (NodeOutput<AgentState> output : streamResume) {
                System.out.println(output.node() + "->" + output.state().value(LOOP_COUNT_KEY).orElse(0));
                if (output.node().contains("END")) {
                    isQuit = true;
                }
            }
            Thread.sleep(20);
        }
        System.out.println("done");
    }

    /**
     * 构建人机协同的循环图:node-1 自增 loopCount,根据控制台输入路由到 node-2 / node-reset / node-1 / END。
     *
     * @return 已配置节点与条件边的 StateGraph
     * @throws GraphStateException 图状态异常
     */
    public static StateGraph<AgentState> getLoopGraph() throws GraphStateException {
        return new StateGraph<>(AgentState::new)
                .addNode("node-1", node_async(state -> Map.of(LOOP_COUNT_KEY, (int) state.value(LOOP_COUNT_KEY).orElse(0) + 1)))
                .addNode("node-2", node_async(state -> Map.of()))
                .addNode("node-reset", node_async(state -> Map.of()))
                .addEdge(GraphDefinition.START, "node-1")
                .addEdge("node-2", "node-1")
                .addEdge("node-reset", "node-1")
                .addConditionalEdges("node-1", state -> CompletableFuture.supplyAsync(HumanInLoopGraph2Application::waitForHumanDecision),
                        Map.of(
                                "exit", GraphDefinition.END,
                                "reset", "node-reset",
                                "next", "node-2",
                                "back", "node-1"));
    }

    /**
     * 在控制台阻塞等待用户输入,映射为图的条件边取值。
     * <ul>
     *   <li>N → "next"(到 node-2)</li>
     *   <li>B → "back"(回到 node-1)</li>
     *   <li>R → "reset"(到 node-reset,外部可配合将 loopCount 置 0)</li>
     *   <li>Q → "exit"(到 END)</li>
     * </ul>
     *
     * @return 条件边键:"next" | "back" | "reset" | "exit"
     */
    private static String waitForHumanDecision() {
        while (true) {
            System.out.print("请输入 N(next) 继续到 node-2,B(back) 退回到 node-1,R(Reset) 重置,或 Q(Quit) 结束 [N/B/R/Q]: ");
            if (!CONSOLE_SCANNER.hasNextLine()) {
                return "exit";
            }
            String input = CONSOLE_SCANNER.nextLine();
            if (input == null) {
                continue;
            }
            String trimmed = input.trim().toUpperCase();
            if ("N".equals(trimmed)) {
                return "next";
            }
            if ("R".equals(trimmed)) {
                return "reset";
            }
            if ("B".equals(trimmed)) {
                return "back";
            }
            if ("Q".equals(trimmed)) {
                return "exit";
            }
            System.out.println("无效输入,请只输入[N/B/R/Q]");
        }
    }
}

文中源码:langgraph4j-study/src/main/java/org/bsc/langgraph4j/agent/_09_human_in_loop at main · yjmyzz/langgraph4j-study · GitHub

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2026-03-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 代码示例
  • 运行效果
  • 复杂示例
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档