我从Apache Kafka读取json消息,然后使用Apache Spark将拼图文件写入Azure blob存储中。我使用方法partitionBy将这些拼图文件写入嵌套文件夹中。我的代码是这样的:
val sourceDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerList)
.option("subscribe", sourceTopic)
.option("startingOffsets", "latest")
.load()
sourceDF
.select(...)
.where("somecol"=="something")
.writeStream
.format("parquet")
.option("path", outputPath+"/somepath")
.option("checkpointLocation", checkpointLocation+"/somepath")
.partitionBy("date","country")
.queryName("test")
.trigger(Trigger.ProcessingTime(interval+" seconds"))
.start()我注意到spark应用程序生成的是空的拼图文件。这对我来说是一个瓶颈,因为我在配置单元导入过程中读取了这些拼图文件,并且抛出了一个异常,表明这不是一个拼图文件(长度太小: 0)
一般来说,我希望禁止Spark streaming写入空文件。
发布于 2021-03-23 22:47:30
dataframe.rdd.isEmpty() //如果为且不运行,则检查中的空数据帧。
https://stackoverflow.com/questions/64538928
复制相似问题