我正试图用kstreams来解决一个问题。目前,我在进行聚合时遇到了这个错误。
Exception in thread "main" java.lang.NoClassDefFoundError: org/rocksdb/RocksDBException
at org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.get(RocksDbWindowBytesStoreSupplier.java:50)
at org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.get(RocksDbWindowBytesStoreSupplier.java:24)
at org.apache.kafka.streams.state.internals.WindowStoreBuilder.build(WindowStoreBuilder.java:40)
at org.apache.kafka.streams.state.internals.WindowStoreBuilder.build(WindowStoreBuilder.java:26)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder$StateStoreFactory.build(InternalTopologyBuilder.java:141)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.buildProcessorNode(InternalTopologyBuilder.java:966)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:869)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:822)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:805)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:667)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:624)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:534)我的代码实际上如下:
KStream<String, InputData> input = builder.stream(topicname);
KTable<Windowed<String>, CustomAgg> grouped =
input.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(60000)))
.aggregate(
CustomAgg::new,
(k, v, agg) -> agg.add(v),
Materialized.<String, CustomAgg, WindowStore<Bytes, byte[]>>as("aggs").withValueSerde(new CustomAggSerde()));
grouped.toStream().print(Printed.toSysOut());kafka-streams版本2.1.0
我似乎在网上找不到任何关于如何为卡夫卡流设置rocksDB的资源--任何建议都将不胜感激。(我已经安装了brew,但我不确定我需要如何指向它,任何设置,它是否需要在我的pom.xml文件中等等)。目前正在开发MacOS。
谢谢!
发布于 2021-04-16 08:14:46
您不需要为Kafka流安装RocksDB。RocksDB是卡夫卡流的依赖。如果在构建自动化工具(例如maven或gradle)中将Kafka流作为依赖项,则在构建过程中应该自动下载RocksDB JAR并将其放到类路径中。
如果没有构建自动化工具,您可能需要手动将RocksDB JAR放到类路径上。卡夫卡流2.1.0的RocksDB版本应该是5.14.2。
您所得到的错误似乎是类路径问题,因此可能与上述问题有关。
发布于 2022-03-14 03:14:30
尝试在您的pom.xml中插入下面的依赖项:
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>4.9.0</version>
</dependency>https://stackoverflow.com/questions/67087968
复制相似问题