我要计算的是平均功率x=chx*((60/5)*100/500)瓦和=chx/500千瓦时
ch1-ch6是能量测量值,我将附上卡夫卡数据的图像,它将在终端上显示。单击以查看数据(图像)
发布于 2022-04-20 21:40:25
我想卡夫卡的数据看起来是这样的:
{"gw_id": "sn001", "t": 1650270347, "ch1": 38, "ch2": 0, "ch3": 1, "ch4": 2, "ch5": 0, "ch6": 50}我还假设您希望对每个ch[x]值执行无状态计算,例如:
p[x] = ch[x]*((60/5)*100/500)
e[x] = ch[x]/500所谓无状态,我的意思是计算的输出不需要以前的chx值。
有两种方法可以在用例中使用Flink :使用DataStream API或表/SQL API。Python还描述了如何在PyFlink环境中使用这些API。
SQL方法更简单--如果您确实需要非常定制的处理或以非关系的方式处理数据,那么您可以考虑使用DataStream API,但这里只考虑使用SQL。
注意:我没有尝试运行下面的代码,因此很可能会出现语法错误。
第一步是使用适合于数据源(即Kafka流)的连接器定义输入表。确保您使用的是卡夫卡表连接器,而不是DataStream。
def create_input():
return """
CREATE TABLE input (
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka',
'format' = 'json',
<see docs for other props>
)
"""现在,我们定义了一个将执行处理的函数,下面是一个只返回输入的例子,我们将该函数放在一个单独的模块中,以便更容易地进行测试:
# within module mymodule
@udf(result_type=DataTypes.BIGINT())
def my_func(i):
return i
def create_some_function():
return """
CREATE TEMPORARY FUNCTION my_func AS 'mymodule.my_func' LANGUAGE PYTHON
"""现在创建您的输出表,为此我们将只使用打印连接器
def create_output():
return """
CREATE TABLE output (
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`ch1_mod` BIGINT,
`ch2_mod` BIGINT,
`ch3_mod` BIGINT,
`ch4_mod` BIGINT,
`ch5_mod` BIGINT,
`ch6_mod` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'print'
)
"""现在我们可以定义out环境,然后加入输入和输出表:
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.execute_sql(create_some_function())
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, t, ch1, my_func(ch1), ch2, my_func(ch2), ch3, my_func(ch3), ch4, my_func(ch4), ch5, my_func(ch5), ch6, my_func(ch6) FROM input").wait()可以定义函数来执行数据的任意处理,包括接受多个参数。您可以使用Flink的窗口功能来执行有状态计算,例如在一个时间窗口中对一系列值进行平均,请参见下面的查询以获得更多详细信息(https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/overview/)
https://stackoverflow.com/questions/71920490
复制相似问题