接上节继续,前面的3种基本工作流演示,节点上携带的状态数据使用的是默认的AgentState

data的Value是Object类型(通俗点说,等同于没有类型,没有强类型约束),读写状态值时,需要做各种类型转换,十分繁琐,且容易出错。实际应用时,可以给AgentState中的各项元素,加1个类型说明(即:schema),同时顺手加1个元素值的更新策略(即:channel),这些策略可以是”追加(Append)“,”始终用新值替换(LastResult)“,”自定义(Customize)“
一句话速记:SCHEMA定义数据应该长啥样,Channel定义数据如何更新.
/**
* @author junmingyang
*/
public class OrderState extends AgentState {
static Date DEFAULT_DATE = Date.from(LocalDate.of(1999, 1, 1)
.atStartOfDay(ZoneId.systemDefault()).toInstant());
static SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd");
public static final String ORDER_ID = "orderId";
public static final String ORDER_AMOUNT = "amount";
public static final String ORDER_PRICE = "price";
public static final String ORDER_TOTAL = "total";
public static final String ORDER_DATE = "orderDate";
public static final String ORDER_REMARK = "remark";
//订单状态的Schema
public static final Map<String, Channel<?>> SCHEMA = Map.of(
ORDER_ID, Channels.base(() -> ""), //订单号,类型是String,默认值为空字符串
ORDER_AMOUNT, Channels.base(() -> 0), //订单数量,类型是Integer,默认值0
ORDER_PRICE, Channels.base(OrderState::min), //订单价格,类型是BigDecimal,使用自定义策略更新,始终取最低价
ORDER_TOTAL, Channels.base(() -> BigDecimal.valueOf(0)), //订单总价,类型是BigDecimal,默认值0
ORDER_DATE, Channels.base(() -> DEFAULT_DATE), //订单日期,类型是Date,默认值是 1999-01-01
ORDER_REMARK, Channels.appender(ArrayList::new) //订单备注,类型是List<?>,使用“追加”策略更新
);
/**
* 这里模拟始终取最低价格
*
* @param a
* @param b
* @return
*/
static BigDecimal min(BigDecimal a, BigDecimal b) {
if (a == null && b == null) {
return null;
}
if (a == null) {
return b;
}
if (b == null) {
return a;
}
return a.min(b);
}
public OrderState(Map<String, Object> initData) {
super(initData);
}
public String orderId() {
return this.<String>value(ORDER_ID).orElse("");
}
public Integer amount() {
return this.<Integer>value(ORDER_AMOUNT).orElse(0);
}
public BigDecimal price() {
return this.<BigDecimal>value(ORDER_PRICE).orElse(BigDecimal.valueOf(0));
}
public BigDecimal total() {
return this.<BigDecimal>value(ORDER_TOTAL).orElse(BigDecimal.valueOf(0));
}
public Date orderDate() {
return this.<Date>value(ORDER_DATE).orElse(DEFAULT_DATE);
}
public List<String> remark() {
return this.<List<String>>value(ORDER_REMARK).orElse(new ArrayList<>());
}
@Override
public String toString() {
return "OrderState{" +
"orderId='" + orderId() + '\'' +
", amount=" + amount() +
", price=" + price() +
", total=" + total() +
", orderDate=" + SDF.format(orderDate()) +
", remark=" + remark() +
'}';
}
}一个订单,经过2个节点(分别是:改价格、改数量),然后观察最终订单状态的变化

public class ChangePriceNode implements NodeAction<OrderState> {
@Override
public Map<String, Object> apply(OrderState state) throws Exception {
System.out.println("current node: changePrice , data:" + state);
return Map.of(OrderState.ORDER_REMARK, "上涨价格10%",
OrderState.ORDER_PRICE, state.price().multiply(BigDecimal.valueOf(1.1)));
}
}public class ChangeAmountNode implements NodeAction<OrderState> {
@Override
public Map<String, Object> apply(OrderState state) throws Exception {
System.out.println("current node: changeAmount , data:" + state);
int newAmount = state.amount() + 1;
BigDecimal newTotal = state.price().multiply(BigDecimal.valueOf(newAmount));
return Map.of(OrderState.ORDER_REMARK, "数量+1",
OrderState.ORDER_AMOUNT, newAmount,
OrderState.ORDER_TOTAL, newTotal);
}
}/**
* @author junmingyang
*/
public class SampleOrderGraphApplication {
public static void main(String[] args) throws GraphStateException {
StateGraph<OrderState> graph = getSequenceGraph();
System.out.println(graph.getGraph(GraphRepresentation.Type.MERMAID, "Order Graph", true).content());
Map<String, Object> initState = Map.of(
OrderState.ORDER_ID, "123456",
OrderState.ORDER_AMOUNT, 10,
OrderState.ORDER_PRICE, BigDecimal.valueOf(100),
OrderState.ORDER_DATE, new Date()
);
graph.compile().invoke(initState).ifPresent(c -> {
System.out.println("订单最终结果:" + c);
});
}
public static StateGraph<OrderState> getSequenceGraph() throws GraphStateException {
return new StateGraph<>(OrderState.SCHEMA, OrderState::new) //使用Schema构建StateGraph
.addNode("change-price", node_async(new ChangePriceNode()))
.addNode("change-amount", node_async(new ChangeAmountNode()))
.addEdge(GraphDefinition.START, "change-price")
.addEdge("change-price", "change-amount")
.addEdge("change-amount", GraphDefinition.END);
}
}current node: changePrice , data:OrderState{orderId='123456', amount=10, price=100, total=0, orderDate=2026-02-10, remark=[]}
current node: changeAmount , data:OrderState{orderId='123456', amount=10, price=100, total=0, orderDate=2026-02-10, remark=[上涨价格10%]}
订单最终结果:OrderState{orderId='123456', amount=11, price=100, total=1100, orderDate=2026-02-10, remark=[上涨价格10%, 数量+1]}注1:改价格节点,虽然将价格上涨10%,但是因为OrderState的price Channel中设置了自定义更新策略(始终取最低价),所以仍是100
注2:remark字段,设置了Append追加策略后,每次赋值后,remark List中会将新备注自动追加
假设要设计一个【两个整数的四则运算通用流程】。每个节点的计算输入在图运行时动态指定,然后依次计算,如下图:

/**
* 两步整数运算的图状态。
* <p>
* 支持两个运算槽位:op1(第一步)与 op2(第二步)。每步有 num1、num2、operator,
* 以及 result_1、result_2。第二步的 num1 可由第一步的结果通过 Action 的 forwardResultToKey 写入,
* 实现“第二步的 num1 = 第一步结果”的串联。
* </p>
*
* @author junmingyang
*/
public class TwoIntCalculateState extends AgentState {
/** 第一步运算的槽位前缀,对应 op1_num1、op1_num2、op1_operator */
public static final String OP1_PREFIX = "op1";
/** 第二步运算的槽位前缀,对应 op2_num1、op2_num2、op2_operator */
public static final String OP2_PREFIX = "op2";
/** 状态 Schema:两步的输入与两个结果 channel */
public static final Map<String, Channel<?>> SCHEMA = Map.of(
OP1_PREFIX + "_num1", Channels.base(() -> 0),
OP1_PREFIX + "_num2", Channels.base(() -> 0),
OP1_PREFIX + "_operator", Channels.base(() -> "+"),
OP2_PREFIX + "_num1", Channels.base(() -> 0),
OP2_PREFIX + "_num2", Channels.base(() -> 0),
OP2_PREFIX + "_operator", Channels.base(() -> "*"),
"result_1", Channels.base(() -> 0),
"result_2", Channels.base(() -> 0)
);
/**
* 使用初始数据构造状态(通常由图的 invoke 传入)。
*
* @param initData 初始键值对,如 op1_num1、op1_num2、op1_operator、op2_num2、op2_operator 等
*/
public TwoIntCalculateState(Map<String, Object> initData) {
super(initData);
}
/**
* 按槽位前缀取第一个操作数。
*
* @param prefix 槽位前缀,如 {@link #OP1_PREFIX} 或 {@link #OP2_PREFIX}
* @return 对应 prefix_num1 的值,缺省为 0
*/
public Integer num1(String prefix) {
return this.<Integer>value(prefix + "_num1").orElse(0);
}
/**
* 按槽位前缀取第二个操作数。
*
* @param prefix 槽位前缀
* @return 对应 prefix_num2 的值,缺省为 0
*/
public Integer num2(String prefix) {
return this.<Integer>value(prefix + "_num2").orElse(0);
}
/**
* 按槽位前缀取运算符。
*
* @param prefix 槽位前缀
* @return 对应 prefix_operator 的值,缺省为 "+"
*/
public String operator(String prefix) {
return this.<String>value(prefix + "_operator").orElse("+");
}
/**
* 第一步运算的结果。
*
* @return result_1 的值,缺省为 0
*/
public Integer result1() {
return this.<Integer>value("result_1").orElse(0);
}
/**
* 第二步运算的结果。
*
* @return result_2 的值,缺省为 0
*/
public Integer result2() {
return this.<Integer>value("result_2").orElse(0);
}
/**
* 最后一步的结果,便于统一取最终值(当前为 result_2)。
*
* @return 第二步结果
*/
public Integer result() {
return result2();
}
}/**
* 两数四则运算节点动作。
* <p>
* 根据状态中指定槽位(由 inputPrefix 决定)的 num1、num2、operator 执行加减乘除,
* 将结果写入 resultKey;可选地将同一结果再写入 forwardResultToKey,用于串联下一节点(如第二步的 num1 取第一步结果)。
* </p>
*
* @author junmingyang
*/
public class TwoIntCalculateAction implements NodeAction<TwoIntCalculateState> {
private final String inputPrefix;
private final String resultKey;
/**
* 若非空,将本节点计算结果同时写入该 key,供下一节点使用(如 op2_num1 = result_1)
*/
private final String forwardResultToKey;
/**
* 仅写入结果,不转发到下一节点。
*
* @param inputPrefix 读取的槽位前缀,如 "op1" 表示使用 op1_num1, op1_num2, op1_operator
* @param resultKey 写入结果的 channel key,如 "result_1"
*/
public TwoIntCalculateAction(String inputPrefix, String resultKey) {
this(inputPrefix, resultKey, null);
}
/**
* 写入结果,并可选择将结果转发到指定 key,供下一节点读取(如将 result_1 写入 op2_num1)。
*
* @param inputPrefix 读取的槽位前缀
* @param resultKey 写入结果的 channel key
* @param forwardResultToKey 若非空,将计算结果同时写入该 key,用于串联下一节点的 num1
*/
public TwoIntCalculateAction(String inputPrefix, String resultKey, String forwardResultToKey) {
this.inputPrefix = inputPrefix;
this.resultKey = resultKey;
this.forwardResultToKey = forwardResultToKey;
}
/**
* 从状态中按槽位读取 num1、num2、operator,执行四则运算并写回结果;若配置了 forwardResultToKey 则同时写回该 key。
*
* @param state 当前图状态
* @return 包含 resultKey(及可选的 forwardResultToKey)的更新 Map
*/
@Override
public Map<String, Object> apply(TwoIntCalculateState state) throws Exception {
String operator = state.operator(inputPrefix);
int num1 = state.num1(inputPrefix);
int num2 = state.num2(inputPrefix);
List<String> allowOperators = List.of("+", "-", "*", "/");
if (allowOperators.contains(operator)) {
int result = switch (operator) {
case "+" -> num1 + num2;
case "-" -> num1 - num2;
case "*" -> num1 * num2;
case "/" -> num2 == 0 ? 0 : num1 / num2;
default -> throw new IllegalArgumentException("Invalid operator: " + operator);
};
if (forwardResultToKey != null && !forwardResultToKey.isEmpty()) {
return Map.of(resultKey, result,
forwardResultToKey, result);
}
return Map.of(resultKey, result);
}
return Map.of(resultKey, 0);
}
}/**
* 演示:SCHEMA与Channel的使用
* 两步整数运算图应用:固定流程为「先算第一步(op1),再算第二步(op2)」,
* 第二步的 num1 由第一步的计算结果自动填入,实现步骤间串联。
* <p>
* 所有参与计算的值在 {@link #main(String[])} 中通过初始状态一次性传入;
* 第一步需提供 op1_num1、op1_num2、op1_operator,第二步只需 op2_num2、op2_operator(op2_num1 由第一步结果写入)。
* </p>
*
* @author junmingyang
*/
public class TwoIntCalculateGraphApplication {
/**
* 入口:构建图、传入初始状态并执行,打印 Mermaid 图与两步运算结果。
*
* @param args 未使用
* @throws GraphStateException 图构建或状态异常
*/
public static void main(String[] args) throws GraphStateException {
StateGraph<TwoIntCalculateState> graph = getSequenceGraph();
System.out.println(graph.getGraph(GraphRepresentation.Type.MERMAID, "Two Int Calculate Graph", true).content());
Map<String, Object> initState = Map.of(
TwoIntCalculateState.OP1_PREFIX + "_num1", 3, TwoIntCalculateState.OP1_PREFIX + "_num2", 6, TwoIntCalculateState.OP1_PREFIX + "_operator", "+",
TwoIntCalculateState.OP2_PREFIX + "_num2", 2, TwoIntCalculateState.OP2_PREFIX + "_operator", "*"
);
graph.compile().invoke(initState).ifPresent(c -> {
int n1 = c.num1(TwoIntCalculateState.OP1_PREFIX);
int n2 = c.num2(TwoIntCalculateState.OP1_PREFIX);
String op1 = c.operator(TwoIntCalculateState.OP1_PREFIX);
int r1 = c.result1();
int r2 = c.result2();
int m1 = c.num1(TwoIntCalculateState.OP2_PREFIX);
int m2 = c.num2(TwoIntCalculateState.OP2_PREFIX);
String op2 = c.operator(TwoIntCalculateState.OP2_PREFIX);
System.out.println("步骤1 : " + n1 + op1 + n2 + " = " + r1);
System.out.println("步骤2 : " + m1 + op2 + m2 + " = " + r2 + " (num1 来自步骤1结果)");
System.out.println("最终 result = " + r2);
});
}
/**
* 构建两步运算的序列图:node-1 执行 op1 并结果写入 result_1 与 op2_num1,node-2 执行 op2 并结果写入 result_2。
*
* @return 已配置 START -> node-1 -> node-2 -> END 的 StateGraph
* @throws GraphStateException 图构建异常
*/
public static StateGraph<TwoIntCalculateState> getSequenceGraph() throws GraphStateException {
return new StateGraph<>(TwoIntCalculateState.SCHEMA, TwoIntCalculateState::new)
.addNode("node-1", node_async(new TwoIntCalculateAction(TwoIntCalculateState.OP1_PREFIX, "result_1", TwoIntCalculateState.OP2_PREFIX + "_num1")))
.addNode("node-2", node_async(new TwoIntCalculateAction(TwoIntCalculateState.OP2_PREFIX, "result_2")))
.addEdge(GraphDefinition.START, "node-1")
.addEdge("node-1", "node-2")
.addEdge("node-2", GraphDefinition.END);
}
}注意54行,构建图时需指定Schema,运行时强类型的约束检查才会生效。
步骤1 : 3+6 = 9
步骤2 : 9*2 = 18 (num1 来自步骤1结果)
最终 result = 18文中源码: