首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RxJava2并行下载

RxJava2并行下载
EN

Stack Overflow用户
提问于 2018-07-01 08:11:48
回答 2查看 265关注 0票数 0

嗨,我正在尝试使用rxjava2并行下载文件,但是下面的代码是不工作的:

代码语言:javascript
复制
private Flowable<BinaryDownlodable> downloadFile(List<BinaryDownlodable> binaryDownlodables) throws IOException {
    return Flowable.fromIterable(binaryDownlodables)
        .flatMap((BinaryDownlodable downlodable) -> {
          return Flowable.fromCallable(new Callable<BinaryDownlodable>() {

            @Override
            public BinaryDownlodable call() throws Exception {
              System.out.println("Starting: " + downlodable.remote());
              final Request request = new Request.Builder()
                  .cacheControl(CacheControl.FORCE_NETWORK)
                  .url(downlodable.remote())
                  .build();
              final Response response = okHttpClient.newCall(request).execute();
              final InputStream inputStream = response.body().byteStream();
              final File newFile = new File(downlodable.local());
              final byte[] buff = new byte[4096];
              long downloaded = 0;
              final long target = response.body().contentLength();
              final String totalSize = FileUtils.readableFileSize(target);
              try (OutputStream outStream = new FileOutputStream(newFile)) {
                while (true) {
                  int read = inputStream.read(buff);
                  if (read == -1) {
                    break;
                  }
                  outStream.write(buff, 0, read);
                  //write buff
                  downloaded += read;
                }
                updateDbItem(downlodable, downloaded, target);
              }
              return downlodable;
            }
          });
        }, 3);
  }

BinaryDownloadable.java

代码语言:javascript
复制
public final class BinaryDownlodable implements Downloable {



  private String urlLocal;
  private String urlRemote;
  private boolean completed;
  private Object item;

  public BinaryDownlodable(String urlLocal, String urlRemote) {
    this.urlLocal = urlLocal;
    this.urlRemote = urlRemote;
  }



  public Object getItem() {
    return item;
  }

  public void setItem(Object item) {
    this.item = item;
  }

  @Override
  public String local() {
    return urlLocal;
  }

  @Override
  public String remote() {
    return urlRemote;
  }

  @Override
  public void completed(boolean completed) {
    this.completed = completed;
  }

  public String getUrlLocal() {
    return urlLocal;
  }

  public String getUrlRemote() {
    return urlRemote;
  }

  public boolean isCompleted() {
    return completed;
  }



  @Override
  public String toString() {
    return "BinaryDownlodable{" +
        "urlLocal='" + urlLocal + '\'' +
        ", urlRemote='" + urlRemote + '\'' +
        '}';
  }
}

我就是这样打电话的:

代码语言:javascript
复制
downloadFile(binaryDownlodables)
                  .subscribeOn(Schedulers.io())
                  .observeOn(AndroidSchedulers.mainThread())
                  .subscribe(new Consumer<BinaryDownlodable>() {
                    @Override
                    public void accept(BinaryDownlodable binaryDownlodable) throws Exception {
                      System.out.println("Accepted");
                    }
                  });
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-07-01 08:22:34

很抱歉,在这里不耐烦是怎么解决的,我忘了添加.subscribeOn(Schedulers.io())

代码语言:javascript
复制
private Flowable<BinaryDownlodable> downloadFile(List<BinaryDownlodable> binaryDownlodables) throws IOException {
    return Flowable.fromIterable(binaryDownlodables)
        .flatMap((BinaryDownlodable downlodable) -> {
          return Flowable.fromCallable(new Callable<BinaryDownlodable>() {

            @Override
            public BinaryDownlodable call() throws Exception {
              System.out.println("Starting: " + downlodable.remote());
              final Request request = new Request.Builder()
                  .cacheControl(CacheControl.FORCE_NETWORK)
                  .url(downlodable.remote())
                  .build();
              final Response response = okHttpClient.newCall(request).execute();
              final InputStream inputStream = response.body().byteStream();
              final File newFile = new File(downlodable.local());
              final byte[] buff = new byte[4096];
              long downloaded = 0;
              final long target = response.body().contentLength();
              final String totalSize = FileUtils.readableFileSize(target);
              try (OutputStream outStream = new FileOutputStream(newFile)) {
                while (true) {
                  int read = inputStream.read(buff);
                  if (read == -1) {
                    break;
                  }
                  outStream.write(buff, 0, read);
                  //write buff
                  downloaded += read;
                }
                updateDbItem(downlodable, downloaded, target);
              }
              return downlodable;
            }
          })
              .retryWhen(new RetryWithDelay(5, 3000))
              .subscribeOn(Schedulers.io());
        }, 3);
  }
票数 0
EN

Stack Overflow用户

发布于 2018-07-01 08:33:08

从我的评论中发帖,因为这对你有帮助,

试着给Flowable.fromCallable(...).subscribeOn(Schedulers.newThread())。我认为Flowables中的所有flatMap都是在后台IO线程中依次执行的。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51121532

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档