首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >连接写入KTable:如何将连接与ktable写入同步?

连接写入KTable:如何将连接与ktable写入同步?
EN

Stack Overflow用户
提问于 2017-09-14 13:37:21
回答 1查看 2K关注 0票数 5

对于以下拓扑的行为,我有一些问题:

代码语言:javascript
复制
String topic = config.topic();

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

// Receive a stream of various events
topology.eventsStream()
    // Only process events that are implementing MyEvent
    .filter((k, v) -> v instanceof MyEvent)
    // Cast to ease the code
    .mapValues(v -> (MyEvent) v)
    // rekey by data id
    .selectKey((k, v) -> v.data.id)
    .peek((k, v) -> L.info("Event:"+v.action))
    // join the event with the according entry in the KTable and apply the state mutation
    .leftJoin(myTable, eventHandler::handleEvent, UUIDSerdes.get(), EventSerdes.get())
    .peek((k, v) -> L.info("Updated:" + v.id + "-" + v.id2))
    // write the updated state to the KTable.
    .to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

我的问题发生在我同时收到不同的事件时。因为我的状态突变是由leftJoin完成的,然后由to方法编写。如果使用相同的密钥同时接收到事件1和2,则可以发生以下情况:

代码语言:javascript
复制
event1 joins with state A => state A mutated to state X
event2 joins with state A => state A mutated to state Y
state X written to the KTable topic
state Y written to the KTable topic

正因为如此,状态Y没有来自event1的更改,所以我丢失了数据。

下面是我所看到的日志( Processing:...部件是从值合并器中记录的):

代码语言:javascript
复制
Event:Event1
Event:Event2
Processing:Event1, State:none
Updated:1-null
Processing:Event2, State:none
java.lang.IllegalStateException: Event2 event received but we don't have data for id 1

可以将Event1视为创建事件:它将在KTable中创建条目,因此状态是否为空并不重要。尽管Event2需要将它的更改应用到现有状态,但是它没有找到任何更改,因为第一个状态突变仍然没有写入到KTable (它仍然没有被to方法处理)

无论如何,是为了确保我的leftJoin和写入ktable的操作都是原子化的?

谢谢

更新与当前解决方案

由于@Matthias的响应,我找到了一个使用Transformer的解决方案。

代码如下所示:

那是变压器

代码语言:javascript
复制
public class KStreamStateLeftJoin<K, V1, V2> implements Transformer<K, V1, KeyValue<K, V2>> {

    private final String                    stateName;
    private final ValueJoiner<V1, V2, V2>   joiner;
    private final boolean                   updateState;

    private KeyValueStore<K, V2>            state;

    public KStreamStateLeftJoin(String stateName, ValueJoiner<V1, V2, V2> joiner, boolean updateState) {
        this.stateName = stateName;
        this.joiner = joiner;
        this.updateState = updateState;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.state = (KeyValueStore<K, V2>) context.getStateStore(stateName);
    }

    @Override
    public KeyValue<K, V2> transform(K key, V1 value) {
        V2 stateValue = this.state.get(key); // Get current state
        V2 updatedValue = joiner.apply(value, stateValue); // Apply join
        if (updateState) {
            this.state.put(key, updatedValue); // write new state
        }
        return new KeyValue<>(key, updatedValue);
    }

    @Override
    public KeyValue<K, V2> punctuate(long timestamp) {
        return null;
    }

    @Override
    public void close() {}
}

以下是经过调整的拓扑结构:

代码语言:javascript
复制
String topic = config.topic();
String store = topic + "-store";

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic, store);

// Receive a stream of various events
topology.eventsStream()
    // Only process events that are implementing MyEvent
    .filter((k, v) -> v instanceof MyEvent)
    // Cast to ease the code
    .mapValues(v -> (MyEvent) v)
    // rekey by data id
    .selectKey((k, v) -> v.data.id)
    // join the event with the according entry in the KTable and apply the state mutation
    .transform(() -> new KStreamStateLeftJoin<UUID, MyEvent, MyData>(store, eventHandler::handleEvent, true), store)
    // write the updated state to the KTable.
    .to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

当我们使用KTable的KV StateStore并通过put方法直接在其中应用更改时,事件应该总是获取更新的状态。有一件事我仍然在想:如果我有一个持续的高吞吐量的事件。

,我们在KTable的KV存储上所做的操作与KTable的主题中所完成的写操作之间是否还存在竞争条件?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-09-14 15:29:53

一个KTable被分成多个物理存储,每个存储仅由一个线程更新。因此,您描述的场景不可能发生。如果有两个具有相同时间戳的记录,它们都更新相同的碎片,它们将被一个接一个地处理(按偏移顺序)。因此,第二次更新将看到第一次更新之后的状态。

所以也许你只是描述了你的场景不对?

更新

您不能在执行联接时改变状态。因此,期望

代码语言:javascript
复制
event1 joins with state A => state A mutated to state X

是错的。与任何处理顺序无关,当event1state A连接时,它将以只读模式访问state A,并且不会修改state A

因此,当event2加入时,它将看到与event1相同的状态。对于流表联接,只有在从表输入主题读取新数据时才更新表状态。

如果希望从两个输入中更新共享状态,则需要使用transform()构建自定义解决方案。

代码语言:javascript
复制
builder.addStore(..., "store-name");
builder.stream("table-topic").transform(..., "store-name"); // will not emit anything downstream
KStream result = builder.stream("stream-topic").transform(..., "store-name");

这将创建一个由两个处理器共享的存储,并且两者都可以根据自己的意愿进行读写。因此,对于表输入,您可以只更新状态而不向下游发送任何内容,而对于流输入,您可以执行连接、更新状态并向下游发送结果。

更新2

关于解决方案,在Transformer应用于状态并在状态更新后记录Transformer进程的更新之间将不存在争用条件。此部分将在单个线程中执行,记录将按输入主题的偏移顺序处理。因此,可以确保状态更新可供以后的记录使用。

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46220663

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档