我正在学习使用Vert.x的Rx-Java2 2,我想用一些并行任务链接一个成功的配置检索。
我创建了一个搜索配置并返回单个订阅的方法,它运行良好。但我怀疑接下来的任务在哪里和如何进行:
public void start(Future<Void> startFuture) throws Exception {
Single<JsonObject> configSingle = prepareConfigurationAsync();
configSingle.subscribe(onSuccess -> {
System.out.println(onSuccess);
--> Single<Boolean> task1 = prepareLongAsyncTask1(onSuccess).subscribe(...);
--> Completable task2 = prepareLongAsyncTask2(onSuccess)..subscribe(...);
}, onError -> {
startFuture.fail(onError);
}));我所做的似乎是在工作,但没有并行性。我怎样才能做到这一点?
我应该如何和在哪里处理这些订阅?
发布于 2018-04-03 13:31:21
继续使用其他源通常是通过flatMap完成的。并行工作通常是用zip或merge完成的。在您的例子中,我认为您不需要内部Single的值作为输出的一部分,所以您可以尝试如下:
Completable config = prepareConfigurationAsync()
.flatMapCompletable(success ->
System.out.println(success);
return Completable.mergeArray (
prepareLongAsyncTask1(success)
.doOnSuccess(innerSuccess -> /* ... */)
.toCompletable(),
prepareLongAsyncTask2(success)
.doOnComplete(() -> /* ... */)
)
);
config
.subscribe( () -> /* completed */, error -> /* error'd */);https://stackoverflow.com/questions/49630327
复制相似问题