首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >PySpark -Streaming作业已停滞,无法进一步处理

PySpark -Streaming作业已停滞,无法进一步处理
EN

Stack Overflow用户
提问于 2021-11-25 20:57:41
回答 1查看 35关注 0票数 1

我有一个Kafka流作业(Spark version2.4.5)在kubernetes上运行,有一个驱动程序和4个executors.The,该作业的目的是消费来自PySpark主题的数据,并对它们进行处理,然后将最终数据加载到ElasticSearch。

问题:

有时,我们会遇到Kafka集群的问题。因此,PySpark流作业将不会获得任何要处理/消费的数据,即acceptable.However,即使在Kafka群集变得正常之后,PySpark流作业也不会消耗/处理来自Kafka topic.The作业的任何进一步记录

错误:在驱动程序或任何执行器上找不到错误。

其中一个executors的结尾日志:

代码语言:javascript
复制
> 21/11/24 16:46:32 INFO ShuffleBlockFetcherIterator: Getting 1
> non-empty blocks including 1 local blocks and 0 remote blocks 21/11/24
> 16:46:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in
> 0 ms 21/11/24 16:46:32 INFO CodeGenerator: Code generated in 12.569815
> ms 21/11/24 16:46:32 INFO CodeGenerator: Code generated in 4.696542 ms
> 21/11/24 16:46:32 INFO EsDataFrameWriter: Writing to [list_anom]
> 21/11/24 16:46:33 INFO Executor: Finished task 3.0 in stage 299400.0
> (TID 438086). 9114 bytes result sent to driver 21/11/24 16:46:33 INFO
> BlockManager: Removing RDD 777788 21/11/24 16:46:33 INFO BlockManager:
> Removing RDD 777807 21/11/24 16:46:33 INFO BlockManager: Removing RDD
> 777951 21/11/24 16:46:33 INFO BlockManager: Removing RDD 778649
> 21/11/24 17:01:42 INFO BlockManager: Removing RDD 776873 21/11/24
> 17:01:42 INFO BlockManager: Removing RDD 777013

卡夫卡数据流的PySpark代码片段:

代码语言:javascript
复制
inputDF = spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", Kakfa_broker_ip:port) \
                .option("subscribe", "kafka_topic") \
                .load() \
                .selectExpr("CAST(value AS STRING)") \
                .writeStream \
                .trigger(processingTime='30 seconds') \
                .option("checkpointLocation", "/file-storage/Checkpoints")\
                .foreachBatch(processKafkaInput) \
                .outputMode("append") \
                .start()

inputDF.awaitTermination() 

来自UI的一些信息:

驱动程序的线程转储:

EN

回答 1

Stack Overflow用户

发布于 2021-11-25 21:05:21

我对此不是100%,但如果垃圾数据导致错误,您可能需要在PySpark流作业中进行一些错误处理才能调用streamingContext.stop()

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70117164

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档