首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Spark和Kafka流式传输时出现的空值问题

使用Spark和Kafka流式传输时出现的空值问题
EN

Stack Overflow用户
提问于 2019-01-08 23:06:13
回答 2查看 1.8K关注 0票数 1

我创建了SparkConsumer,这样我就可以通过Spark Structured Streaming将csv文件发送到Kafka。我启动sparkConsumer,然后他等待制作人。我启动了生成器,文件就被发送出去了。问题是我变成了一个“null”--数据帧中的值,而不是内容。我的输出如下所示:

代码语言:javascript
复制
-------------------------------------------
Batch: 1
-------------------------------------------
+---------+---------+-----------+--------+-----------------------+
|InvoiceNo|StockCode|Description|Quantity|timestamp              |
+---------+---------+-----------+--------+-----------------------+
|null     |null     |null       |null    |2019-01-08 15:46:29.156|
|null     |null     |null       |null    |2019-01-08 15:46:29.224|
|null     |null     |null       |null    |2019-01-08 15:46:29.224|
|null     |null     |null       |null    |2019-01-08 15:46:29.225|
|null     |null     |null       |null    |2019-01-08 15:46:29.225|
|null     |null     |null       |null    |2019-01-08 15:46:29.225|
|null     |null     |null       |null    |2019-01-08 15:46:29.225|
|null     |null     |null       |null    |2019-01-08 15:46:29.225|
|null     |null     |null       |null    |2019-01-08 15:46:29.225|
|null     |null     |null       |null    |2019-01-08 15:46:29.225|
|null     |null     |null       |null    |2019-01-08 15:46:29.225|
|null     |null     |null       |null    |2019-01-08 15:46:29.241|
|null     |null     |null       |null    |2019-01-08 15:46:29.241|
|null     |null     |null       |null    |2019-01-08 15:46:29.241|
|null     |null     |null       |null    |2019-01-08 15:46:29.241|
|null     |null     |null       |null    |2019-01-08 15:46:29.241|
|null     |null     |null       |null    |2019-01-08 15:46:29.241|
|null     |null     |null       |null    |2019-01-08 15:46:29.241|
|null     |null     |null       |null    |2019-01-08 15:46:29.241|
|null     |null     |null       |null    |2019-01-08 15:46:29.241|
+---------+---------+-----------+--------+-----------------------+

sparkConsumer的代码为:

代码语言:javascript
复制
object sparkConsumer extends App {

  val rootLogger = Logger.getRootLogger()
  rootLogger.setLevel(Level.ERROR)

  val spark = SparkSession
    .builder()
    .appName("Spark-Kafka-Integration")
    .master("local")
    .getOrCreate()

  val schema = StructType(Array(
    StructField("InvoiceNo", StringType, nullable = true),
    StructField("StockCode", StringType, nullable = true),
    StructField("Description", StringType, nullable = true),
    StructField("Quantity", StringType, nullable = true)
  ))

  import spark.implicits._
  val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("delimiter", ";")
    .option("header","true")
    .option("inferSchema","true")
    .load()

  val df1 = df.selectExpr("CAST(value as STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
    .select(from_json($"value", schema).as("data"), $"timestamp")
    .select("data.*", "timestamp")


  df1.writeStream
    .format("console")
    .option("truncate","false")
    .start()
    .awaitTermination()

}

Producer.scala:

代码语言:javascript
复制
object Producer extends App {
  import java.util.Properties
  import org.apache.kafka.clients.producer._

  val  props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")                                             
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")        
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")      

  val producer = new KafkaProducer[String, String](props)                                             
  val TOPIC="test"
  val fileName = "path/to/test.csv"
  val lines = Source.fromFile(fileName).getLines()

  for(i <- lines){
    val record = new ProducerRecord(TOPIC, "key", s"$i")                    
    producer.send(record)
  }
  val record = new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date)
  producer.send(record)
  producer.close()

}

有没有人可以帮助我成为我的文件的内容?

EN

回答 2

Stack Overflow用户

发布于 2019-01-16 18:11:41

我认为这个问题与序列化和反序列化有关。您写入主题的value格式为csv,例如:

111,someCode,someDescription,11

您的Spark使用者认为该消息是json格式(具有某种模式的from_json)。如果消息将如下所示,则解析将会起作用。

代码语言:javascript
复制
{
    "InvoiceNo": "111",
    "StockCode": "someCode",
    "Description": "someDescription",
    "Quantity": "11"
}

您必须更改序列化或反序列化以使它们相互匹配。

以下选项之一应起作用

  1. 生产者必须以json格式将消息写入主题
  2. Spark消费者应使用comma解析行以拆分字段
票数 2
EN

Stack Overflow用户

发布于 2021-07-27 04:19:37

在我的例子中,删除.option("delimiter", ";")是可行的

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54094582

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档