首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在testharness中,闪烁事件计时器不会触发

在testharness中,闪烁事件计时器不会触发
EN

Stack Overflow用户
提问于 2021-09-27 07:54:05
回答 2查看 106关注 0票数 0

我正在尝试用flink实现一个基于事件时间特征计算的翻滚窗口。

因此,我有一个KeyedBroadcastProcessFunction来做所有的工作。有了ProcessTimers,在单元测试中,一切都会按预期进行。现在,我更改了代码以使用事件计时器,但时间不会触发。(我确实向registerEventTimeTimer注册了计时器)

基本上,测试是这样的

代码语言:javascript
复制
  @Test
  public void evaluateFormular_ShouldSumOnTimer() throws Exception {
    long minutesToWait = 1;
    var definition = createTestCondition("Test", String.format("%s(_var)", method), "_var", "Test",
        "1", minutesToWait);

    var message = new CalculationControlMessage();
    message.setAction(ControlMessageAction.Create);
    message.setCalculationDefinition(definition);

    harness.processBroadcastElement(message, 100l);
    //harness.processBroadcastWatermark(5l);
    this.processValues(harness, values);
    // advance the watermark so that the timer can fire
    harness.processWatermark(Time.minutes(minutesToWait).toMilliseconds() + 1);

    assertEquals(harness.numEventTimeTimers(), 1);
    assertEquals("there should be a formular evaluated", 1, harness.extractOutputValues().size());
    harness.extractOutputValues().forEach(datapoint -> {
      assertEquals(datapoint.getValue(), expected, 0d);
    });
  }

据我所知,水印是手动提前的,这样计时器就可以触发了。水印肯定比事件中的水印高。

线束是这样设置的

代码语言:javascript
复制
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.util.KeyedBroadcastOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;

public class FlinkHarness {
  public static KeyedBroadcastOperatorTestHarness<String, DataPointEvent, CalculationControlMessage, DataPointEvent> createForCalculations()
      throws Exception {
    var dynamicCalculationFunction = new DynamicCalculationFunction();
    
    var harness = ProcessFunctionTestHarnesses
        .forKeyedBroadcastProcessFunction(dynamicCalculationFunction, new KeySelector<>() {
          private static final long serialVersionUID = 1337L;

          @Override
          public String getKey(DataPointEvent value) throws Exception {
            return value.getDataPointKey();
          }

        }, Types.STRING, CalculationDescriptors.calculation);

    harness.setTimeCharacteristic(TimeCharacteristic.EventTime);
    harness.getExecutionConfig().setAutoWatermarkInterval(50l);
    harness.open();
    return harness;
  }

我不明白为什么定时器不能触发。我是不是遗漏了什么?

EN

回答 2

Stack Overflow用户

发布于 2021-09-27 16:51:45

代码语言:javascript
复制
this.processValues(harness, values);

这些值是分配了时间戳的StreamRecords吗?

代码语言:javascript
复制
harness.processWatermark(Time.minutes(minutesToWait).toMilliseconds() + 1);

水印是绝对时间戳。

代码语言:javascript
复制
assertEquals(harness.numEventTimeTimers(), 1);

通过水印发送来触发计时器应该会减少计时器的数量。

票数 1
EN

Stack Overflow用户

发布于 2021-09-28 14:20:22

对我来说,不清楚在上是否有两种方法可以在具有多个输入流的操作符上推进水印。

是的,这是有道理的。对于具有多个输入通道的运算符,总体水印始终是所有传入水印中的最小值。所以你必须把这两个水印

在这种情况下,使用测试中的KeyedBroadcastProcessFunction时,您总是需要将所有传入的水印

因此,在调用harness.processBroadcastElement(message, 0);harness.processElement(new StreamRecord<DataPointEvent>(event, event.getTimestamp()));时,这两个水印都需要使用

代码语言:javascript
复制
harness.processBroadcastWatermark(timestamp);
harness.processWatermark(timestamp);

然后,所需的计时器就可以触发

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69343000

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档