首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何处理UnkownProducerIdException

如何处理UnkownProducerIdException
EN

Stack Overflow用户
提问于 2020-04-07 15:48:27
回答 2查看 1.3K关注 0票数 1

我们在Spring和Kafka方面遇到了一些问题,有时我们的微服务会抛出一个UnkownProducerIdException,如果代理端的参数transactional.id.expiration.ms过期了,就会导致这种情况。

我的问题是,是否有可能捕捉到这个异常并重新尝试失败的消息?如果是的话,怎样才是最好的解决办法?

我看了一下:

我们使用的是Spring Hoxton.RELEASE版本和Spring版本2.2.4.RELEASE

我们使用AWS解决方案,所以我们不能为我前面提到的属性设置一个新的值。

下面是一些异常的跟踪:

代码语言:javascript
复制
2020-04-07 20:54:00.563 ERROR 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2] The broker returned org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. for topic-partition test.produce.another-2 with producerId 35000, epoch 0, and sequence number 8
2020-04-07 20:54:00.563  INFO 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2] ProducerId set to -1 with epoch -1
2020-04-07 20:54:00.565 ERROR 5188 --- [ad | producer-2] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{...}' to topic <some-topic>:

若要复制此例外情况:

  • 我已经使用了合流码头图像,并将环境变量KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS设置为10秒,这样我就不会等待太久才会抛出这个异常。
  • 在另一个进程中,以10秒的间隔一个一个地发送java将监听的主题中的1条消息。

下面是一个代码示例:

文件Bindings.java

代码语言:javascript
复制
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface Bindings {
  @Input("test-input")
  SubscribableChannel testListener();

  @Output("test-output")
  MessageChannel testProducer();
}

文件application.yml (不要忘记设置环境变量KAFKA_HOST):

代码语言:javascript
复制
spring:
  cloud:
    stream:
      kafka:
        binder:
          auto-create-topics: true
          brokers: ${KAFKA_HOST}
          transaction:
            producer:
              error-channel-enabled: true
          producer-properties:
            acks: all
            retry.backoff.ms: 200
            linger.ms: 100
            max.in.flight.requests.per.connection: 1
            enable.idempotence: true
            retries: 3
            compression.type: snappy
            request.timeout.ms: 5000
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
          consumer-properties:
            session.timeout.ms: 20000
            max.poll.interval.ms: 350000
            enable.auto.commit: true
            allow.auto.create.topics: true
            auto.commit.interval.ms: 12000
            max.poll.records: 5
            isolation.level: read_committed
          configuration:
            auto.offset.reset: latest

      bindings:

        test-input:
          # contentType: text/plain
          destination: test.produce
          group: group-input
          consumer:
            maxAttempts: 3
            startOffset: latest
            autoCommitOnError: true
            queueBufferingMaxMessages: 100000
            autoCommitOffset: true


        test-output:
          # contentType: text/plain
          destination: test.produce.another
          group: group-output
          producer:
            acks: all

debug: true

侦听器处理程序:

代码语言:javascript
复制
@SpringBootApplication
@EnableBinding(Bindings.class)
public class PocApplication {

    private static final Logger log = LoggerFactory.getLogger(PocApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(PocApplication.class, args);
    }


    @Autowired
    private BinderAwareChannelResolver binderAwareChannelResolver;

    @StreamListener(Topics.TESTLISTENINPUT)
    public void listen(Message<?> in, String headerKey) {
        final MessageBuilder builder;
        MessageChannel messageChannel;

        messageChannel = this.binderAwareChannelResolver.resolveDestination("test-output");

        Object payload = in.getPayload();
        builder = MessageBuilder.withPayload(payload);

        try {
            log.info("Event received: {}", in);

            if (!messageChannel.send(builder.build())) {
                log.error("Something happend trying send the message! {}", in.getPayload());
            }

            log.info("Commit success");
        } catch (UnknownProducerIdException e) {
            log.error("UnkownProducerIdException catched ", e);
        } catch (KafkaException e) {
            log.error("KafkaException catched ", e);
        }catch (Exception e) {
            System.out.println("Commit failed " + e.getMessage());
        }
    }
}

问候

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-04-08 13:42:56

代码语言:javascript
复制
        } catch (UnknownProducerIdException e) {
            log.error("UnkownProducerIdException catched ", e);

要在那里捕获异常,需要设置sync kafka生产者属性(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.3.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#kafka-producer-properties)。否则,错误将异步返回。

您不应该“吃”那里的异常;它必须抛回容器,这样容器才会回滚事务。

另外,

代码语言:javascript
复制
        }catch (Exception e) {
            System.out.println("Commit failed " + e.getMessage());
        }

在流侦听器返回到容器后,容器将执行提交,因此您将永远不会在这里看到提交错误;同样,您必须让异常传播回容器。

容器将根据使用者绑定的重试配置重试传递。

票数 1
EN

Stack Overflow用户

发布于 2021-11-17 06:02:58

可能还可以使用回调函数来处理异常,不确定kafka的sure lib,如果使用kafka客户端,您可以这样做:

代码语言:javascript
复制
producer.send(record, new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
            if(e != null) {
               e.printStackTrace();

               if(e.getClass().equals(UnknownProducerIdException.class)) {
                   logger.info("UnknownProducerIdException caught");
                   while(--retry>=0) {
                       send(topic,partition,msg);
                   }
               }
            } else {
                logger.info("The offset of the record we just sent is: " + metadata.offset());
            }
        }
    });
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61084031

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档