我正在与生产者和消费者一起构建一个简单的Kafka应用程序。我正在通过邮递员发送一条字符串并推介这个话题。主题是接收消息,但消费者并没有使用它。
ConsumerConfig.Java
@EnableKafka
@Configuration
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
public class KafkaConsumerConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Map<String,Object> config(){
Map<String,Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_Id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
return config;
}
@Bean
public ConsumerFactory<String,String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(config());
}
}CosumerService.Java
@Service
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
@Component
public class KafkaConsumerService {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
private static final String TOPIC = "Kafka_Test";
@KafkaListener(topics = TOPIC, groupId= "group_Id")
public void consumeOTP(String otp) {
log.debug("The OTP Sent to Kafka is:" + otp);
}
}发布于 2022-05-24 20:52:18
基于您的问题,我假设您使用Spring和Spring。举个简单的例子,使用这个设置,您可以避免所有Bean配置,并使用Spring中的DefaultBean,这样您基本上可以使用application.yml文件来完成设置,在这个职位中有更好的解释,但基本上:
制片人:
@Service
public class SimpleProducer {
private KafkaTemplate<String, String> simpleProducer;
public SimpleProducer(KafkaTemplate<String, String> simpleProducer) {
this.simpleProducer = simpleProducer;
}
public void send(String message) {
simpleProducer.send("simple-message", message);
}
}消费者:
@Slf4j
@Service
public class SimpleConsumer {
@KafkaListener(id = "simple-consumer", topics = "simple-message")
public void consumeMessage(String message) {
log.info("Consumer got message: {}", message);
}
}Api,这样您就可以生成发送消息:
@RestController
@RequestMapping("/api")
public class MessageApi {
private final SimpleProducer simpleProducer;
public MessageApi(SimpleProducer simpleProducer) {
this.simpleProducer = simpleProducer;
}
@PostMapping("/message")
public ResponseEntity<String> message(@RequestBody String message) {
simpleProducer.send(message);
return ResponseEntity.ok("Message received: " + message);
}
}因为您使用的是以字符串作为键和字符串作为值的默认值,所以您甚至不必向spring引导道具或yaml文件添加任何特定的配置。
https://stackoverflow.com/questions/72368262
复制相似问题