在卡夫卡,org.apache.kafka.clients.consumer.KafkaConsumer已经取代了SimpleConsumer。然而,它没有发送(.)函数。如何使用新的KafkaConsumer重写下面的代码?
import scala.concurrent.duration._
import kafka.api.TopicMetadataRequest
import kafka.consumer.SimpleConsumer
....
val consumer = new SimpleConsumer(
host = "127.0.0.1",
port = 9092,
soTimeout = 2.seconds.toMillis.toInt,
bufferSize = 1024,
clientId = "health-check")
// this will fail if Kafka is unavailable
consumer.send(new TopicMetadataRequest(Nil, 1))发布于 2017-09-29 15:58:10
您可以使用.partitionsFor和.listTopics获取主题元数据。
发布于 2017-09-29 15:52:58
没有方法的直接替代,这取决于您想要做什么。如果您需要所有分区信息,新api中就有该consumer.partitionFor(主题)的方法。
https://stackoverflow.com/questions/46491340
复制相似问题