我想知道Apache Flink一般对顶氮查询和表使用了多少状态。
首先,我使用Flink SQL处理来自Kafka主题的消息:
CREATE TABLE purchases (
country STRING,
product STRING
) WITH (
'connector' = 'kafka',
'topic' = 'purchases',
'properties.bootstrap.servers' = 'kafka:29092',
'value.format' = 'json',
'properties.group.id' = '1',
'scan.startup.mode' = 'earliest-offset'
);我还初始化了一个JDBC连接器:
CREATE TABLE aggregations (
`country` STRING,
`product` STRING,
`purchases` BIGINT NOT NULL,
PRIMARY KEY (`country`, `product`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/postgres?&user=postgres&password=postgres',
'table-name' = 'aggregations'
);最后,我开始聚合:
insert into aggregations
SELECT `country`, `product`, `purchases`
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY country ORDER BY `purchases` DESC) AS row_num
FROM (select country, product, count(*) as `purchases` from purchases group by country, product))
WHERE row_num <= 3;来自Flink状态管理的文档是这样说的:
从概念上讲,源表从未完全保持在状态中。实现者处理逻辑表(即动态表)。它们的国家需求取决于使用的操作。
那么,我是否正确地理解Flink没有从purchases连接器中保存这些行的表?
更重要的是,在聚合中:
select country, product, count(*) as `purchases` from purchases group by country, productFlink是否保留了每一个国家,产品的关键所在?
发布于 2021-12-13 04:10:49
Flink将把SQL/Table API转换为DataStream/DataSet操作符。例如,对于SQL中的purchases表,它将在DataStream中转换为FlinkKafkaConsumer。
你是正确的。Flinks没有将数据从Flink保存到状态,而是将Kafka保存到状态。
对于select and group by语句,Flink将以状态保存键和值(Count)。
发布于 2021-12-13 07:42:21
当使用Flink SQL或table时,传入流将转换为动态表。top-n是一个连续查询,它累加状态。这在表/中有更详细的解释。
Top查询积累状态,如https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/topn/上所解释的那样。还有一个窗口Top,正如在https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-topn/中解释的那样。后者提到了Moreover, window Top-N purges all intermediate state when no longer needed.。与Top相比,窗口Top更加友好。
https://stackoverflow.com/questions/70318076
复制相似问题