首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >kafka streams示例抛出一个类转换异常,插入窗口->字符串。如何设置合适的Serde?

kafka streams示例抛出一个类转换异常,插入窗口->字符串。如何设置合适的Serde?
EN

Stack Overflow用户
提问于 2021-05-14 00:37:56
回答 1查看 202关注 0票数 1

我正在尝试重现this example。我的拓扑是:

代码语言:javascript
复制
@Bean("myTopo")
    public KStream<Object, Object> getTopo(@Qualifier("myKConfig") StreamsBuilder builder) {
        var stream = builder.stream("my-events");
        stream.groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
                .count()
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream()
                .foreach((k, v) -> {
                    System.out.println("k + v = " + k + " --- " + v);
                });

我已经在配置中设置了serde和有窗口的serde内部类:

代码语言:javascript
复制
        ...
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
        ...
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
        props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, JsonSerde.class);
        var config = new KafkaStreamsConfiguration(props);
        return new StreamsBuilderFactoryBean(config);

我得到的错误是

代码语言:javascript
复制
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. 
Do the Processor's input types match the deserialized types? 
Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. 
Make sure the Processor can accept the deserialized input of type key: 
   org.apache.kafka.streams.kstream.Windowed, 
and value: 
   org.apache.kafka.streams.kstream.internals.Change.

有潜在的原因

代码语言:javascript
复制
java.lang.ClassCastException: class org.apache.kafka.streams.kstream.Windowed 
cannot be cast to class java.lang.String (org.apache.kafka.streams.kstream.Windowed is in unnamed module of loader 'app'; 
java.lang.String is in module java.base of loader 'bootstrap')

我看到count()返回KTable<Windowed<Object>, Long>。所以看起来问题在于它需要密钥的Windowed<String> serde。显然,DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS是不够的。

我如何创建和设置它?

EN

回答 1

Stack Overflow用户

发布于 2021-05-14 02:29:42

我想我遇到了这个bug:

https://issues.apache.org/jira/browse/KAFKA-9259

我在count()方法中添加了一个物化的

代码语言:javascript
复制
        var store = Stores.persistentTimestampedWindowStore(
                "some-state-store",
                Duration.ofMinutes(5),
                Duration.ofMinutes(2),
                false);
        var materialized = Materialized
                .<String, Long>as(store)
                .withKeySerde(Serdes.String());

现在代码毫无异常地运行了。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67522926

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档