我有一个spring引导服务,它使用kafka主题。当我消费时,我会对kafka消息执行某些任务。在执行这些操作之前,我需要等待服务将一些数据加载到我设置的缓存中。我的问题是,如果我将kafka consumer设置为自动启动,它会在缓存加载之前开始消耗,然后它就会出错。
我试图在加载缓存后显式地启动使用者,但是我得到了空指针异常。
@Configuration
public class KafkaConfig {
@Value("${kafka.server}")
String server;
@Value("${kafka.port}")
String port;
@Value("${kafka.group.id}")
String groupid;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server+":"+port);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// config.put("security.protocol","SASL_PLAINTEXT");
// config.put("sasl.kerberos.service.name","kafka");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setAutoStartup(false);
return factory;
}
}KafkaListener
@Service
public class KafkaConsumer {
@Autowired
AggregationService aggregationService;
@Autowired
private KafkaListenerEndpointRegistry registry;
private final CounterService counterService;
public KafkaConsumer(CounterService counterService) {
this.counterService = counterService;
}
@KafkaListener(topics = "gliTransactionTopic", group = "gliDecoupling", id = "gliKafkaListener")
public boolean consume(String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset) throws ParseException {
System.out.println("Inside kafka listener :" + message+" partition :"+partition.toString()+" offset :"+offset.toString());
aggregationService.run();
return true;
}
}要启动和停止的服务
@Service
public class DecouplingController {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
public void stop() {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry
.getListenerContainer("gliKafkaListener");
listenerContainer.stop();
}
public void start() {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry
.getListenerContainer("gliKafkaListener");
listenerContainer.start();
}
}main方法
@SpringBootApplication
public class DecouplingApplication {
Ignite ignite;
static IgniteCache<Long, MappingsEntity> mappingsCache;
public static void main(String[] args) {
SpringApplication.run(DecouplingApplication.class, args);
Ignition.setClientMode(true);
Ignite ignite = Ignition.ignite("ignite");
loadCaches(ignite);
}
public static boolean loadCaches(Ignite ignite) {
mappingsCache = ignite.getOrCreateCache("MappingsCache");
mappingsCache.loadCache(null);
System.out.println("Data Loaded");
DecouplingController dc=new DecouplingController();
dc.start();
return true;
}
}以下是例外情况
Data Loaded
Exception in thread "main" java.lang.NullPointerException
at com.ignite.spring.decoupling.controller.DecouplingController.start(DecouplingController.java:126)
at com.ignite.spring.decoupling.DecouplingApplication.loadCaches(DecouplingApplication.java:64)
at com.ignite.spring.decoupling.DecouplingApplication.main(DecouplingApplication.java:37)发布于 2019-11-07 17:33:59
在DecouplingApplication中自动创建依赖项,而不是手动创建DecouplingController对象。
@Autowired
DecouplingController deDecouplingController;处理自动连接依赖关系的ApplicationContext不知道您使用"new“手动创建的对象。自动连接的kafkaListenerEndpointRegistry对于您创建的新DecouplingController对象是未知的。
发布于 2020-06-27 23:51:57
似乎ConsumerConfig/ListenerConfig的某个部分没有注册gliKafkaListener1
https://stackoverflow.com/questions/58742546
复制相似问题