首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >@KafkaListener正常关机,批量Kakfa监听器不工作

@KafkaListener正常关机,批量Kakfa监听器不工作
EN

Stack Overflow用户
提问于 2020-05-14 22:21:43
回答 1查看 501关注 0票数 0

19880 -05-14 19:32:11.238信息2020- on(4)-127.0.0.1 o.s.c.support.DefaultLifecycleProcessor :在30000超时时间内关闭阶段值为2147483547的1个bean失败:

EN

回答 1

Stack Overflow用户

发布于 2020-05-15 00:02:12

默认情况下,Spring要求每个SmartLifecycle阶段的bean在30秒内停止;您可以通过添加以下bean来更改此行为:

代码语言:javascript
复制
@Bean
public DefaultLifecycleProcessor lifecycleProcessor() {
    DefaultLifecycleProcessor lp = new DefaultLifecycleProcessor();
    lp.setTimeoutPerShutdownPhase(120_000);
    return lp;
}

编辑

在侦听器正在处理批处理时停止容器不会影响批处理的处理;侦听器线程不会被容器终止;但是,默认情况下,容器将在10秒后发布容器停止事件(容器属性shutDownTimeout),即使侦听器尚未实际完成也是如此。

如果您担心生命周期处理器在批处理过程中杀死线程,并且不想增加它的超时,那么您可以结合暂停使用者和侦听事件来执行正常关闭。

下面是一个示例:

代码语言:javascript
复制
@SpringBootApplication
public class So61799727Application {

    public static void main(String[] args) {
        SpringApplication.run(So61799727Application.class, args);
    }


    @KafkaListener(id = "so61799727", topics = "so61799727", concurrency = "3")
    public void listen(List<String> in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so61799727").partitions(3).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template,
            KafkaListenerEndpointRegistry registry) {

        return args -> {
            sendTen(template);
            System.out.println("Hit enter to pause container");
            System.in.read();
            registry.getListenerContainer("so61799727").pause();
        };
    }

    public static void sendTen(KafkaTemplate<String, String> template) {
        IntStream.range(0, 10).forEach(i -> template.send("so61799727", "foo" + i));
    }

}
代码语言:javascript
复制
@Component
class Eventer {

    privaate final KafkaTemplate<String, String> template;

    private final AtomicInteger paused = new AtomicInteger();

    Eventer(KafkaTemplate<String, String> template) {
        this.template = template;
    }

    @EventListener
    public void paused(ConsumerPausedEvent event) {
        System.out.println(event);
        if (this.paused.incrementAndGet() ==
                event.getContainer(ConcurrentMessageListenerContainer.class).getConcurrency()) {
            System.out.println("All containers paused");
            So61799727Application.sendTen(this.template);
        }
    }

    @EventListener
    public void idle(ListenerContainerIdleEvent event) {
        System.out.println(event);
    }

}
代码语言:javascript
复制
spring.kafka.listener.idle-event-interval=5000
spring.kafka.listener.type=batch
spring.kafka.producer.properties.linger.ms=50
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61799727

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档