我有三个话题。每个主题都有密钥和有效负载。我尝试加入前两个主题,汇总结果,最后将这个结果与第三个主题结合起来。但它并不像预期的那样起作用。
让我举一个简单的例子来说明这一情况:
Topic 1 "Company":
- Key:1234, {"id":"1234", ...}
...
Topic 2 "Mapping":
- Key:5678, {"id":"5678", "company_id":"1234", "category_id":"9876}
- Key:5679, {"id":"5679", "company_id":"1234", "category_id":"9877}
...
Topic 3 "Categories":
- Key:9876, {"id":"9876", "name":"foo"}
- Key:9877, {"id":"9877", "name":"bar"}
...我希望每一家公司都有一个所有相关类别的列表。我尝试将“映射”与“类别”连接起来,并对结果进行聚合"name“。此操作失败,引发以下错误:
org.apache.kafka.streams.errors.StreamsException:未能初始化处理器KTABLE连接-输出-0000000018
和
处理器KTABLE连接-输出-0000000018不能访问StateStore KTABLE连接输出状态存储-0000000019,因为存储没有连接到处理器.
我试过:
var joined = mappedTable
.leftJoin(
categoriesTable,
mappedForeignKey -> String.valueOf(mappedForeignKey.getCategoryId()),
(mapping, categories) -> new CategoriesMapping(mapping.getCompanyId(), categories.getName()),
Materialized.with(Serdes.String(), mappedSerde)
)
.groupBy((key, mapping) -> new KeyValue<>(String.valueOf(mapping.getCompanyId()), mapping), Grouped.with(Serdes.String(), mappedSerde))
.aggregate(
// ...
);(我跳过了这个部分,其中连接表最终与"Company“表连接)
聚合函数执行这样的操作:{mappedValue1}、{mappedValue2},并且它不需要表上的联接即可工作。
有什么方法可以使这个连接聚合发生吗?是否有可能有这样的输出:
key, value:{"id":..., ..., "name":[{foo},{bar}, ...]}全堆栈跟踪:
Exception in thread "company_details-16eef466-408a-4271-94ec-adad071b4d24-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KTABLE-FK-JOIN-OUTPUT-0000000018
at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:97)
at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:608)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:336)
at org.apache.kafka.streams.processor.internals.AssignedTasks.transitionToRunning(AssignedTasks.java:118)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.updateRestored(AssignedStreamsTasks.java:349)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:390)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor KTABLE-FK-JOIN-OUTPUT-0000000018 has no access to StateStore KTABLE-FK-JOIN-OUTPUT-STATE-STORE-0000000019 as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:104)
at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.init(KTableSource.java:84)
at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:93)
and
java.lang.IllegalStateException: Expected postgres_company_categories-STATE-STORE-0000000000 to have been initialized
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:284) ~[kafka-streams-2.4.0.jar:na]
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177) ~[kafka-streams-2.4.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:680) ~[kafka-streams-2.4.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:788) ~[kafka-streams-2.4.0.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.closeTask(AssignedStreamsTasks.java:80) ~[kafka-streams-2.4.0.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.closeTask(AssignedStreamsTasks.java:36) ~[kafka-streams-2.4.0.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedTasks.shutdown(AssignedTasks.java:256) ~[kafka-streams-2.4.0.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.shutdown(AssignedStreamsTasks.java:534) ~[kafka-streams-2.4.0.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:292) ~[kafka-streams-2.4.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1115) ~[kafka-streams-2.4.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:683) ~[kafka-streams-2.4.0.jar:na]发布于 2020-02-11 19:40:48
您所遇到的是一个bug:https://issues.apache.org/jira/browse/KAFKA-9517
在即将发布的2.4.1和2.5.0版本中,该bug已经修复。
作为解决办法,您可以通过将Materialize.as("some-name")传递到leftJoin(),显式地实现连接结果。
https://stackoverflow.com/questions/60053901
复制相似问题