首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka Streams | RocksDB异常

Kafka Streams | RocksDB异常
EN

Stack Overflow用户
提问于 2021-02-07 13:11:44
回答 1查看 361关注 0票数 0

应用使用kafka版本为2.7.0

在pom.xml中尝试使用kafka stream maven version2.2.1 kafka stream,然后启动stream并获得预期的输出。

但是,当在pom.xml中更新到maven版本2.3.0(或更高版本)时,从日志和流中得到的以下错误不会启动

代码语言:javascript
复制
Exception in thread "Average-3ded0155-d697-492b-897b-4da5bfec5cf1-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: Error opening store KSTREAM-REDUCE-STATE-STORE-0000000005 at location /kafka/Average/statedir/Average/1_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000005
        at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:87)
        at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:185)
        at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:253)
        at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
        at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:54)
        at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:74)
        at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$1(MeteredKeyValueStore.java:120)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:120)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:201)
        at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
        at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:209)
        at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:473)
        at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:728)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
Caused by: org.rocksdb.RocksDBException: Column family not found: keyValueWithTimestamp
        at org.rocksdb.RocksDB.open(Native Method)
        at org.rocksdb.RocksDB.open(RocksDB.java:306)
        at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:75)
EN

回答 1

Stack Overflow用户

发布于 2021-02-07 20:52:38

当我在pom.xml文件中包含“kafka-stream”和"rocksdbjni“作为依赖项时,我得到了相同的错误/根本原因。删除后一个错误后,错误就消失了。我的目标是创建一个带有依赖项的jar文件,并在docker容器中使用它。我包含了"rocksdbjni“,因为在开发过程中发生了一些依赖错误。最后,我发现它们来自使用过的docker镜像,而不是pom.xml。

代码语言:javascript
复制
Exception in thread "streams-window-sum6-b7f79924-c195-428f-bc02-c7d61d7a0f7d-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: Error opening store aggregate-state-store1.1606723200000 at location /tmp/kafka-streams/streams-window-sum6/0_0/aggregate-state-store1/aggregate-state-store1.1606723200000
    at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:133)
    at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:148)
    at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:147)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error opening store aggregate-state-store1.1606723200000 at location /tmp/kafka-streams/streams-window-sum6/0_0/aggregate-state-store1/aggregate-state-store1.1606723200000
    at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:87)
    at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188)
    at org.apache.kafka.streams.state.internals.TimestampedSegment.openDB(TimestampedSegment.java:49)
    at org.apache.kafka.streams.state.internals.TimestampedSegments.getOrCreateSegment(TimestampedSegments.java:50)
    at org.apache.kafka.streams.state.internals.TimestampedSegments.getOrCreateSegment(TimestampedSegments.java:25)
    at org.apache.kafka.streams.state.internals.AbstractSegments.getOrCreateSegmentIfLive(AbstractSegments.java:84)
    at org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.put(AbstractRocksDBSegmentedBytesStore.java:142)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:62)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:27)
    at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:102)
    at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:32)
    at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put$2(MeteredWindowStore.java:127)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:126)
    ... 16 more
Caused by: org.rocksdb.RocksDBException: Column family not found: keyValueWithTimestamp
    at org.rocksdb.RocksDB.open(Native Method)
    at org.rocksdb.RocksDB.open(RocksDB.java:306)
    at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:75)
    ... 29 more

下面是我的pom.xml文件,它基于官方文档(https://kafka.apache.org/27/documentation/streams/tutorial),它既适用于mvn exec:java -Dexec.mainClass=myapps.MyClass,也适用于jar java -cp app.jar myapps.MyClass

代码语言:javascript
复制
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>streams.examples</groupId>
    <artifactId>streams.examples</artifactId>
    <version>0.2</version>
    <packaging>jar</packaging>
    <name>Kafka Streams Quickstart :: Java</name>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <kafka.version>2.6.0</kafka.version>
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>
    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>
    <!--
        Execute "mvn clean package -Pbuild-jar"
        to build a jar file out of this project!
    -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <!-- This filter is to workaround the problem caused by included signed jars.
                                     java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
                                -->
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>myapps.MyClass</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!-- Apache Kafka dependencies -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>${kafka.version}</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
</project>
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66084779

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档