首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >尝试解析PipelineDB中的数据类型并将错误流到失败的文本表?

尝试解析PipelineDB中的数据类型并将错误流到失败的文本表?
EN

Stack Overflow用户
提问于 2019-05-16 02:08:09
回答 1查看 14关注 0票数 0

我们使用Pipeline DB将数据接收到一个流表中,在两个流视图中,在一个视图中,过滤掉类型转换验证错误失败的记录,在另一个视图中,过滤类型转换错误失败的记录。理想情况下,我们试图将好的记录与坏的记录分开,并将它们具体化到两个最终的表中。

例如,系统配置为以YYYY/MM/DD HH24:MI:SS格式从第三方接收数据,但由于某些原因,值显示在日和月的反转位置。在PipelineDB中,由于使用PostGres SQL "to_timestamp(mycolumn,'YYYY/MM/DD HH24:MI:SS')“,如果"mycolumn”中的文本类似于'2019/15/05 13:10:24‘,则会抛出一个硬错误。并且该事务内馈送到流中的任何记录都被回滚。(因此,如果使用PG Copy,则材料化流视图失败的一条记录将导致零条记录一起插入。在数据自动化中,这不是一个理想的场景,因为第三方自动化系统可能不太关心我们处理其数据的问题。)

从我所看到的:- PostGres没有“原生SQL”方式来做“尝试解析”- PipelineDB不支持用户定义的函数(如果我们编写一个函数有两个输出,一个用来解析值,另一个返回布尔值"is_valid“列)。(我的假设是函数驻留在服务器上,而pipelinedb作为外部服务器执行,这完全是另一回事。)

理想情况下,函数返回类型转换值和布尔标志(如果有效),并且可以在流式视图的WHERE子句中使用它来从坏记录派生出好记录。但我似乎不能解决这个问题?有什么想法吗?

EN

回答 1

Stack Overflow用户

发布于 2019-05-16 21:59:02

过了很长时间,我找到了解决这个问题的办法。我不喜欢它,但它会起作用的。

当我意识到整个问题是建立在以下基础上时,我才意识到:

http://docs.pipelinedb.com/continuous-transforms.html“您可以将连续转换看作是传入流数据之上的触发器,其中对连续转换输出的每个新行执行触发器函数。在内部,该函数作为每个行触发器的AFTER INSERT执行,因此没有旧行,而新行包含连续转换输出的行。”

我花了几个小时试图弄清楚:“为什么我编写的自定义函数不能为传入数据流”尝试解析“数据类型?物化视图或输出表中什么都不会显示? PipelineDB也没有抛出任何硬错误?几个小时后,我意识到问题与PipelineDB不能处理用户定义函数有关,而是在连续转换中,表示为SQL的转换在”插入行之后“发生。因此,从根本上说,更改物化流中数据字段的类型转换在开始之前就失败了。

解决方案(这不是很优雅)是:1-将类型转换逻辑或任何可能导致错误的SQL逻辑移到触发器函数中2-在触发器函数中创建"EXCEPTION WHEN others“部分3-确保RETURN NEW;在转换成功和失败的两种情况下都会发生。4-将连续转换作为不应用逻辑的简单传递,它只是调用触发器。(在这种情况下,它在某种程度上否定了使用PipelineDB解决这个初始数据暂存问题的全部意义。但是,我离题了。)

这样,我创建了一个表来捕获错误,并通过确保上面列出的所有3个步骤都得到了实现,从而确保事务将会成功。

这一点很重要,因为如果没有这样做,并且“你在异常中获取和异常”,或者你没有优雅地处理异常,那么将不会加载任何记录。

这支持了策略:我们只是试图使数据处理“在河中分叉”,以单向路由成功转换为一个表(或流表)的记录,以及未能转换为错误表的记录。

下面我展示了一个POC,我们将记录作为流处理,并将它们物化到物理表中。(它也可能是另一个流)。实现这一点的关键是:

errors表使用的文本列触发器函数捕获尝试的转换中的错误,并将它们写入errors表,其中包含从系统返回的错误的基本描述。

我提到我“不喜欢”这个解决方案,但这是我在几个小时内找到的最好的解决方案,可以绕过PipelineDB作为触发器post-insert执行操作的限制,因此无法捕获insert上的失败,并且pipelinedb没有内置的处理能力:-在失败时继续处理事务中的流-在行级优雅地失败,并提供一种更容易的机制将失败的转换路由到错误表

代码语言:javascript
复制
DROP SCHEMA IF EXISTS pdb CASCADE;
CREATE SCHEMA IF NOT EXISTS pdb;


DROP TABLE IF EXISTS pdb.lis_final;
CREATE TABLE pdb.lis_final(
    edm___row_id bigint,
    edm___created_dtz timestamp with time zone DEFAULT current_timestamp,
    edm___updatedat_dtz timestamp with time zone DEFAULT current_timestamp,
    patient_id text,
    encounter_id text,
    order_id text,
    sample_id text,
    container_id text,
    result_id text,
    orderrequestcode text,
    orderrequestname text,
    testresultcode text,
    testresultname text,
    testresultcost text,
    testordered_dt timestamp,
    samplereceived_dt timestamp,
    testperformed_dt timestamp,
    testresultsreleased_dt timestamp,
    extractedfromsourceat_dt timestamp,
    birthdate_d date
);

DROP TABLE IF EXISTS pdb.lis_errors;
CREATE TABLE pdb.lis_errors(
    edm___row_id bigint,
    edm___errorat_dtz timestamp with time zone default current_timestamp,
    edm___errormsg text,
    patient_id text,
    encounter_id text,
    order_id text,
    sample_id text,
    container_id text,
    result_id text,
    orderrequestcode text,
    orderrequestname text,
    testresultcode text,
    testresultname text,
    testresultcost text,
    testordered_dt text,
    samplereceived_dt text,
    testperformed_dt text,
    testresultsreleased_dt text,
    extractedfromsourceat_dt text,
    birthdate_d text
);


DROP FOREIGN TABLE IF EXISTS pdb.lis_streaming_table CASCADE;
CREATE FOREIGN TABLE pdb.lis_streaming_table (
    edm___row_id serial,
    patient_id text,
    encounter_id text,
    order_id text,
    sample_id text,
    container_id text,
    result_id text,
    orderrequestcode text,
    orderrequestname text,
    testresultcode text,
    testresultname text,
    testresultcost text,
    testordered_dt text,
    samplereceived_dt text,
    testperformed_dt text,
    testresultsreleased_dt text,
    extractedfromsourceat_dt text,
    birthdate_d text
)
SERVER pipelinedb;


CREATE OR REPLACE FUNCTION insert_into_t()
  RETURNS trigger AS
  $$
  BEGIN

    INSERT INTO pdb.lis_final
    SELECT
        NEW.edm___row_id,
        current_timestamp as edm___created_dtz,
        current_timestamp as edm___updatedat_dtz,
        NEW.patient_id,
        NEW.encounter_id,
        NEW.order_id,
        NEW.sample_id,
        NEW.container_id,
        NEW.result_id,
        NEW.orderrequestcode,
        NEW.orderrequestname,
        NEW.testresultcode,
        NEW.testresultname,
        NEW.testresultcost,
        to_timestamp(NEW.testordered_dt,'YYYY/MM/DD HH24:MI:SS') as testordered_dt,
        to_timestamp(NEW.samplereceived_dt,'YYYY/MM/DD HH24:MI:SS') as samplereceived_dt,
        to_timestamp(NEW.testperformed_dt,'YYYY/MM/DD HH24:MI:SS') as testperformed_dt,
        to_timestamp(NEW.testresultsreleased_dt,'YYYY/MM/DD HH24:MI:SS') as testresultsreleased_dt,
        to_timestamp(NEW.extractedfromsourceat_dt,'YYYY/MM/DD HH24:MI:SS') as extractedfromsourceat_dt,
        to_date(NEW.birthdate_d,'YYYY/MM/DD') as birthdate_d;

    -- Return new as nothing happens
    RETURN NEW;

    EXCEPTION WHEN others THEN

        INSERT INTO pdb.lis_errors
        SELECT
            NEW.edm___row_id,
            current_timestamp as edm___errorat_dtz,
            SQLERRM as edm___errormsg,
            NEW.patient_id,
            NEW.encounter_id,
            NEW.order_id,
            NEW.sample_id,
            NEW.container_id,
            NEW.result_id,
            NEW.orderrequestcode,
            NEW.orderrequestname,
            NEW.testresultcode,
            NEW.testresultname,
            NEW.testresultcost,
            NEW.testordered_dt,
            NEW.samplereceived_dt,
            NEW.testperformed_dt,
            NEW.testresultsreleased_dt,
            NEW.extractedfromsourceat_dt,
            NEW.birthdate_d;

        -- Return new back to the streaming view as we don't want that process to error.  We already routed the record above to the errors table as text.
        RETURN NEW;

  END;
  $$
  LANGUAGE plpgsql;


DROP VIEW IF EXISTS pdb.lis_tryparse CASCADE;
CREATE VIEW pdb.lis_tryparse WITH (action=transform, outputfunc=insert_into_t) AS
SELECT
    edm___row_id,
    patient_id,
    encounter_id,
    order_id,
    sample_id,
    container_id,
    result_id,
    orderrequestcode,
    orderrequestname,
    testresultcode,
    testresultname,
    testresultcost,
    testordered_dt,
    samplereceived_dt,
    testperformed_dt,
    testresultsreleased_dt,
    extractedfromsourceat_dt,
    birthdate_d
FROM pdb.lis_streaming_table as st;
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56155291

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档