我有两个不同的应用程序,名为A1和A2。每个应用程序都有自己的Kakfa服务器。来自这两个KAKFA服务器(代理)的消息发送到NiFi。
每个Kakfa都有不同的主题名称,基于这个,我可以区分来自Kakfa的消息。但是,除了卡夫卡的主题名称之外,在NiFi中还有其他方法来区分来自两个不同Kakfa的消息吗?他们是否有NiFi处理器来检查主题名,然后决定下一条路线?
非常感谢。你好啊,叶什维德
发布于 2018-12-17 08:57:42
如果您使用NiFi的Kafka处理器(ConsumeKafka/ConsumeKafkaRecord)接收来自Kafka的消息,它们将以FlowFiles的形式输出消息。它们带有一个名为kafka.topic的属性,它将具有消息来自的主题的名称。
要根据主题名称路由消息,可以使用RouteOnAttribute处理器。例如,您有两个主题:topicA & topicB。然后,您必须像这样配置RouteOnAttribute处理器:


然后根据您的需求将关系topic-a和topic-b连接到分离流。如果你要添加更多的卡夫卡资源,你所要做的就是用一个动态的关系来更新RouteOnAttribute。例:topic-c : ${kafka.topic:equals('topicC')}
发布于 2018-12-17 11:47:03
另一项决议。为此,您可以使用CryptographicHashContent处理器。您将得到内容的唯一哈希码,并在此基础上确定您的消息。
注意:由于非标准的遗留实现,HashContent处理器在ApacheApache1.8.0中被标记为不推荐使用,并且可能在以后的版本中被删除。
https://stackoverflow.com/questions/53810684
复制相似问题