首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >方法运行后在Main中显式启动Kafka Consumer

方法运行后在Main中显式启动Kafka Consumer
EN

Stack Overflow用户
提问于 2019-11-07 13:36:13
回答 2查看 1.1K关注 0票数 0

我有一个spring引导服务,它使用kafka主题。当我消费时,我会对kafka消息执行某些任务。在执行这些操作之前,我需要等待服务将一些数据加载到我设置的缓存中。我的问题是,如果我将kafka consumer设置为自动启动,它会在缓存加载之前开始消耗,然后它就会出错。

我试图在加载缓存后显式地启动使用者,但是我得到了空指针异常。

代码语言:javascript
复制
@Configuration
public class KafkaConfig {

    @Value("${kafka.server}")
    String server;

    @Value("${kafka.port}")
    String port;

    @Value("${kafka.group.id}")
    String groupid;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server+":"+port);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // config.put("security.protocol","SASL_PLAINTEXT");
        // config.put("sasl.kerberos.service.name","kafka");

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setAutoStartup(false);
        return factory;
    }

}

KafkaListener

代码语言:javascript
复制
    @Service
    public class KafkaConsumer {

    @Autowired
    AggregationService aggregationService;

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    private final CounterService counterService;

    public KafkaConsumer(CounterService counterService) {
        this.counterService = counterService;
    }

    @KafkaListener(topics = "gliTransactionTopic", group = "gliDecoupling", id = "gliKafkaListener")
    public boolean consume(String message,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
            @Header(KafkaHeaders.OFFSET) Long offset) throws ParseException {
        System.out.println("Inside kafka listener :" + message+" partition :"+partition.toString()+" offset :"+offset.toString());
    aggregationService.run();
            return true;
        }



}

要启动和停止的服务

代码语言:javascript
复制
    @Service
    public class DecouplingController {

        @Autowired
        private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;



        public void stop() {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry
                    .getListenerContainer("gliKafkaListener");
            listenerContainer.stop();
        }

        public void start() {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry
                    .getListenerContainer("gliKafkaListener");
            listenerContainer.start();
        }


    }

main方法

代码语言:javascript
复制
    @SpringBootApplication
    public class DecouplingApplication {

        Ignite ignite;


        static IgniteCache<Long, MappingsEntity> mappingsCache;

        public static void main(String[] args) {
            SpringApplication.run(DecouplingApplication.class, args);

            Ignition.setClientMode(true);
            Ignite ignite = Ignition.ignite("ignite");
            loadCaches(ignite);




        }

        public static boolean loadCaches(Ignite ignite) {

            mappingsCache = ignite.getOrCreateCache("MappingsCache");


            mappingsCache.loadCache(null);

            System.out.println("Data Loaded");

            DecouplingController dc=new DecouplingController();
            dc.start();

            return true;
        }

    }

以下是例外情况

代码语言:javascript
复制
    Data Loaded
    Exception in thread "main" java.lang.NullPointerException
        at com.ignite.spring.decoupling.controller.DecouplingController.start(DecouplingController.java:126)
        at com.ignite.spring.decoupling.DecouplingApplication.loadCaches(DecouplingApplication.java:64)
        at com.ignite.spring.decoupling.DecouplingApplication.main(DecouplingApplication.java:37)
EN

回答 2

Stack Overflow用户

发布于 2019-11-07 17:33:59

在DecouplingApplication中自动创建依赖项,而不是手动创建DecouplingController对象。

代码语言:javascript
复制
@Autowired

DecouplingController deDecouplingController;

处理自动连接依赖关系的ApplicationContext不知道您使用"new“手动创建的对象。自动连接的kafkaListenerEndpointRegistry对于您创建的新DecouplingController对象是未知的。

票数 2
EN

Stack Overflow用户

发布于 2020-06-27 23:51:57

似乎ConsumerConfig/ListenerConfig的某个部分没有注册gliKafkaListener1

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58742546

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档