首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >避免使用RxJS5可观测到的递归

避免使用RxJS5可观测到的递归
EN

Stack Overflow用户
提问于 2016-12-29 21:45:46
回答 1查看 587关注 0票数 3

好的,我想避免使用可观察的递归,使用外部和内部事件的组合,而不是回忆相同的方法/函数。

现在我有一个:

代码语言:javascript
复制
Queue.prototype.deq = function (opts) {

    opts = opts || {};

    const noOfLines = opts.noOfLines || opts.count || 1;
    const isConnect = opts.isConnect !== false;

    let $dequeue = this.init()
        .flatMap(() => {
            return acquireLock(this)
                .flatMap(obj => {
                    if(obj.error){

                    // if there is an error acquiring the lock we
                    // retry after 100 ms, which means using recursion
                    // because we call "this.deq()" again

                        return Rx.Observable.timer(100)
                            .flatMap(() => this.deq(opts));
                    }
                    else{
                        return makeGenericObservable()
                          .map(() => obj);
                    }
                })

        })
        .flatMap(obj => {
            return removeOneLine(this)
                .map(l => ({l: l, id: obj.id}))
        })
        .flatMap(obj => {
            return releaseLock(this, obj.id)
                .map(() => obj.l)
        })
        .catch(e => {
            console.error(e.stack || e);
            return releaseLock(this);
        });

    if (isConnect) {
        $dequeue = $dequeue.publish();
        $dequeue.connect();
    }

    return $dequeue;

};

与上面使用递归(希望是正确的)不同,我想使用一种更公平的方法。如果有一个从acquireLock函数传回的错误,我想每100‘s重试一次,一旦它成功了,我就不知道该如何做,而且我很难测试it....is,对吧?

代码语言:javascript
复制
Queue.prototype.deq = function (opts) {

    // ....

    let $dequeue = this.init()
        .flatMap(() => {
            return acquireLock(this)
                .flatMap(obj => {
                    if(obj.error){
                        return Rx.Observable.interval(100)
                            .takeUntil(
                                acquireLock(this)
                                .filter(obj => !obj.error)
                            )
                    }
                    else{

                        // this is just an "empty" observable
                        // which immediately fires onNext()

                        return makeGenericObservable()
                              .map(() => obj);
                    }
                })

        })

     // ...

    return $dequeue;

};

有什么办法能让这句话更简洁吗?我也只想重试5次左右。我的主要问题是--如何在间隔旁边创建一个计数,以便每100 ms重试一次,直到获得锁或计数达到5为止?

我需要这样的东西:

代码语言:javascript
复制
.takeUntil(this or that)

也许我可以简单地连锁takeUntils,如下所示:

代码语言:javascript
复制
                   return Rx.Observable.interval(100)
                    .takeUntil(
                        acquireLock(this)
                        .filter(obj => !obj.error)
                    )
                    .takeUntil(++count < 5);

我可以这么做:

代码语言:javascript
复制
                return Rx.Observable.interval(100)
                    .takeUntil(
                        acquireLock(this)
                        .filter(obj => !obj.error)
                    )
                    .takeUntil( Rx.Observable.timer(500));

但可能是在寻找一些不那么疯狂的东西

但是我不知道在哪里存储/跟踪count变量.

此外,还希望使它更简洁,并可能检查它的正确性。

我不得不说,如果这个东西有效的话,它是非常强大的编码结构。

EN

回答 1

Stack Overflow用户

发布于 2016-12-30 08:13:09

有两个操作符可以帮助您:重试retryWhen。两个都在源上重新订阅,可以观察到,因此重试失败的操作。

在这个示例中,我们可以观察到在第一次count订阅上失败的情况:

代码语言:javascript
复制
let getObs = (count) => {
  return Rx.Observable.create((subs) => {
    console.log('Subscription count = ', count);

    if(count) {
      count--;
      subs.error("ERROR");
    } else {
      subs.next("SUCCESS");
      subs.complete();
    }
  
    return () => {};
  });
};

getObs(2).subscribe(console.log, console.log);
getObs(2).retry(2).subscribe(console.log, console.log);
getObs(3).retry(2).subscribe(console.log, console.log);
代码语言:javascript
复制
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

如你所见:

  • 如果我们这样称呼它,它就会失败。
  • 有了retry,我们可以,嗯,重试几次,пуе成功响应
  • 如果可观察到的失败太多,retry将放弃并沿链传播错误。

您实际上需要的是retryWhen,因为retry试图立即再次执行操作。

代码语言:javascript
复制
let getObs = (count) => {
  return Rx.Observable.create((subs) => {
    if(count) {
      count--;
      subs.error("ERROR");
    } else {
      subs.next("SUCCESS");
      subs.complete();
    }
  
    return () => {};
  });
};

getObs(2).retryWhen(errors => errors.delay(100))
  .subscribe(console.log, console.log);
getObs(4).retryWhen(errors => errors.delay(100))
  .subscribe(console.log, console.log);
代码语言:javascript
复制
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

retryWhen中添加延迟很容易,但在多次尝试之后强迫它失败是很困难的:

代码语言:javascript
复制
let getObs = (count) => {
  return Rx.Observable.create((subs) => {
    if(count) {
      count--;
      subs.error("ERROR");
    } else {
      subs.next("SUCCESS");
      subs.complete();
    }
  
    return () => {};
  });
};

getObs(2)
  .retryWhen(errors => {
    return errors.delay(100).scan((errorCount, err) => {
      if(!errorCount) {
        throw err;
      }
      return --errorCount;
    }, 2);
  })
  .subscribe(console.log, console.log);

getObs(4)
  .retryWhen(errors => {
    return errors.delay(100).scan((errorCount, err) => {
      if(!errorCount) {
        throw err;
      }
      return --errorCount;
    }, 2);
  })
  .subscribe(console.log, console.log);
代码语言:javascript
复制
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

最后,两次重试都会引发错误,因此在获取锁时需要这样做:

代码语言:javascript
复制
    .flatMap(() => {
        return acquireLock(this)
            .switchMap(obj => {
              if(obj.error) {
                return Rx.Observable.throw(obj.error);
              } else {
                Rx.Observable.of(obj);
              }
            })
            .retryWhen(...)
    })
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41387633

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档