首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用Spring在有状态应用程序中配置DLQ?

如何使用Spring在有状态应用程序中配置DLQ?
EN

Stack Overflow用户
提问于 2022-05-04 17:32:12
回答 1查看 155关注 0票数 1

我需要使用有状态重试创建一个应用程序,该应用程序侦听Kafka主题并调用一些API,然后提交消息。如果在其中一个调用中发生错误,例如超时,应用程序必须重试4次尝试,间隔为4秒。在这四次尝试的最后,如果它仍然没有工作,应用程序应该将它发送到DLQ主题。

发送到DLQ主题中我无法做的部分。因为当我试图配置DLQ时,重试不会停止,也不会发送到DLQ。

代码语言:javascript
复制
@KafkaListener(topics = "${topic.name}", concurrency = "1")
public void listen(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem,
                    @Headers final MessageHeaders headers,
                    Acknowledgment ack) {
    AberturaContaLimiteCreditoCalculadoData dados;
    if (!validarMensagem(mensagem)) {
        dados = mensagem.value().getData();
        throw new RuntimeException();
        //ack.acknowledge();
        //This throw Runtime it's just to force it to retry.
    }
}

private boolean validarMensagem(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem) {
    return mensagem == null || mensagem.value() == null;
}

KafkaConfig:

代码语言:javascript
复制
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(final ConsumerFactory<String, Object> consumerFactory) {
    final ConcurrentKafkaListenerContainerFactory<String, Object> factory
            = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    factory.setCommonErrorHandler(new DefaultErrorHandler(
            publisherRetryDLQ(),
            new FixedBackOff(4000L, 4L)));
    return factory;
}

public DeadLetterPublishingRecoverer publisherRetryDLQ() {
    return new DeadLetterPublishingRecoverer(createKafkaTemplate(),
            (record, ex) -> new TopicPartition(topicoDLQ, 0));
}

public ProducerFactory<String, String> producerFactory() {
    final Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
    return new DefaultKafkaProducerFactory<>(config);
}

public KafkaOperations<String, String> createKafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

编辑2022-05-04:

从您关于RetryListener和logging.level与Debug的技巧中,我们设法找到了没有构建生产者的问题。

现在的问题是,我们收到了一个与DLQ不同的消费者。不同之处在于DLQ有一个额外的字段,必须存储错误的原因。

代码语言:javascript
复制
2022/05/04 16:53:43.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [ERROR] o.s.k.l.DeadLetterPublishingRecoverer - Dead-letter publication to limites-abertura-conta-limite-credito-calculado-convivenciaaberturaconta-dlq failed for: limites-abertura-conta-limite-credito-calculado-0@6
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema{...}]}}]}

有办法进行这种转换吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-05-04 21:20:49

如果我正确理解这个问题,您希望创建一个具有不同值类型的ProducerRecord

只需子类DLPR并重写createProducerRecord()

代码语言:javascript
复制
    /**
     * Subclasses can override this method to customize the producer record to send to the
     * DLQ. The default implementation simply copies the key and value from the consumer
     * record and adds the headers. The timestamp is not set (the original timestamp is in
     * one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
     * less than 0, it must be set to null in the {@link ProducerRecord}.
     * @param record the failed record
     * @param topicPartition the {@link TopicPartition} returned by the destination
     * resolver.
     * @param headers the headers - original record headers plus DLT headers.
     * @param key the key to use instead of the consumer record key.
     * @param value the value to use instead of the consumer record value.
     * @return the producer record to send.
     * @see KafkaHeaders
     */
    protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
            TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) {

可以检查标头以确定导致故障的异常。如果您需要实际的异常,请重写accept()ThreadLocal中捕获它,然后调用super.accept();然后可以在createProducerRecord()中使用本地线程。

有几种解决方案可以在同一个生产者工厂发布不同类型的产品。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72117194

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档