首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SparkStreaming: DirectStream RDD到dataframe

SparkStreaming: DirectStream RDD到dataframe
EN

Stack Overflow用户
提问于 2018-09-17 12:22:38
回答 1查看 600关注 0票数 0

我正在研究星火流上下文,它正在从avro序列化中的kafka主题中获得数据,如下所示。

代码语言:javascript
复制
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "schema.registry.url" -> "http://localhost:8081",
  "key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
  "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
  "group.id" -> "1"
)

使用Kafka实用程序,我将创建如下所示的直接流

代码语言:javascript
复制
val topics = Set("mysql-foobar")


val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String,String](
    topics,
    kafkaParams)
)

我还将数据写到控制台,作为

代码语言:javascript
复制
stream.foreachRDD ( rdd => {
  rdd.foreachPartition(iterator => {
    while (iterator.hasNext) {
      val next = iterator.next()
      println(next.value())
    }
  })
})

现在我想从这些RDD中创建数据框架,是否有可能我已经审查并测试了许多来自堆栈溢出的解决方案,但却遇到了一些问题。Stackoverflow解决方案也是。我的输出如下

代码语言:javascript
复制
{"c1": 4, "c2": "Jarry", "create_ts": 1536758512000, "update_ts": 1537204805000}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-09-17 13:40:42

由于您使用的是汇流序列化程序,而且它们此时不提供与Spark的轻松集成,因此您可以通过AbsaOSS在Github上签出一个相对新的库,该库对此有所帮助。

但是,基本上,您使用星火结构化流来获取DataFrames,不要尝试使用Dstream来实现数据挖掘.

你可以找到你在这里寻找的例子

还请参阅将Spark结构化流与Kafka模式注册表集成上的其他示例

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52367666

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档