我需要使用有状态重试创建一个应用程序,该应用程序侦听Kafka主题并调用一些API,然后提交消息。如果在其中一个调用中发生错误,例如超时,应用程序必须重试4次尝试,间隔为4秒。在这四次尝试的最后,如果它仍然没有工作,应用程序应该将它发送到DLQ主题。
发送到DLQ主题中我无法做的部分。因为当我试图配置DLQ时,重试不会停止,也不会发送到DLQ。
@KafkaListener(topics = "${topic.name}", concurrency = "1")
public void listen(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem,
@Headers final MessageHeaders headers,
Acknowledgment ack) {
AberturaContaLimiteCreditoCalculadoData dados;
if (!validarMensagem(mensagem)) {
dados = mensagem.value().getData();
throw new RuntimeException();
//ack.acknowledge();
//This throw Runtime it's just to force it to retry.
}
}
private boolean validarMensagem(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem) {
return mensagem == null || mensagem.value() == null;
}KafkaConfig:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(final ConsumerFactory<String, Object> consumerFactory) {
final ConcurrentKafkaListenerContainerFactory<String, Object> factory
= new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setCommonErrorHandler(new DefaultErrorHandler(
publisherRetryDLQ(),
new FixedBackOff(4000L, 4L)));
return factory;
}
public DeadLetterPublishingRecoverer publisherRetryDLQ() {
return new DeadLetterPublishingRecoverer(createKafkaTemplate(),
(record, ex) -> new TopicPartition(topicoDLQ, 0));
}
public ProducerFactory<String, String> producerFactory() {
final Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
return new DefaultKafkaProducerFactory<>(config);
}
public KafkaOperations<String, String> createKafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}编辑2022-05-04:
从您关于RetryListener和logging.level与Debug的技巧中,我们设法找到了没有构建生产者的问题。
现在的问题是,我们收到了一个与DLQ不同的消费者。不同之处在于DLQ有一个额外的字段,必须存储错误的原因。
2022/05/04 16:53:43.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [ERROR] o.s.k.l.DeadLetterPublishingRecoverer - Dead-letter publication to limites-abertura-conta-limite-credito-calculado-convivenciaaberturaconta-dlq failed for: limites-abertura-conta-limite-credito-calculado-0@6
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema{...}]}}]}有办法进行这种转换吗?
发布于 2022-05-04 21:20:49
如果我正确理解这个问题,您希望创建一个具有不同值类型的ProducerRecord。
只需子类DLPR并重写createProducerRecord()。
/**
* Subclasses can override this method to customize the producer record to send to the
* DLQ. The default implementation simply copies the key and value from the consumer
* record and adds the headers. The timestamp is not set (the original timestamp is in
* one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
* less than 0, it must be set to null in the {@link ProducerRecord}.
* @param record the failed record
* @param topicPartition the {@link TopicPartition} returned by the destination
* resolver.
* @param headers the headers - original record headers plus DLT headers.
* @param key the key to use instead of the consumer record key.
* @param value the value to use instead of the consumer record value.
* @return the producer record to send.
* @see KafkaHeaders
*/
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) {可以检查标头以确定导致故障的异常。如果您需要实际的异常,请重写accept()在ThreadLocal中捕获它,然后调用super.accept();然后可以在createProducerRecord()中使用本地线程。
有几种解决方案可以在同一个生产者工厂发布不同类型的产品。
DelegatingByTypeSerializer查看KafkaConsumer With Multiple Different Avro Producers And Transactions作为示例KafkaTemplate配置发布服务器(它有一个构造函数,可以覆盖生产者工厂信任)。https://stackoverflow.com/questions/72117194
复制相似问题