首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KafkaListenerEndpointContainer无法用Spring创建Kafka事务

KafkaListenerEndpointContainer无法用Spring创建Kafka事务
EN

Stack Overflow用户
提问于 2019-07-22 18:00:00
回答 1查看 5K关注 0票数 1

我正在使用2.2.2.RELEASE(org.apache.kafka:kafka-clients:jar:2.0.1)和spring(2.1.1)。我无法执行事务,因为我的侦听器无法获得分配的分区。我为一次使用者创建了建议的配置。我正在尝试配置事务性侦听器容器,并且只需要一次处理

我使用事务管理器配置了生产者和使用者,用事务id配置了生产者,用isolation.level=read_committed配置了使用者。

代码语言:javascript
复制
@Bean(name = "producerFactory")
        public ProducerFactory<String, MyObject> producerFactory() {
            Map<String, Object> configProps = new HashMap<>();
            configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
            configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
            configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
            configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
            configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"txApp");
            DefaultKafkaProducerFactory<String, KafkaSerializer> producerFactory = new DefaultKafkaProducerFactory<>(configProps);
            producerFactory.setTransactionIdPrefix("tx.");

                    return producerFactory;
        }



@Bean
    public KafkaTransactionManager<?, ?> kafkaTransactionManager() {
        KafkaTransactionManager<?, ?> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory());
        // ...
        return kafkaTransactionManager;
    }

@Bean(name="appTemplate")
    public KafkaTemplate<String,MyObject> kafkaTemplate(){
        KafkaTemplate<String, MyObject> kafkaTemplate = new KafkaTemplate<>(
                producerFactory());
        return kafkaTemplate;
    }

//Consumer

@Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                                                                          ConsumerFactory kafkaConsumerFactory,
                                                                          KafkaTransactionManager kafkaTransactionManager) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setTransactionManager(kafkaTransactionManager());
        return factory;
    }

//in the Consumer
   @KafkaListener(topics = "myTopic", groupId = "ingest", concurrency = "4")
    public void listener(@Payload MyObject message,
                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) throws ExecutionException, InterruptedException {

...

// In my producer

myTemplate.executeInTransaction(t-> t.send(kafkaConfig.getTopicName(), myMessage));

我希望看到消息到达我的侦听器,但当我执行生产者,我得到以下错误:

代码语言:javascript
复制
22-07-2019 10:21:55.283 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR  o.a.k.c.c.i.ConsumerCoordinator.onJoinComplete request.id= request.caller=  - [Consumer clientId=consumer-2, groupId=ingest] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on partition assignment 
org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
    at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:150)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:137)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:1657)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:719)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
EN

回答 1

Stack Overflow用户

发布于 2019-07-22 19:17:10

查看服务器日志;很可能您没有足够的副本来支持事务(默认情况下为3)。如果只进行测试,可以将其设置为1。

请参见代理属性transaction.state.log.replication.factormin.insync.replicas

事务主题的复制因子(设置为更高以确保可用性)。在集群大小满足此复制因子要求之前,内部主题创建将失败。

当生产者将ack设置为"all“(或"-1")时,此配置指定必须确认写入才能被认为成功的副本的最小数量。如果不能满足这个最小值,那么生产者将引发一个异常( NotEnoughReplicas或NotEnoughReplicasAfterAppend)。当同时使用时,min.insync.replicas和allow允许您执行更好的耐久性保证。一个典型的场景是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并生成带有"all“的ack。这将确保在大多数副本没有接收到写的情况下,生产者会引发异常。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57151440

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档