首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka-Streams:(KTable,KTable)连接结果的聚合

Kafka-Streams:(KTable,KTable)连接结果的聚合
EN

Stack Overflow用户
提问于 2020-02-04 09:06:27
回答 1查看 829关注 0票数 1

我有三个话题。每个主题都有密钥和有效负载。我尝试加入前两个主题,汇总结果,最后将这个结果与第三个主题结合起来。但它并不像预期的那样起作用。

让我举一个简单的例子来说明这一情况:

代码语言:javascript
复制
Topic 1 "Company": 
- Key:1234, {"id":"1234", ...}
...

Topic 2 "Mapping":
- Key:5678, {"id":"5678", "company_id":"1234", "category_id":"9876}
- Key:5679, {"id":"5679", "company_id":"1234", "category_id":"9877}
...

Topic 3 "Categories":
- Key:9876, {"id":"9876", "name":"foo"}
- Key:9877, {"id":"9877", "name":"bar"}
...

我希望每一家公司都有一个所有相关类别的列表。我尝试将“映射”与“类别”连接起来,并对结果进行聚合"name“。此操作失败,引发以下错误:

org.apache.kafka.streams.errors.StreamsException:未能初始化处理器KTABLE连接-输出-0000000018

处理器KTABLE连接-输出-0000000018不能访问StateStore KTABLE连接输出状态存储-0000000019,因为存储没有连接到处理器.

我试过:

代码语言:javascript
复制
    var joined = mappedTable
                    .leftJoin(
                            categoriesTable,
                            mappedForeignKey -> String.valueOf(mappedForeignKey.getCategoryId()),
                            (mapping, categories) -> new CategoriesMapping(mapping.getCompanyId(), categories.getName()),
                            Materialized.with(Serdes.String(), mappedSerde)
                    )
                    .groupBy((key, mapping) -> new KeyValue<>(String.valueOf(mapping.getCompanyId()), mapping), Grouped.with(Serdes.String(), mappedSerde))
                    .aggregate(
                            // ...
                    );

(我跳过了这个部分,其中连接表最终与"Company“表连接)

聚合函数执行这样的操作:{mappedValue1}、{mappedValue2},并且它不需要表上的联接即可工作。

有什么方法可以使这个连接聚合发生吗?是否有可能有这样的输出:

代码语言:javascript
复制
key, value:{"id":..., ..., "name":[{foo},{bar}, ...]}

全堆栈跟踪:

代码语言:javascript
复制
Exception in thread "company_details-16eef466-408a-4271-94ec-adad071b4d24-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KTABLE-FK-JOIN-OUTPUT-0000000018
    at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:97)
    at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:608)
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:336)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.transitionToRunning(AssignedTasks.java:118)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.updateRestored(AssignedStreamsTasks.java:349)
    at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:390)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor KTABLE-FK-JOIN-OUTPUT-0000000018 has no access to StateStore KTABLE-FK-JOIN-OUTPUT-STATE-STORE-0000000019 as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:104)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.init(KTableSource.java:84)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:93)

and

java.lang.IllegalStateException: Expected postgres_company_categories-STATE-STORE-0000000000 to have been initialized
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:284) ~[kafka-streams-2.4.0.jar:na]
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177) ~[kafka-streams-2.4.0.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:680) ~[kafka-streams-2.4.0.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:788) ~[kafka-streams-2.4.0.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.closeTask(AssignedStreamsTasks.java:80) ~[kafka-streams-2.4.0.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.closeTask(AssignedStreamsTasks.java:36) ~[kafka-streams-2.4.0.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedTasks.shutdown(AssignedTasks.java:256) ~[kafka-streams-2.4.0.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.shutdown(AssignedStreamsTasks.java:534) ~[kafka-streams-2.4.0.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:292) ~[kafka-streams-2.4.0.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1115) ~[kafka-streams-2.4.0.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:683) ~[kafka-streams-2.4.0.jar:na]
EN

回答 1

Stack Overflow用户

发布于 2020-02-11 19:40:48

您所遇到的是一个bug:https://issues.apache.org/jira/browse/KAFKA-9517

在即将发布的2.4.1和2.5.0版本中,该bug已经修复。

作为解决办法,您可以通过将Materialize.as("some-name")传递到leftJoin(),显式地实现连接结果。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60053901

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档