首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何删除kafka消费者群体(通过新的消费api创建)?

如何删除kafka消费者群体(通过新的消费api创建)?
EN

Stack Overflow用户
提问于 2016-06-10 07:04:14
回答 2查看 10.4K关注 0票数 7

我通过新的消费API创造了卡夫卡消费者。

我使用的是kafka 2.10-0.9.0.1

我们有一个使用者组,每个组中有一个使用者实例。

Kafka脚本“kafka- old groups.sh”提供了删除用户的方法,但这只适用于运行命令的旧使用者组:

bin/kafka-消费者-groups.sh.bin

给出

--删除:警告:组删除只适用于旧的基于ZK的使用者组,必须小心使用它只删除不活动的组。

因此,我想问一问,是否有任何方法删除通过新的消费者API创建的消费者组?

EN

回答 2

Stack Overflow用户

发布于 2016-06-14 08:44:32

没有必要删除与新的消费者。以下是脚本试图删除时输出的内容:

请注意,无需删除新使用者的组元数据,因为在最后一个成员离开时会自动删除组元数据。

这是简单的答案。更多细节:“元数据”指的是两件事。首先,只是作为组成员资格协调器的一部分存储的有关使用者和使用者组的信息。如果组中的所有使用者都消失了,则自动删除。

第二,消费者组将提交的偏移量存储在Kafka主题中(当使用新的消费者时)。以前这些都是和动物园管理员一起保存的)。当使用者组消失时,不会立即删除该主题。如果使用者组再次出现,它将自动在本主题中找到以前的偏移量。它可以选择使用它们,也可以忽略它们。如果使用者组永远不会出现,这些存储的偏移量最终会自动被垃圾收集。

因此,简单地说,在使用新的消费者时,没有必要删除任何内容。

票数 5
EN

Stack Overflow用户

发布于 2020-09-30 16:07:43

在Kafka2.5.0中,AdminClient中有一个名为deleteConsumerGroups的Java,可用于删除单个ConsumerGroups。

您可以使用它,如下所示:

代码语言:javascript
复制
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;

public class DeleteConsumerGroups {
  public static void main(String[] args) {
    System.out.println("*** Starting AdminClient to delete a Consumer Group ***");

    final Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
    properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000");

    AdminClient adminClient = AdminClient.create(properties);
    String consumerGroupToBeDeleted = "console-consumer-65092";
    DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(consumerGroupToBeDeleted));

    KafkaFuture<Void> resultFuture = deleteConsumerGroupsResult.all();
    try {
      resultFuture.get();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }

    adminClient.close();
  }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37741936

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档