首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka使用者(使用反应堆kafka)重新加入组,并在调用取消订阅后立即分配主题分区

Kafka使用者(使用反应堆kafka)重新加入组,并在调用取消订阅后立即分配主题分区
EN

Stack Overflow用户
提问于 2021-08-04 06:15:37
回答 1查看 885关注 0票数 0

我面临一个问题,而没有订阅从卡夫卡消费者,请注意,我们正在使用反应堆卡夫卡API。它在开始时确实成功地取消订阅,但随后它立即加入到组中,并被分配给该主题的分区,因此本质上它始终保持订阅状态,并且一直在使用来自主题的消息,即使它不应该这样做!

下面是我做这个生意的代码,

代码语言:javascript
复制
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;

private final ReceiverOptions<String, byte[]> receiverOptions;
private Disposable disposable;

public void subscribe() {
    ReceiverOptions<String, byte[]> options = receiverOptions.subscription(Collections.singleton(topic))
            .addAssignListener(partitions -> {/*some listener code here*/})
            .addRevokeListener(partitions -> {/*some listener code here*/});

    Flux<ReceiverRecord<String, byte[]>> kafkaFlux = KafkaReceiver.create(options).receive();
    disposable = kafkaFlux
            .publishOn(Schedulers.fromExecutor(executorService))
            .subscribe(record -> {/*consume the record here*/});
}

public void unsubscribe() {
    if (disposable != null)
        disposable.dispose();
}

当我调用unsubscribe()方法时,如下面的日志所示;它会撤销分区,并向协调器发送一个休假组请求。

代码语言:javascript
复制
ConsumerCoordinator |[reactive-kafka-cgid-2]|| [Consumer clientId=consumer-cgid-2, groupId=cgid] Revoke previously assigned partitions topic-2
AbstractCoordinator |[reactive-kafka-cgid-2]|| [Consumer clientId=consumer-cgid-2, groupId=cgid] Member consumer-cgid-2-f61f347c-b07c-4037-92f5-9a418ac8d153 sending LeaveGroup request to coordinator kafaka_server:9084 (id: 2147483644 rack: null) due to the consumer is being closed

但是,在这之后,紧接着发生并记录了一组事件,其中为该使用者分配了主题分区,并从那里开始使用消息。请注意,我没有在这里调用订阅方法!

代码语言:javascript
复制
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Attempt to heartbeat failed since group is rebalancing
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Revoke previously assigned partitions topic-0, topic-1
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] (Re-)joining group
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Successfully joined group with generation Generation{generationId=3775, memberId='consumer-cgid-1-1f5da192-2a14-4634-a0f9-79707518598b', protocol='range'}
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Successfully synced group in generation Generation{generationId=3775, memberId='consumer-cgid-1-1f5da192-2a14-4634-a0f9-79707518598b', protocol='range'}
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Notifying assignor about the new Assignment(partitions=[topic-0, topic-1, topic-2])
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Adding newly assigned partitions: topic-0, topic-2, topic-1
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Found no committed offset for partition topic-0
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Setting offset for partition topic-2 to the committed offset FetchPosition{offset=2049, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafaka_server:9084 (id: 3 rack: null)], epoch=14}}
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Setting offset for partition topic-1 to the committed offset FetchPosition{offset=119509, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafaka_server:9083 (id: 2 rack: null)], epoch=19}}
SubscriptionState   |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Resetting offset for partition topic-0 to position FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafaka_server:9082 (id: 1 rack: null)], epoch=22}}.

使用的版本如下

org.apache.kafka:kafka-客户端:2.8.0

org.apache.kafka:kafka-streams:2.8.0

io.projectreactor.kafka:reactor-kafka:1.1.0.RELEASE

这里值得注意的是,还有另一个服务正在向该服务正在使用的主题生成消息。

这可能与卡夫卡参数有关,也可能不是,但如果你遇到这样的问题,并在此之前已经解决了,请告诉我解决方案。

谢谢,

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-08-06 02:26:12

我在回答我自己的问题。

我编写了一个独立的程序来订阅和取消订阅相关的主题,该程序正在按预期工作。它清楚地指出,从Kafka参数的角度来看,根本没有问题(所以应用程序本身有问题)。

在做了一些好的代码分析并逐行检查代码之后,我注意到subscribe方法被调用了2次。我commented其中的一个调用,然后进行测试,它的行为都很好,而且是预期的。

从来没有想过,通过订阅两次的主题,消费者将永远无法取消订阅!

注意:即使在执行了2次unsubscribe调用之后,这个使用者也不会取消订阅主题。因此,如果它已经订阅了两次(或者更有可能--但我还没有测试过),它将永远无法取消订阅它的生命!

从卡夫卡的角度来看,这是一种正常的行为吗?我不太确定,让其他人来回应.

谢谢..。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68646252

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档