我正在构建一个websocket后端,它将连接到一个主题(只有一个分区),并从最早的位置消费数据,并不断消费新数据,直到websocket连接断开。一次可以存在多个websocket连接。
为了确保从头开始的所有数据都被使用,每次建立websocket连接时,我都会创建一个新的消费者组并订阅该主题
const Kafka = require('node-rdkafka')
const { v4: uuidv4 } = require('uuid')
const kafkaConfig = (uuid) => ({
'group.id': `my-topic-${uuid}`,
'metadata.broker.list': KAFKA_URL,
})
const topicName= 'test-topic'
const consumer = new Kafka.KafkaConsumer(kafkaConfig(uuidv4()), {
'auto.offset.reset': 'earliest',
})
console.log('attempting to connect to topic')
consumer.connect({ topic: topicName, timeout: 300 }, (err) => {
if (err) {
console.log('error connecting consumer to topic', topicName)
throw err
}
console.log(`consumer connected to topic ${topicName}`)
consumer.subscribe([topicName])
consumer.consume((_err, data) => {
// send data to websocket
})
})这似乎像预期的那样工作得很好。但是,当我尝试将消费者/消费者组的数量超过4个时,消费者连接似乎在无限期地等待。在上面的代码片段中,我会看到日志“正在尝试连接”,但之后什么也看不到。
我读了Kafka的文档,它看起来对消费者群体的数量没有限制。
我在本地主机上的docker容器中运行Kafka/Zookeper,并且我没有对主题设置任何限制。
我的dockerfile
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
labels:
- 'custom.project=faster-cms'
- 'custom.service=kafka'
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG4J_ROOT_LOGLEVEL: INFO
KAFKA_LOG4J_LOGGERS: 'kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO'
CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'我的问题是,为什么连接会无限期地等待,当连接不确定地被卡住时,我如何提高消费者限制或抛出错误。
发布于 2021-11-16 10:18:11
显然,这是node-rdkafka包中的一个限制。默认的消费者/生产者组限制是5。如果你想增加限制,在.env文件中设置环境变量UV_THREADPOOL_SIZE,包会增加组的限制。
https://stackoverflow.com/questions/69955840
复制相似问题