应用使用kafka版本为2.7.0
在pom.xml中尝试使用kafka stream maven version2.2.1 kafka stream,然后启动stream并获得预期的输出。
但是,当在pom.xml中更新到maven版本2.3.0(或更高版本)时,从日志和流中得到的以下错误不会启动
Exception in thread "Average-3ded0155-d697-492b-897b-4da5bfec5cf1-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: Error opening store KSTREAM-REDUCE-STATE-STORE-0000000005 at location /kafka/Average/statedir/Average/1_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000005
at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:87)
at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:185)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:253)
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:54)
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:74)
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$1(MeteredKeyValueStore.java:120)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:120)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:201)
at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:209)
at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:473)
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:728)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
Caused by: org.rocksdb.RocksDBException: Column family not found: keyValueWithTimestamp
at org.rocksdb.RocksDB.open(Native Method)
at org.rocksdb.RocksDB.open(RocksDB.java:306)
at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:75)发布于 2021-02-07 20:52:38
当我在pom.xml文件中包含“kafka-stream”和"rocksdbjni“作为依赖项时,我得到了相同的错误/根本原因。删除后一个错误后,错误就消失了。我的目标是创建一个带有依赖项的jar文件,并在docker容器中使用它。我包含了"rocksdbjni“,因为在开发过程中发生了一些依赖错误。最后,我发现它们来自使用过的docker镜像,而不是pom.xml。
Exception in thread "streams-window-sum6-b7f79924-c195-428f-bc02-c7d61d7a0f7d-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: Error opening store aggregate-state-store1.1606723200000 at location /tmp/kafka-streams/streams-window-sum6/0_0/aggregate-state-store1/aggregate-state-store1.1606723200000
at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:133)
at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:148)
at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:147)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error opening store aggregate-state-store1.1606723200000 at location /tmp/kafka-streams/streams-window-sum6/0_0/aggregate-state-store1/aggregate-state-store1.1606723200000
at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:87)
at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188)
at org.apache.kafka.streams.state.internals.TimestampedSegment.openDB(TimestampedSegment.java:49)
at org.apache.kafka.streams.state.internals.TimestampedSegments.getOrCreateSegment(TimestampedSegments.java:50)
at org.apache.kafka.streams.state.internals.TimestampedSegments.getOrCreateSegment(TimestampedSegments.java:25)
at org.apache.kafka.streams.state.internals.AbstractSegments.getOrCreateSegmentIfLive(AbstractSegments.java:84)
at org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.put(AbstractRocksDBSegmentedBytesStore.java:142)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:62)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:27)
at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:102)
at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:32)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put$2(MeteredWindowStore.java:127)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:126)
... 16 more
Caused by: org.rocksdb.RocksDBException: Column family not found: keyValueWithTimestamp
at org.rocksdb.RocksDB.open(Native Method)
at org.rocksdb.RocksDB.open(RocksDB.java:306)
at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:75)
... 29 more下面是我的pom.xml文件,它基于官方文档(https://kafka.apache.org/27/documentation/streams/tutorial),它既适用于mvn exec:java -Dexec.mainClass=myapps.MyClass,也适用于jar java -cp app.jar myapps.MyClass。
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>streams.examples</groupId>
<artifactId>streams.examples</artifactId>
<version>0.2</version>
<packaging>jar</packaging>
<name>Kafka Streams Quickstart :: Java</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>2.6.0</kafka.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<!--
Execute "mvn clean package -Pbuild-jar"
to build a jar file out of this project!
-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<!-- This filter is to workaround the problem caused by included signed jars.
java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
-->
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>myapps.MyClass</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<!-- Apache Kafka dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>https://stackoverflow.com/questions/66084779
复制相似问题