首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >应该由KTable发出的事件

应该由KTable发出的事件
EN

Stack Overflow用户
提问于 2019-04-15 10:23:14
回答 1查看 434关注 0票数 2

我正在尝试测试一个拓扑,作为最后一个节点,它有一个KTable。我的测试是使用一个完整的卡夫卡集群(通过合流的码头图像),所以我是,而不是使用TopologyTestDriver

我的拓扑有键值类型String -> CustomerString -> CustomerMapped输出的输入。serdes、架构和与架构注册表的集成都按预期工作。

我正在使用Scala、Kafka2.2.0、汇合平台5.2.1和kafka-streams-scala。我的拓扑尽可能简化,如下所示:

代码语言:javascript
复制
val otherBuilder = new StreamsBuilder()

otherBuilder
     .table[String,Customer](source)
     .mapValues(c => CustomerMapped(c.surname, c.age))
     .toStream.to(target)   

(所有隐式serdes、ProducedConsumed等都是默认的,并且是正确找到的)

我的测试包括将几条记录(data)同步和不间断地发送到source主题,并从target主题中读取,将结果与expected进行比较。

代码语言:javascript
复制
val data: Seq[(String, Customer)] = Vector(
   "key1" -> Customer(0, "Obsolete", "To be overridden", 0),
   "key1" -> Customer(0, "Obsolete2", "To be overridden2", 0),
   "key1" -> Customer(1, "Billy", "The Man", 32),
   "key2" -> Customer(2, "Tommy", "The Guy", 31),
   "key3" -> Customer(3, "Jenny", "The Lady", 40)
)
val expected = Vector(
   "key1" -> CustomerMapped("The Man", 32),
   "key2" -> CustomerMapped("The Guy", 31),
   "key3" -> CustomerMapped("The Lady", 40)
)

我构建了Kafka Stream应用程序,在其他设置之间设置了以下两个设置:

代码语言:javascript
复制
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000")
val s: Long = 50L * 1024 * 1024
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, s.toString)

因此,我期望KTable使用缓存,在提交和缓存大小为50 my之间间隔为5秒(对于我的场景来说已经足够了)。

我的问题是,我从target主题中读取的结果总是包含key1的多个条目。我本以为Obsolete和“`Obsolete1 1”的记录不会发出任何事件。实际产出如下:

代码语言:javascript
复制
Vector(
    "key1" -> CustomerMapped("To be overridden", 0),
    "key1" -> CustomerMapped("To be overridden2", 0),
    "key1" -> CustomerMapped("The Man", 32),
    "key2" -> CustomerMapped("The Guy", 31),
    "key3" -> CustomerMapped("The Lady", 40)
)

最后要提到的一点是:在我将Kafka从2.1.0升级到2.2.0之前,这个测试一直按照预期的方式工作。我再次验证了这一点,降低了我的申请级别。

我很困惑,有人能指出2.2.x版本中KTables的行为是否发生了变化吗?或者现在有新的设置,我必须设置来控制事件的发射?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-04-16 08:48:40

在Kafka 2.2中引入了一种优化,以减少Kafka流的资源占用。如果计算不需要KTable,则它不一定是物化的。这适用于您的情况,因为mapValues()可以实时计算.因为KTable没有物化,所以没有缓存,因此每个输入记录产生一个输出记录。

比较:https://issues.apache.org/jira/browse/KAFKA-6036

如果要强制KTable物化,可以将Materilized.as("someStoreName")传入StreamsBuilder#table()方法。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55687101

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档