keyBy工作前的assignTimestampsAndWatermarks:
DataStream<Trip> trips =
env.addSource(consumer).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
@Override
public long extractTimestamp(Trip trip) {
return trip.endTime.getTime();
}
});
KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new Featurization());
AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
featurizedUserTrips.timeWindowAll(Time.days(7),
Time.days(1));但不是在keyBy和process之后
DataStream<Trip> trips = env.addSource(consumer);
KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
DataStream<FeaturizedTrip> featurizedUserTrips =
userTrips.process(new Featurization()).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
@Override
public long extractTimestamp(FeaturizedTrip trip) {
return trip.endTime.getTime();
}
});
AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
featurizedUserTrips.timeWindowAll(Time.days(7),
Time.days(1));Windows永远不会触发。
这是一个bug还是预期的行为?如果是后者,它的文档在哪里?
发布于 2020-07-02 01:13:00
我也遇到过类似的问题。在我的例子中,我有一个读取文件的源,它的每个实例都读取一个单独的文件。在将一个文件发送到数据流的最简单测试中,只有一个源实例正在生成事件,而其他实例处于空闲状态。所以,在应用了水印分配器之后,当我使用AssignerWithPeriodicWatermarks时,只有一个水印分配器在发送新的水印,而其他的没有,或者它们在发送Long.MIN_VALUE。
应用keyBy()运算符后,空闲事件流与活动事件流混合。由于在任何操作符处计算水印的规则都是取每个传入流中可用的最小值,因此链中下一个操作符process()的每个实例都停留在水印Long.MIN_VALUE处。因此,在应用keyBy()之后,水印看起来永远不会有进展。
我的解决方案是在分配水印之前对事件进行混洗。在水印分配运算符上,规则是相反的-始终采用并重新发送计算出的最高水印。通过这种方式,我们保证水印分配器的所有实例都在发送水印,即使某些源实例是空闲的:
stream.rebalance().assignTimestampsAndWatermarks(...);这里还有一个您必须避免的陷阱。如果您决定在源实例未收集数据时将其声明为空闲:
context.markAsTemporarilyIdle();你会发现水印再次丢失了,即使上面的rebalance()攻击已经就位了。这是因为markAsTemporarilyIdle()将相应的流设置为idle,并且在代码的这一部分(来自Flink 1.7):
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark()水印已停止,因为条件streamStatusProvider.getStreamStatus().isActive()为false。因此,结论是在所描述的情况下不使用context.markAsTemporarilyIdle()。
https://stackoverflow.com/questions/55716807
复制相似问题