我有一个Kafka流作业(Spark version2.4.5)在kubernetes上运行,有一个驱动程序和4个executors.The,该作业的目的是消费来自PySpark主题的数据,并对它们进行处理,然后将最终数据加载到ElasticSearch。
问题:
有时,我们会遇到Kafka集群的问题。因此,PySpark流作业将不会获得任何要处理/消费的数据,即acceptable.However,即使在Kafka群集变得正常之后,PySpark流作业也不会消耗/处理来自Kafka topic.The作业的任何进一步记录
错误:在驱动程序或任何执行器上找不到错误。
其中一个executors的结尾日志:
> 21/11/24 16:46:32 INFO ShuffleBlockFetcherIterator: Getting 1
> non-empty blocks including 1 local blocks and 0 remote blocks 21/11/24
> 16:46:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in
> 0 ms 21/11/24 16:46:32 INFO CodeGenerator: Code generated in 12.569815
> ms 21/11/24 16:46:32 INFO CodeGenerator: Code generated in 4.696542 ms
> 21/11/24 16:46:32 INFO EsDataFrameWriter: Writing to [list_anom]
> 21/11/24 16:46:33 INFO Executor: Finished task 3.0 in stage 299400.0
> (TID 438086). 9114 bytes result sent to driver 21/11/24 16:46:33 INFO
> BlockManager: Removing RDD 777788 21/11/24 16:46:33 INFO BlockManager:
> Removing RDD 777807 21/11/24 16:46:33 INFO BlockManager: Removing RDD
> 777951 21/11/24 16:46:33 INFO BlockManager: Removing RDD 778649
> 21/11/24 17:01:42 INFO BlockManager: Removing RDD 776873 21/11/24
> 17:01:42 INFO BlockManager: Removing RDD 777013卡夫卡数据流的PySpark代码片段:
inputDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", Kakfa_broker_ip:port) \
.option("subscribe", "kafka_topic") \
.load() \
.selectExpr("CAST(value AS STRING)") \
.writeStream \
.trigger(processingTime='30 seconds') \
.option("checkpointLocation", "/file-storage/Checkpoints")\
.foreachBatch(processKafkaInput) \
.outputMode("append") \
.start()
inputDF.awaitTermination() 来自UI的一些信息:

驱动程序的线程转储:

发布于 2021-11-25 21:05:21
我对此不是100%,但如果垃圾数据导致错误,您可能需要在PySpark流作业中进行一些错误处理才能调用streamingContext.stop()。
https://stackoverflow.com/questions/70117164
复制相似问题