首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡-火花结构化流中如何选择Case类对象作为DataFrame

卡夫卡-火花结构化流中如何选择Case类对象作为DataFrame
EN

Stack Overflow用户
提问于 2020-07-18 15:43:26
回答 1查看 432关注 0票数 0

我有个案例课:

代码语言:javascript
复制
case class clickStream(userid:String, adId :String, timestamp:String)

实例,其中我希望将其与KafkaProducer一起发送为:

代码语言:javascript
复制
val record = new ProducerRecord[String,clickStream](
  "clicktream",
  "data",
  clickStream(Random.shuffle(userIdList).head, Random.shuffle(adList).head, new Date().toString).toString
)
producer.send(record)

它按照主题队列中期望的那样将记录作为字符串完美地发送:

代码语言:javascript
复制
clickStream(user5,ad2,Sat Jul 18 20:48:53 IST 2020)

然而,问题在于消费者:

代码语言:javascript
复制
val clickStreamDF = spark.readStream
.format("kafka")
.options(kafkaMap)
.option("subscribe","clicktream")
.load()
 

clickStreamDF 
.select($"value".as("string"))
.as[clickStream]       //trying to leverage DataSet APIs conversion
.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate","false")
.start()
.awaitTermination()

显然,使用.asclickStream API不作为例外情况:

代码语言:javascript
复制
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`userid`' given input columns: [value];

这是列包含的内容:

代码语言:javascript
复制
    Batch: 2
-------------------------------------------
+----------------------------------------------------+
|value                                               |
+----------------------------------------------------+
|clickStream(user3,ad11,Sat Jul 18 20:59:35 IST 2020)|
+----------------------------------------------------+

我尝试使用value.serializervalue.deserializer使用自定义序列化程序

但在我的目录结构中却面临着不同的ClassNotFoundException问题。

我有三个问题:

卡夫卡如何在这里使用自定义反序列化类来解析对象?

我不完全理解编码器、的概念,在本例中如何使用?

使用Kafka发送/接收自定义Case类对象的最佳方法是什么?

EN

回答 1

Stack Overflow用户

发布于 2020-07-18 16:11:42

当您将clickStream对象数据作为string传递给卡夫卡& spark将读取相同的字符串时,您必须在clickStream(user3,ad11,Sat Jul 18 20:59:35 IST 2020)中解析和提取所需的字段。

检查下面的代码。

代码语言:javascript
复制
clickStreamDF 
.select(split(regexp_extract($"value","\\(([^)]+)\\)",1),"\\,").as("value"))
.select($"value"(0).as("userid"),$"value"(1).as("adId"),$"value"(2).as("timestamp"))
.as[clickStream] # Extract all fields from the value string & then use .as[clickStream] option. I think this line is not required as data already parsed to required format. 
.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate","false")
.start()
.awaitTermination()

示例如何解析clickStream字符串数据。

代码语言:javascript
复制
scala> df.show(false)
+---------------------------------------------------+
|value                                              |
+---------------------------------------------------+
|clickStream(user5,ad2,Sat Jul 18 20:48:53 IST 2020)|
+---------------------------------------------------+
代码语言:javascript
复制
scala> df
.select(split(regexp_extract($"value","\\(([^)]+)\\)",1),"\\,").as("value"))
.select($"value"(0).as("userid"),$"value"(1).as("adId"),$"value"(2).as("timestamp"))
.as[clickStream]
.show(false)

+------+----+----------------------------+
|userid|adId|timestamp                   |
+------+----+----------------------------+
|user5 |ad2 |Sat Jul 18 20:48:53 IST 2020|
+------+----+----------------------------+

用Kafka发送/接收自定义Case类对象的最佳方法是什么?

尝试将您的case类转换为jsonavrocsv,然后将消息发送给kafka &使用火花读取相同的消息。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62970512

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档