首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Reactor Kafka实现多个接收器读取一个主题中的多个分区

使用Reactor Kafka实现多个接收器读取一个主题中的多个分区
EN

Stack Overflow用户
提问于 2021-08-04 06:25:43
回答 1查看 311关注 0票数 0

我在一个主题中有1000个分区。我想让一个线程从一个主题中的一个分区读取,转换消息并写入另一个主题。我正在添加多线程,以获得更好的吞吐量。我正在尝试使用reactor-Kafka - https://projectreactor.io/docs/kafka/1.3.5-SNAPSHOT/reference/index.html#_introduction来实现这一点。我的理解是在reactor中,每个接收器都有自己的单线程调度器,所以我必须创建1000个接收器来实现上面提到的场景。我一直在寻找这方面的例子,但我找不到任何例子,我也无法弄清楚如何做到这一点。

这是我拥有的代码,它读取一个主题中的所有分区,转换消息并写入另一个主题。

代码语言:javascript
复制
static class ReactiveTransposeAndSend extends SetKafkaProperties {

    SenderOptions<Integer, String> senderOptions =
        SenderOptions.<Integer, String>create(producerProps)
            .maxInFlight(1024);

    KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);

    ReceiverOptions<Integer, String> receiverOptions =
        ReceiverOptions.<Integer, String>create(consumerProps)
            .subscription(Collections.singleton(SOURCE_TOPIC));


    ReactiveTransposeAndSend(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
        super(consumerPropsOverride, producerPropsOverride, bootstrapServers, sourceTopic, destTopic);
    }

    public Disposable ReadProcessWriteRecords() {
        Scheduler writerScheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");
        Scheduler readerScheduler = Schedulers.newBoundedElastic(60, 60, "readerThreads");
        return KafkaReceiver.create(receiverOptions)
            .receive()
            .doOnNext( r -> System.out.printf("Record received: " + r.value() + " in thread: " + Thread.currentThread().getName() + System.lineSeparator()))
            .map(m -> SenderRecord.create(processRecord(m),m.receiverOffset()))
            .as(sender::send)
            .doOnNext(m->m.correlationMetadata().acknowledge())
            .doOnError(e -> e.printStackTrace())
            .subscribe();
    }

    private ProducerRecord<Integer, String> processRecord( ReceiverRecord<Integer, String> message) {
        System.out.printf( "Processing record " + message.value() + " in thread: "
            + Thread.currentThread().getName() + System.lineSeparator()) ;
        return new ProducerRecord<Integer,String>(DESTINATION_TOPIC, message.key(), message.value()+ " updated");
    }
}

如果有人能给我一些建议,或者给我举个例子,让多个接收者使用来自多个分区的消息,我会非常感激。

更新的代码:

代码语言:javascript
复制
static class ReactiveConsumeTransposeAndSend extends SetKafkaProperties {

    SenderOptions<Integer, String> senderOptions =
        SenderOptions.<Integer, String>create(producerProps)
            .maxInFlight(1024);

    KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);

    ReceiverOptions<Integer, String> receiverOptions =
        ReceiverOptions.<Integer, String>create(consumerProps)
            .subscription(Collections.singleton(SOURCE_TOPIC))
        .addAssignListener(partitions -> {
        System.out.printf("Partitions assigned" + partitions + System.lineSeparator());})
        .addRevokeListener(partitions -> {
            System.out.printf("Partitions assigned" + partitions + System.lineSeparator());})
        ;


    ReactiveConsumeTransposeAndSend(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
        super(consumerPropsOverride, producerPropsOverride, bootstrapServers, sourceTopic, destTopic);
    }

    public Disposable ReadProcessWriteRecords() {
        Scheduler writerScheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");
        Scheduler readerScheduler = Schedulers.newBoundedElastic(60, 60, "readerThreads");
        return KafkaReceiver.create(receiverOptions)
            .receive()
            .doOnNext( r -> System.out.printf("Record received: " + r.value() + " from partition: " + r.partition() + " in thread: " + Thread.currentThread().getName() + System.lineSeparator()))
            .map(m -> SenderRecord.create(processRecord(m),m.receiverOffset()))
            .as(sender::send)
            .doOnNext(m->m.correlationMetadata().acknowledge())
            .doOnError(e -> e.printStackTrace())
            .subscribe();
    }

    private ProducerRecord<Integer, String> processRecord( ReceiverRecord<Integer, String> message) {
        System.out.printf( "Processing record " + message.value() + " in thread: "
            + Thread.currentThread().getName() + System.lineSeparator()) ;
        return new ProducerRecord<Integer,String>(DESTINATION_TOPIC, message.key(), message.value()+ " updated");
    }
}

按照@nipuna.的建议,我按照SampleConsumer.java中的示例更新了代码。但是,以下是我在运行应用程序时收到的print语句:

代码语言:javascript
复制
Partitions assigned[metrics-2, metrics-1, metrics-0]
Record received:  A16 from partition: 2 in thread: reactive-kafka-reactive-group-1
Processing record  A16 in thread: reactive-kafka-reactive-group-1
Record received:  B14 from partition: 1 in thread: reactive-kafka-reactive-group-1
Processing record  B14 in thread: reactive-kafka-reactive-group-1

因此,使用相同的线程(“eactive-kafka-eactive group-1”)来使用来自分区的消息。我想让不同的线程使用来自不同分区的消息。

EN

回答 1

Stack Overflow用户

发布于 2021-08-04 06:37:51

here中子主题sample consumer下的文档示例所示,它正在消耗来自一个主题的多个分区。

您可以在SampleConsumer.java中查看示例消费者代码。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68646349

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档