首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >PyFlink UDAF InternalRow与行

PyFlink UDAF InternalRow与行
EN

Stack Overflow用户
提问于 2021-06-18 05:41:00
回答 1查看 95关注 0票数 1

我正在尝试通过PyFlink中的自定义UDAF调用外部函数。我使用的函数要求数据在字典对象中。我尝试使用row(t.rowtime, t.b, t.c).cast(schema)来达到这样的效果。

在UDAF之外,这个表达式工作得很好。在UDAF中,此表达式被转换为InternalRow,而不能转换为字典对象。

有没有办法强制使用Row而不是InternalRow

代码语言:javascript
复制
from pyflink.common import Row
from pyflink.table import EnvironmentSettings, TableEnvironment, AggregateFunction, DataTypes
from pyflink.table.expressions import row, col, lit, row_interval
from pyflink.table.window import Tumble
from pyflink.table.udf import udaf

from datetime import datetime, date, time

class TestAccumulator(AggregateFunction):
    def create_accumulator(self):
        return Row(last_type="")

    def accumulate(self, accumulator, value):
        accumulator["last_type"] = str(type(value))

    def get_value(self, accumulator):
        return accumulator["last_type"]

    def get_result_type(self):
        return DataTypes.STRING()

    def get_accumulator_type(self):
        return DataTypes.ROW([
            DataTypes.FIELD("last_type", DataTypes.STRING()),
        ])


if __name__ == "__main__":
    # create a blink streaming TableEnvironment
    env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    table_env = TableEnvironment.create(env_settings)

    schema = DataTypes.ROW([
        DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3)),
        DataTypes.FIELD("b", DataTypes.STRING()),
        DataTypes.FIELD("c", DataTypes.STRING()),
        ])


    my_udaf = udaf(TestAccumulator())

    t = table_env.from_elements([(datetime(1970, 1, 1, 0, 0, 0), 'Hi', 'Hello'),
                                 (datetime(1970, 1, 1, 1, 0, 0), 'Hi', 'hi'),
                                 (datetime(1970, 1, 1, 2, 0, 0), 'Hi2', 'hi'),
                                 (datetime(1970, 1, 1, 3, 0, 0), 'Hi', 'Hello'),
                                 (datetime(1970, 1, 1, 4, 0, 0), 'Hi', 'Hello')], schema=schema)

    print(
        t.select( my_udaf(row(t.rowtime, t.b, t.c).cast(schema)).alias("udaf")).to_pandas().values
    )

输出:

代码语言:javascript
复制
[["<class 'pyflink.fn_execution.coder_impl_fast.InternalRow'>"]]
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-06-23 19:11:51

感谢您报告此问题。这是一个bug。我已经创建了一个JIRA https://issues.apache.org/jira/browse/FLINK-23121来修复它。它将在版本1.13.2中修复

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68026832

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档