首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >停止Stream @StreamListener监听直到收到一些Spring事件

停止Stream @StreamListener监听直到收到一些Spring事件
EN

Stack Overflow用户
提问于 2018-08-30 12:36:11
回答 2查看 3.6K关注 0票数 2

我正在工作一个Camunda弹簧启动应用程序。应用程序使用从rabbitmq队列中读取消息。一旦收到消息,应用程序将调用Camunda中的一个流程实例。

如果在应用程序启动期间已经有消息在rabbitmq队列中,云流侦听器甚至在Camunda初始化之前就开始读取消息。

在触发某个事件之前,是否可以停止云流侦听器侦听队列--在本例中为PostDeployEvent。

我已经创建了一个示例应用程序以供参考,https://github.com/kpkurian/spring-cloud-stream-camunda

谢谢!!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-09-17 12:13:40

如@OlegZhurakousky所建议

问题

RuntimeService是自动启动的,当应用程序启动时,假设所有服务、bean等都已完全初始化。如果它仍然处于初始化和启动的过程中,那么从Spring习语的角度来看,它没有得到正确的实现。

溶液

用定制的生命周期实现包装RuntimeService,直到执行它的start()方法才会返回,以确保RuntmeService已经准备就绪。

我已经在示例github应用程序中实现了这一点。

票数 2
EN

Stack Overflow用户

发布于 2018-10-23 10:02:50

弹簧云流(kafka粘合剂)增加了暂停和恢复消费者的方法

代码语言:javascript
复制
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
        System.out.println(in);
        consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
    }

    @Bean
    public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
        return event -> {
            System.out.println(event);
            if (event.getConsumer().paused().size() > 0) {
                event.getConsumer().resume(event.getConsumer().paused());
            }
        };
    }
}

请检查文档examples

但我认为暂停方法存在一些问题:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/479

PS/您可以在示例侦听器中获得partion id和主题名称:

代码语言:javascript
复制
  @StreamListener(Sink.INPUT)
  public void in(String in,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
    System.out.println(in);
    TopicPartition p = new TopicPartition(topic, partition);
    consumer.pause(Collections.singleton(p));
  }

或者在errorChannel全局侦听器中

代码语言:javascript
复制
   @StreamListener("errorChannel")
  public void errorGlobal(Message<?> message) {
    Message<?> failedMessage = ((ErrorMessage)message).getOriginalMessage();
    Consumer consumer = (Consumer)failedMessage.getHeaders().get(KafkaHeaders.CONSUMER);
    int partition = (int) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
    String topic = (String) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
    TopicPartition p = new TopicPartition(topic, partition);
    // ?
    consumer.pause(Collections.singleton(p));
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52097256

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档