首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KStream-KTable LeftJoin,当KTable未完全加载时发生连接

KStream-KTable LeftJoin,当KTable未完全加载时发生连接
EN

Stack Overflow用户
提问于 2020-04-27 21:06:08
回答 1查看 1.2K关注 0票数 0

我试图使用KStream来用主题B丰富主题A中的项目。主题A是我的KStream,主题B是我的KTtable,有大约2300万条记录。这两个主题的密钥都不是设定好的,所以我必须使用还原剂将KStream(主题B)转换为KTable。

这是我的代码:

代码语言:javascript
复制
KTable<String, String> ktable = streamsBuilder
     .stream("TopicB", Consumed.withTimestampExtractor(new customTimestampsExtractor()))
     .filter((key, value) -> {...})
     .transform(new KeyTransformer()) // generate new key
     .groupByKey()
     .reduce((aggValue, newValue) -> {...});

streamBuilder
     .stream("TopicA")
     .filter((key, value) -> {...})
     .transform(...)
     .leftJoin(ktable, new ValueJoiner({...}))
     .transform(...)
     .to("result")

1) KTable初始化速度慢。(大约2000毫希/秒),这正常吗?我的主题是只有一个分区。有什么办法可以提高性能吗?我试图将下面的设置设置为reduec写吞吐量,但似乎并没有提高很多。

代码语言:javascript
复制
CACHE_MAX_BYTES_BUFFERING_CONFIG = 10 * 1024 * 1024
COMMIT_INTERVAL_MS_CONFIG = 15 * 1000

2)连接发生在KTable未从主题B加载完时发生。这里是连接发生时的偏移量(当前偏移/日志结束偏移)。

代码语言:javascript
复制
   Topic A: 32725/32726 (Lag 1)
   Topic B: 1818686/23190390 (Lag 21371704)

我检查了主题A记录失败的时间戳,这是4天前的记录,最后一次处理主题B的记录是6天前。据我的理解,基于时间戳的kstream进程记录,我不明白为什么在我的示例中,KStream(主题A)没有等到KTable(主题B)完全加载到4天前触发连接时为止。

我还尝试设置时间戳提取器返回0,但它也不起作用。

Updated:当将时间戳设置为0时,我将得到以下错误:

代码语言:javascript
复制
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerID are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. 

我还尝试将max.task.idle.ms设置为>0(3秒30分钟),但仍然得到相同的错误。

更新:我修正了“UnknownProducerIdException”错误,在6天前将customTimestampsExtractor设置为6天前,这比主题A中的记录还要早。但是,在ktable完成加载之前,join仍然无法工作。为什么会这样呢?

我正在使用2.3.0。

我在这里做错什么了吗?非常感谢。

EN

回答 1

Stack Overflow用户

发布于 2020-04-28 03:32:30

1. KTable初始化速度慢。(大约2000毫希/秒),这正常吗?

这取决于您的网络,我认为关联是TopicB的消耗率,您使用的两个配置CACHE_MAX_BYTES_BUFFERING_CONFIGCOMMIT_INTERVAL_MS_CONFIG是在您想要生成的KTable的输出量(因为KTable变更量是修订流)和您在将KTable更新到底层主题和下游处理器时所接受的延迟之间进行权衡。详细查看状态存储的Kafka流缓存配置这个博客part Tables, Not Triggers

我认为增加TopicB的消耗率的好方法是增加更多的分区。

  1. KStream.leftJoin(KTable,...)总是表查找,它总是用KTable上最新更新的记录加入当前流记录,在决定是否加入时不考虑流时间。如果您想在加入时考虑流时间,请查看KStream-KStream连接

在您的示例中,这种滞后是TopicB的滞后,并不意味着KTable没有完全加载。在实际运行流应用程序之前,当您的KTable处于状态恢复过程时,当它从KTable的基本变更主题读取以恢复当前状态时,它不会被完全加载,只是在情况下,您将无法执行连接,因为在状态完全恢复之前,流应用程序不会运行。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61468460

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档