首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在平面地图中停止一些流

如何在平面地图中停止一些流
EN

Stack Overflow用户
提问于 2017-08-07 14:39:30
回答 1查看 533关注 0票数 0

我正在开发一个文件下载器,我正在构建我的流,大致如下

代码语言:javascript
复制
someobservable          ----------------------------  this stream can generate 1000 of                                                                                     
                                                         downloadable urls
   .flatmap(urltodownload -> {
           downloadStream(value); ---------------------  this steam can pause resume and 
                                                         maybe cancel
   }.observeOn(AndroidScheduler.mainthread())
   .subcribe();

如何在flatmap中暂停或取消某些可观察到的创建

EN

回答 1

Stack Overflow用户

发布于 2017-08-07 16:36:39

我有一个不优雅的解决方案。我使用CombineLast通过一个dispose Disposable来停止任务的Observable。请参见代码:

代码语言:javascript
复制
package xdean.stackoverflow.rx;

import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.Subject;

import java.util.List;
import java.util.Random;
import java.util.function.IntPredicate;

import xdean.jex.extra.Pair;

public class Q45540738 {
  static Random random = new Random();

  public static void main(String[] args) throws InterruptedException {
    Single<List<Pair<Integer, Disposable>>> tasks = Observable.range(0, 50)
        .map(i -> Pair.of(i, download(i).subscribe()))
        .toList();
    Subject<IntPredicate> stopers = BehaviorSubject.createDefault(i -> true);
    Observable.combineLatest(tasks.toObservable(), stopers, (l, s) -> {
      l.removeIf(p -> {
        if (s.test(p.getLeft()) == false) {
          p.getRight().dispose();
        }
        return p.getRight().isDisposed();
      });
      return l;
    }).subscribe();
    Thread.sleep(2000);
    stopers.onNext(i -> i % 2 == 0);
    Thread.sleep(10000);
  }

  static Observable<Integer> download(int id) {
    return Observable.just(random.nextInt(1000) + 500)
        .observeOn(Schedulers.computation())
        .doOnNext(t -> Thread.sleep(t))
        .doOnDispose(() -> System.err.printf("%d task stoped on %s\n", id, Thread.currentThread()))
        .doOnNext(t -> System.out.printf("%d task done on %s\n", id, Thread.currentThread()));
  }
}

和输出:

代码语言:javascript
复制
0 task done on Thread[RxComputationThreadPool-1,5,main]
5 task done on Thread[RxComputationThreadPool-6,5,main]
6 task done on Thread[RxComputationThreadPool-7,5,main]
2 task done on Thread[RxComputationThreadPool-3,5,main]
1 task done on Thread[RxComputationThreadPool-2,5,main]
3 task done on Thread[RxComputationThreadPool-4,5,main]
4 task done on Thread[RxComputationThreadPool-5,5,main]
7 task done on Thread[RxComputationThreadPool-8,5,main]
8 task done on Thread[RxComputationThreadPool-1,5,main]
9 task stoped on Thread[main,5,main]
11 task stoped on Thread[main,5,main]
13 task stoped on Thread[main,5,main]
14 task done on Thread[RxComputationThreadPool-7,5,main]
15 task stoped on Thread[main,5,main]
17 task stoped on Thread[main,5,main]
19 task stoped on Thread[main,5,main]
21 task stoped on Thread[main,5,main]
23 task stoped on Thread[main,5,main]
25 task stoped on Thread[main,5,main]
27 task stoped on Thread[main,5,main]
29 task stoped on Thread[main,5,main]
31 task stoped on Thread[main,5,main]
33 task stoped on Thread[main,5,main]
35 task stoped on Thread[main,5,main]
37 task stoped on Thread[main,5,main]
39 task stoped on Thread[main,5,main]
41 task stoped on Thread[main,5,main]
43 task stoped on Thread[main,5,main]
45 task stoped on Thread[main,5,main]
47 task stoped on Thread[main,5,main]
49 task stoped on Thread[main,5,main]
10 task done on Thread[RxComputationThreadPool-3,5,main]
16 task done on Thread[RxComputationThreadPool-1,5,main]
12 task done on Thread[RxComputationThreadPool-5,5,main]
22 task done on Thread[RxComputationThreadPool-7,5,main]
24 task done on Thread[RxComputationThreadPool-1,5,main]
18 task done on Thread[RxComputationThreadPool-3,5,main]
30 task done on Thread[RxComputationThreadPool-7,5,main]
20 task done on Thread[RxComputationThreadPool-5,5,main]
32 task done on Thread[RxComputationThreadPool-1,5,main]
26 task done on Thread[RxComputationThreadPool-3,5,main]
40 task done on Thread[RxComputationThreadPool-1,5,main]
38 task done on Thread[RxComputationThreadPool-7,5,main]
34 task done on Thread[RxComputationThreadPool-3,5,main]
28 task done on Thread[RxComputationThreadPool-5,5,main]
48 task done on Thread[RxComputationThreadPool-1,5,main]
42 task done on Thread[RxComputationThreadPool-3,5,main]
46 task done on Thread[RxComputationThreadPool-7,5,main]
36 task done on Thread[RxComputationThreadPool-5,5,main]
44 task done on Thread[RxComputationThreadPool-5,5,main]
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45540738

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档