首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RxJava collect() & takeUntil()

RxJava collect() & takeUntil()
EN

Stack Overflow用户
提问于 2015-07-12 10:17:26
回答 2查看 8K关注 0票数 4

我有一个未知大小的用户列表。我想要的是查询前30并更新UI。然后,我想用步骤100的偏移量来查询所有其他用户,直到我得到最后一包用户--我应该在这里使用takeUntil吗?)当我得到-我通过添加剩余的用户来更新UI (结合reduce(),我相信)。

这是我的密码:

代码语言:javascript
复制
final int INITIAL_OFFSET = 0;
final int INITIAL_LIMIT = 30;
// Loading first 30 users to immediately update UI (better UX)
getServerApi().getAllFriends(userId, "photo_50", INITIAL_OFFSET, INITIAL_LIMIT)
        // Loading remaining users 100 by 100 and updating UI after all users been loaded
        .flatMap(users -> {
            AtomicInteger newOffset = new AtomicInteger(INITIAL_LIMIT);
            return Observable.just(users)
                    .flatMap(users1 -> getServerApi().getAllFriends(userId, "photo_50", newOffset.get(), Config.DEFAULT_FRIEND_REQUEST_COUNT))
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.io())
                    .collect(() -> new ArrayList<User>(), (b, s) -> {
                        b.addAll(s);
                        newOffset.set(newOffset.get() + Config.DEFAULT_FRIEND_REQUEST_COUNT);
                    })
                    .repeat()
                    .takeUntil(friends -> friends.size() == 0);
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(users -> getView().appendAllFriends(users),
                throwable -> getView().setError(processFail(throwable, ServerApi.Action.GET_ALL_FRIENDS), false));

但是似乎我做错了什么,因为每次进行更新调用时都会调用onNext。

EN

回答 2

Stack Overflow用户

发布于 2015-07-12 21:28:12

回答我自己的问题。阿黛尔的回答很好,但我需要一个单一的订阅(我使用的是核MVP文库),我想使用collect()和takeUntil()而不是while循环(后者需要阻塞修改接口方法)。

花了几个小时终于得到了它:

代码语言:javascript
复制
final int INITIAL_LIMIT = 30;
// Loading first 30 users to immediately update UI (better UX)
getServerApi().getAllFriends(userId, "photo_50", null, INITIAL_LIMIT)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        // Updating UI 1st time or show error
        .doOnNext(users -> getView().appendAllFriends(users))
        .doOnError(throwable -> getView().setError(processFail(throwable, ServerApi.Action.GET_ALL_FRIENDS), false))
        // Loading remaining users 100 by 100 and updating UI after all users been loaded
        .flatMap(users -> {
            AtomicInteger newOffset = new AtomicInteger(INITIAL_LIMIT);
            ArrayList<User> remainingUsers = new ArrayList<>();
            AtomicBoolean hasMore = new AtomicBoolean(true);
            return Observable.just(users)
                    .flatMap(users1 -> getServerApi().getAllFriends(userId, "photo_50", newOffset.get(), Config.DEFAULT_FRIEND_REQUEST_COUNT))
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .collect(() -> remainingUsers, (b, s) -> {
                        // Needed for takeUntil
                        hasMore.set(b.addAll(s));
                        newOffset.set(newOffset.get() + Config.DEFAULT_FRIEND_REQUEST_COUNT);
                    })
                    .repeat()
                    .takeUntil(friends -> !hasMore.get())
                    // Grab all items emitted by collect()
                    .last()
                    // Updating UI last time
                    .doOnNext(users2 -> getView().appendAllFriends(users2));
        })
        .subscribe();

也许这对使用的其他人是有用的。

票数 3
EN

Stack Overflow用户

发布于 2015-07-12 19:08:27

代码语言:javascript
复制
// cache() will ensure that we load the first pack only once
Observable<Users> firstPack = firstPack().cache();

// this subscription is for updating the UI on the first batch
firstPack
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(x -> draw(x), e -> whoops(e));

// this subscription is for collecting all the stuff
// do whatever tricks you need to do with your backend API to get the full list of stuff
firstPack
  .flatMap(fp -> rest(fp))
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(x -> allUsers(x), e -> whoops(e));

// I would do this in a simple while loop
Observable<List<User>> rest(List<User> firstPack) {
  return Observable.create(sub -> {
    final List<User> total = firstPack;
    try {
      while (!sub.isUnsubscribed()) {
        final List<User> friends = api.getFriendsBlocking(total.size());
        if (friends.isEmpty()) {
          sub.onNext(total);
          sub.onCompleted();
        } else {
          total.addAll(friends);
        }
      }
    } catch(IOException e) {
      sub.onError(e);
    }
  })
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31366699

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档