我想按主题分组,或者在申请时知道消息来自哪个主题:
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](
Array(topicConfig.srcTopic),
kafkaParameters(BOOTSTRAP_SERVERS,"kafka_test_group_id))
)
)然而,在最新的API中,kafka010似乎不像以前的版本那样支持消息处理程序。有关于如何获得主题的想法吗?
我的目标是从N个主题中消费,处理它们(根据主题的不同以不同的方式),然后在主题的1:1映射中将其推回到另外N个主题:
SrcTopicA--> Process --> DstTopicA
SrcTopicB--> Process --> DstTopicB
SrcTopicC--> Process --> DstTopicC但是有一些属性需要共享(这些属性变化很大,所以不可能使用广播变量)。因此,所有的主题都需要在同一个spark作业中使用。
发布于 2017-03-16 01:53:37
当你在0.10中使用createDirectStream时,你会得到一个ConsumerRecord。此记录具有topic值。您可以创建主题和值的元组:
val stream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val res: DStream[(String, String)] = stream.map(record => (record.topic(), record.value()))发布于 2017-03-15 23:48:43
您可以像这样使用topic对流进行过滤:
stream.filter(cr => cr.topic)https://stackoverflow.com/questions/42813846
复制相似问题