首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >java.lang.IllegalArgumentException:未指定路径// Spark Consumer问题

java.lang.IllegalArgumentException:未指定路径// Spark Consumer问题
EN

Stack Overflow用户
提问于 2019-01-08 17:55:01
回答 1查看 1.9K关注 0票数 0

我正在尝试创建SparkConsumer,以便我可以发送消息,在这种情况下,csv文件通过火花流到卡夫卡。但是我有一个错误,没有指定'path‘。请参阅下面的代码

我的代码如下:

代码语言:javascript
复制
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.streaming.OutputMode

object sparkConsumer extends App {

  val conf = new SparkConf().setMaster("local").setAppName("Name")
  val sc = new SparkContext(conf)

  val rootLogger = Logger.getRootLogger()
  rootLogger.setLevel(Level.ERROR)

  val spark = SparkSession
    .builder()
    .appName("Spark-Kafka-Integration")
    .master("local")
    .getOrCreate()

  val schema = StructType(Array(
    StructField("InvoiceNo", StringType, nullable = true),
    StructField("StockCode", StringType, nullable = true),
    StructField("Description", StringType, nullable = true),
    StructField("Quantity", StringType, nullable = true)
  ))

  val streamingDataFrame = spark.readStream.schema(schema).csv("C:/Users/me/Desktop/Tasks/Tasks1/test.csv")

  streamingDataFrame.selectExpr("CAST(InvoiceNo AS STRING) AS key", "to_json(struct(*)) AS value").
    writeStream
    .format("csv")
    .option("topic", "topic_test")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("checkpointLocation", "C:/Users/me/IdeaProjects/SparkStreaming/checkpointLocation/")
    .start()

  import spark.implicits._
  val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "topic_test")
    .load()

  val df1 = df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
    .select(from_json($"value", schema).as("data"), $"timestamp")
    .select("data.*", "timestamp")

  df1.writeStream
    .format("console")
    .option("truncate","false")
    .outputMode(OutputMode.Append)
    .start()
    .awaitTermination()

}

我变成了下面的错误:

代码语言:javascript
复制
Exception in thread "main" java.lang.IllegalArgumentException: 'path' is not specified

有人知道我错过了什么吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-01-08 18:24:55

这似乎是你代码的这一部分的一个问题:

代码语言:javascript
复制
  streamingDataFrame.selectExpr("CAST(InvoiceNo AS STRING) AS key", "to_json(struct(*)) AS value").
    writeStream
    .format("csv")
    .option("topic", "topic_test")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("checkpointLocation", "C:/Users/me/IdeaProjects/SparkStreaming/checkpointLocation/")
    .start()

因为您使用的是"csv“格式,但您没有设置它所需的文件位置。相反,您可以将Kafka属性配置为使用kafka主题作为接收器。因此,如果您将格式更改为"kafka“,它应该可以工作。

您可以尝试使用csv作为源的另一个问题是,您的路径应该是一个目录,而不是文件。在您的情况下,如果您创建一个目录并移动您的csv文件,它将工作。

只是为了测试,创建一个名为C:/Users/me/Desktop/Tasks/Tasks1/test.csv的目录,并创建一个文件,文件名为part-0000.csv。然后将您的csv内容包含在这个新文件中,并再次开始该过程。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54089226

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档