我正在研究星火流上下文,它正在从avro序列化中的kafka主题中获得数据,如下所示。
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"schema.registry.url" -> "http://localhost:8081",
"key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"group.id" -> "1"
)使用Kafka实用程序,我将创建如下所示的直接流
val topics = Set("mysql-foobar")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String,String](
topics,
kafkaParams)
)我还将数据写到控制台,作为
stream.foreachRDD ( rdd => {
rdd.foreachPartition(iterator => {
while (iterator.hasNext) {
val next = iterator.next()
println(next.value())
}
})
})现在我想从这些RDD中创建数据框架,是否有可能我已经审查并测试了许多来自堆栈溢出的解决方案,但却遇到了一些问题。Stackoverflow解决方案也是这和这。我的输出如下
{"c1": 4, "c2": "Jarry", "create_ts": 1536758512000, "update_ts": 1537204805000}发布于 2018-09-17 13:40:42
由于您使用的是汇流序列化程序,而且它们此时不提供与Spark的轻松集成,因此您可以通过AbsaOSS在Github上签出一个相对新的库,该库对此有所帮助。
但是,基本上,您使用星火结构化流来获取DataFrames,不要尝试使用Dstream来实现数据挖掘.
你可以找到你在这里寻找的例子
还请参阅将Spark结构化流与Kafka模式注册表集成上的其他示例
https://stackoverflow.com/questions/52367666
复制相似问题