首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡流中的metadataForKey方法为连接到同一组的多个应用实例提供错误信息。

卡夫卡流中的metadataForKey方法为连接到同一组的多个应用实例提供错误信息。
EN

Stack Overflow用户
提问于 2020-08-03 09:38:49
回答 1查看 324关注 0票数 0

我正在实现一种机制,通过在本地存储或请求远程Kafka实例来提供一些元数据信息。

我正在使用Scala和kafka-streams-scala库的2.4.1版本

我试着给你们简单的例子来说明我在做什么

  1. ,我正在运行Kafka集群,它创建了一个带有两个分区的测试主题。
  2. 也运行了一个Kafka流实例,正如我前面提到的,它实现了从存储中请求本地或远程元数据的机制,它保存所有分区信息,直到没有任何其他实例连接到同一个组为止。
  3. 将一些记录推入测试主题

中。

代码语言:javascript
复制
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "1", "01"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "2", "02"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "3", "03"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "4", "04"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "5", "15"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "6", "16"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "7", "17"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "8", "18"));

我运行第二个连接到同一个组的Kafka流实例,我看到了重新平衡和分区重新分配的过程,并且我很好地理解这两个应用程序之间应该在分区之间共享,例如Kafka流应用程序1应该使用分区0,Kafka流应用程序2应该使用分区1,反之亦然,在重新平衡和reassignment.之后。

下一步,为了确保Kafka流以这种方式工作,正如我在步骤4中所描述的那样,我正在运行以下代码。

代码语言:javascript
复制
val it: KeyValueIterator[String, String] = streams.store(TEST_REQUEST_STORE, QueryableStoreTypes.keyValueStore[String, String]).all()

while (it.hasNext) {
  val keyValue: KeyValue[String, String] = it.next();
  println(keyValue)
}

很酷,我看到了我想要的。我在本地主机上运行的Kafka在重新平衡和分区重新分配后持有分区1。

代码语言:javascript
复制
KeyValue(5, 15)
KeyValue(6, 16)
KeyValue(7, 17)
KeyValue(8, 18)

但是,当我运行这小部分代码时,从我的角度来看,我看到了完全出乎意料的输出。

代码语言:javascript
复制
println(streams.metadataForKey(TEST_REQUEST_STORE, "1", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "2", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "3", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "4", stringSerializer))
println()
println(streams.metadataForKey(TEST_REQUEST_STORE, "5", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "6", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "7", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "8", stringSerializer))
println()
代码语言:javascript
复制
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}

StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}

正如我所理解的,我应该期待这样的事情

代码语言:javascript
复制
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}

StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-08-06 09:10:53

首先,我想注意的是,即使您在存储中没有任何记录,metadataForKey也会给您一些信息,并且似乎这个密钥托管的信息是随机的。

我意识到这个问题与版本完全无关,而是与序列化程序有关。

我使用StringSerializer将记录推入主题,从scala将记录推入主题,我尝试使用Serdes.String.serializer()查询元数据,这给了我一些不符合现实的随机结果。

我被创建了另一种方法,可以使用scala和GenericPrimitiveSerdeString键序列化程序以及metadataForKey的相同序列化程序将数据推入主题,这一次我感到意外的是,这一次的效果与预期一样。

因此,对于那些将要使用metadataForKey的人来说,为了使这个方法能够正常工作,要注意键序列化器。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63227040

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档