我正在使用Spark Streaming,当它试图流式传输一个主题时,我突然收到了这条消息。如何跳过此错误?
Caused by: java.lang.AssertionError: assertion failed: Got wrong record for GROUP TOPIC 109 even after seeking to offset 754809
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:90)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:222)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:988)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:979)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:919)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:979)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:697)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)发布于 2018-12-22 01:40:13
这不是一个实际的答案,但不适合在评论中。此外,这不是一个修复,只是工作。
Spark也保留偏移量,并在消费消息时检查完整性。Spark Streaming API中的偏移量状态与Kafka的偏移量状态不匹配的情况很少发生。您可以检查偏移的完整性:
kafka-simple-consumer-shell --broker-list BROKER:9092 --clientId GROUP_ID --offset 752000 --print-offsets --max-messages 1000 --topic TOPIC | grep offset在这里,752000是一个偏移量关闭,但在失败之前,您可以在异常中看到。
您可以遍历输出并查看偏移量在Kafka中是否按顺序排列。
然而,在我们的例子中,Kafka中的偏移量是很好的。我们在Kafka发生了一次中断,我们必须通过重建日志来恢复。因此,我们采取的方法只是跳过偏移量,直到Spark Streaming的状态与Kafka匹配。
为了实现这一点,我们使用kt工具作为
kt group -brokers BROKER:9092 -topic TOPIC -group GROUP_ID -partitions 113 -reset 753000在这里,第113分区是有偏移量问题的分区(你可以在异常中找到它),753000是一个可能的偏移量,你猜它以后会好起来的。有时,您需要重复该过程并重新启动作业,以达到一切正常的程度。
此过程完全是实验性的,因为消息不会告诉您缺少哪个偏移量。因此,根据您对丢失多少数据的要求,您可以在日志中提到的偏移量之前或之后选择一个数字。例如,如果在日志消息中打印了偏移量752900,则可以通过将其设置为752800 (故障偏移量在此之前)来跳过错误,或者必须将其设置为更早的值,如752950。在后者中,它跳过50条消息。
https://stackoverflow.com/questions/51025133
复制相似问题