给出这样的桌子:
╔══════════════════════════════════════╤════════╤═══════════╗
║ message_id │ type │ json_data ║
╠══════════════════════════════════════╪════════╪═══════════╣
║ 2c8c86b7-4867-494a-88bf-1e6b17dd121f │ type-a │ {} ║
╟──────────────────────────────────────┼────────┼───────────╢
║ 767d6fbf-84cf-4baa-9e33-46b15f0c8594 │ type-b │ {} ║
╟──────────────────────────────────────┼────────┼───────────╢
║ 298dcedb-b51d-4623-89f8-fabec44663c8 │ type-c │ {} ║
╚══════════════════════════════════════╧════════╧═══════════╝一种类似于以下的类型:
CREATE TYPE public.new_stream_message AS (
message_id UUID,
"type" VARCHAR(128),
json_data VARCHAR,
json_metadata VARCHAR
);以及一个函数,它将其中的一个数组作为参数之一。
在这个函数中,我将这个数组写到一个表中。如果存在唯一的约束冲突,我希望将数组中的message_id与该表中的select语句进行比较。如果一切都匹配,那么这是一个幂等的写,我可以安全地忽略异常。我该怎么做?
发布于 2018-05-09 10:53:13
最后,我执行了以下操作:使用游标,迭代返回的行,并将它们附加到循环中的数组中。然后,检查a) _new_stream_messages的长度等于读回来的内容,b)两者的内容是相同的。我确信有更好的方法来做这件事,但我还没有找到。
代码如下:
CREATE OR REPLACE FUNCTION __schema__.enforce_idempotent_append(
_stream_id CHAR(42),
_start INT,
_check_length BOOLEAN,
_new_stream_messages __schema__.new_stream_message [])
RETURNS VOID AS $F$
DECLARE
_message_id_record RECORD;
_message_id_cursor REFCURSOR;
_message_ids UUID [] = '{}' :: UUID [];
BEGIN
_message_id_cursor = (
SELECT *
FROM __schema__.read(_stream_id, cardinality(_new_stream_messages), _start, true, false)
OFFSET 1
);
FETCH FROM _message_id_cursor
INTO _message_id_record;
WHILE FOUND LOOP
_message_ids = array_append(_message_ids, _message_id_record.message_id);
FETCH FROM _message_id_cursor
INTO _message_id_record;
END LOOP;
IF (_check_length AND cardinality(_new_stream_messages) > cardinality(_message_ids))
THEN
RAISE EXCEPTION 'WrongExpectedVersion'
USING HINT = 'Wrong message count';
END IF;
IF _message_ids <> (
SELECT ARRAY(
SELECT n.message_id
FROM unnest(_new_stream_messages) n
))
THEN
RAISE EXCEPTION 'WrongExpectedVersion';
END IF;
END;
$F$
LANGUAGE 'plpgsql'发布于 2018-05-09 03:39:51
像这样的东西呢
CREATE OR REPLACE FUNCTION idempotentAppend(streamId varchar, new_stream_message NewMessage[]) RETURNS AppendResult AS $$
DECLARE
...
BEGIN
...
foreach message in ARRAY newMessages
loop
BEGIN
INSERT INTO public.Messages (message_id, Type, JsonData, json_metadata)
SELECT message.*;
EXCEPTION WHEN unique_violation THEN
SELECT * FROM public.Messages into conflict WHERE public.Messages.Id = Id;
IF message.type != conflict.type OR message.json_data != conflict.json_data THEN
RAISE unique_violation USING MESSAGE = 'Duplicate message ID: ' || Id;
END IF;
END;
end loop;
...
END;
$$ LANGUAGE plpgsql;顺便说一句,我在ddd es松弛的#sql-流存储通道中发现了这一点,并且一直在缓慢地开发java版本。
编辑: Whelp...looks,就像我应该读更多的松弛通道。看来你已经解决了
https://stackoverflow.com/questions/49530471
复制相似问题