我正在尝试创建一个带有10个分区的Kafka主题的应用程序。我没有在application.properties中定义消费者的分区数,因为应用程序将部署在OpenShift中,我希望OpenShift完成负载平衡。问题是,当我启动应用程序时,它没有消耗任何东西。代码非常简单,我真的不知道哪里出了问题:
这是使用者,日志没有显示任何内容:
@Incoming("plac-fate")
@Transactional
public void consume(NFeDistSVBAPayload payload) throws PlacException {
logger.info("Consumindo payload: " + payload);
service.criaNFeDistSVBAEntity(payload);
logger.info("Payload com nrProtocolo '" + payload.getNrProtocolo() + "' consumido com sucesso.");
}作为反序列化器,它的日志没有显示任何内容:
@Override
public NFeDistSVBAPayload deserialize(String topic, byte[] data) {
logger.info("Deserializando mensagem do topico: " + topic);
var strPayload = new String(data, StandardCharsets.UTF_8);
var module = new JaxbAnnotationModule();
var mapper = JsonMapper.builder()
.enable(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER)
.build();
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.registerModule(module);
try {
var payload = mapper.readValue(strPayload, NFeDistSVBAPayload.class);
return payload;
} catch (JsonProcessingException e) {
logger.error(e);
return null;
}
}这是àpplication.properties
quarkus.kafka.devservices.enabled=false
kafka.bootstrap.servers=${PLAC_KAFKA_URL}
mp.messaging.incoming.plac-fate.connector=smallrye-kafka
mp.messaging.incoming.plac-fate.value.deserializer=br.gov.pr.fazenda.plac.dominio.utils.DistNFePayloadSVBADeserializer
mp.messaging.incoming.plac-fate.group.id=plac-fate-consumer下面是应用程序日志:
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2022-10-11 18:53:42,919 INFO [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18229: Configured topics for channel 'plac-fate': [plac-fate]
2022-10-11 18:53:42,949 INFO [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18214: Key deserializer omitted, using String as default
2022-10-11 18:53:43,764 INFO [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18257: Kafka consumer kafka-consumer-plac-fate, connected to Kafka brokers 'kafka:9092', belongs to the 'plac-fate-consumer' consumer group and is configured to poll records from [plac-fate]
2022-10-11 18:53:43,873 INFO [io.quarkus] (Quarkus Main Thread) consumer 1.0-SNAPSHOT on JVM (powered by Quarkus 2.12.3.Final) started in 12.193s. Listening on: http://localhost:8080
2022-10-11 18:53:43,873 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-10-11 18:53:43,874 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [agroal, cdi, hibernate-orm, hibernate-orm-panache, jdbc-oracle, kafka-client, micrometer, narayana-jta, qute, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]如您所见,日志显示已创建并连接了使用者,但它什么也不做。
发布于 2022-10-11 20:29:09
消费者默认从主题的末尾开始(在该主题中没有什么可供消费的)
如果您想从主题的开头开始并阅读现有事件,则需要添加
mp.messaging.incoming.{channel-name}.auto.offset.reset=earliesthttps://stackoverflow.com/questions/74032901
复制相似问题