首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何创建一个带有spring start的Kafka消费者侦听器,在消息被拒绝的情况下,在可变时间后重试消费它们

如何创建一个带有spring start的Kafka消费者侦听器,在消息被拒绝的情况下,在可变时间后重试消费它们
EN

Stack Overflow用户
提问于 2019-07-11 15:59:47
回答 2查看 1.3K关注 0票数 1

我在一个springboot应用程序中有一个简单的kafka消费者侦听器,如下所示:

代码语言:javascript
复制
@KafkaListener(topics="mytopic")
public void receive(String message) {
   LOGGER.info("received message='{}'", messge);
}

在某些特定情况下,我想拒绝该消息,但我希望系统在特定时间后再次向我提出该消息;

我该怎么做呢?

注意:我也希望kafka的配置是定制的(而不是默认的springboot结构)

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-07-11 23:33:42

我的实现恰好做了您需要的事情:

1) kafka配置类,从自定义属性中获取字段,5000毫秒后(在kafkaListenerContainerFactory方法内部)重试被拒绝的消息:

代码语言:javascript
复制
@Configuration
public class KafkaConfig {

    //...

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        if(enableSsl) {
            //configure the following three settings for SSL Encryption
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  sslPassword);

            // configure the following three settings for SSL Authentication
            props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
            props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslPassword);
            props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslPassword);
        }
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {              
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));

        ContainerProperties containerProperties = factory.getContainerProperties();
        containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);

        RetryTemplate retryTemplate = new RetryTemplate();
        factory.setStatefulRetry(false);
        factory.setRetryTemplate(retryTemplate);

        //infinite number of retry attempts
        retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());

        //wait a "waitingTime" time before retrying
        int waitingTime = 5000;
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(waitingTime);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        //or use exponential waiting
        //ExponentialBackOffPolicy expBackoff = new ExponentialBackOffPolicy();
        //expBackoff.setInitialInterval(...);
        //expBackoff.setMaxInterval(...);
        //retryTemplate.setBackOffPolicy(expBackoff);

        return factory;
    }
}

2)消费消息的类:

代码语言:javascript
复制
@Service
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Consumer.class);

    //...

    @KafkaListener(topics="${kafka.topics.test}")
    public void consume(String message, Acknowledgment ack) throws IOException {
        if(processMessage) {
            logger.info(String.format("##KAFKA## -> Consumed message -> %s", message)); 
            ack.acknowledge();
        } 
        else { 
            logger.error(String.format("##KAFKA## -> Failed message -> %s", message));  
            throw new IOException("reject message");
        }
    }
}
票数 1
EN

Stack Overflow用户

发布于 2019-07-11 21:09:36

参见retrying deliveriesStateful Retry

使用所需的重试特征配置监听程序工厂,并(可选)添加SeekToCurrentErrorHandler

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56984321

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档