首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Akka持久力和至少一次交付

Akka持久力和至少一次交付
EN

Stack Overflow用户
提问于 2017-07-06 07:02:49
回答 1查看 407关注 0票数 1

我有一些消息传递应用程序,需要至少一次保证。正如我从文档中理解的: akka-persistence -是关于参与者状态的。它使用一些逐层的意识形态来处理这种状态。

在文档中,我发现一些似乎提出了这个保证。但现在我对这个概念有了一些怀疑:

我的演员是一个简单的发送者,我所需要的就是交付保证。因此,我实际上并不关心参与者的状态和所有这些消耗宝贵内存的分层。大量的期刊会成为问题的原因吗?

演员:

代码语言:javascript
复制
public class SafeSenderActor extends AbstractPersistentActorWithAtLeastOnceDelivery {

    private String persistenceId;
    private ActorSelection destination;

    public SafeSenderActor() {
        System.out.println("SafeSenderActor created");
        this.persistenceId = "safe-persistent-actor-id-" + UUID.randomUUID();
        destination = context().actorSelection("/user/safeReceiverRouter");
    }

    @Override
    public String persistenceId() {
        return persistenceId;
    }

    @Override
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create()
                .match(SenderTaskMessage.class, msg -> {
                    persistAsync(new MsgSentEvent(msg.getTestMessage()), this::updateState);
                })
                .match(ConfirmRobustMessageDelivery.class, ack -> {
                    persistAsync(new MsgConfirmEvent(ack.getMessageId(), ack.getLocalMessageNumber()), this::updateState);
                })
                .build();
    }

    @Override
    public Receive createReceiveRecover() {
        return receiveBuilder().match(Object.class, this::updateState).build();
    }

    private void updateState(Object event) {
        if (event instanceof MsgSentEvent) {
            MsgSentEvent ev = (MsgSentEvent) event;
            deliver(destination, deliveryId -> new RobustTestMessage(deliveryId, ev.getMessage()));
        } else if (event instanceof MsgConfirmEvent) {
            MsgConfirmEvent ev = (MsgConfirmEvent) event;
            confirmDelivery(ev.getDeliveryId());
        }
    }

}

在一些连续的消息传递之后,我得到了错误:

代码语言:javascript
复制
[ERROR] [07/06/2017 01:40:33.446] [sender-system-akka.actor.default-dispatcher-50] [akka://sender-system@127.0.0.1:6666/user/safeSendersRouter/$d] Failed to persist event type [com.test.common.events.MsgSentEvent] with sequence number [358698] for persistenceId [safe-persistent-actor-id-648ec66d-7b7f-4291-b3a2-9bd395d92dc7]. (akka.pattern.CircuitBreaker$$anon$1: Circuit Breaker Timed out.)

我正在使用leveldb作为日志。

EN

回答 1

Stack Overflow用户

发布于 2020-11-25 01:16:42

这可能需要很长一段时间,但也许回答这个问题仍然有用。您可以覆盖方法onPersistFailureonPersistRejected来调查您提到的错误。如果持久化失败,则调用onPersistFailure方法。这位演员将被贬低。BEst的做法是在一段时间后再次启动参与者,并使用回退管理程序。如果JOURNAl无法持久化事件,则调用onPersistRejected方法。将恢复执行元。

代码语言:javascript
复制
public class SafeSenderActor extends AbstractPersistentActorWithAtLeastOnceDelivery implements ActorLogging {
    ...
    ...
    ...
    @Override
    public void onPersistFailure(Throwable cause, Object event, long seqNr) {
        log().error("fail to persist $event because of: {}", cause);
        super.onPersistFailure(cause, event, seqNr);
    }

    @Override
    public void onPersistRejected(Throwable cause, Object event, long seqNr) {
        log().error("persist rejected for {} because of: {}", event, cause);
        super.onPersistRejected(cause, event, seqNr);
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44937309

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档