首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KafkaUtils010 SparkStreaming中的MessageHandler

KafkaUtils010 SparkStreaming中的MessageHandler
EN

Stack Overflow用户
提问于 2017-03-15 23:19:16
回答 2查看 1.2K关注 0票数 0

我想按主题分组,或者在申请时知道消息来自哪个主题:

代码语言:javascript
复制
val stream = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent, 
    Subscribe[String, String](
      Array(topicConfig.srcTopic),
      kafkaParameters(BOOTSTRAP_SERVERS,"kafka_test_group_id))
    )
  )

然而,在最新的API中,kafka010似乎不像以前的版本那样支持消息处理程序。有关于如何获得主题的想法吗?

我的目标是从N个主题中消费,处理它们(根据主题的不同以不同的方式),然后在主题的1:1映射中将其推回到另外N个主题:

代码语言:javascript
复制
SrcTopicA--> Process --> DstTopicA
SrcTopicB--> Process --> DstTopicB
SrcTopicC--> Process --> DstTopicC

但是有一些属性需要共享(这些属性变化很大,所以不可能使用广播变量)。因此,所有的主题都需要在同一个spark作业中使用。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-03-16 01:53:37

当你在0.10中使用createDirectStream时,你会得到一个ConsumerRecord。此记录具有topic值。您可以创建主题和值的元组:

代码语言:javascript
复制
val stream: InputDStream[ConsumerRecord[String, String]] = 
  KafkaUtils.createDirectStream[String, String](
    streamingContext,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )

val res: DStream[(String, String)] = stream.map(record => (record.topic(), record.value()))
票数 0
EN

Stack Overflow用户

发布于 2017-03-15 23:48:43

您可以像这样使用topic对流进行过滤:

代码语言:javascript
复制
stream.filter(cr => cr.topic)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42813846

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档