假设有一个多阶段的请求流,要求具有给定in的文档从源中检索、对比度调整、OCRed、字数统计、放在目标目录中等(具体的阶段可能因请求而异)。请求阶段必须按顺序完成,缓存中的中间结果应该被重用(不要一遍又一遍地保持OCRing相同的文档)。我想为各个应用程序(文档检索应用程序、OCR应用程序等)创建一个请求流。我怎样才能用ksqlDB做到这一点?
发布于 2020-06-17 14:45:51
让我们将请求和产品都表示为创建它们所需的步骤数组。在最坏的情况下,假设每个步骤都可以包含自定义信息,那么表示的类型应该是ARRAY<MAP<STRING,STRING>>。ksqlDB目前不支持将数组用作键,但我们可以通过将数组强制转换为字符串作为键来“作弊”。
让我们先分解一下逻辑:-任何新的请求都应该触发任何缺少先决条件的请求。-任何与现有产品匹配的新请求都应静默忽略。-任何没有匹配产品但具有匹配的必备产品的新请求都应转发到应用程序任务流。-任何既没有匹配的产品也没有匹配的必备产品的新请求应触发为必备产品创建新的请求。-任何作为请求先决条件的新产品都应该触发请求转发到应用程序任务流。
首先,让我们把我们的请求流和表放在一起。
CREATE STREAM requests (stages ARRAY<MAP<STRING,STRING>>) WITH (kafka_topic='requests', value_format='json', partitions=1);
CREATE STREAM keyed_requests WITH (kafka_topic='keyed_requests', value_format='json', partitions=1) AS
SELECT
CAST(stages AS STRING) AS request_id,
CAST(SLICE(stages,1,ARRAY_LENGTH(stages)-1) AS STRING) AS prereq_id,
stages,
SLICE(stages,1,ARRAY_LENGTH(stages)-1) AS prereq
FROM requests
PARTITION BY CAST(stages AS STRING);
CREATE TABLE tbl_requests (ROWKEY STRING PRIMARY KEY, request_id STRING, prereq_id STRING, stages ARRAY<MAP<STRING,STRING>>, prereq ARRAY<MAP<STRING,STRING>>) WITH (kafka_topic='keyed_requests', value_format='json');
CREATE TABLE tbl_requests_copy AS SELECT * FROM tbl_requests;
CREATE TABLE prereq_requests AS SELECT prereq_id, COLLECT_SET(request_id) AS requests FROM keyed_requests GROUP BY prereq_id;
INSERT INTO requests
SELECT
keyed_requests.prereq AS stages
FROM keyed_requests
LEFT JOIN tbl_requests_copy ON keyed_requests.prereq_id = tbl_requests_copy.ROWKEY
WHERE tbl_requests_copy.ROWKEY IS NULL;接下来,让我们创建已完成产品的流/表:
CREATE STREAM products (ROWKEY STRING KEY) WITH (kafka_topic='products', value_format='json', partitions=1);
CREATE TABLE tbl_products (ROWKEY STRING PRIMARY KEY) WITH (kafka_topic='products', value_format='json');现在,让我们为满足前提条件的请求创建一个新的流:
CREATE STREAM completed_prereq_requests (ROWKEY STRING KEY, request_id STRING) WITH (kafka_topic='completed_prereq_requests', value_format='json', partitions=1);让我们将新请求插入到流中:
INSERT INTO completed_prereq_requests
SELECT
request_id
FROM keyed_requests
LEFT JOIN tbl_products ON keyed_requests.prereq_id = tbl_products.ROWKEY
WHERE keyed_requests.prereq_id = '[]' OR tbl_products.ROWKEY IS NOT NULL
EMIT CHANGES;让我们也将完成的产品插入到流中:
INSERT INTO completed_prereq_requests
SELECT
EXPLODE(prereq_requests.requests) AS request_id
FROM products
INNER JOIN prereq_requests ON products.ROWKEY = prereq_requests.ROWKEY
EMIT CHANGES;现在我们有了一个具有已完成先决条件的请求的it流,但它仍然包括已经完成的请求本身。此外,请求数组本身不在此流中。让我们过滤出已完成的请求,并在新的流中使用数组对象进行丰富:
CREATE STREAM tasks AS
SELECT
tbl_requests.stages AS stages
FROM completed_prereq_requests
LEFT JOIN tbl_requests ON completed_prereq_requests.request_id = tbl_requests.ROWKEY
LEFT JOIN tbl_products ON completed_prereq_requests.request_id = tbl_products.ROWKEY
WHERE tbl_products.ROWKEY IS NULL
PARTITION BY completed_prereq_requests.request_id;要对此进行测试,请使用SELECT * FROM tasks EMIT CHANGES;在一个ksqlDB窗口中选择任务流。然后打开另一个ksqlDB窗口,创建一个请求并开始生产产品。看看另一个屏幕上发生了什么。
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p':='x'),MAP('r':='y')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p':='x')] AS STRING));
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p':='x'),MAP('r':='y2')]);
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y'),MAP('n':='z')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p1':='x1','p2':='x2')] AS STRING));
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y'),MAP('n':='z2')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y')] AS STRING));https://stackoverflow.com/questions/62422962
复制相似问题