首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Akka分布的Pub/Sub背压

Akka分布的Pub/Sub背压
EN

Stack Overflow用户
提问于 2017-12-11 01:17:45
回答 1查看 969关注 0票数 5

我正在使用Akka分布式Pub/Sub,并且有一个发布者和一个订阅者。我的发行者比订阅者快得多。有什么办法让出版商在某一点之后放慢速度吗?

出版者代码:

代码语言:javascript
复制
public class Publisher extends AbstractActor {
    private ActorRef mediator;

    static public Props props() {
        return Props.create(Publisher.class, () -> new Publisher());
    }

    public Publisher () {
        this.mediator = DistributedPubSub.get(getContext().system()).mediator();
        this.self().tell(0, ActorRef.noSender());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .match(Integer.class, msg -> {
                // Sending message to Subscriber
                mediator.tell(
                    new DistributedPubSubMediator.Send(
                        "/user/" + Subscriber.class.getName(),
                        msg.toString(),
                        false),
                    getSelf());

                getSelf().tell(++msg, ActorRef.noSender());
            })
            .build();
    }
}

订户代码:

代码语言:javascript
复制
public class Subscriber extends AbstractActor {
    static public Props props() {
        return Props.create(Subscriber.class, () -> new Subscriber());
    }

    public Subscriber () {
        ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
        mediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .match(String.class, msg -> {
                System.out.println("Subscriber message received: " + msg);
                Thread.sleep(10000);
            })
            .build();
    }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-12-11 13:30:53

不幸的是,按照目前的设计,我认为没有办法为原始发件人提供“背压”。由于您使用ActorRef.tellmediator发送消息,因此无法获得下游接收器正在备份的信号。这是因为您使用的方法tell返回一个void

交换机询问

如果您将tell切换到ask,则可以设置一个适当的Timeout值,该值至少会在特定时间内没有收到响应时通知您。

切换到流

“背压”是阿克卡流的主要特征之一。。因此,通过切换到流实现,您将能够实现您想要的目标。

如果可以从原始数据创建流Source,那么可以使用Sink.actorRefmediator创建Sink,并使用Flow.throttle控制流向中介的速度。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47745256

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档