我刚开始玩星火流。
我从卡夫卡那里得到了一个类似于下面的事件。我必须从dataframe中提取路径,从路径中读取数据并将其写入目的地。
{"path":["/tmp/file_path/file.parquet"],"format":"parquet","entries":null}知道如何提取路径并格式化火花流数据吗?
这就是我想要达到的目标,
val df: DataFrame = spark.readStream.format("kafka").
option("kafka.bootstrap.servers", "localhost:9092").
option("subscribe", "kafka-test-event").
option("startingOffsets", "earliest").load()
df.printSchema()
val valDf = df.selectExpr("CAST(value AS STRING)")
val path = valDf.collect()(0).getString(0)
println("path - "+ path)
val newDf = spark.read.parquet(path)
newDf.selectExpr("CAST(value AS STRING)").writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)当我尝试在dataframe上进行一个收集时,它会抛出一个Unsupported operation exception.。
发布于 2022-11-15 12:30:59
您会得到该错误,因为您正在尝试对流数据文件执行静态操作。
你可以试试下面这样的方法。在阅读了Kafka的流数据后,请尝试下面的内容
val incomingSchema =新的StructType() .add("path",StringType) .add("format",StringType) .add("entries",StringType)
val valDf =df.selectExpr(“强制转换(值为字符串)为jsonEntry").select(from_json($"jsonEntry",jsonEntry)
https://stackoverflow.com/questions/74436763
复制相似问题