我们在Spring和Kafka方面遇到了一些问题,有时我们的微服务会抛出一个UnkownProducerIdException,如果代理端的参数transactional.id.expiration.ms过期了,就会导致这种情况。
我的问题是,是否有可能捕捉到这个异常并重新尝试失败的消息?如果是的话,怎样才是最好的解决办法?
我看了一下:
我们使用的是Spring Hoxton.RELEASE版本和Spring版本2.2.4.RELEASE
我们使用AWS解决方案,所以我们不能为我前面提到的属性设置一个新的值。
下面是一些异常的跟踪:
2020-04-07 20:54:00.563 ERROR 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] The broker returned org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. for topic-partition test.produce.another-2 with producerId 35000, epoch 0, and sequence number 8
2020-04-07 20:54:00.563 INFO 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] ProducerId set to -1 with epoch -1
2020-04-07 20:54:00.565 ERROR 5188 --- [ad | producer-2] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{...}' to topic <some-topic>:若要复制此例外情况:
KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS设置为10秒,这样我就不会等待太久才会抛出这个异常。下面是一个代码示例:
文件Bindings.java
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface Bindings {
@Input("test-input")
SubscribableChannel testListener();
@Output("test-output")
MessageChannel testProducer();
}文件application.yml (不要忘记设置环境变量KAFKA_HOST):
spring:
cloud:
stream:
kafka:
binder:
auto-create-topics: true
brokers: ${KAFKA_HOST}
transaction:
producer:
error-channel-enabled: true
producer-properties:
acks: all
retry.backoff.ms: 200
linger.ms: 100
max.in.flight.requests.per.connection: 1
enable.idempotence: true
retries: 3
compression.type: snappy
request.timeout.ms: 5000
key.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
session.timeout.ms: 20000
max.poll.interval.ms: 350000
enable.auto.commit: true
allow.auto.create.topics: true
auto.commit.interval.ms: 12000
max.poll.records: 5
isolation.level: read_committed
configuration:
auto.offset.reset: latest
bindings:
test-input:
# contentType: text/plain
destination: test.produce
group: group-input
consumer:
maxAttempts: 3
startOffset: latest
autoCommitOnError: true
queueBufferingMaxMessages: 100000
autoCommitOffset: true
test-output:
# contentType: text/plain
destination: test.produce.another
group: group-output
producer:
acks: all
debug: true侦听器处理程序:
@SpringBootApplication
@EnableBinding(Bindings.class)
public class PocApplication {
private static final Logger log = LoggerFactory.getLogger(PocApplication.class);
public static void main(String[] args) {
SpringApplication.run(PocApplication.class, args);
}
@Autowired
private BinderAwareChannelResolver binderAwareChannelResolver;
@StreamListener(Topics.TESTLISTENINPUT)
public void listen(Message<?> in, String headerKey) {
final MessageBuilder builder;
MessageChannel messageChannel;
messageChannel = this.binderAwareChannelResolver.resolveDestination("test-output");
Object payload = in.getPayload();
builder = MessageBuilder.withPayload(payload);
try {
log.info("Event received: {}", in);
if (!messageChannel.send(builder.build())) {
log.error("Something happend trying send the message! {}", in.getPayload());
}
log.info("Commit success");
} catch (UnknownProducerIdException e) {
log.error("UnkownProducerIdException catched ", e);
} catch (KafkaException e) {
log.error("KafkaException catched ", e);
}catch (Exception e) {
System.out.println("Commit failed " + e.getMessage());
}
}
}问候
发布于 2020-04-08 13:42:56
} catch (UnknownProducerIdException e) {
log.error("UnkownProducerIdException catched ", e);要在那里捕获异常,需要设置sync kafka生产者属性(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.3.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#kafka-producer-properties)。否则,错误将异步返回。
您不应该“吃”那里的异常;它必须抛回容器,这样容器才会回滚事务。
另外,
}catch (Exception e) {
System.out.println("Commit failed " + e.getMessage());
}在流侦听器返回到容器后,容器将执行提交,因此您将永远不会在这里看到提交错误;同样,您必须让异常传播回容器。
容器将根据使用者绑定的重试配置重试传递。
发布于 2021-11-17 06:02:58
可能还可以使用回调函数来处理异常,不确定kafka的sure lib,如果使用kafka客户端,您可以这样做:
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
if(e.getClass().equals(UnknownProducerIdException.class)) {
logger.info("UnknownProducerIdException caught");
while(--retry>=0) {
send(topic,partition,msg);
}
}
} else {
logger.info("The offset of the record we just sent is: " + metadata.offset());
}
}
});https://stackoverflow.com/questions/61084031
复制相似问题