首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用来自KStream输出主题的消息

如何使用来自KStream输出主题的消息
EN

Stack Overflow用户
提问于 2019-05-31 15:42:37
回答 1查看 1.1K关注 0票数 2

我正在使用KStream.to(“outputt异位”)编写输出主题;在apache文档中提到,它将自动创建传递给to()的主题。如何使用来自该主题的信息?

我可以使用consumer.subscribe()来处理输出消息和轮询消息吗?

代码语言:javascript
复制
        KStreamBuilder builder = new KStreamBuilder();

        builder.stream(topic).filterNot((k, v) -> {
            v.toString().contains(tid);
        }).to("outputtopic");

        streams = new KafkaStreams(builder, config);
        streams.start();

        consumer.subscribe(Arrays.asList("outputtopic"));
EN

回答 1

Stack Overflow用户

发布于 2019-06-02 13:35:25

代码语言:javascript
复制
builder.stream(topic).filterNot((k, v) -> {
            v.toString().contains(tid);
        }) // i.e., without the last `to()` method

这个方法链的结果是一个KStream。如果您的问题是如何在同一应用程序中继续对此结果的KStream进行操作,那么请执行如下操作:

代码语言:javascript
复制
KStream<..., ...> myStream = builder.stream(topic).filterNot((k, v) -> {
            v.toString().contains(tid);
        });
myStream.to("outputtopic");

// Then continue to use the `myStream` instance for further work.
myStream.map(....).aggregate(...);

如果您的问题是如何从不同的应用程序读取输出主题,那么您可以通过从另一个Kafka流应用程序、从KSQL、从普通的Kafka消费者(通过订阅)等读取这个主题来做到这一点。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56398097

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档