首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >自动提交偏移量失败&重试也不例外。

自动提交偏移量失败&重试也不例外。
EN

Stack Overflow用户
提问于 2019-10-30 17:23:24
回答 1查看 625关注 0票数 0

我用的是弹簧引导2.1.9和弹簧Kafka 2.2.9

我在日志文件中得到了一些警告,其中说提交失败了,而且我也在使用SeekToCurrentErrorHandler来捕获错误,一旦重试检出,但有时如果提交失败,它会继续迭代。

这是我的配置类

代码语言:javascript
复制
@Configuration
@EnableKafka
public class KafkaReceiverConfig {

    // Kafka Server Configuration
    @Value("${kafka.servers}")
    private String kafkaServers;

    // Group Identifier
    @Value("${kafka.groupId}")
    private String groupId;

    // Kafka Max Retry Attempts
    @Value("${kafka.retry.maxAttempts:5}")
    private Integer retryMaxAttempts;

    // Kafka Max Retry Interval
    @Value("${kafka.retry.interval:180000}")
    private Long retryInterval;

    // Kafka Concurrency
    @Value("${kafka.concurrency:10}")
    private Integer concurrency;

    // Kafka Concurrency
    @Value("${kafka.poll.timeout:100}")
    private Integer pollTimeout;

    // Kafka Consumer Offset
    @Value("${kafka.consumer.auto-offset-reset:earliest}")
    private String offset = "earliest";

    // Logger
    private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);

    /**
     * Defines the Max Number of Retry Attempts
     * 
     * @return Return the Retry Policy @see {@link RetryPolicy}
     */
    @Bean
    public RetryPolicy retryPolicy() {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
        return simpleRetryPolicy;
    }

    /**
     * Time before the next Retry can happen, the Time used is in Milliseconds
     * 
     * @return Return the BackOff Policy @see {@link BackOffPolicy}
     */
    @Bean
    public BackOffPolicy backOffPolicy() {
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(retryInterval);
        return backOffPolicy;
    }

    /**
     * Get Retry Template
     * 
     * @return Return the Retry Template @see {@link RetryTemplate}
     */
    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy());
        return retryTemplate;
    }

    /**
     * String Kafka Listener Container Factor
     * 
     * @return @see {@link KafkaListenerContainerFactory}
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
            ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        factory.getContainerProperties().setSyncCommits(true);
        factory.setRetryTemplate(retryTemplate());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setTransactionManager(chainedTM);
        factory.setStatefulRetry(true);
        // NOTE: retryMaxAttempts should always +1 due to spring kafka bug
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            log.warn("failed to process kafka message (retries are exausted). topic name:"+record.topic()+" value:"+record.value());
            messageProducer.saveFailedMessage(record, exception);
        }, retryMaxAttempts + 1);

        factory.setErrorHandler(errorHandler);
        log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
        return factory;
    }

    /**
     * String Consumer Factory
     * 
     * @return @see {@link ConsumerFactory}
     */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        log.debug("Kafka Receiver Config consumerFactory created");
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * Consumer Configurations
     * 
     * @return @see {@link Map}
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new ConcurrentHashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // Disable the Auto Commit if required for testing
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        log.debug("Kafka Receiver Config consumerConfigs created");
        return props;
    }

}

这是日志:

代码语言:javascript
复制
2019-10-30 15:48:05.907  WARN [xxxxx-component-workflow-starter,,,] 11 --- [nt_create-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=fulfillment_create] Synchronous auto-commit of offsets {fulfillment_create-4=OffsetAndMetadata{offset=32, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

  1. 我的配置文件有问题吗?
  2. 如何设置最大轮询和会话超时等等?(举个例子)
  3. 如何在SpringKafka2.2.9中安装SeekToCurrentErrorHandler,使其工作正常(因为我不能升级spring,因为我不能使用其他的
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-10-30 18:05:57

处理轮询()返回的记录花费的时间太长了。

您需要减少max.poll.records (ConsumerConfig.MAX_POLL_RECORDS_CONFIG)和/或增加max.poll.interval.ms

在此错误之后,您无法执行搜索--您已经丢失了分区。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58630729

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档