嗨,我正在尝试使用rxjava2并行下载文件,但是下面的代码是不工作的:
private Flowable<BinaryDownlodable> downloadFile(List<BinaryDownlodable> binaryDownlodables) throws IOException {
return Flowable.fromIterable(binaryDownlodables)
.flatMap((BinaryDownlodable downlodable) -> {
return Flowable.fromCallable(new Callable<BinaryDownlodable>() {
@Override
public BinaryDownlodable call() throws Exception {
System.out.println("Starting: " + downlodable.remote());
final Request request = new Request.Builder()
.cacheControl(CacheControl.FORCE_NETWORK)
.url(downlodable.remote())
.build();
final Response response = okHttpClient.newCall(request).execute();
final InputStream inputStream = response.body().byteStream();
final File newFile = new File(downlodable.local());
final byte[] buff = new byte[4096];
long downloaded = 0;
final long target = response.body().contentLength();
final String totalSize = FileUtils.readableFileSize(target);
try (OutputStream outStream = new FileOutputStream(newFile)) {
while (true) {
int read = inputStream.read(buff);
if (read == -1) {
break;
}
outStream.write(buff, 0, read);
//write buff
downloaded += read;
}
updateDbItem(downlodable, downloaded, target);
}
return downlodable;
}
});
}, 3);
}BinaryDownloadable.java
public final class BinaryDownlodable implements Downloable {
private String urlLocal;
private String urlRemote;
private boolean completed;
private Object item;
public BinaryDownlodable(String urlLocal, String urlRemote) {
this.urlLocal = urlLocal;
this.urlRemote = urlRemote;
}
public Object getItem() {
return item;
}
public void setItem(Object item) {
this.item = item;
}
@Override
public String local() {
return urlLocal;
}
@Override
public String remote() {
return urlRemote;
}
@Override
public void completed(boolean completed) {
this.completed = completed;
}
public String getUrlLocal() {
return urlLocal;
}
public String getUrlRemote() {
return urlRemote;
}
public boolean isCompleted() {
return completed;
}
@Override
public String toString() {
return "BinaryDownlodable{" +
"urlLocal='" + urlLocal + '\'' +
", urlRemote='" + urlRemote + '\'' +
'}';
}
}我就是这样打电话的:
downloadFile(binaryDownlodables)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<BinaryDownlodable>() {
@Override
public void accept(BinaryDownlodable binaryDownlodable) throws Exception {
System.out.println("Accepted");
}
});发布于 2018-07-01 08:22:34
很抱歉,在这里不耐烦是怎么解决的,我忘了添加.subscribeOn(Schedulers.io())
private Flowable<BinaryDownlodable> downloadFile(List<BinaryDownlodable> binaryDownlodables) throws IOException {
return Flowable.fromIterable(binaryDownlodables)
.flatMap((BinaryDownlodable downlodable) -> {
return Flowable.fromCallable(new Callable<BinaryDownlodable>() {
@Override
public BinaryDownlodable call() throws Exception {
System.out.println("Starting: " + downlodable.remote());
final Request request = new Request.Builder()
.cacheControl(CacheControl.FORCE_NETWORK)
.url(downlodable.remote())
.build();
final Response response = okHttpClient.newCall(request).execute();
final InputStream inputStream = response.body().byteStream();
final File newFile = new File(downlodable.local());
final byte[] buff = new byte[4096];
long downloaded = 0;
final long target = response.body().contentLength();
final String totalSize = FileUtils.readableFileSize(target);
try (OutputStream outStream = new FileOutputStream(newFile)) {
while (true) {
int read = inputStream.read(buff);
if (read == -1) {
break;
}
outStream.write(buff, 0, read);
//write buff
downloaded += read;
}
updateDbItem(downlodable, downloaded, target);
}
return downlodable;
}
})
.retryWhen(new RetryWithDelay(5, 3000))
.subscribeOn(Schedulers.io());
}, 3);
}发布于 2018-07-01 08:33:08
从我的评论中发帖,因为这对你有帮助,
试着给Flowable.fromCallable(...).subscribeOn(Schedulers.newThread())。我认为Flowables中的所有flatMap都是在后台IO线程中依次执行的。
https://stackoverflow.com/questions/51121532
复制相似问题