我张贴3条消息到3个主题-同时张贴如果获得异常-所有的消息将被回滚。
但在我的例子中,当我为第三个主题模拟下面的异常时,它并没有发生。org.apache.kafka.common.errors.RecordTooLargeException:消息是117440606字节
当向第三主题(价格主题)发布大型消息时,-i以编程方式增加消息的大小以获得异常。
消息被成功地发送到第1、2主题--第3主题失败。--作为每一个事务,所有消息都必须回滚--但是主题1和主题2总是得到消息。
但日志显示-事务回滚
如何解决这个问题
日志
2022-03-23 21:16:59.690 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.a.m.r.producer.KafkaProducer - @@@ --- Sending Data to Item , price, Inventory -----
2022-03-23 21:16:59.733 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='String' and payload='{"sku":"String","lowestOriginalPrice":...' to topic PRICE-TOPIC:
**org.apache.kafka.common.errors.RecordTooLargeException**: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
2022-03-23 21:16:59.733 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-2, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1d0] Aborting incomplete transaction
2022-03-23 21:16:59.737 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-1, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1draw-item-processor-group-id.OSMI_C02_CATALOG_MKPDOMAIN.0] **Aborting incomplete transaction**
2022-03-23 21:16:59.738 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Transaction rolled back
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.albertsons.mkp.rawitemprocessor.consumer.KafkaConsumer.receive(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>) throws java.io.IOException' threw exception; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
2022-03-23 21:17:00.250 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.a.m.r.producer.KafkaProducer - @@@ --- Sending Data to Item , price, Inventory -----
2022-03-23 21:17:00.294 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='String' and payload='{"sku":"String","lowestOriginalPrice":"String","lowestPrice":"String","updatedAt":"String","createdA...' to topic PRICE-TOPIC:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
2022-03-23 21:17:00.295 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-2, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1d0] Aborting incomplete transaction
2022-03-23 21:17:00.298 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-1, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1draw-item-processor-group-id.OSMI_C02_CATALOG_MKPDOMAIN.0] **Aborting incomplete transaction**
2022-03-23 21:17:00.308 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.**KafkaMessageListenerContainer - Transaction rolled back**
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.albertsons.mkp.rawitemprocessor.consumer.KafkaConsumer.receive(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>) throws java.io.IOException' threw exception; nested exception is **org.springframework.kafka.KafkaException: Send failed**; nested exception is org.apache.kafka.common.errors.**RecordTooLargeException**: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.






发布于 2022-03-23 17:23:58
回滚记录仍保留在日志中。
Kafka在日志中添加了一个标记,以指示事务是已提交还是回滚。
默认情况下,消费者将接收所有记录,即使它们被回滚。
使用者必须配置isolation.level=read_committed以避免看到回滚记录。
https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
控制如何读取以事务方式写入的消息。如果设置为
read_committed,consumer.poll()将只返回已提交的事务性消息。如果设置为read_uncommitted(默认),consumer.poll()将返回所有消息,甚至已中止的事务性消息。非事务性消息将在任何模式下无条件返回。消息将始终按偏移顺序返回。因此,在read_committed模式下,consumer.poll()只返回最后一个稳定偏移量(LSO),即小于第一个开放事务的偏移量的消息。特别是,在属于正在进行的事务的消息之后出现的任何消息都将被保留,直到相关事务完成。因此,当在飞行事务中存在时,read_committed使用者将无法读取到高水印。
当使用Spring时,它是read-committed,而不是read_committed。
spring.kafka.consumer.isolation-level=read-committed您的IDE应该建议正确的值。
或
spring.kafka.consumer.properties=isolation.level=read_committed编辑
(虽然我看到Boot也适用于read_uncommitted )。
这对我来说就像预期的一样。
@SpringBootApplication
public class So71591355Application {
public static void main(String[] args) {
SpringApplication.run(So71591355Application.class, args);
}
@KafkaListener(id = "so71591355", topics = "so71591355")
void listen1(String in) {
System.out.println("committed: " + in);
}
@KafkaListener(id = "so71591355-2", topics = "so71591355",
properties = "isolation.level:read_uncommitted")
void listen2(String in) {
System.out.println("uncommitted: " + in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so71591355").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
template.setAllowNonTransactional(true);
return args -> {
template.send("so71591355", "non-transactional");
try {
template.executeInTransaction(t -> {
t.send("so71591355", "first");
t.send("so71591355", "second");
t.send("so71591355", new String(new byte[2000000]));
return null;
});
}
catch (Exception e) {
e.printStackTrace();
}
};
}
}spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.isolation-level=read-committed
spring.kafka.producer.transaction-id-prefix=tx- . ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.6.4)
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:660)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:403)
at com.example.demo.So71591355Application.lambda$1(So71591355Application.java:49)
at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:507)
at com.example.demo.So71591355Application.lambda$0(So71591355Application.java:44)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:768)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:758)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:310)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301)
at com.example.demo.So71591355Application.main(So71591355Application.java:19)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
uncommitted: non-transactional
committed: non-transactional
uncommitted: first
uncommitted: secondEDIT2
您的应用程序正在按预期工作;当我添加
@KafkaListener(id = "otherApp", topics = { "ITEM-TOPIC", "INVENTORY-TOPIC", "PRICE-TOPIC" })
void listen3(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
System.out.println("so71591355 from " + topic + ": " + in);
}对于另一个应用程序,它不接收任何数据。
2022-03-24 10:04:57.939 INFO 15038 -- hisApp-0-C-1 o.s.k.l.KafkaMessageListenerContainer : otherApp:分配的分区:价格-主题-0,商品-主题-0,库存-主题-0
当然,对于控制台使用者,我们可以看到消息,因为控制台使用者不是read_committed。
当我评论价格时,我看到了
so71591355 from INVENTORY-TOPIC: Inventory data : My test Message
so71591355 from ITEM-TOPIC: Item data : My test Message
...EDIT3
要定制后回滚处理器,只需将其添加为@Bean,并将其连接到容器工厂。
@Bean
AfterRollbackProcessor<Object, Object> arp() {
return new DefaultAfterRollbackProcessor<>((rec, ex) -> {
log.error("Failed to process {} from topic, partition {}-{}, @{}",
rec.value(), rec.topic(), rec.partition(), rec.offset(), ex);
}, new FixedBackOff(3000L, 2));
}但是,您应该删除excuteInTransaction调用,直接对模板执行发送操作。这样,模板将参与容器的事务,而不是启动新的事务。
本例只记录错误;您可以添加DeadLetterPublishingRecoverer (或任何自定义恢复程序)。
https://stackoverflow.com/questions/71591355
复制相似问题