首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用spark udaf实现条件窗口计数?

如何使用spark udaf实现条件窗口计数?
EN

Stack Overflow用户
提问于 2022-01-25 09:44:03
回答 1查看 125关注 0票数 1

我有一个有列的表:时间戳、id和条件,我希望每隔一段时间(例如10秒)计算每个id的数量。

如果条件为true,则count++返回前一个值。

非洲发展新议程的代码如下:

代码语言:javascript
复制
public class MyCount extends UserDefinedAggregateFunction {

    @Override
    public StructType inputSchema() {
        return DataTypes.createStructType(
                Arrays.asList(
                        DataTypes.createStructField("condition", DataTypes.BooleanType, true),
                        DataTypes.createStructField("timestamp", DataTypes.LongType, true),
                        DataTypes.createStructField("interval", DataTypes.IntegerType, true)
                )
        );
    }

    @Override
    public StructType bufferSchema() {
        return DataTypes.createStructType(
                Arrays.asList(
                        DataTypes.createStructField("timestamp", DataTypes.LongType, true),
                        DataTypes.createStructField("count", DataTypes.LongType, true)
                )
        );
    }

    @Override
    public DataType dataType() {
        return DataTypes.LongType;
    }

    @Override
    public boolean deterministic() {
        return true;
    }

    @Override
    public void initialize(MutableAggregationBuffer mutableAggregationBuffer) {
        mutableAggregationBuffer.update(0, 0L);
        mutableAggregationBuffer.update(1, 0L);
    }

    public void update(MutableAggregationBuffer mutableAggregationBuffer, Row row) {
        long timestamp = mutableAggregationBuffer.getLong(0);
        long count = mutableAggregationBuffer.getLong(1);
        long event_time = row.getLong(1);
        int interval = row.getInt(2);
        if (event_time > timestamp + interval) {
            timestamp = event_time - event_time % interval;
            count = 0;
        }
        if (row.getBoolean(0)) {
            count++;
        }
        mutableAggregationBuffer.update(0, timestamp);
        mutableAggregationBuffer.update(1, count);
    }
    
    @Override
    public void merge(MutableAggregationBuffer mutableAggregationBuffer, Row row) {

    }

    @Override
    public Object evaluate(Row row) {
        return row.getLong(1);
    }
}

然后我总结了一个类似于以下的sql:

代码语言:javascript
复制
select timestamp, id, MyCount(true, timestamp, 10) over(PARTITION BY id ORDER BY timestamp) as count from xxx.xxx

结果是:

代码语言:javascript
复制
timestamp    id     count
1642760594    0        1
1642760596    0        2
1642760599    0        3
1642760610    0        2 --duplicate
1642760610    0        2
1642760613    0        3
1642760594    1        1
1642760597    1        2
1642760600    1        1
1642760603    1        2
1642760606    1        4 --duplicate
1642760606    1        4
1642760608    1        5

当时间戳被重复时,我得到了1,2,4,4,5而不是1,2,3,4,5,如何修正它呢?

另一个要求是什么时候执行联合发展新议程的合并方法?我清空,实现它,但它正常运行。我试图在方法中添加日志,但我还没有看到这个日志。真的有必要吗?

有一个类似的问题:Apache Spark SQL UDAF over window showing odd behaviour with duplicate input

但是,row_number()没有这样的问题。row_number()是一个蜂箱,然后我尝试创建一个蜂箱。但我也有问题.Why hive udaf row_number() terminate() returns 'ArrayList'? I create my udaf row_number2() by copying its code then I got list return?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-02-15 13:25:32

最后,我通过spark aggregateWindowFunction解决了这个问题:

代码语言:javascript
复制
case class Count(condition: Expression) extends AggregateWindowFunction with Logging {

  override def prettyName: String = "myCount"

  override def dataType: DataType = LongType

  override def children: Seq[Expression] = Seq(condition)

  private val zero = Literal(0L)
  private val one = Literal(1L)

  private val count = AttributeReference("count", LongType, nullable = false)()

  private val increaseCount = If(condition, Add(count, one), count)

  override val initialValues: Seq[Expression] = zero :: Nil
  override val updateExpressions: Seq[Expression] = increaseCount :: Nil
  override val evaluateExpression: Expression = count

  override val aggBufferAttributes: Seq[AttributeReference] = count :: Nil

然后使用spark_session.functionRegistry.registerFunction注册它。

代码语言:javascript
复制
"select myCount(true) over(partition by window(timestamp, '10 seconds'), id order by timestamp) as count from xxx"
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70846427

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档