首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡消费者没有消费

卡夫卡消费者没有消费
EN

Stack Overflow用户
提问于 2022-10-11 19:00:12
回答 1查看 33关注 0票数 1

我正在尝试创建一个带有10个分区的Kafka主题的应用程序。我没有在application.properties中定义消费者的分区数,因为应用程序将部署在OpenShift中,我希望OpenShift完成负载平衡。问题是,当我启动应用程序时,它没有消耗任何东西。代码非常简单,我真的不知道哪里出了问题:

这是使用者,日志没有显示任何内容:

代码语言:javascript
复制
    @Incoming("plac-fate")
    @Transactional
    public void consume(NFeDistSVBAPayload payload) throws PlacException {
        logger.info("Consumindo payload: " + payload);
        service.criaNFeDistSVBAEntity(payload);
        logger.info("Payload com nrProtocolo '" + payload.getNrProtocolo() + "' consumido com sucesso.");
    }

作为反序列化器,它的日志没有显示任何内容:

代码语言:javascript
复制
@Override
    public NFeDistSVBAPayload deserialize(String topic, byte[] data) {
        logger.info("Deserializando mensagem do topico: " + topic);
        var strPayload = new String(data, StandardCharsets.UTF_8);
        var module = new JaxbAnnotationModule();
        var mapper = JsonMapper.builder()
        .enable(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER)
        .build();
        mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        mapper.registerModule(module);
        try {
            var payload =  mapper.readValue(strPayload, NFeDistSVBAPayload.class);
            return payload;
        } catch (JsonProcessingException e) {
            logger.error(e);
            return null;
        }
    }

这是àpplication.properties

代码语言:javascript
复制
quarkus.kafka.devservices.enabled=false
kafka.bootstrap.servers=${PLAC_KAFKA_URL}
mp.messaging.incoming.plac-fate.connector=smallrye-kafka
mp.messaging.incoming.plac-fate.value.deserializer=br.gov.pr.fazenda.plac.dominio.utils.DistNFePayloadSVBADeserializer
mp.messaging.incoming.plac-fate.group.id=plac-fate-consumer

下面是应用程序日志:

代码语言:javascript
复制
__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2022-10-11 18:53:42,919 INFO  [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18229: Configured topics for channel 'plac-fate': [plac-fate]
2022-10-11 18:53:42,949 INFO  [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18214: Key deserializer omitted, using String as default
2022-10-11 18:53:43,764 INFO  [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18257: Kafka consumer kafka-consumer-plac-fate, connected to Kafka brokers 'kafka:9092', belongs to the 'plac-fate-consumer' consumer group and is configured to poll records from [plac-fate]
2022-10-11 18:53:43,873 INFO  [io.quarkus] (Quarkus Main Thread) consumer 1.0-SNAPSHOT on JVM (powered by Quarkus 2.12.3.Final) started in 12.193s. Listening on: http://localhost:8080
2022-10-11 18:53:43,873 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-10-11 18:53:43,874 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [agroal, cdi, hibernate-orm, hibernate-orm-panache, jdbc-oracle, kafka-client, micrometer, narayana-jta, qute, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]

如您所见,日志显示已创建并连接了使用者,但它什么也不做。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-11 20:29:09

消费者默认从主题的末尾开始(在该主题中没有什么可供消费的)

如果您想从主题的开头开始并阅读现有事件,则需要添加

代码语言:javascript
复制
mp.messaging.incoming.{channel-name}.auto.offset.reset=earliest
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74032901

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档