我正在尝试重现this example。我的拓扑是:
@Bean("myTopo")
public KStream<Object, Object> getTopo(@Qualifier("myKConfig") StreamsBuilder builder) {
var stream = builder.stream("my-events");
stream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
.count()
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach((k, v) -> {
System.out.println("k + v = " + k + " --- " + v);
});我已经在配置中设置了serde和有窗口的serde内部类:
...
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
...
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, JsonSerde.class);
var config = new KafkaStreamsConfiguration(props);
return new StreamsBuilderFactoryBean(config);我得到的错误是
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor.
Do the Processor's input types match the deserialized types?
Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
Make sure the Processor can accept the deserialized input of type key:
org.apache.kafka.streams.kstream.Windowed,
and value:
org.apache.kafka.streams.kstream.internals.Change.有潜在的原因
java.lang.ClassCastException: class org.apache.kafka.streams.kstream.Windowed
cannot be cast to class java.lang.String (org.apache.kafka.streams.kstream.Windowed is in unnamed module of loader 'app';
java.lang.String is in module java.base of loader 'bootstrap')我看到count()返回KTable<Windowed<Object>, Long>。所以看起来问题在于它需要密钥的Windowed<String> serde。显然,DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS是不够的。
我如何创建和设置它?
发布于 2021-05-14 02:29:42
我想我遇到了这个bug:
https://issues.apache.org/jira/browse/KAFKA-9259
我在count()方法中添加了一个物化的
var store = Stores.persistentTimestampedWindowStore(
"some-state-store",
Duration.ofMinutes(5),
Duration.ofMinutes(2),
false);
var materialized = Materialized
.<String, Long>as(store)
.withKeySerde(Serdes.String());现在代码毫无异常地运行了。
https://stackoverflow.com/questions/67522926
复制相似问题