我有两个KTable对象:
KTable<Long, byte[]> firstTable = builder.table("firstTopic", Consumed.with(Serdes.Long(), Serdes.ByteArray()));
KTable<Long, byte[]> secondTable = builder.table("secondTopic",
Consumed.with(Serdes.Long(), Serdes.ByteArray()));在此之后,我想加入这两个表:
firstTable.leftJoin(secondTable,
(leftValue, rightValue) -> {
try {
return utils.serializeNetwork(utils.deserializeNetwork(leftValue));
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
)所以我有两个表,我把它们连接到一个表中,我希望得到的表按每个键存储在kafka状态存储中,但是我不知道如何做到这一点。
发布于 2018-12-04 19:51:32
您可以通过在Materialized上指定leftJoin参数并指定状态存储的名称,将物化强制进入本地存储。
firstTable.leftJoin(..., Materialized.as("my-store-name"));https://stackoverflow.com/questions/53616769
复制相似问题