首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >[RIAK]InternalAPI-in-CommitHook

[RIAK]InternalAPI-in-CommitHook
EN

Stack Overflow用户
提问于 2017-03-01 09:45:55
回答 1查看 56关注 0票数 1

我对erlang和riak很陌生。几个月前我开始用riak作为kv商店。现在,我想实现一个提交挂钩的riak,这样riak可以帮助我做一些统计。我阅读了一些文档并编写了预挂钩脚本,这些脚本将获取对象键并将其存储到一个集合中。如果只有一个客户端写到riak,那么这个钩子工作得很好,但是如果我增加到riak写入的连接,我发现它在集合中丢失了一些元素。看起来crdt_op没有做合并operation.And,日志文件中没有明显的错误。

谁能帮我找出发生了什么或者我错过了什么。

我用的是riak 2.1.3

谢谢大家!

下面是钩子脚本:

代码语言:javascript
复制
-module(myhook).
-export([pretest/1]).

now_to_local_string({MegaSecs, Secs, MicroSecs}) ->
    LocalTime = calendar:now_to_local_time({MegaSecs, Secs, MicroSecs}),
    {{Year, Month, Day}, {Hour, Minute, _}} = LocalTime,
    TimeStr = lists:flatten(io_lib:format("~4..0w~2..0w~2..0w~2..0w~2..0w",
                [Year, Month, Day, Hour, Minute])),
    TimeStr.

is_deleted(Object)->
    case dict:find(<<"X-Riak-Deleted">>,riak_object:get_metadata(Object)) of
        {ok,_} ->
            true;
        _ ->
            false
    end.

pretest(Object) ->
    % timer:sleep(10000),
    try
        ObjBucket = riak_object:bucket(Object),
  %     riak_object:bucket(Obj).

        Bucket = element(2, ObjBucket),
        BucketType = element(1, ObjBucket),

        ObjKey = riak_object:key(Object),
        % Key = binary_to_list(ObjKey),
        % ObjData = riak_object:get_value(Object),
        % Msg = binary_to_list(ObjData),
        CommitItem = iolist_to_binary(mochijson2:encode({struct, [{b, Bucket}, {k, ObjKey}, {t, BucketType}]})),

        case is_deleted(Object) of
            true ->
                KeyPrefix = "delete";
            _ ->
                KeyPrefix = "update"
        end,

        CurMin = now_to_local_string(os:timestamp()),
        IndexKey = binary:list_to_bin(io_lib:format("~s-~s", [CurMin, KeyPrefix])),

        %% Get a riak client
        {ok, C} = riak:local_client(),
        % get node obj
        ThisNode = atom_to_binary(node(), latin1),

        % get index obj and set context
        BType = <<"archive">>,
        B = <<"local-test">>,

        {SetObj, Context} = case C:get({BType, B}, IndexKey) of
            {error, notfound} -> 
                ThisSetObj = riak_kv_crdt:new({BType, B}, IndexKey, riak_dt_orswot),
                {ThisSetObj, undefined};
            {ok, ThisSetObj} ->
                % The datatype update requires the context if the value exists
                {{Ctx, _}, _} = riak_kv_crdt:value(ThisSetObj, riak_dt_orswot),
                {ThisSetObj, Ctx}
        end,

        UpdateIndex = [{add, CommitItem}],
        UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, Context},
        % UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, undefined},
        NewObj = riak_kv_crdt:update(SetObj, ThisNode, UpdateOp),

        error_logger:info_msg("Updating index for ~s,to set ~s~n", [binary:bin_to_list(CommitItem), IndexKey]),

        C:put(NewObj),
        Object
    catch
        error:Error ->
            {fail, lists:flatten(io_lib:format("[PREHOOKEXCEPTION]~p",[Error]))}
    end.

这是固定的水桶道具

代码语言:javascript
复制
active: true
allow_mult: true
basic_quorum: false
big_vclock: 50
chash_keyfun: {riak_core_util,chash_std_keyfun}
claimant: 'riak@192.168.100.2'
datatype: set
dvv_enabled: true
dw: quorum
last_write_wins: false
linkfun: {modfun,riak_kv_wm_link_walker,mapreduce_linkfun}
n_val: 3
notfound_ok: true
old_vclock: 86400
postcommit: []
pr: 0
precommit: []
pw: 0
r: quorum
rw: quorum
small_vclock: 50
w: quorum
young_vclock: 20
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-03-02 07:43:08

您的put C:put(NewObj)在处理CRDT时没有Riak使用的相同的选项。

Riak KV使用此函数从客户端请求中更新CRDT:

crdt.erl#L162-L175

代码语言:javascript
复制
maybe_update({true, true}, Req, State0) ->
    #dtupdatereq{bucket=B, key=K, type=BType,
                 include_context=InclCtx,
                 context=Ctx} = Req,
    #state{client=C, mod=Mod, op=Op} = State0,
    {Key, ReturnKey} = get_key(K),
    O = riak_kv_crdt:new({BType, B}, Key, Mod),
    Options0 = make_options(Req),
    CrdtOp = make_operation(Mod, Op, Ctx),
    Options = [{crdt_op, CrdtOp},
               {retry_put_coordinator_failure, false}] ++ Options0,
    Resp =  C:put(O, Options),
    State = State0#state{return_key=ReturnKey, return_ctx=InclCtx},
    process_update_response(Resp, State);

注意,它显式地传递crdt_opretry_put_coordinator_failure选项。

作为crdt_op值传递的记录是从这个函数生成的:

代码语言:javascript
复制
make_operation(Mod, Op, Ctx) ->
    #crdt_op{mod=Mod, op=Op, ctx=Ctx}.
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42528322

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档