我在一个经典的用例上使用了Spark结构化流:我想读一下kafka主题,并将流写入HDFS中。
这是我的代码:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType}
object TestKafkaReader extends App{
val spark = SparkSession
.builder
.appName("Spark-Kafka-Integration")
.master("local")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val kafkaDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","KAFKA_BROKER_IP:PORT")
//.option("subscribe", "test")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
val moviesJsonDf = kafkaDf.selectExpr("CAST(value AS STRING)")
// movie struct
val struct = new StructType()
.add("title", DataTypes.StringType)
.add("year", DataTypes.IntegerType)
.add("cast", ArrayType(DataTypes.StringType))
.add("genres", ArrayType(DataTypes.StringType))
val moviesNestedDf = moviesJsonDf.select(from_json($"value", struct).as("movie"))
// json flatten
val movieFlattenedDf = moviesNestedDf.selectExpr("movie.title", "movie.year", "movie.cast","movie.genres")
// convert to parquet and save to hdfs
val query = movieFlattenedDf
.writeStream
.outputMode("append")
.format("parquet")
.queryName("movies")
.option("checkpointLocation", "src/main/resources/chkpoint_dir")
.start("src/main/resources/output")
.awaitTermination()
}背景:
我的问题是:
在工作期间,它不会在文件夹中写任何东西,我必须手动停止作业以最终查看文件。
我想可能与.awaitTermination()有关的信息,我试图删除这个选项,但没有得到一个错误,作业根本不运行。
也许我没有设定正确的选项,但是在阅读了很多次文档和在Google上搜索之后,我什么也没找到。
你能帮我一下吗?
谢谢
编辑:
发布于 2019-03-09 16:05:01
是的问题解决了
我的问题是,我有太少的数据和火花等待更多的数据,以编写拼花文件。
为了完成这项工作,我使用@AlexandrosBiratsis中的注释(更改块大小)。
再次感谢@AlexandrosBiratsis
https://stackoverflow.com/questions/54921528
复制相似问题