首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何比较两个kafka流或数据库表之间的(10亿条记录)数据

如何比较两个kafka流或数据库表之间的(10亿条记录)数据
EN

Stack Overflow用户
提问于 2018-09-01 15:32:05
回答 1查看 2.4K关注 0票数 3

我们通过疾病控制中心从DB2 (表-1)发送数据到Kafka主题(主题-1)。我们需要在DB2数据和卡夫卡主题之间进行协调。我们有两个选择-

a)将所有卡夫卡主题数据放入DB2 (作为表-1-拷贝),然后左转连接(表-1和表-1-拷贝之间)以查看不匹配的记录,创建增量并将其推回kafka。problem:可伸缩性--我们的数据集大约有10亿条记录,我不确定DB2 DBA是否会让我们运行如此庞大的连接操作(这种操作可以轻松持续15-20分钟)。

( b)将DB2再推回并行的kafka主题(Top-1-Copy),然后执行一些基于kafka流的解决方案,在kafka主题-1和Top-1-拷贝之间做左外连接。我仍然围绕卡夫卡流和离开外部连接我的头。我不确定(使用卡夫卡流中的窗口系统),我将能够比较整个内容的主题-1和主题-1拷贝。

更糟糕的是,卡夫卡中的主题-1是一个紧凑的主题,所以当我们把数据从DB2推回卡夫卡主题-1-拷贝,我们不能确定地启动卡夫卡主题压缩循环,以确保主题-1和主题-1-拷贝在对它们进行任何比较操作之前都是完全压缩的。

( c)我们是否有其他架构方案可供考虑?

理想的解决方案必须对任何大小的数据进行缩放。

EN

回答 1

Stack Overflow用户

发布于 2020-06-01 14:46:02

我看不出你为什么不能在Kafka流或KSQL中做到这一点。两个支持表-表连接。这是假设数据的格式是支持的。

键压缩不会影响结果,因为流和KSQL都将构建连接两个表的正确的最终状态。如果压缩已运行,则需要处理的数据量可能较少,但结果将是相同的。

例如,在ksqlDB中,您可以将这两个主题作为表导入,然后执行一个联接,然后通过topic-1null进行筛选,以查找缺少的行列表。

代码语言:javascript
复制
-- example using 0.9 ksqlDB, assuming a INT primary key:

-- create table from main topic:
CREATE TABLE_1 
   (ROWKEY INT PRIMARY KEY, <other column defs>) 
   WITH (kafka_topic='topic-1', value_format='?');

-- create table from second topic:
CREATE TABLE_2 
   (ROWKEY INT PRIMARY KEY, <other column defs>) 
   WITH (kafka_topic='topic-1-copy', value_format='?');

-- create a table containing only the missing keys:
CREATE MISSING AS
   SELECT T2.* FROM TABLE_2 T2 LEFT JOIN TABLE_1 T1
   WHERE T1.ROWKEY = null;

这种方法的好处是,缺失行的MISSING表将自动更新:当您从源DB2实例中提取缺失的行并将它们生成到topic-1时,“缺失”表中的行将被删除,也就是说,您将看到正在为MISSING主题生成墓碑。

您甚至可以扩展此方法以查找topic-1中不再存在于源db中的行:

代码语言:javascript
复制
-- using the same DDL statements for TABLE_1 and TABLE_2 from above

-- perform the join:
CREATE JOINED AS
   SELECT * FROM TABLE_2 T2 FULL OUTER JOIN TABLE_1 T1;

-- detect rows in the DB that aren't in the topic:
CREATE MISSING AS
   SELECT * FROM JOINED
   WHERE T1_ROWKEY = null;

-- detect rows in the topic that aren't in the DB:
CREATE EXTRA AS
   SELECT * FROM JOINED
   WHERE T2_ROWKEY = null;

当然,您需要相应地调整集群的大小。ksqlDB集群越大,处理数据的速度就越快。它还需要磁盘上的容量来实现该表。

可以通过主题上的分区数来设置的最大并行化量。如果您只有一个分区,那么数据将按顺序处理。如果运行100个分区,则可以使用100个CPU核心处理数据,前提是运行足够多的ksqlDB实例。(默认情况下,每个ksqlDB节点将在每个查询中创建4个流处理线程(但如果服务器有更多的核心,则可以增加这一点!)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52129604

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档