我正在使用KStream.to(“outputt异位”)编写输出主题;在apache文档中提到,它将自动创建传递给to()的主题。如何使用来自该主题的信息?
我可以使用consumer.subscribe()来处理输出消息和轮询消息吗?
KStreamBuilder builder = new KStreamBuilder();
builder.stream(topic).filterNot((k, v) -> {
v.toString().contains(tid);
}).to("outputtopic");
streams = new KafkaStreams(builder, config);
streams.start();
consumer.subscribe(Arrays.asList("outputtopic"));发布于 2019-06-02 13:35:25
builder.stream(topic).filterNot((k, v) -> {
v.toString().contains(tid);
}) // i.e., without the last `to()` method这个方法链的结果是一个KStream。如果您的问题是如何在同一应用程序中继续对此结果的KStream进行操作,那么请执行如下操作:
KStream<..., ...> myStream = builder.stream(topic).filterNot((k, v) -> {
v.toString().contains(tid);
});
myStream.to("outputtopic");
// Then continue to use the `myStream` instance for further work.
myStream.map(....).aggregate(...);如果您的问题是如何从不同的应用程序读取输出主题,那么您可以通过从另一个Kafka流应用程序、从KSQL、从普通的Kafka消费者(通过订阅)等读取这个主题来做到这一点。
https://stackoverflow.com/questions/56398097
复制相似问题