首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡事务回滚不处理RecordTooLargeException的3个主题

卡夫卡事务回滚不处理RecordTooLargeException的3个主题
EN

Stack Overflow用户
提问于 2022-03-23 17:15:10
回答 1查看 1.5K关注 0票数 0

我张贴3条消息到3个主题-同时张贴如果获得异常-所有的消息将被回滚。

但在我的例子中,当我为第三个主题模拟下面的异常时,它并没有发生。org.apache.kafka.common.errors.RecordTooLargeException:消息是117440606字节

当向第三主题(价格主题)发布大型消息时,-i以编程方式增加消息的大小以获得异常。

消息被成功地发送到第1、2主题--第3主题失败。--作为每一个事务,所有消息都必须回滚--但是主题1和主题2总是得到消息。

但日志显示-事务回滚

如何解决这个问题

日志

代码语言:javascript
复制
2022-03-23 21:16:59.690 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  c.a.m.r.producer.KafkaProducer - @@@ --- Sending Data to  Item , price, Inventory  ----- 
2022-03-23 21:16:59.733 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='String' and payload='{"sku":"String","lowestOriginalPrice":...' to topic PRICE-TOPIC: 
**org.apache.kafka.common.errors.RecordTooLargeException**: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
2022-03-23 21:16:59.733 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-2, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1d0] Aborting incomplete transaction 
2022-03-23 21:16:59.737 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-1, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1draw-item-processor-group-id.OSMI_C02_CATALOG_MKPDOMAIN.0] **Aborting incomplete transaction** 
2022-03-23 21:16:59.738 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Transaction rolled back 
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.albertsons.mkp.rawitemprocessor.consumer.KafkaConsumer.receive(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>) throws java.io.IOException' threw exception; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

2022-03-23 21:17:00.250 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  c.a.m.r.producer.KafkaProducer - @@@ --- Sending Data to  Item , price, Inventory  ----- 
2022-03-23 21:17:00.294 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='String' and payload='{"sku":"String","lowestOriginalPrice":"String","lowestPrice":"String","updatedAt":"String","createdA...' to topic PRICE-TOPIC: 
org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
2022-03-23 21:17:00.295 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-2, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1d0] Aborting incomplete transaction 
2022-03-23 21:17:00.298 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-1, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1draw-item-processor-group-id.OSMI_C02_CATALOG_MKPDOMAIN.0] **Aborting incomplete transaction** 
2022-03-23 21:17:00.308 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.**KafkaMessageListenerContainer - Transaction rolled back** 
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.albertsons.mkp.rawitemprocessor.consumer.KafkaConsumer.receive(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>) throws java.io.IOException' threw exception; nested exception is **org.springframework.kafka.KafkaException: Send failed**; nested exception is org.apache.kafka.common.errors.**RecordTooLargeException**: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

EN

回答 1

Stack Overflow用户

发布于 2022-03-23 17:23:58

回滚记录仍保留在日志中。

Kafka在日志中添加了一个标记,以指示事务是已提交还是回滚。

默认情况下,消费者将接收所有记录,即使它们被回滚。

使用者必须配置isolation.level=read_committed以避免看到回滚记录。

https://kafka.apache.org/documentation/#consumerconfigs_isolation.level

控制如何读取以事务方式写入的消息。如果设置为read_committedconsumer.poll()将只返回已提交的事务性消息。如果设置为read_uncommitted (默认),consumer.poll()将返回所有消息,甚至已中止的事务性消息。非事务性消息将在任何模式下无条件返回。消息将始终按偏移顺序返回。因此,在read_committed模式下,consumer.poll()只返回最后一个稳定偏移量(LSO),即小于第一个开放事务的偏移量的消息。特别是,在属于正在进行的事务的消息之后出现的任何消息都将被保留,直到相关事务完成。因此,当在飞行事务中存在时,read_committed使用者将无法读取到高水印。

当使用Spring时,它是read-committed,而不是read_committed

代码语言:javascript
复制
spring.kafka.consumer.isolation-level=read-committed

您的IDE应该建议正确的值。

代码语言:javascript
复制
spring.kafka.consumer.properties=isolation.level=read_committed

编辑

(虽然我看到Boot也适用于read_uncommitted )。

这对我来说就像预期的一样。

代码语言:javascript
复制
@SpringBootApplication
public class So71591355Application {

    public static void main(String[] args) {
        SpringApplication.run(So71591355Application.class, args);
    }

    @KafkaListener(id = "so71591355", topics = "so71591355")
    void listen1(String in) {
        System.out.println("committed: " + in);
    }

    @KafkaListener(id = "so71591355-2", topics = "so71591355",
            properties = "isolation.level:read_uncommitted")
    void listen2(String in) {
        System.out.println("uncommitted: " + in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71591355").partitions(1).replicas(1).build();
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        template.setAllowNonTransactional(true);
        return args -> {
            template.send("so71591355", "non-transactional");
            try {
                template.executeInTransaction(t -> {
                    t.send("so71591355", "first");
                    t.send("so71591355", "second");
                    t.send("so71591355", new String(new byte[2000000]));
                    return null;
                });
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        };
    }
}
代码语言:javascript
复制
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.isolation-level=read-committed

spring.kafka.producer.transaction-id-prefix=tx-
代码语言:javascript
复制
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.4)

org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:660)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:403)
    at com.example.demo.So71591355Application.lambda$1(So71591355Application.java:49)
    at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:507)
    at com.example.demo.So71591355Application.lambda$0(So71591355Application.java:44)
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:768)
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:758)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:310)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301)
    at com.example.demo.So71591355Application.main(So71591355Application.java:19)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
uncommitted: non-transactional
committed: non-transactional
uncommitted: first
uncommitted: second

EDIT2

您的应用程序正在按预期工作;当我添加

代码语言:javascript
复制
@KafkaListener(id = "otherApp", topics =  { "ITEM-TOPIC", "INVENTORY-TOPIC", "PRICE-TOPIC" })
void listen3(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    System.out.println("so71591355 from " + topic + ": " + in);
}

对于另一个应用程序,它不接收任何数据。

2022-03-24 10:04:57.939 INFO 15038 -- hisApp-0-C-1 o.s.k.l.KafkaMessageListenerContainer : otherApp:分配的分区:价格-主题-0,商品-主题-0,库存-主题-0

当然,对于控制台使用者,我们可以看到消息,因为控制台使用者不是read_committed

当我评论价格时,我看到了

代码语言:javascript
复制
so71591355 from INVENTORY-TOPIC: Inventory data : My test Message
so71591355 from ITEM-TOPIC: Item data : My test Message
...

EDIT3

要定制后回滚处理器,只需将其添加为@Bean,并将其连接到容器工厂。

代码语言:javascript
复制
@Bean
AfterRollbackProcessor<Object, Object> arp() {
    return new DefaultAfterRollbackProcessor<>((rec, ex) -> {
        log.error("Failed to process {} from topic, partition {}-{}, @{}",
                rec.value(), rec.topic(), rec.partition(), rec.offset(), ex);
    }, new FixedBackOff(3000L, 2));
}

但是,您应该删除excuteInTransaction调用,直接对模板执行发送操作。这样,模板将参与容器的事务,而不是启动新的事务。

本例只记录错误;您可以添加DeadLetterPublishingRecoverer (或任何自定义恢复程序)。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71591355

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档