首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从火花流数据提取数据

从火花流数据提取数据
EN

Stack Overflow用户
提问于 2022-11-14 19:17:48
回答 1查看 52关注 0票数 0

我刚开始玩星火流。

我从卡夫卡那里得到了一个类似于下面的事件。我必须从dataframe中提取路径,从路径中读取数据并将其写入目的地。

代码语言:javascript
复制
{"path":["/tmp/file_path/file.parquet"],"format":"parquet","entries":null}

知道如何提取路径并格式化火花流数据吗?

这就是我想要达到的目标,

代码语言:javascript
复制
val df: DataFrame = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers", "localhost:9092").
    option("subscribe", "kafka-test-event").
    option("startingOffsets", "earliest").load()

  df.printSchema()
  val valDf = df.selectExpr("CAST(value AS STRING)")
  val path = valDf.collect()(0).getString(0)
  println("path - "+ path)
  
  val newDf = spark.read.parquet(path)
  newDf.selectExpr("CAST(value AS STRING)").writeStream
    .format("console")
    .outputMode("append")
    .start()
    .awaitTermination()

错误:

代码语言:javascript
复制
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)

当我尝试在dataframe上进行一个收集时,它会抛出一个Unsupported operation exception.

EN

回答 1

Stack Overflow用户

发布于 2022-11-15 12:30:59

您会得到该错误,因为您正在尝试对流数据文件执行静态操作。

你可以试试下面这样的方法。在阅读了Kafka的流数据后,请尝试下面的内容

  1. 创建用于解析传入数据的架构类。

val incomingSchema =新的StructType() .add("path",StringType) .add("format",StringType) .add("entries",StringType)

  1. 将该模式关联到传入数据之上,您可以从数据中选择所需的字段并在其之上进行转换。

val valDf =df.selectExpr(“强制转换(值为字符串)为jsonEntry").select(from_json($"jsonEntry",jsonEntry)

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74436763

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档