我正在尝试开发一个Spring应用程序,使用库reactor-kafka对从Kafka主题读取的一些消息做出反应。
我有一个构建KafkaReceiver的配置类。
@Configuration
public class MyConfiguration {
@Bean
public KafkaReceiver<String, String> kafkaReceiver() {
Map<String, Object> props = new HashMap<>();
// Options initialisation...
final ReceiverOptions<String, String> receiverOptions =
ReceiverOptions.<String, string>create(props)
.subscription(Collections.singleton(consumer.getTopic()));
return KafkaReceiver.create(receiverOptions);
}
}Well...and现在?使用不那么被动的spring-kafka库,我可以用@KafkaListener注释一个方法,Spring将为我创建一个听卡夫卡主题的线程。
我应该把KafkaReceiver放在哪里呢?在所有示例中,我发现直接使用了main方法,但这不是引导方式。
我使用Spring 2.1.3和反应堆-Kafka 1.1.0
提前谢谢。
发布于 2019-03-19 15:06:28
既然有了KafkaReceiver bean,现在就可以这样做了:
@Bean
public ApplicationRunner runner(KafkaReceiver<String, String> kafkaReceiver) {
return args -> {
kafkaReceiver.receive()
...
.sunbscribe();
};
}这个ApplicationRunner bean将在ApplicationContext准备就绪时被踢掉。有关更多信息,请参见其JavaDocs。
https://stackoverflow.com/questions/55242567
复制相似问题