我从四条动态流中读取数据。每个流中的数据是不同的数据类型。在读取所有四个流之后,我分配时间戳和水印,并从每个流中聚合数据。四个聚合的结果都是使用相同的泛型对象输出的。我希望合并来自四个流的结果,这样我就可以将统一的流发送到一个ProcessFunction。这基本上允许我像使用ProcessFunction一样使用CoProcessFunction,但我可以处理来自两个以上流的数据(在这种情况下,ProcessFunction将接收来自所有四个单独流的聚合)。
然而,我担心的是,这可能与水印不太好。如果一个流需要更长的时间来处理,或者在某种程度上是落后的,那么如果所有的水印都在联合中向前传递,而其中的一个流在其他流的前面,那么它的聚合可能不会到达进程函数。如果是这样的话,那么进程函数的水印将是它从四个单独的流中看到的水印的最大值。
我的问题是:如何在联合运算符中处理水印,以及联合的下游操作符如何处理这些水印?
另外:如果泛型对象的合并由于水印问题而无法工作,那么当Flink只支持两个流的CoProcessFunction时,合并四种不同聚合的结果的最佳方法是什么?
发布于 2020-02-25 13:04:50
另一种连接超过2个流的方法是建立一棵树,在所有的流都连接在一起之前进行成对的连接。或者作为一棵平衡的树,就像这样:
A--->
A+B---->
B--->
A+B+C+D------------>
C--->
C+D---->
D--->或者一次添加一个流,如下所示:
a--->
a+b--->
b--->
a+b+c--->
c----->
a+b+c+d--->
d------->FWIW,翻转-92是一种向Flink添加n进制流运算符的提议,但即使实现了,它可能也不会是用户可见的,至少在一开始是这样的。
发布于 2020-02-25 11:25:16
使用Union的水印就像并行流的水印一样工作。这意味着水印始终是来自所有输入流的水印的min。同样代表下游运算符,它们的水印将是所有输入流的min。
老实说,我不认为工会在任何方面都依赖于水印。但是,如果您出于任何原因想要使用CoProcessFunction,我可以提供这种有点麻烦的方式。您可以创建已生成的流的Seq,然后:
//Streams defined
val seq = Seq(stream, stream2, stream3, stream4)
seq.reduce((stream1, stream2) => stream1.connect(stream2).process(...))https://stackoverflow.com/questions/60383993
复制相似问题