我花了一些时间发现我的Go应用程序连接到Kafka 0.11集群使用的是旧的0.8.2版本的库,它在响应中缺少时间戳值。
然后我发现Kafka 0.11.x API/版本不受支持(但他们正在努力)。
我现在有两个解决方案。
首先是在我的应用程序中明确设置所需的版本。其次是“调优”Sarama代码以使用版本0.10.x作为最低版本,使我能够使用所有0.10.x API/功能。
我仍然在想,为什么这个版本不是从我连接的Kafka代理中获取的?
我不能从代码中理解它是如何工作的。我清楚地看到了在sarama.Config.Version中设置或定义的版本,但是一旦连接到代理,我就找不到任何东西来更新这个值?
我知道Python是这样做的:
from kafka import BrokerConnection
broker=BrokerConnection("localhost",9092,0)
broker.connect()
broker.check_version()(0,11,0)
我遗漏了什么?
发布于 2017-09-26 04:10:22
到目前为止,我不确定Sarama自己是否能处理broker版本的搜索。
在我看来,需要在生产者/代理/客户端的配置参数中定义要使用的API版本,如下所示:
config := sarama.NewConfig()
config.Version=sarama.V0_10_2_0此外,Sarama还不支持0.11.0 (2017年9月),因此请使用0.10.2.0访问最新的API。
最后,要使用第二种解决方案,请编辑文件Shopify/sarama/utils.go并在末尾添加您的版本:
V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
V0_11_0 = newKafkaVersion(0, 11, 0, 0)
minVersion = V0_11_0_0https://stackoverflow.com/questions/46409425
复制相似问题