首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >一个小组中的一个卡夫卡消费者一直拒绝协调者,但只有当斯派克和卡夫卡都在EC2的时候

一个小组中的一个卡夫卡消费者一直拒绝协调者,但只有当斯派克和卡夫卡都在EC2的时候
EN

Stack Overflow用户
提问于 2020-10-09 18:44:00
回答 1查看 1K关注 0票数 6

我有一个Java应用程序试图使用EC2中的Kafka2.5.1集群中的主题,使用spark-streaming-kafka-0-10_2.11。它只在星火库集群或AWS之外独立安装:当星火也托管在EC2中时,卡夫卡消费者组永远不会完全初始化。对于总共三个主题,只有两个使用者进行连接,而第三个使用者则反复拒绝组协调器,称其为“不可用或无效”。

它总是同一个第三个主题,其使用者失败,但是第二个和第三个主题配置相同,并且都是空的;它们之间唯一的区别是名称。删除和重新创建第三个主题不会改变任何事情。忽略应用程序代码中的主题#3 (前两个主题很难分离)将导致成功启动。

所有不同的Sparks都是2.4.5版本,Google从发布的14.0.1升级到19.0,但没有其他特殊配置。

Kafka是一个三节点的EC2集群,每个节点托管一个代理、一个动物园管理员实例和一个火花工作者。所有的东西都在说话,而且从其他的东西中都可以听到。server.propertieslisteners配置为内部DNS名称,而advertised.listeners是外部DNS。

代码语言:javascript
复制
listeners=PLAINTEXT://ip-abc-def-ghi-jkl.region.compute.internal:9092
advertised.listeners=PLAINTEXT://ec2-mno-pqr-stu-vwx.region.compute.amazonaws.com:9092

从EC2内部启动失败的Spark应用程序:

代码语言:javascript
复制
2020-10-08/21:09:32.694/UTC org.apache.kafka.clients.consumer.ConsumerConfig INFO ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [ip-(broker 1 private dns).region.compute.internal:9092]
        check.crcs = true
        client.id =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = mygroup
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2020-10-08/21:09:32.843/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka version : 2.0.0
2020-10-08/21:09:32.844/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka commitId : 3402a8361b734732
2020-10-08/21:09:33.098/UTC org.apache.kafka.clients.Metadata INFO Cluster ID: mk34tRyzT1m1VR1ZC9GYnQ
2020-10-08/21:09:33.100/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)
2020-10-08/21:09:33.135/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Revoking previously assigned partitions []
2020-10-08/21:09:33.135/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] (Re-)joining group
2020-10-08/21:09:39.155/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Successfully joined group with generation 1
2020-10-08/21:09:39.159/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Setting newly assigned partitions [partitions here]
2020-10-08/21:09:39.188/UTC org.apache.kafka.clients.consumer.internals.Fetcher INFO [Consumer clientId=consumer-1, groupId=mygroup] Resetting offset for partition station-data-19 to offset 1976.

(more offset resets; consumer 2 has also joined the group successfully between 21:09:32 and 21:09:39. No activity from consumer 3 yet, unlike launches from an external Spark. Consumer 3 spin-up starts next)

2020-10-08/21:09:39.200/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka version : 2.0.0
2020-10-08/21:09:39.205/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka commitId : 3402a8361b734732
2020-10-08/21:09:39.213/UTC org.apache.kafka.clients.Metadata INFO Cluster ID: mk34tRyzT1m1VR1ZC9GYnQ
2020-10-08/21:09:39.214/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)
2020-10-08/21:09:39.219/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Revoking previously assigned partitions []
2020-10-08/21:09:39.219/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] (Re-)joining group
2020-10-08/21:09:42.268/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing
2020-10-08/21:09:42.268/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-2, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing

(more heartbeat failures for consumers 1 and 2)

2020-10-08/21:10:09.254/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
2020-10-08/21:10:09.330/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-2, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing
2020-10-08/21:10:09.331/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing
2020-10-08/21:10:09.377/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)

在EC2之外的Spark中,这种情况不会发生:所有三个消费者都会发现协调器,并且或多或少地同时加入到组中,计算出他们的偏移量,然后就可以开始比赛了。但是,当应用程序被提交到EC2中的Spark集群时,只有前两个使用者成功地加入了这个组。第三个使用者直到前两个连接并重置它们的偏移之后才开始初始化,然后它会发现组协调器,尝试与它交谈(导致一个再平衡,防止其他消费者心跳),失败并决定它无效,然后再次找到相同的协调器,重复恶心地重复。

这种失败的星火提交和来自外部EC2的成功星火提交之间唯一重要的配置区别是,后者必须指向代理的外部DNS名称。但是,无论是针对代理的外部名称还是内部名称、一个代理还是多个代理,内部应用程序的启动都会失败。

下面是broker 3中的Kafka server.log,上面标识为组协调员:

代码语言:javascript
复制
[2020-10-08 21:09:33,128] INFO [GroupCoordinator 3]: Dynamic Member with unknown member id joins group mygroup in Empty state. Created a new member id consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:33,128] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 0 (__consumer_offsets-25) (reason: Adding new member consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:33,136] INFO [GroupCoordinator 3]: Dynamic Member with unknown member id joins group mygroup in PreparingRebalance state. Created a new member id consumer-1-63909784-c821-4903-a08a-98a250d49b19 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,128] INFO [GroupCoordinator 3]: Stabilized group mygroup generation 1 (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,146] INFO [GroupCoordinator 3]: Assignment received from leader for group mygroup for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,222] INFO [GroupCoordinator 3]: Dynamic Member with unknown member id joins group mygroup in Stable state. Created a new member id consumer-3-3a91c573-a90b-4b5c-9707-af285bf9bbac for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,222] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 1 (__consumer_offsets-25) (reason: Adding new member consumer-3-3a91c573-a90b-4b5c-9707-af285bf9bbac with group instance id None) (kafka.coordinator.group.GroupCoordinator)

... (more unknown member ids joining the group in PreparingRebalance)

[2020-10-08 21:14:37,756] INFO [GroupCoordinator 3]: Member consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:37,757] INFO [GroupCoordinator 3]: Member consumer-1-63909784-c821-4903-a08a-98a250d49b19 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:37,757] INFO [GroupCoordinator 3]: Stabilized group mygroup generation 2 (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:39,625] INFO [GroupMetadataManager brokerId=3] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Member consumer-3-057c4229-7183-4733-b973-9f758b9a69d0 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 2 (__consumer_offsets-25) (reason: removing member consumer-3-057c4229-7183-4733-b973-9f758b9a69d0 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Member consumer-3-58c8ca8c-2daa-46c9-964b-7be883193287 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

... (more members failing and being removed)

[2020-10-08 21:14:47,759] INFO [GroupCoordinator 3]: Group mygroup with generation 3 is now empty (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-10-19 22:20:05

事实证明,这是关键:

第三个使用者

直到前两个连接并重置它们的偏移之后才开始初始化

前两位消费者没有几秒钟的活动时间,而消费者-3则迅速增长,之后心跳开始出现故障。

星火驱动程序是一个较旧的实例(m3.size),它有两个vCPU核。我们成功的测试来自最近有更多核心的机器,当我们在测试机器上使用taskset限制CPU可用性时,我们能够准确地再现问题。允许spark-submit三核成功。

对于我们来说,“简单”的0.8+ API并不是一个问题,但是要开始使用用于0.10+的“新的”消费者API,似乎需要组中的每个消费者都有一个核心。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64285670

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档