我们使用Pipeline DB将数据接收到一个流表中,在两个流视图中,在一个视图中,过滤掉类型转换验证错误失败的记录,在另一个视图中,过滤类型转换错误失败的记录。理想情况下,我们试图将好的记录与坏的记录分开,并将它们具体化到两个最终的表中。
例如,系统配置为以YYYY/MM/DD HH24:MI:SS格式从第三方接收数据,但由于某些原因,值显示在日和月的反转位置。在PipelineDB中,由于使用PostGres SQL "to_timestamp(mycolumn,'YYYY/MM/DD HH24:MI:SS')“,如果"mycolumn”中的文本类似于'2019/15/05 13:10:24‘,则会抛出一个硬错误。并且该事务内馈送到流中的任何记录都被回滚。(因此,如果使用PG Copy,则材料化流视图失败的一条记录将导致零条记录一起插入。在数据自动化中,这不是一个理想的场景,因为第三方自动化系统可能不太关心我们处理其数据的问题。)
从我所看到的:- PostGres没有“原生SQL”方式来做“尝试解析”- PipelineDB不支持用户定义的函数(如果我们编写一个函数有两个输出,一个用来解析值,另一个返回布尔值"is_valid“列)。(我的假设是函数驻留在服务器上,而pipelinedb作为外部服务器执行,这完全是另一回事。)
理想情况下,函数返回类型转换值和布尔标志(如果有效),并且可以在流式视图的WHERE子句中使用它来从坏记录派生出好记录。但我似乎不能解决这个问题?有什么想法吗?
发布于 2019-05-16 21:59:02
过了很长时间,我找到了解决这个问题的办法。我不喜欢它,但它会起作用的。
当我意识到整个问题是建立在以下基础上时,我才意识到:
http://docs.pipelinedb.com/continuous-transforms.html“您可以将连续转换看作是传入流数据之上的触发器,其中对连续转换输出的每个新行执行触发器函数。在内部,该函数作为每个行触发器的AFTER INSERT执行,因此没有旧行,而新行包含连续转换输出的行。”
我花了几个小时试图弄清楚:“为什么我编写的自定义函数不能为传入数据流”尝试解析“数据类型?物化视图或输出表中什么都不会显示? PipelineDB也没有抛出任何硬错误?几个小时后,我意识到问题与PipelineDB不能处理用户定义函数有关,而是在连续转换中,表示为SQL的转换在”插入行之后“发生。因此,从根本上说,更改物化流中数据字段的类型转换在开始之前就失败了。
解决方案(这不是很优雅)是:1-将类型转换逻辑或任何可能导致错误的SQL逻辑移到触发器函数中2-在触发器函数中创建"EXCEPTION WHEN others“部分3-确保RETURN NEW;在转换成功和失败的两种情况下都会发生。4-将连续转换作为不应用逻辑的简单传递,它只是调用触发器。(在这种情况下,它在某种程度上否定了使用PipelineDB解决这个初始数据暂存问题的全部意义。但是,我离题了。)
这样,我创建了一个表来捕获错误,并通过确保上面列出的所有3个步骤都得到了实现,从而确保事务将会成功。
这一点很重要,因为如果没有这样做,并且“你在异常中获取和异常”,或者你没有优雅地处理异常,那么将不会加载任何记录。
这支持了策略:我们只是试图使数据处理“在河中分叉”,以单向路由成功转换为一个表(或流表)的记录,以及未能转换为错误表的记录。
下面我展示了一个POC,我们将记录作为流处理,并将它们物化到物理表中。(它也可能是另一个流)。实现这一点的关键是:
errors表使用的文本列触发器函数捕获尝试的转换中的错误,并将它们写入errors表,其中包含从系统返回的错误的基本描述。
我提到我“不喜欢”这个解决方案,但这是我在几个小时内找到的最好的解决方案,可以绕过PipelineDB作为触发器post-insert执行操作的限制,因此无法捕获insert上的失败,并且pipelinedb没有内置的处理能力:-在失败时继续处理事务中的流-在行级优雅地失败,并提供一种更容易的机制将失败的转换路由到错误表
DROP SCHEMA IF EXISTS pdb CASCADE;
CREATE SCHEMA IF NOT EXISTS pdb;
DROP TABLE IF EXISTS pdb.lis_final;
CREATE TABLE pdb.lis_final(
edm___row_id bigint,
edm___created_dtz timestamp with time zone DEFAULT current_timestamp,
edm___updatedat_dtz timestamp with time zone DEFAULT current_timestamp,
patient_id text,
encounter_id text,
order_id text,
sample_id text,
container_id text,
result_id text,
orderrequestcode text,
orderrequestname text,
testresultcode text,
testresultname text,
testresultcost text,
testordered_dt timestamp,
samplereceived_dt timestamp,
testperformed_dt timestamp,
testresultsreleased_dt timestamp,
extractedfromsourceat_dt timestamp,
birthdate_d date
);
DROP TABLE IF EXISTS pdb.lis_errors;
CREATE TABLE pdb.lis_errors(
edm___row_id bigint,
edm___errorat_dtz timestamp with time zone default current_timestamp,
edm___errormsg text,
patient_id text,
encounter_id text,
order_id text,
sample_id text,
container_id text,
result_id text,
orderrequestcode text,
orderrequestname text,
testresultcode text,
testresultname text,
testresultcost text,
testordered_dt text,
samplereceived_dt text,
testperformed_dt text,
testresultsreleased_dt text,
extractedfromsourceat_dt text,
birthdate_d text
);
DROP FOREIGN TABLE IF EXISTS pdb.lis_streaming_table CASCADE;
CREATE FOREIGN TABLE pdb.lis_streaming_table (
edm___row_id serial,
patient_id text,
encounter_id text,
order_id text,
sample_id text,
container_id text,
result_id text,
orderrequestcode text,
orderrequestname text,
testresultcode text,
testresultname text,
testresultcost text,
testordered_dt text,
samplereceived_dt text,
testperformed_dt text,
testresultsreleased_dt text,
extractedfromsourceat_dt text,
birthdate_d text
)
SERVER pipelinedb;
CREATE OR REPLACE FUNCTION insert_into_t()
RETURNS trigger AS
$$
BEGIN
INSERT INTO pdb.lis_final
SELECT
NEW.edm___row_id,
current_timestamp as edm___created_dtz,
current_timestamp as edm___updatedat_dtz,
NEW.patient_id,
NEW.encounter_id,
NEW.order_id,
NEW.sample_id,
NEW.container_id,
NEW.result_id,
NEW.orderrequestcode,
NEW.orderrequestname,
NEW.testresultcode,
NEW.testresultname,
NEW.testresultcost,
to_timestamp(NEW.testordered_dt,'YYYY/MM/DD HH24:MI:SS') as testordered_dt,
to_timestamp(NEW.samplereceived_dt,'YYYY/MM/DD HH24:MI:SS') as samplereceived_dt,
to_timestamp(NEW.testperformed_dt,'YYYY/MM/DD HH24:MI:SS') as testperformed_dt,
to_timestamp(NEW.testresultsreleased_dt,'YYYY/MM/DD HH24:MI:SS') as testresultsreleased_dt,
to_timestamp(NEW.extractedfromsourceat_dt,'YYYY/MM/DD HH24:MI:SS') as extractedfromsourceat_dt,
to_date(NEW.birthdate_d,'YYYY/MM/DD') as birthdate_d;
-- Return new as nothing happens
RETURN NEW;
EXCEPTION WHEN others THEN
INSERT INTO pdb.lis_errors
SELECT
NEW.edm___row_id,
current_timestamp as edm___errorat_dtz,
SQLERRM as edm___errormsg,
NEW.patient_id,
NEW.encounter_id,
NEW.order_id,
NEW.sample_id,
NEW.container_id,
NEW.result_id,
NEW.orderrequestcode,
NEW.orderrequestname,
NEW.testresultcode,
NEW.testresultname,
NEW.testresultcost,
NEW.testordered_dt,
NEW.samplereceived_dt,
NEW.testperformed_dt,
NEW.testresultsreleased_dt,
NEW.extractedfromsourceat_dt,
NEW.birthdate_d;
-- Return new back to the streaming view as we don't want that process to error. We already routed the record above to the errors table as text.
RETURN NEW;
END;
$$
LANGUAGE plpgsql;
DROP VIEW IF EXISTS pdb.lis_tryparse CASCADE;
CREATE VIEW pdb.lis_tryparse WITH (action=transform, outputfunc=insert_into_t) AS
SELECT
edm___row_id,
patient_id,
encounter_id,
order_id,
sample_id,
container_id,
result_id,
orderrequestcode,
orderrequestname,
testresultcode,
testresultname,
testresultcost,
testordered_dt,
samplereceived_dt,
testperformed_dt,
testresultsreleased_dt,
extractedfromsourceat_dt,
birthdate_d
FROM pdb.lis_streaming_table as st;https://stackoverflow.com/questions/56155291
复制相似问题