我正在尝试测试一个拓扑,作为最后一个节点,它有一个KTable。我的测试是使用一个完整的卡夫卡集群(通过合流的码头图像),所以我是,而不是使用TopologyTestDriver的。
我的拓扑有键值类型String -> Customer和String -> CustomerMapped输出的输入。serdes、架构和与架构注册表的集成都按预期工作。
我正在使用Scala、Kafka2.2.0、汇合平台5.2.1和kafka-streams-scala。我的拓扑尽可能简化,如下所示:
val otherBuilder = new StreamsBuilder()
otherBuilder
.table[String,Customer](source)
.mapValues(c => CustomerMapped(c.surname, c.age))
.toStream.to(target) (所有隐式serdes、Produced、Consumed等都是默认的,并且是正确找到的)
我的测试包括将几条记录(data)同步和不间断地发送到source主题,并从target主题中读取,将结果与expected进行比较。
val data: Seq[(String, Customer)] = Vector(
"key1" -> Customer(0, "Obsolete", "To be overridden", 0),
"key1" -> Customer(0, "Obsolete2", "To be overridden2", 0),
"key1" -> Customer(1, "Billy", "The Man", 32),
"key2" -> Customer(2, "Tommy", "The Guy", 31),
"key3" -> Customer(3, "Jenny", "The Lady", 40)
)
val expected = Vector(
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)我构建了Kafka Stream应用程序,在其他设置之间设置了以下两个设置:
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000")
val s: Long = 50L * 1024 * 1024
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, s.toString)因此,我期望KTable使用缓存,在提交和缓存大小为50 my之间间隔为5秒(对于我的场景来说已经足够了)。
我的问题是,我从target主题中读取的结果总是包含key1的多个条目。我本以为Obsolete和“`Obsolete1 1”的记录不会发出任何事件。实际产出如下:
Vector(
"key1" -> CustomerMapped("To be overridden", 0),
"key1" -> CustomerMapped("To be overridden2", 0),
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)最后要提到的一点是:在我将Kafka从2.1.0升级到2.2.0之前,这个测试一直按照预期的方式工作。我再次验证了这一点,降低了我的申请级别。
我很困惑,有人能指出2.2.x版本中KTables的行为是否发生了变化吗?或者现在有新的设置,我必须设置来控制事件的发射?
发布于 2019-04-16 08:48:40
在Kafka 2.2中引入了一种优化,以减少Kafka流的资源占用。如果计算不需要KTable,则它不一定是物化的。这适用于您的情况,因为mapValues()可以实时计算.因为KTable没有物化,所以没有缓存,因此每个输入记录产生一个输出记录。
比较:https://issues.apache.org/jira/browse/KAFKA-6036
如果要强制KTable物化,可以将Materilized.as("someStoreName")传入StreamsBuilder#table()方法。
https://stackoverflow.com/questions/55687101
复制相似问题