首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在kafka (node-rdkafka)中等待every连接到新消费群的topic

在kafka (node-rdkafka)中等待every连接到新消费群的topic
EN

Stack Overflow用户
提问于 2021-11-13 15:48:23
回答 1查看 75关注 0票数 0

我正在构建一个websocket后端,它将连接到一个主题(只有一个分区),并从最早的位置消费数据,并不断消费新数据,直到websocket连接断开。一次可以存在多个websocket连接。

为了确保从头开始的所有数据都被使用,每次建立websocket连接时,我都会创建一个新的消费者组并订阅该主题

代码语言:javascript
复制
const Kafka = require('node-rdkafka')
const { v4: uuidv4 } = require('uuid')

const kafkaConfig = (uuid) => ({
  'group.id': `my-topic-${uuid}`,
  'metadata.broker.list': KAFKA_URL,
})
const topicName= 'test-topic'
const consumer = new Kafka.KafkaConsumer(kafkaConfig(uuidv4()), {
  'auto.offset.reset': 'earliest',
})

console.log('attempting to connect to topic')
consumer.connect({ topic: topicName, timeout: 300 }, (err) => {
   if (err) {
      console.log('error connecting consumer to topic', topicName)
      throw err
   }
   console.log(`consumer connected to topic ${topicName}`)
   consumer.subscribe([topicName])
   consumer.consume((_err, data) => {
       // send data to websocket 
   })
})

这似乎像预期的那样工作得很好。但是,当我尝试将消费者/消费者组的数量超过4个时,消费者连接似乎在无限期地等待。在上面的代码片段中,我会看到日志“正在尝试连接”,但之后什么也看不到。

我读了Kafka的文档,它看起来对消费者群体的数量没有限制。

我在本地主机上的docker容器中运行Kafka/Zookeper,并且我没有对主题设置任何限制。

我的dockerfile

代码语言:javascript
复制
zookeeper:
  image: confluentinc/cp-zookeeper:latest
  environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

kafka:
    image: confluentinc/cp-kafka:latest
    labels:
      - 'custom.project=faster-cms'
      - 'custom.service=kafka'
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG4J_ROOT_LOGLEVEL: INFO
      KAFKA_LOG4J_LOGGERS: 'kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO'
      CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'

我的问题是,为什么连接会无限期地等待,当连接不确定地被卡住时,我如何提高消费者限制或抛出错误。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-11-16 10:18:11

显然,这是node-rdkafka包中的一个限制。默认的消费者/生产者组限制是5。如果你想增加限制,在.env文件中设置环境变量UV_THREADPOOL_SIZE,包会增加组的限制。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69955840

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档