我正在试着做一个有背压的可流动的。我的想法是,在当前项之一完成处理之前,不会发出可流动对象的新项。我使用ResourceSubscriber和subscribeWith()方法来实现这一点。
flowable的每个元素都在单独的线程池中进行异步处理。(这是通过使用flatMap/subscribeOn实现的)
我希望秒之后的每个元素都会在调用subscriber的onNext方法之后发出。但是,当我尝试运行这段代码时,可流动的代码无法控制地发出元素。背压不起作用。
下面是重现该问题的代码:
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.ResourceSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class RxTest2 {
private static final Logger log = LoggerFactory.getLogger(RxTest.class);
static AtomicInteger integer = new AtomicInteger();
public static void main(String[] args) {
Flowable.generate(emitter -> {
final int i1 = integer.incrementAndGet();
if (i1 >= 20) {
Thread.sleep(10000);
System.exit(0);
}
emitter.onNext(i1);
})
.doOnNext(i -> log.info("Published: " + i))
.flatMap(i -> Flowable.defer(() -> {
log.info("Starting consuming {}", i);
Thread.sleep(100);
log.info("Finished consuming {}", i);
return Flowable.just(i);
}).subscribeOn(Schedulers.computation()))
.doOnNext(i -> log.info("Consuming finished, result: " + i))
.subscribeWith(new BackpressureSubscriber(2));
}
}
class BackpressureSubscriber extends ResourceSubscriber<Object> {
private static final Logger log = LoggerFactory.getLogger(BackpressureSubscriber.class);
private final long initialRequest;
public BackpressureSubscriber(final long initialRequest) {
this.initialRequest = initialRequest;
}
@Override
protected void onStart() {
super.onStart();
log.info("Starting execution with {} initial requests", initialRequest);
request(initialRequest);
}
@Override
public void onNext(final Object message) {
log.info("On next for {}", message);
request(1);
}
@Override
public void onError(final Throwable throwable) {
log.error("Unhandled error: ", throwable);
}
@Override
public void onComplete() {
log.info("On Complete");
}
}预期输出类似于:
[main] INFO RxTest - Published: 1
[main] INFO RxTest - Published: 2
[RxComputationThreadPool-1] INFO RxTest - Starting consuming 1
[RxComputationThreadPool-1] INFO RxTest - Finished consuming 1
[RxComputationThreadPool-2] INFO RxTest - Starting consuming 2
[RxComputationThreadPool-1] INFO RxTest - On next for 1
[main] INFO RxTest - Published: 3
[RxComputationThreadPool-1] INFO RxTest - Finished consuming 2实际输出:
11:30:32.166 [main] INFO BackpressureSubscriber - Starting execution with 2 initial requests
11:30:32.170 [main] INFO RxTest - Published: 1
11:30:32.189 [main] INFO RxTest - Published: 2
11:30:32.189 [RxComputationThreadPool-1] INFO RxTest - Starting consuming 1
11:30:32.189 [RxComputationThreadPool-2] INFO RxTest - Starting consuming 2
11:30:32.189 [main] INFO RxTest - Published: 3
11:30:32.190 [main] INFO RxTest - Published: 4
11:30:32.190 [RxComputationThreadPool-3] INFO RxTest - Starting consuming 3
11:30:32.190 [main] INFO RxTest - Published: 5
11:30:32.190 [RxComputationThreadPool-4] INFO RxTest - Starting consuming 4
11:30:32.190 [main] INFO RxTest - Published: 6
11:30:32.190 [RxComputationThreadPool-5] INFO RxTest - Starting consuming 5
11:30:32.190 [main] INFO RxTest - Published: 7
11:30:32.191 [RxComputationThreadPool-6] INFO RxTest - Starting consuming 6
11:30:32.191 [main] INFO RxTest - Published: 8
11:30:32.191 [RxComputationThreadPool-7] INFO RxTest - Starting consuming 7
11:30:32.191 [main] INFO RxTest - Published: 9
11:30:32.191 [RxComputationThreadPool-8] INFO RxTest - Starting consuming 8
11:30:32.191 [main] INFO RxTest - Published: 10
11:30:32.191 [RxComputationThreadPool-9] INFO RxTest - Starting consuming 9
11:30:32.191 [main] INFO RxTest - Published: 11
11:30:32.191 [RxComputationThreadPool-10] INFO RxTest - Starting consuming 10
11:30:32.192 [main] INFO RxTest - Published: 12
11:30:32.192 [RxComputationThreadPool-11] INFO RxTest - Starting consuming 11
11:30:32.192 [main] INFO RxTest - Published: 13
11:30:32.192 [main] INFO RxTest - Published: 14
11:30:32.192 [RxComputationThreadPool-12] INFO RxTest - Starting consuming 12
11:30:32.192 [main] INFO RxTest - Published: 15
11:30:32.192 [main] INFO RxTest - Published: 16
11:30:32.192 [main] INFO RxTest - Published: 17
11:30:32.192 [main] INFO RxTest - Published: 18
11:30:32.192 [main] INFO RxTest - Published: 19
11:30:32.294 [RxComputationThreadPool-2] INFO RxTest - Finished consuming 2
11:30:32.294 [RxComputationThreadPool-1] INFO RxTest - Finished consuming 1
11:30:32.294 [RxComputationThreadPool-1] INFO RxTest - Consuming finished, result: 1
11:30:32.294 [RxComputationThreadPool-1] INFO BackpressureSubscriber - On next for 1在库版本上测试:
2.2.19 2.1.2
就我对ReactiveX文档的理解而言,我认为它是RX。然而,我可能错了,如果你指出,我将不胜感激
发布于 2020-06-25 13:47:00
flatMap实际上是成批地从上游请求,并将缓存项目,直到下游请求它们。这一事实足以描述你所看到的行为。如果您将bufferSize设置为1,您可能会看到预期的行为。有一个重载可以让你设置bufferSize。
另外,flatMap有一个更容易理解的maxConcurrent参数,如果你意识到flatMap实际上是一个map,那么merge就会被应用到map给出的流中。实际上,merge一次只能订阅有限数量的源,这就是maxConcurrent。bufferSize和maxConcurrent的默认值为128。
请记住,当合并步骤接收到来自下游的请求时,它不知道需要订阅多少个流(请记住,我们在这里处理的是流)才能满足请求!前10个流根本不能返回值。如果第一个流没有返回任何内容,并且在一个小时内没有完成,并且我们有maxConcurrent=1,那么在第一个小时内我们将根本不会收到任何事件,即使流2和流3已经准备好向我们发送数据。由于这些原因,我们不得不为bufferSize和maxConcurrent选择通用的默认值,这些值通常会在某些基准测试情况下优化性能,并在许多边缘情况下最小化问题。
https://stackoverflow.com/questions/62566721
复制相似问题