我一定会用到
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>使用过时的功能
val kafkaStream = KafkaUtils.createStream(streamingContext, zkArgs, consumerGroupId, topicMap)
kafkaStream.foreachRDD(rdd => {
val sqlContext = new SQLContext(sc)我读到手动使用水印是这样做的:
// enabling watermarking upon success
val sparkConf = new SparkConf()
....
.set("zookeeper.hosts", zkArgs)
.set("enable.auto.commit", "false")
....
df.withWatermark("eventTime", "10 minutes")
.write .....沿着类的轨迹我找到了像EventTimeWatermark这样的类。
在另一个地方,我读到我应该自己编写偏移量,如下所示:
def saveOffsets(zkClient: ZkClient, zkPath: String, rdd: RDD[_]): Unit = {
val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}")
.mkString(",")
ZkUtils.updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
}是不是
df.withWatermark("eventTime", "10 minutes")
.write.最终更新Zookeeper中的水印?还是在运行spark的集群上的另一种机制?
发布于 2019-03-18 23:53:06
由于水印仅在Spark streaming中进行,因此从Kafka中挑选的延迟消息在Spark中会被忽略。
当消息被读取时,Kafka偏移量被更新。
https://stackoverflow.com/questions/55222238
复制相似问题