首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在KeyedStream.process之后assignTimestampsAndWatermarks不起作用

在KeyedStream.process之后assignTimestampsAndWatermarks不起作用
EN

Stack Overflow用户
提问于 2019-04-17 05:23:26
回答 1查看 871关注 0票数 1

keyBy工作前的assignTimestampsAndWatermarks

代码语言:javascript
复制
DataStream<Trip> trips =
        env.addSource(consumer).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
            @Override
            public long extractTimestamp(Trip trip) {
                return trip.endTime.getTime();
            }
        });
KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new Featurization());
AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
        featurizedUserTrips.timeWindowAll(Time.days(7),
                Time.days(1));

但不是在keyByprocess之后

代码语言:javascript
复制
DataStream<Trip> trips = env.addSource(consumer);
KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
DataStream<FeaturizedTrip> featurizedUserTrips =
        userTrips.process(new Featurization()).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
            @Override
            public long extractTimestamp(FeaturizedTrip trip) {
                return trip.endTime.getTime();
            }
        });
AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
        featurizedUserTrips.timeWindowAll(Time.days(7),
                Time.days(1));

Windows永远不会触发。

这是一个bug还是预期的行为?如果是后者,它的文档在哪里?

EN

回答 1

Stack Overflow用户

发布于 2020-07-02 01:13:00

我也遇到过类似的问题。在我的例子中,我有一个读取文件的源,它的每个实例都读取一个单独的文件。在将一个文件发送到数据流的最简单测试中,只有一个源实例正在生成事件,而其他实例处于空闲状态。所以,在应用了水印分配器之后,当我使用AssignerWithPeriodicWatermarks时,只有一个水印分配器在发送新的水印,而其他的没有,或者它们在发送Long.MIN_VALUE

应用keyBy()运算符后,空闲事件流与活动事件流混合。由于在任何操作符处计算水印的规则都是取每个传入流中可用的最小值,因此链中下一个操作符process()的每个实例都停留在水印Long.MIN_VALUE处。因此,在应用keyBy()之后,水印看起来永远不会有进展。

我的解决方案是在分配水印之前对事件进行混洗。在水印分配运算符上,规则是相反的-始终采用并重新发送计算出的最高水印。通过这种方式,我们保证水印分配器的所有实例都在发送水印,即使某些源实例是空闲的:

代码语言:javascript
复制
stream.rebalance().assignTimestampsAndWatermarks(...);

这里还有一个您必须避免的陷阱。如果您决定在源实例未收集数据时将其声明为空闲:

代码语言:javascript
复制
context.markAsTemporarilyIdle();

你会发现水印再次丢失了,即使上面的rebalance()攻击已经就位了。这是因为markAsTemporarilyIdle()将相应的流设置为idle,并且在代码的这一部分(来自Flink 1.7):

代码语言:javascript
复制
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark()

水印已停止,因为条件streamStatusProvider.getStreamStatus().isActive()为false。因此,结论是在所描述的情况下不使用context.markAsTemporarilyIdle()

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55716807

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档