首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KafkaContainer不支持Producer#initTransactions

KafkaContainer不支持Producer#initTransactions
EN

Stack Overflow用户
提问于 2019-03-20 20:30:16
回答 2查看 275关注 0票数 1

我尝试通过事务向Kafka发送消息。因此,我使用以下代码:

代码语言:javascript
复制
 try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers)) {
            producer.initTransactions();
            producer.beginTransaction();
            Arrays.stream(messages).forEach(
                message -> producer.send(new ProducerRecord<>(KAFKA_INPUT_TOPIC, message)));
            producer.commitTransaction();
        }

..。

代码语言:javascript
复制
private static Producer<Void, String> createProducer(String kafkaContainerBootstrapServers) {
        return new KafkaProducer<>(
            ImmutableMap.of(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainerBootstrapServers,
                ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
                ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
                ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()
            ),
            new VoidSerializer(),
            new StringSerializer());
    }

如果我使用本地的Kafka,它工作得很好。

但是如果我使用Kafka TestContainers,它在producer.initTransactions()上就会冻结

代码语言:javascript
复制
private static final String KAFKA_VERSION = "4.1.1";

@Rule
public KafkaContainer kafka = new KafkaContainer(KAFKA_VERSION)
    .withEmbeddedZookeeper();

如何配置KafkaContainer以处理事务?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-03-25 17:24:54

尝试使用Kafka for JUnit而不是Kafka测试容器。我在处理事务时遇到了同样的问题,并以这种方式使它们生效。

我使用的Maven依赖项:

代码语言:javascript
复制
<dependency>
    <groupId>net.mguenther.kafka</groupId>
    <artifactId>kafka-junit</artifactId>
    <version>2.1.0</version>
    <scope>test</scope>
</dependency>
票数 2
EN

Stack Overflow用户

发布于 2019-03-27 00:44:39

正如@AntonLitvinenko建议的那样,我在使用Kafka for JUnit时遇到了一个异常。我的问题是here

我添加了这个依赖来修复它(参见issue):

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-test</artifactId>
    <version>2.12.0</version>
    <exclusions>
        <exclusion>
           <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
    <scope>test</scope>
</dependency>

另外,我对kafka-junit和kafka_2.11使用了2.0.1版本:

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>${kafkaVersion}</version>
    <scope>test</scope>
</dependency>
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55260840

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档