我有一个长时间运行的管道,它有一些失败的项(在进程结束时没有加载的项,因为它们失败了数据库验证或类似的东西)。
我想重新运行管道,但只处理上次运行时导入失败的项。
我有一个系统,在那里我检查每个项目ID (从外部来源收到)。我在装载机里做这个检查。如果数据库中已有该项ID,则跳过在数据库中加载/插入该项。
这个很好用。但是,这很慢,因为我提取了每个项的转换加载,只有在加载时,我才查询数据库(每个项一个查询)并比较项ID。
我想尽快把这些记录过滤掉。如果我用变压器做的话,我只能再做一次。它看起来可能是提取器的位置,或者我可以将记录分批传递给转换器,然后在(第一个)转换器中filter+explode项目。
这里有什么更好的方法?
我也在考虑我的提取器的可重用性,但我想我可以接受这样一个事实:一个提取器同时进行提取和过滤。我认为最好的解决办法是能够连锁多个萃取器。然后我会有一个提取数据,另一个过滤数据。
编辑:也许我可以这样做:
already_imported_item_ids = Items.pluck(:item_id)
Kiba.run(
Kiba.parse do
source(...)
transform do |item|
next if already_imported_item_ids.include?(item)
item
end
transform(...)
destination(...)
end
)我想这能行吗?
发布于 2020-07-03 12:36:50
以下几点提示:
如果
pre_process块开始时加载完整的ids列表(主要是您在代码示例中想到的内容),然后在源代码之后进行比较。显然,它不会无限缩放,但是它可以根据数据集的大小而长时间工作。如果您需要更高的规模,我建议您要么使用缓冲转换(分组N行)来实现单个查询,以验证目标数据库中所有N行in的存在,要么与行组一起工作。
https://stackoverflow.com/questions/62695849
复制相似问题