首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用reactor-kafka使用不同的线程从Kafka中的消费者组读取数据

使用reactor-kafka使用不同的线程从Kafka中的消费者组读取数据
EN

Stack Overflow用户
提问于 2021-07-28 06:16:53
回答 1查看 209关注 0票数 0

我需要从一个卡夫卡主题消费,将有数百万的数据。一旦我从主题中读取,我需要将其转换并写入另一个主题。我能够使用来自主题的消息,通过多个线程处理数据,并写入另一个主题。我遵循了这里的示例https://projectreactor.io/docs/kafka/1.3.5-SNAPSHOT/reference/index.html#concurrent-ordered

下面是我的代码:

代码语言:javascript
复制
public Flux<?> flux() {
            KafkaSender<Integer, Person> sender = sender(senderOptions());
            return KafkaReceiver.create(receiverOptions(Collections.singleton(sourceTopic)))
                                .receive()
                                .map(m -> SenderRecord.create(transform(m.value()), m.receiverOffset()))
                                .as(sender::send)
                                .doOnNext(m -> m.correlationMetadata().acknowledge())
                                .doOnCancel(() -> close());
        }

我有多个消费者可以阅读,并且由于数据量的原因,我正在考虑添加不同的读者线程来从主题中读取。然而,reactor-kafka documentation提到KafkaReceiver不是线程安全的,因为底层KafkaConsumer不能被多个线程并发访问。

我正在寻找从一个主题阅读的建议。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-07-28 06:52:06

所以基本上,你正在寻找的所谓的消费者组,你可以运行的最大并行消耗是由你的主题的分区数量限制的。

Kafka消费者组机制允许您将消费某个topic的工作分开,以区分属于同一组的不同“读者”,工作将按照组中每个消费者单独负责一个分区来划分(1或更多,基于组中的消费者数量和该topic的分区数量)

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68552336

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档