我有个案例课:
case class clickStream(userid:String, adId :String, timestamp:String)实例,其中我希望将其与KafkaProducer一起发送为:
val record = new ProducerRecord[String,clickStream](
"clicktream",
"data",
clickStream(Random.shuffle(userIdList).head, Random.shuffle(adList).head, new Date().toString).toString
)
producer.send(record)它按照主题队列中期望的那样将记录作为字符串完美地发送:
clickStream(user5,ad2,Sat Jul 18 20:48:53 IST 2020)然而,问题在于消费者:
val clickStreamDF = spark.readStream
.format("kafka")
.options(kafkaMap)
.option("subscribe","clicktream")
.load()
clickStreamDF
.select($"value".as("string"))
.as[clickStream] //trying to leverage DataSet APIs conversion
.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate","false")
.start()
.awaitTermination()显然,使用.asclickStream API不作为例外情况:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`userid`' given input columns: [value];这是值列包含的内容:
Batch: 2
-------------------------------------------
+----------------------------------------------------+
|value |
+----------------------------------------------------+
|clickStream(user3,ad11,Sat Jul 18 20:59:35 IST 2020)|
+----------------------------------------------------+我尝试使用value.serializer和value.deserializer使用自定义序列化程序
但在我的目录结构中却面临着不同的ClassNotFoundException问题。
我有三个问题:
卡夫卡如何在这里使用自定义反序列化类来解析对象?
我不完全理解编码器、和的概念,在本例中如何使用?。
使用Kafka发送/接收自定义Case类对象的最佳方法是什么?
发布于 2020-07-18 16:11:42
当您将clickStream对象数据作为string传递给卡夫卡& spark将读取相同的字符串时,您必须在clickStream(user3,ad11,Sat Jul 18 20:59:35 IST 2020)中解析和提取所需的字段。
检查下面的代码。
clickStreamDF
.select(split(regexp_extract($"value","\\(([^)]+)\\)",1),"\\,").as("value"))
.select($"value"(0).as("userid"),$"value"(1).as("adId"),$"value"(2).as("timestamp"))
.as[clickStream] # Extract all fields from the value string & then use .as[clickStream] option. I think this line is not required as data already parsed to required format.
.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate","false")
.start()
.awaitTermination()示例如何解析clickStream字符串数据。
scala> df.show(false)
+---------------------------------------------------+
|value |
+---------------------------------------------------+
|clickStream(user5,ad2,Sat Jul 18 20:48:53 IST 2020)|
+---------------------------------------------------+scala> df
.select(split(regexp_extract($"value","\\(([^)]+)\\)",1),"\\,").as("value"))
.select($"value"(0).as("userid"),$"value"(1).as("adId"),$"value"(2).as("timestamp"))
.as[clickStream]
.show(false)
+------+----+----------------------------+
|userid|adId|timestamp |
+------+----+----------------------------+
|user5 |ad2 |Sat Jul 18 20:48:53 IST 2020|
+------+----+----------------------------+用Kafka发送/接收自定义Case类对象的最佳方法是什么?
尝试将您的case类转换为json、avro或csv,然后将消息发送给kafka &使用火花读取相同的消息。
https://stackoverflow.com/questions/62970512
复制相似问题