我正在工作一个Camunda弹簧启动应用程序。应用程序使用从rabbitmq队列中读取消息。一旦收到消息,应用程序将调用Camunda中的一个流程实例。
如果在应用程序启动期间已经有消息在rabbitmq队列中,云流侦听器甚至在Camunda初始化之前就开始读取消息。
在触发某个事件之前,是否可以停止云流侦听器侦听队列--在本例中为PostDeployEvent。
我已经创建了一个示例应用程序以供参考,https://github.com/kpkurian/spring-cloud-stream-camunda
谢谢!!
发布于 2018-09-17 12:13:40
如@OlegZhurakousky所建议
问题
RuntimeService是自动启动的,当应用程序启动时,假设所有服务、bean等都已完全初始化。如果它仍然处于初始化和启动的过程中,那么从Spring习语的角度来看,它没有得到正确的实现。
溶液
用定制的生命周期实现包装RuntimeService,直到执行它的start()方法才会返回,以确保RuntmeService已经准备就绪。
我已经在示例github应用程序中实现了这一点。
发布于 2018-10-23 10:02:50
弹簧云流(kafka粘合剂)增加了暂停和恢复消费者的方法
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}请检查文档examples
但我认为暂停方法存在一些问题:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/479
PS/您可以在示例侦听器中获得partion id和主题名称:
@StreamListener(Sink.INPUT)
public void in(String in,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
TopicPartition p = new TopicPartition(topic, partition);
consumer.pause(Collections.singleton(p));
}或者在errorChannel全局侦听器中
@StreamListener("errorChannel")
public void errorGlobal(Message<?> message) {
Message<?> failedMessage = ((ErrorMessage)message).getOriginalMessage();
Consumer consumer = (Consumer)failedMessage.getHeaders().get(KafkaHeaders.CONSUMER);
int partition = (int) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
String topic = (String) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
TopicPartition p = new TopicPartition(topic, partition);
// ?
consumer.pause(Collections.singleton(p));
}https://stackoverflow.com/questions/52097256
复制相似问题