首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >同一个应用程序中的KafkaStreams多个流

同一个应用程序中的KafkaStreams多个流
EN

Stack Overflow用户
提问于 2018-03-23 19:15:19
回答 2查看 7.3K关注 0票数 0

我试着用KafkaStreams做一个基于约定和合理性的实用设计决策。

假设我有两个不同的事件要放在KTable中,我有一个生产者将这些消息发送给正在收听该主题的KStream

据我所知,我不能使用条件转发来处理使用KafkaStreams的消息,因此,如果流订阅了许多主题(例如,上面每条消息都订阅一个主题),我只能在单个接收器主题上调用stream.to --否则,我将不得不在流上调用foreach,并向接收器主题发送带有KProducer的消息。

以上建议使用单一的流。我以为我可以在同一个应用程序中设置多个流,每个流可以监听一个主题,映射并转发到一个表接收器,但是每次我尝试创建两个KafkaStreams实例时,只有第一个初始化的对象订阅了它的主题--另一个得到了客户端的警告,即它的主题没有订阅。

我可以在同一个应用程序中设置多个流吗?若然,是否有特别的要求?

代码语言:javascript
复制
    class Stream(topic: String) {
      val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
      val streamsBuilder = new StreamsBuilder
      val topics = new util.ArrayList[String]
      topics.add(props.get("topic"))

      val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))

      def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
        builder.stream[String, String](
          topics,
          Consumed.`with`(String(), String())
        )
      }

      def init(): KafkaStreams = {
        val streams = new KafkaStreams(streamsBuilder.build(), KafkaConfig.streamConfig)

        streams.start()

        streams
      }
    }

    class Streams() {

      val eventStream = new Stream("first_event") //looking good!
      val eventStream2 = new Stream("second_event") // no subscribers
      //if I switch the other of these, eventStream2 is subscribed to and eventStream is dead in the water
      val streams: KafkaStreams = eventStream.init()
      val streams2: KafkaStreams = eventStream2.init()

    }

流配置

代码语言:javascript
复制
    val streamConfig: Properties = {
        val properties = new Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerHost)
        properties
    }

我也希望有其他的选择

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-03-23 22:12:42

据我所知,我不能对消息使用条件转发。

你知道KStream#split() (按顺序排列的KStream#branch()版本)吗?基本上和条件转发一样。

我想我可以在同一个应用程序中设置多个流,每个流听一个主题,映射并转发到一个表接收器,

这方面的工作如下:

代码语言:javascript
复制
StreamsBuilder builder = new SteamsBuilder();
KStream stream1 = builder.stream("topic1");
KStream stream2 = builder.stream("topic2");

stream1.to("table1-topic");
stream2.to("table2-topic");

但是,每次我尝试创建两个KafkaStreams实例时,只有初始化的第一个实例订阅它的主题--另一个则收到客户端的警告,即它的主题没有订阅。

不确定。这应该能行。也许你能分享你的代码?

票数 2
EN

Stack Overflow用户

发布于 2018-05-17 09:57:11

在创建KafkaStreams时,需要传递具有不同application.id的属性,例如:

代码语言:javascript
复制
    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP1");
    StreamsBuilder builder = new SteamsBuilder();
    KStream stream1 = builder.stream("topic1");
    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

然后,您应该创建另一个流:

代码语言:javascript
复制
    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP2");
    StreamsBuilder builder = new SteamsBuilder();
    KStream stream2 = builder.stream("topic2");
    KafkaStreams streams2 = new KafkaStreams(builder, props);
    streams2.start();
票数 8
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49456897

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档