我正在实现一种机制,通过在本地存储或请求远程Kafka实例来提供一些元数据信息。
我正在使用Scala和kafka-streams-scala库的2.4.1版本
我试着给你们简单的例子来说明我在做什么
中。
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "1", "01"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "2", "02"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "3", "03"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "4", "04"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "5", "15"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "6", "16"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "7", "17"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "8", "18"));我运行第二个连接到同一个组的Kafka流实例,我看到了重新平衡和分区重新分配的过程,并且我很好地理解这两个应用程序之间应该在分区之间共享,例如Kafka流应用程序1应该使用分区0,Kafka流应用程序2应该使用分区1,反之亦然,在重新平衡和reassignment.之后。
下一步,为了确保Kafka流以这种方式工作,正如我在步骤4中所描述的那样,我正在运行以下代码。
val it: KeyValueIterator[String, String] = streams.store(TEST_REQUEST_STORE, QueryableStoreTypes.keyValueStore[String, String]).all()
while (it.hasNext) {
val keyValue: KeyValue[String, String] = it.next();
println(keyValue)
}很酷,我看到了我想要的。我在本地主机上运行的Kafka在重新平衡和分区重新分配后持有分区1。
KeyValue(5, 15)
KeyValue(6, 16)
KeyValue(7, 17)
KeyValue(8, 18)但是,当我运行这小部分代码时,从我的角度来看,我看到了完全出乎意料的输出。
println(streams.metadataForKey(TEST_REQUEST_STORE, "1", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "2", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "3", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "4", stringSerializer))
println()
println(streams.metadataForKey(TEST_REQUEST_STORE, "5", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "6", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "7", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "8", stringSerializer))
println()StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}正如我所理解的,我应该期待这样的事情
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}发布于 2020-08-06 09:10:53
首先,我想注意的是,即使您在存储中没有任何记录,metadataForKey也会给您一些信息,并且似乎这个密钥托管的信息是随机的。
我意识到这个问题与版本完全无关,而是与序列化程序有关。
我使用StringSerializer将记录推入主题,从scala将记录推入主题,我尝试使用Serdes.String.serializer()查询元数据,这给了我一些不符合现实的随机结果。
我被创建了另一种方法,可以使用scala和GenericPrimitiveSerdeString键序列化程序以及metadataForKey的相同序列化程序将数据推入主题,这一次我感到意外的是,这一次的效果与预期一样。
因此,对于那些将要使用metadataForKey的人来说,为了使这个方法能够正常工作,要注意键序列化器。
https://stackoverflow.com/questions/63227040
复制相似问题