首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用反应性流的基于反应拉的BackPressure

使用反应性流的基于反应拉的BackPressure
EN

Stack Overflow用户
提问于 2017-10-17 09:51:35
回答 1查看 635关注 0票数 2

这就是我对这个问题的理解。

有发行者和订阅者。

发布服务器和订阅服务器的伪代码类似于,

代码语言:javascript
复制
Publisher{
    Subscriber s;
    subscribe(Subscriber s){
        this.s = s;
        s.onSubscribe(new Subscription(){
            onRequest(int n){
                List<Message> messages = service.getMessages(n);
                s.onNext(messages);
            }

            onCancel(){
                s.onComplete();
            }
        });
    }
}

Subscriber{
    Subscription s;
    onSubscribe(Subscription s){
        this.s = s;
        s.request(5);
    }

    onNext(List<Message> messages){
        messages.stream().parallel().map(this::process).collect(toList());
        s.request(5);
    }

    onComplete(){}

    onError(e){}

    private boolean process(Message m){
        //process message and return true/false according to whether it passed/failed.
    }
}

我的理解是,根据应用程序的能力,订阅者将调用请求。当应用程序处于健康状态时,订阅服务器可以更快地处理并请求更多的消息。如果应用程序处于加载状态,订阅服务器将在处理当前批处理后才请求下一批处理。如果处理需要时间,则请求更多消息的次数较少。消息流将根据应用程序的能力而定。

我的理解正确吗?

它与简单的循环有什么不同?

代码语言:javascript
复制
while(true){
    List<Message> messages = service.getMessages(5);
    messages.stream().parallel().map(this:process).collect(toList());
}

在这种情况下,下一批消息在并发处理消息之后才会被读取。在这种情况下,当应用程序运行良好时,将读取更多消息。如果慢消息的读取频率会降低。

这两种方法有何不同?是否所有的不同类型的调度程序是可用的?我不明白,这里的好处到底是什么?

更新1

好的,我理解基于反应拉力的方法相对于简单循环的一些优点。

如果订阅服务器请求n项,那么发布服务器是否需要在订阅服务器上调用onNext() n次?或者,如果发布服务器使用n个元素列表(如前一个片段中的内容)调用订阅服务器一次,那么也可以吗?如果需要进行n个onNext()调用,订户就会变得更加复杂。

代码语言:javascript
复制
Publisher{
    Subscriber s;
    subscribe(Subscriber s){
        this.s = s;
        s.onSubscribe(new Subscription(){
            request(int n){
                service.getMessagesAsyc(n, (List<Message> messages) -> messages.stream().forEach(s::onNext));
            }

            onCancel(){
                s.onComplete();
            }
        });
    }
}

Subscriber{
    Subscription s;
    COUNT = 5;
    volatile int i = COUNT;

    onSubscribe(Subscription s){
        this.s = s;
        s.request(COUNT);
    }

    onNext(Message message){
        CompletableFuture.runAsync(() -> process(message));
        requestMessagesIfNeeded();
    }

    private synchronized requestmessagesIfNeeded(){
        if(0 == i--){
            i = COUNT;
            s.request(COUNT);
        }
    }

    private boolean process(Message m){
        //process message and return true/false according to whether it passed/failed.
    }
}

如果可以向订阅者传递n条消息列表,则也没有其他优势。假设订阅者只需要确认成功处理的消息,那么在使用批处理应答API的第一种方法中很容易做到这一点。

代码语言:javascript
复制
    Map<Boolean, List<Message>> partitioned = messages.stream().parallel().collect(partitioningBy(this::process));
service.ackowledge(partitioned.get(true));
s.request(5);   

第二种方法,在onNext()上各得到一条消息,看起来要实现它要困难得多。

EN

回答 1

Stack Overflow用户

发布于 2017-10-17 10:07:31

onRequest(int ){列表消息= service.getMessages(n);s.onNext(消息);}

这是对反应流的错误看法。request告诉Publisher它可以执行n onNext()调用。这通常意味着表示源和使用者之间的活动连接的Subscription实现将处理request调用。

它与简单的循环有什么不同?

反应性流允许非阻塞消耗;您的示例阻塞一个线程,直到getMessages()可以撤回消息的List。使用Publisher的优点是,无论源Publisher是阻塞还是非阻塞,您都不必以不同的方式处理事件。给出了两种情况下的统一规划模型。如果有一天getMessages()收到一个非阻塞的变体,它的包装器Publisher的下游消费就不需要改变了。

是否所有的不同类型的调度程序是可用的?

Scheduler表示对异步边界的抽象:生成事件的线程和消耗这些事件的线程。不同的Scheduler实现以不同的方式管理它们的线程,这取决于这些线程的使用情况。一些线程将执行计算密集型任务,一些线程将由于与非反应性源的交互而阻塞。调度器类给出了各种标准实现的描述。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46787315

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档