首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >我有一个kafka数据仓库,想用pyflink对数据做一些计算,我该怎么做呢?

我有一个kafka数据仓库,想用pyflink对数据做一些计算,我该怎么做呢?
EN

Stack Overflow用户
提问于 2022-04-19 06:23:57
回答 1查看 108关注 0票数 -2

我要计算的是平均功率x=chx*((60/5)*100/500)瓦和=chx/500千瓦时

ch1-ch6是能量测量值,我将附上卡夫卡数据的图像,它将在终端上显示。单击以查看数据(图像)

EN

回答 1

Stack Overflow用户

发布于 2022-04-20 21:40:25

我想卡夫卡的数据看起来是这样的:

代码语言:javascript
复制
{"gw_id": "sn001", "t": 1650270347, "ch1": 38, "ch2": 0, "ch3": 1, "ch4": 2, "ch5": 0, "ch6": 50}

我还假设您希望对每个ch[x]值执行无状态计算,例如:

代码语言:javascript
复制
p[x] = ch[x]*((60/5)*100/500)
e[x] = ch[x]/500

所谓无状态,我的意思是计算的输出不需要以前的chx值。

有两种方法可以在用例中使用Flink :使用DataStream API表/SQL API。Python还描述了如何在PyFlink环境中使用这些API。

SQL方法更简单--如果您确实需要非常定制的处理或以非关系的方式处理数据,那么您可以考虑使用DataStream API,但这里只考虑使用SQL。

注意:我没有尝试运行下面的代码,因此很可能会出现语法错误。

第一步是使用适合于数据源(即Kafka流)的连接器定义输入表。确保您使用的是卡夫卡表连接器,而不是DataStream。

代码语言:javascript
复制
def create_input():
    return """
        CREATE TABLE input (
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka',
          'format' = 'json',
          <see docs for other props>
        )
    """

现在,我们定义了一个将执行处理的函数,下面是一个只返回输入的例子,我们将该函数放在一个单独的模块中,以便更容易地进行测试:

代码语言:javascript
复制
# within module mymodule
@udf(result_type=DataTypes.BIGINT())
def my_func(i):
    return i



def create_some_function():
    return """
    CREATE TEMPORARY FUNCTION my_func AS 'mymodule.my_func' LANGUAGE PYTHON
    """

现在创建您的输出表,为此我们将只使用打印连接器

代码语言:javascript
复制
def create_output():
    return """
        CREATE TABLE output (
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `ch1_mod` BIGINT,
          `ch2_mod` BIGINT,
          `ch3_mod` BIGINT,
          `ch4_mod` BIGINT,
          `ch5_mod` BIGINT,
          `ch6_mod` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'print'
        )
    """

现在我们可以定义out环境,然后加入输入和输出表:

代码语言:javascript
复制
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

table_env.execute_sql(create_input())
table_env.execute_sql(create_some_function())
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, t, ch1, my_func(ch1), ch2, my_func(ch2), ch3, my_func(ch3), ch4, my_func(ch4), ch5, my_func(ch5), ch6, my_func(ch6) FROM input").wait()

可以定义函数来执行数据的任意处理,包括接受多个参数。您可以使用Flink的窗口功能来执行有状态计算,例如在一个时间窗口中对一系列值进行平均,请参见下面的查询以获得更多详细信息(https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/overview/)

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71920490

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档