我正在尝试用flink实现一个基于事件时间特征计算的翻滚窗口。
因此,我有一个KeyedBroadcastProcessFunction来做所有的工作。有了ProcessTimers,在单元测试中,一切都会按预期进行。现在,我更改了代码以使用事件计时器,但时间不会触发。(我确实向registerEventTimeTimer注册了计时器)
基本上,测试是这样的
@Test
public void evaluateFormular_ShouldSumOnTimer() throws Exception {
long minutesToWait = 1;
var definition = createTestCondition("Test", String.format("%s(_var)", method), "_var", "Test",
"1", minutesToWait);
var message = new CalculationControlMessage();
message.setAction(ControlMessageAction.Create);
message.setCalculationDefinition(definition);
harness.processBroadcastElement(message, 100l);
//harness.processBroadcastWatermark(5l);
this.processValues(harness, values);
// advance the watermark so that the timer can fire
harness.processWatermark(Time.minutes(minutesToWait).toMilliseconds() + 1);
assertEquals(harness.numEventTimeTimers(), 1);
assertEquals("there should be a formular evaluated", 1, harness.extractOutputValues().size());
harness.extractOutputValues().forEach(datapoint -> {
assertEquals(datapoint.getValue(), expected, 0d);
});
}据我所知,水印是手动提前的,这样计时器就可以触发了。水印肯定比事件中的水印高。
线束是这样设置的
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.util.KeyedBroadcastOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
public class FlinkHarness {
public static KeyedBroadcastOperatorTestHarness<String, DataPointEvent, CalculationControlMessage, DataPointEvent> createForCalculations()
throws Exception {
var dynamicCalculationFunction = new DynamicCalculationFunction();
var harness = ProcessFunctionTestHarnesses
.forKeyedBroadcastProcessFunction(dynamicCalculationFunction, new KeySelector<>() {
private static final long serialVersionUID = 1337L;
@Override
public String getKey(DataPointEvent value) throws Exception {
return value.getDataPointKey();
}
}, Types.STRING, CalculationDescriptors.calculation);
harness.setTimeCharacteristic(TimeCharacteristic.EventTime);
harness.getExecutionConfig().setAutoWatermarkInterval(50l);
harness.open();
return harness;
}我不明白为什么定时器不能触发。我是不是遗漏了什么?
发布于 2021-09-27 16:51:45
this.processValues(harness, values);这些值是分配了时间戳的StreamRecords吗?
harness.processWatermark(Time.minutes(minutesToWait).toMilliseconds() + 1);水印是绝对时间戳。
assertEquals(harness.numEventTimeTimers(), 1);通过水印发送来触发计时器应该会减少计时器的数量。
发布于 2021-09-28 14:20:22
对我来说,不清楚在上是否有两种方法可以在具有多个输入流的操作符上推进水印。
是的,这是有道理的。对于具有多个输入通道的运算符,总体水印始终是所有传入水印中的最小值。所以你必须把这两个水印
在这种情况下,使用测试中的KeyedBroadcastProcessFunction时,您总是需要将所有传入的水印
因此,在调用harness.processBroadcastElement(message, 0);或harness.processElement(new StreamRecord<DataPointEvent>(event, event.getTimestamp()));时,这两个水印都需要使用
harness.processBroadcastWatermark(timestamp);
harness.processWatermark(timestamp);然后,所需的计时器就可以触发
https://stackoverflow.com/questions/69343000
复制相似问题