首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kakfa侦听器和消费者不调用

Kakfa侦听器和消费者不调用
EN

Stack Overflow用户
提问于 2022-05-24 19:09:35
回答 1查看 76关注 0票数 0

我正在与生产者和消费者一起构建一个简单的Kafka应用程序。我正在通过邮递员发送一条字符串并推介这个话题。主题是接收消息,但消费者并没有使用它。

ConsumerConfig.Java

代码语言:javascript
复制
@EnableKafka
@Configuration
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
public class KafkaConsumerConfig {
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
        
    }

    @Bean
    public Map<String,Object> config(){
        Map<String,Object> config = new HashMap<>();
        
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_Id");

        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        return config;
    }
    @Bean
    public ConsumerFactory<String,String> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(config());
    }
    

}

CosumerService.Java

代码语言:javascript
复制
@Service
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
@Component
public class KafkaConsumerService {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
    private static final String TOPIC = "Kafka_Test";

    @KafkaListener(topics = TOPIC, groupId= "group_Id")
    public void consumeOTP(String otp) {
        log.debug("The OTP Sent to Kafka is:" + otp);
    }

}
EN

回答 1

Stack Overflow用户

发布于 2022-05-24 20:52:18

基于您的问题,我假设您使用Spring和Spring。举个简单的例子,使用这个设置,您可以避免所有Bean配置,并使用Spring中的DefaultBean,这样您基本上可以使用application.yml文件来完成设置,在这个职位中有更好的解释,但基本上:

制片人:

代码语言:javascript
复制
@Service
public class SimpleProducer {

  private KafkaTemplate<String, String> simpleProducer;

  public SimpleProducer(KafkaTemplate<String, String> simpleProducer) {
    this.simpleProducer = simpleProducer;
  }
  public void send(String message) {
    simpleProducer.send("simple-message", message);
  }
}

消费者:

代码语言:javascript
复制
@Slf4j
@Service
public class SimpleConsumer {
  @KafkaListener(id = "simple-consumer", topics = "simple-message")
  public void consumeMessage(String message) {
    log.info("Consumer got message: {}", message);
  }
}

Api,这样您就可以生成发送消息:

代码语言:javascript
复制
@RestController
@RequestMapping("/api")
public class MessageApi {

  private final SimpleProducer simpleProducer;

  public MessageApi(SimpleProducer simpleProducer) {
    this.simpleProducer = simpleProducer;
  }

  @PostMapping("/message")
  public ResponseEntity<String> message(@RequestBody String message) {
    simpleProducer.send(message);
    return ResponseEntity.ok("Message received: " + message);
  }
}

因为您使用的是以字符串作为键和字符串作为值的默认值,所以您甚至不必向spring引导道具或yaml文件添加任何特定的配置。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72368262

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档