我正在为一个相对简单的窗口字数统计示例而努力。我试图只得到窗口结果,但什么也得不到。
KStream<String, Long> sl = s
...
.groupBy((key, value) -> value)
.windowedBy(of(ofSeconds(5))
.advanceBy(ofSeconds(3))
.grace(ofSeconds(2)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"counts-store").withRetention(ofSeconds(7)))
.suppress(untilWindowCloses(unbounded()))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), value))
.to(outputTopicName, produced);我输入了一些信息:
inputWords.pipeInput(new TestRecord<>("word", "a b c", now));
inputWords.pipeInput(new TestRecord<>("word", "a c d c", now.plus(ofSeconds(6))));
inputWords.pipeInput(new TestRecord<>("word", "", now.plus(Duration.ofDays(1))));但是什么都不会发射。有人知道可能的解决方案吗?
正如你所看到的,我已经在使用宽限和保留,正如其他人所写的那样,这可能会有所帮助,但它实际上并没有帮助。在注释取消行上,一切都正常。
发布于 2020-03-20 10:57:10
你必须为你的count Materialized视图提供有效的Serdes,这样Kafka流才能正确地为内部抑制处理器提供有效的窗口Serdes,如果不是这样,那么这个处理器将选择默认的密钥serdes,这可能导致序列化不能正常工作,我在KTableSuppressProcessor.buffer()中得到以下异常:
//please check if you get this exception
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86)正确地为实体化视图counts-store提供有效的Serde,您应该会得到预期的输出:
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts-store")
.withRetention(ofSeconds(7))
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())https://stackoverflow.com/questions/60758784
复制相似问题