首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >直到我停止工作,火花结构化流写才会写文件

直到我停止工作,火花结构化流写才会写文件
EN

Stack Overflow用户
提问于 2019-02-28 08:43:16
回答 1查看 7.3K关注 0票数 3

我在一个经典的用例上使用了Spark结构化流:我想读一下kafka主题,并将流写入HDFS中。

这是我的代码:

代码语言:javascript
复制
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType}

object TestKafkaReader extends  App{
  val spark = SparkSession
    .builder
    .appName("Spark-Kafka-Integration")
    .master("local")
    .getOrCreate()
  spark.sparkContext.setLogLevel("ERROR")
  import spark.implicits._

  val kafkaDf = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","KAFKA_BROKER_IP:PORT")
    //.option("subscribe", "test")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load()

  val moviesJsonDf = kafkaDf.selectExpr("CAST(value AS STRING)")

  // movie struct
  val struct = new StructType()
    .add("title", DataTypes.StringType)
    .add("year", DataTypes.IntegerType)
    .add("cast", ArrayType(DataTypes.StringType))
    .add("genres", ArrayType(DataTypes.StringType))

  val moviesNestedDf = moviesJsonDf.select(from_json($"value", struct).as("movie"))
  // json flatten
  val movieFlattenedDf = moviesNestedDf.selectExpr("movie.title", "movie.year", "movie.cast","movie.genres")


  // convert to parquet and save to hdfs
  val query = movieFlattenedDf
    .writeStream
    .outputMode("append")
    .format("parquet")
    .queryName("movies")
    .option("checkpointLocation", "src/main/resources/chkpoint_dir")
    .start("src/main/resources/output")
    .awaitTermination()
  }

背景:

  • 我直接从intellij运行这个程序(安装了一个本地火花)
  • 我设法阅读卡夫卡没有问题,并写在控制台(使用控制台模式)
  • 目前,我想在本地机器上编写文件(但我在HDFS集群上尝试过,问题是一样的)

我的问题是:

在工作期间,它不会在文件夹中写任何东西,我必须手动停止作业以最终查看文件。

我想可能与.awaitTermination()有关的信息,我试图删除这个选项,但没有得到一个错误,作业根本不运行。

也许我没有设定正确的选项,但是在阅读了很多次文档和在Google上搜索之后,我什么也没找到。

你能帮我一下吗?

谢谢

编辑:

  • 我用的是火花2.4.0
  • 我尝试了64/128 job格式的=>,在停止作业之前不更改任何文件
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-03-09 16:05:01

是的问题解决了

我的问题是,我有太少的数据和火花等待更多的数据,以编写拼花文件。

为了完成这项工作,我使用@AlexandrosBiratsis中的注释(更改块大小)。

再次感谢@AlexandrosBiratsis

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54921528

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档