我面临一个问题,我的卡夫卡几乎每5-10分钟都会重新平衡,原因不明。我用卡夫卡和阿帕奇风暴。一种在背压(Strom )之前记录的典型情况--重复,但过了一段时间,我看到Node 2147482646断开连接,以及其他日志使用“协调器unavailable.isDisconnected: true”,在这之后,一切都是重新平衡的,对我来说,这是一个不健康的情况,尽管卡夫卡工作得很好。
几乎我所有的主题都有3个副本和10个分区,我有10个喷口(10个消费者)。
据我所知,这些吐露如下:
heartbeat.interval.ms
session.timeout.ms
rebalance.timeout.ms会影响到这件事。我的目的是:
heartbeat.interval.ms: 5000
session.timeout.ms: 30000
rebalance.timeout.ms: 60000我认为可能是由于一些连接问题,4-5次心跳消息是不够的,并尝试了heartbeat.interval.ms: 2000,但它没有帮助。
正如我从搜索kafka中了解到的,它发生在NetworkClient中的poll()方法中,然后在handleDisconnections(responses,updatedNow)和那里选择器已经断开连接。
还有什么能引起这样的问题呢?
2022-09-30 10:59:54.599 o.a.s.k.s.KafkaSpout Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Polled [0] records from Kafka
2022-09-30 10:59:54.599 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Resuming partitions []
2022-09-30 10:59:54.601 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Topic partitions with entries ready to be retried [{}]
2022-09-30 10:59:54.601 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Pausing partitions []
2022-09-30 10:59:54.614 o.a.s.g.LoadAwareShuffleGrouping refresh-load-timer [DEBUG] targetTask 1 is in LocalityScope RACK_LOCAL
2022-09-30 10:59:54.615 o.a.s.g.LoadAwareShuffleGrouping refresh-load-timer [DEBUG] targetTask 2 is in LocalityScope WORKER_LOCAL
2022-09-30 10:59:54.615 o.a.s.g.LoadAwareShuffleGrouping refresh-load-timer [DEBUG] targetTask 3 is in LocalityScope RACK_LOCAL
2022-09-30 10:59:54.615 o.a.s.g.LoadAwareShuffleGrouping refresh-load-timer [DEBUG] targetTask 4 is in LocalityScope WORKER_LOCAL
2022-09-30 10:59:54.630 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.630 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.680 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.680 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.731 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.731 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.781 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.781 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.801 o.a.s.k.s.KafkaSpout Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Polled [0] records from Kafka
2022-09-30 10:59:54.801 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Resuming partitions []
2022-09-30 10:59:54.802 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Topic partitions with entries ready to be retried [{}]
2022-09-30 10:59:54.803 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Pausing partitions []
2022-09-30 10:59:54.831 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.832 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.882 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.882 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.933 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.933 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.983 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.983 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.995 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Received FETCH response from node 1003 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-segmentation.check.groupid-1, correlationId=2202): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=808668183, responses=[])
2022-09-30 10:59:54.995 o.a.k.c.FetchSessionHandler Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Node 1003 sent an incremental fetch response with throttleTimeMs = 0 for session 808668183 with 0 response partition(s), 1 implied partition(s)
2022-09-30 10:59:54.996 o.a.k.c.c.i.Fetcher Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Added READ_UNCOMMITTED fetch request for partition segmentation.check.queue-1 at position FetchPosition{offset=1136153, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka-broker2:9092 (id: 1003 rack: /rack2)], epoch=4}} to node kafka-broker2:9092 (id: 1003 rack: /rack2)
2022-09-30 10:59:54.996 o.a.k.c.FetchSessionHandler Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Built incremental fetch (sessionId=808668183, epoch=2152) for node 1003. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s), replaced 0 partition(s) out of 1 partition(s)
2022-09-30 10:59:54.996 o.a.k.c.c.i.Fetcher Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), implied=(segmentation.check.queue-1), canUseTopicIds=False) to broker kafka-broker2:9092 (id: 1003 rack: /rack2)
2022-09-30 10:59:54.996 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-segmentation.check.groupid-1, correlationId=2203) and timeout 60000 to node 1003: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=808668183, sessionEpoch=2152, topics=[], forgottenTopicsData=[], rackId='')
2022-09-30 10:59:55.003 o.a.s.k.s.KafkaSpout Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Polled [0] records from Kafka
2022-09-30 10:59:55.003 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Resuming partitions []
2022-09-30 10:59:55.005 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Topic partitions with entries ready to be retried [{}]
2022-09-30 10:59:55.005 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Pausing partitions []
2022-09-30 10:59:55.033 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:55.034 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:55.084 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:55.084 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:55.134 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:55.134 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:55.185 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:55.185 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:55.206 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [INFO] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Node 2147482646 disconnected.
2022-09-30 10:59:55.206 o.a.s.k.s.KafkaSpout Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Polled [0] records from Kafka
2022-09-30 10:59:55.206 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Resuming partitions []
2022-09-30 10:59:55.208 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Topic partitions with entries ready to be retried [{}]
2022-09-30 10:59:55.208 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Pausing partitions []
2022-09-30 10:59:55.208 o.a.k.c.c.i.ConsumerCoordinator Thread-16-SegmentationCheck_Spout-executor[6, 6] [INFO] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Group coordinator kafka-broker0:9092 (id: 2147482646 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2022-09-30 10:59:55.208 o.a.k.c.c.i.ConsumerCoordinator Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Sending FindCoordinator request to broker kafka-broker0:9092 (id: 1001 rack: /rack0)
2022-09-30 10:59:55.209 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='segmentation.check.queue')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node kafka-broker0:9092 (id: 1001 rack: /rack0)
2022-09-30 10:59:55.209 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-segmentation.check.groupid-1, correlationId=2205) and timeout 60000 to node 1001: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='segmentation.check.queue')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
2022-09-30 10:59:55.210 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-segmentation.check.groupid-1, correlationId=2204) and timeout 60000 to node 1001: FindCoordinatorRequestData(key='segmentation.check.groupid', keyType=0, coordinatorKeys=[])
2022-09-30 10:59:55.212 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Received METADATA response from node 1001 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-segmentation.check.groupid-1, correlationId=2205): MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=1001, host='kafka-broker0', port=9092, rack='/rack0'), MetadataResponseBroker(nodeId=1003, host='kafka-broker2', port=9092, rack='/rack2'), MetadataResponseBroker(nodeId=1002, host='kafka-broker1', port=9092, rack='/rack1')], clusterId='fzb0hRraRb6mSwlmiWGOvA', controllerId=1001, topics=[MetadataResponseTopic(errorCode=0, name='segmentation.check.queue', topicId=AAAAAAAAAAAAAAAAAAAAAA, isInternal=false, partitions=[MetadataResponsePartition(errorCode=0, partitionIndex=0, leaderId=1002, leaderEpoch=3, replicaNodes=[1002, 1003, 1001], isrNodes=[1001, 1002, 1003], offlineReplicas=[]), MetadataResponsePartition(errorCode=0, partitionIndex=2, leaderId=1001, leaderEpoch=1, replicaNodes=[1001, 1002, 1003], isrNodes=[1001, 1002, 1003], offlineReplicas=[]), MetadataResponsePartition(errorCode=0, partitionIndex=1, leaderId=1003, leaderEpoch=4, replicaNodes=[1003, 1001, 1002], isrNodes=[1001, 1002, 1003], offlineReplicas=[])], topicAuthorizedOperations=0)], clusterAuthorizedOperations=0)
2022-09-30 10:59:55.212 o.a.k.c.Metadata Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Updating last seen epoch for partition segmentation.check.queue-0 from 3 to epoch 3 from new metadata
2022-09-30 10:59:55.212 o.a.k.c.Metadata Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Updating last seen epoch for partition segmentation.check.queue-2 from 1 to epoch 1 from new metadata
2022-09-30 10:59:55.212 o.a.k.c.Metadata Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Updating last seen epoch for partition segmentation.check.queue-1 from 4 to epoch 4 from new metadata
2022-09-30 10:59:55.213 o.a.k.c.Metadata Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Updated cluster metadata updateVersion 7 to MetadataCache{clusterId='fzb0hRraRb6mSwlmiWGOvA', nodes={1001=kafka-broker0:9092 (id: 1001 rack: /rack0), 1002=kafka-broker1:9092 (id: 1002 rack: /rack1), 1003=kafka-broker2:9092 (id: 1003 rack: /rack2)}, partitions=[PartitionMetadata(error=NONE, partition=segmentation.check.queue-0, leader=Optional[1002], leaderEpoch=Optional[3], replicas=1002,1003,1001, isr=1001,1002,1003, offlineReplicas=), PartitionMetadata(error=NONE, partition=segmentation.check.queue-2, leader=Optional[1001], leaderEpoch=Optional[1], replicas=1001,1002,1003, isr=1001,1002,1003, offlineReplicas=), PartitionMetadata(error=NONE, partition=segmentation.check.queue-1, leader=Optional[1003], leaderEpoch=Optional[4], replicas=1003,1001,1002, isr=1001,1002,1003, offlineReplicas=)], controller=kafka-broker0:9092 (id: 1001 rack: /rack0)}
2022-09-30 10:59:55.213 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Received FIND_COORDINATOR response from node 1001 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-segmentation.check.groupid-1, correlationId=2204): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1001, host='kafka-broker0', port=9092, coordinators=[])
2022-09-30 10:59:55.214 o.a.k.c.c.i.ConsumerCoordinator Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Received FindCoordinator response ClientResponse(receivedTimeMs=1664535595213, latencyMs=5, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-segmentation.check.groupid-1, correlationId=2204), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1001, host='kafka-broker0', port=9092, coordinators=[]))
2022-09-30 10:59:55.214 o.a.k.c.c.i.ConsumerCoordinator Thread-16-SegmentationCheck_Spout-executor[6, 6] [INFO] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Discovered group coordinator kafka-broker0:9092 (id: 2147482646 rack: null)
2022-09-30 10:59:55.214 o.a.k.c.c.i.ConsumerCoordinator Thread-16-SegmentationCheck_Spout-executor[6, 6] [INFO] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Group coordinator kafka-broker0:9092 (id: 2147482646 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted.
2022-09-30 10:59:55.214 o.a.k.c.c.i.ConsumerCoordinator Thread-16-SegmentationCheck_Spout-executor[6, 6] [INFO] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Requesting disconnect from last known coordinator kafka-broker0:9092 (id: 2147482646 rack: null)发布于 2022-10-03 11:34:38
正如我所发现的,如果将日志级别设置为跟踪(而不是调试),那么我就看到了原因:
About to close the idle connection from 2147482646 due to being idle for 540005 millis
Node 2147482646 disconnected.因为:
connections.max.idle.ms 600000 (10mins) is default for server
connections.max.idle.ms 540000 (9mins) is default for consumers但是我仍然不明白为什么连接是空闲的。开启了另一个与它相关的问题。When Kafka connection is idle?
https://stackoverflow.com/questions/73909501
复制相似问题