我在同一份flink工作中读到两个卡夫卡的主题。
Stream1:消息来自第一个主题被保存到rocksdb,然后它将与stream2.Stream2:合并消息来自于第二个主题被stream1保存的状态丰富,然后它将与stream1.结合。
Topic1和主题2是不同的源,但是两个源的输出基本上是相同的。我必须丰富来自topic2的数据和来自topic1的数据。
这是流动;
val stream1 = readKafkaTopic1().keyBy(_.memberId).map(saveMemberDetailsToRocksDB)
val stream2 = readKafkaTopic2().keyBy(_.memberId).map(readMemberDetailsAndEnrich)
stream1.union(stream2).addSink(kafkaProducer)以下是问题;
stream1为同一个发布于 2021-04-17 16:25:28
似乎您应该能够通过使用KeyedCoProcessFunction来实现您想要的结果。这或多或少地希望:
stream1
.keyBy(_.memberId)
.connect(stream2.keyBy(_.memberId))
.process(new CustomKeyedCoProcessFunction())通过这种方式,您可以将状态保持在单个KeyedCoProcessFunction中,这样您就可以访问stream1和stream2的状态。
因此,对于processElement1,您可以在map中为stream1做相同的事情,而在processElement2中,您可以做与在map for stream2中所做的相同的事情。
https://stackoverflow.com/questions/67139378
复制相似问题