首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache状态管理

Apache状态管理
EN

Stack Overflow用户
提问于 2021-04-17 14:40:24
回答 1查看 59关注 0票数 0

我在同一份flink工作中读到两个卡夫卡的主题。

  • Stream1:消息来自第一个主题被保存到rocksdb,然后它将与stream2.
  • Stream2:合并消息来自于第二个主题被stream1保存的状态丰富,然后它将与stream1.

结合。

Topic1和主题2是不同的源,但是两个源的输出基本上是相同的。我必须丰富来自topic2的数据和来自topic1的数据。

这是流动;

代码语言:javascript
复制
val stream1 = readKafkaTopic1().keyBy(_.memberId).map(saveMemberDetailsToRocksDB)
val stream2 = readKafkaTopic2().keyBy(_.memberId).map(readMemberDetailsAndEnrich)
stream1.union(stream2).addSink(kafkaProducer)

以下是问题;

  1. 是好的吗?
  2. 可以访问stream1为同一个
  3. 保存的状态。
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-04-17 16:25:28

似乎您应该能够通过使用KeyedCoProcessFunction来实现您想要的结果。这或多或少地希望:

代码语言:javascript
复制
stream1
.keyBy(_.memberId)
.connect(stream2.keyBy(_.memberId))
.process(new CustomKeyedCoProcessFunction())

通过这种方式,您可以将状态保持在单个KeyedCoProcessFunction中,这样您就可以访问stream1stream2的状态。

因此,对于processElement1,您可以在map中为stream1做相同的事情,而在processElement2中,您可以做与在map for stream2中所做的相同的事情。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67139378

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档