我们通过疾病控制中心从DB2 (表-1)发送数据到Kafka主题(主题-1)。我们需要在DB2数据和卡夫卡主题之间进行协调。我们有两个选择-
a)将所有卡夫卡主题数据放入DB2 (作为表-1-拷贝),然后左转连接(表-1和表-1-拷贝之间)以查看不匹配的记录,创建增量并将其推回kafka。problem:可伸缩性--我们的数据集大约有10亿条记录,我不确定DB2 DBA是否会让我们运行如此庞大的连接操作(这种操作可以轻松持续15-20分钟)。
( b)将DB2再推回并行的kafka主题(Top-1-Copy),然后执行一些基于kafka流的解决方案,在kafka主题-1和Top-1-拷贝之间做左外连接。我仍然围绕卡夫卡流和离开外部连接我的头。我不确定(使用卡夫卡流中的窗口系统),我将能够比较整个内容的主题-1和主题-1拷贝。
更糟糕的是,卡夫卡中的主题-1是一个紧凑的主题,所以当我们把数据从DB2推回卡夫卡主题-1-拷贝,我们不能确定地启动卡夫卡主题压缩循环,以确保主题-1和主题-1-拷贝在对它们进行任何比较操作之前都是完全压缩的。
( c)我们是否有其他架构方案可供考虑?
理想的解决方案必须对任何大小的数据进行缩放。
发布于 2020-06-01 14:46:02
我看不出你为什么不能在Kafka流或KSQL中做到这一点。两个支持表-表连接。这是假设数据的格式是支持的。
键压缩不会影响结果,因为流和KSQL都将构建连接两个表的正确的最终状态。如果压缩已运行,则需要处理的数据量可能较少,但结果将是相同的。
例如,在ksqlDB中,您可以将这两个主题作为表导入,然后执行一个联接,然后通过topic-1表null进行筛选,以查找缺少的行列表。
-- example using 0.9 ksqlDB, assuming a INT primary key:
-- create table from main topic:
CREATE TABLE_1
(ROWKEY INT PRIMARY KEY, <other column defs>)
WITH (kafka_topic='topic-1', value_format='?');
-- create table from second topic:
CREATE TABLE_2
(ROWKEY INT PRIMARY KEY, <other column defs>)
WITH (kafka_topic='topic-1-copy', value_format='?');
-- create a table containing only the missing keys:
CREATE MISSING AS
SELECT T2.* FROM TABLE_2 T2 LEFT JOIN TABLE_1 T1
WHERE T1.ROWKEY = null;这种方法的好处是,缺失行的MISSING表将自动更新:当您从源DB2实例中提取缺失的行并将它们生成到topic-1时,“缺失”表中的行将被删除,也就是说,您将看到正在为MISSING主题生成墓碑。
您甚至可以扩展此方法以查找topic-1中不再存在于源db中的行:
-- using the same DDL statements for TABLE_1 and TABLE_2 from above
-- perform the join:
CREATE JOINED AS
SELECT * FROM TABLE_2 T2 FULL OUTER JOIN TABLE_1 T1;
-- detect rows in the DB that aren't in the topic:
CREATE MISSING AS
SELECT * FROM JOINED
WHERE T1_ROWKEY = null;
-- detect rows in the topic that aren't in the DB:
CREATE EXTRA AS
SELECT * FROM JOINED
WHERE T2_ROWKEY = null;当然,您需要相应地调整集群的大小。ksqlDB集群越大,处理数据的速度就越快。它还需要磁盘上的容量来实现该表。
可以通过主题上的分区数来设置的最大并行化量。如果您只有一个分区,那么数据将按顺序处理。如果运行100个分区,则可以使用100个CPU核心处理数据,前提是运行足够多的ksqlDB实例。(默认情况下,每个ksqlDB节点将在每个查询中创建4个流处理线程(但如果服务器有更多的核心,则可以增加这一点!)。
https://stackoverflow.com/questions/52129604
复制相似问题