使用Kafka Spark-Streaming。能够读取和处理生产者发来的数据。我在这里有一个场景,让我们假设生产者正在产生消息,而消费者被关闭了一段时间,然后打开了。现在,Conumser只读取实时数据。相反,它还应该保留它停止读取的数据。这是我一直在使用的pom.xml。
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.0.1</spark.version>
<kafka.version>0.8.2.2</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.json/json -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20160810</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.json4s/json4s-ast_2.11 -->
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-ast_2.11</artifactId>
<version>3.2.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.2.0</version>
</dependency>我尝试过使用Kafka-v0.10.1.0 Producer和Conumser。行为与预期一致(消费者从数据离开的地方读取数据)。因此,在这个版本中,偏移量被正确地拾取。
我也尝试过在上面的pom.xml中使用相同的版本,但使用java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker失败了。
我理解版本的兼容性,但我也在寻找连续流。
发布于 2017-02-07 01:09:32
不同的行为可能是因为Kafka在0.8和0.10版本之间经历了一些相当大的变化。
除非您一定要使用旧版本,否则我建议您切换到新版本。
看看这个链接:
https://spark.apache.org/docs/latest/streaming-kafka-integration.html
Kafka项目在0.8和0.10版本之间引入了一个新的消费者应用程序接口,因此有两个单独的对应Spark Streaming包可用。
如果你想使用Kafka v0.10.1.0,你必须在https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11中指定一些kafka spark流集成依赖项。
例如,类似于以下内容:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>补充说明:您使用的是2013年10月发布的hadoop 2.2.0,因此在Hadoop术语中是古老的,您应该考虑将其更改为较新的版本。
如果这有帮助,请告诉我。
https://stackoverflow.com/questions/42054328
复制相似问题