我想定义函数MAX_BY,它接受类型T的值和类型Number的排序参数,并根据排序从窗口返回最大元素(类型为T)。我试过了
public class MaxBy<T> extends AggregateFunction<T, Tuple2<T, Number>> {
@Override
public T getValue(Tuple2<T, Number> tuple) {
return tuple.f0;
}
@Override
public Tuple2<T, Number> createAccumulator() {
return Tuple2.of(null, 0L);
}
public void accumulate(Tuple2<T, Number> acc, T value, Number order) {
if (order.doubleValue() > acc.f1.doubleValue()) {
acc.f0 = value;
acc.f1 = order;
}
}
}但是我不能使用TableEnvironment.registerFunction注册这样的函数。在底层,Flink使用TypeInformation来匹配SQL query中的类型,并且使用这样的定义,它不能确定类型(至少我是这么认为的)。我看到可以提供几个accumulate函数,但我仍然认为每个重载方法的返回类型必须相同。
内置聚合函数的工作方式类似于我想要实现的功能-- MAX可以接受任意列类型并返回相同的类型。这就是为什么我想我也应该能够做到这一点。
发布于 2020-05-29 23:20:28
不幸的是,Flink不支持具有灵活返回类型的聚合函数。对于MAX函数,内部实现独立于类型定义核心逻辑,然后为每个支持的类型(see code)创建一个实现。
在内部,根据类型将MAX映射到正确的实现。
如果您将函数定义并注册为用户定义的聚合函数,我认为这是不可能的。
https://stackoverflow.com/questions/62027612
复制相似问题