首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >rxjs5合并和错误处理

rxjs5合并和错误处理
EN

Stack Overflow用户
提问于 2017-08-17 14:57:37
回答 3查看 3.4K关注 0票数 6

我希望组合/合并多个可观测值,当它们完成时,执行一个finally函数。merge操作符似乎并行执行每个订阅,这正是我所需要的,但是如果其中任何一个抛出错误,执行就会停止。

RxJS version 4有一个操作符mergeDelayError,它应该让所有订阅都执行,直到它们全部完成,但是这个操作符不是在版本5中实现的。

我应该换个接线员吗?

代码语言:javascript
复制
var source1 = Rx.Observable.of(1,2,3).delay(3000);
var source2 = Rx.Observable.throw(new Error('woops'));
var source3 = Rx.Observable.of(4,5,6).delay(1000);

// Combine the 3 sources into 1 

var source = Rx.Observable
  .merge(source1, source2, source3)
  .finally(() => {

    // finally is executed before all 
    // subscriptions are completed.

    console.log('finally');

  }); 

var subscription = source.subscribe(
  x => console.log('next:', x),
  e => console.log('error:', e),
  () => console.log('completed'));

JSBin

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2017-08-17 15:01:26

我认为您可以使用catch()模拟相同的行为。您只需将其附加到每个可以观察到的源中:

代码语言:javascript
复制
const sources = [source1, source2, source3].map(obs => 
  obs.catch(() => Observable.empty())
);

Rx.Observable
  .merge(sources)
  .finally(...)
  ...
票数 2
EN

Stack Overflow用户

发布于 2018-02-28 23:13:42

我们可以通过收集错误并在最后发出错误来避免阻塞流。

代码语言:javascript
复制
function mergeDelayError(...sources) {
  const errors = [];
  const catching = sources.map(obs => obs.catch(e => {
    errors.push(e);
    return Rx.Observable.empty();
  }));
  return Rx.Observable
    .merge(...catching)
    .concat(Rx.Observable.defer(
      () => errors.length === 0 ? Rx.Observable.empty() : Rx.Observable.throw(errors)));
}


const source1 = Rx.Observable.of(1,2,3);
const source2 = Rx.Observable.throw(new Error('woops'));
const source3 = Rx.Observable.of(4,5,6);

mergeDelayError(source1, source2, source3).subscribe(
  x => console.log('next:', x),
  e => console.log('error:', e),
  () => console.log('completed'));
票数 5
EN

Stack Overflow用户

发布于 2017-11-15 17:00:01

如果你不想吞下你的错误,却想把它们推迟到最后,你可以:

代码语言:javascript
复制
const mergeDelayErrors = [];
const sources = [source1, source2, source3].map(obs => obs.catch((error) => {
  mergeDelayErrors.push(error);
  return Rx.Observable.empty();
}));

return Rx.Observable
  .merge(...sources)
  .toArray()
  .flatMap(allEmissions => {
    let spreadObs = Rx.Observable.of(...allEmissions);
    if (mergeDelayErrors.length) {
      spreadObs = spreadObs.concat(Rx.Observable.throw(mergeDelayErrors));
    }
    return spreadObs;
  })

您可能只想抛出第一个错误,或者创建一个CompositeError。我不知道当抛出多个错误时,mergeDelayErrors最初是如何表现的。

不幸的是,由于此实现必须等到所有可观察的完成后才会发出错误,所以它也要等到所有可观察的完成后才发出下一个。这很可能不是mergeDelayError最初的行为,它应该以流的形式发出,而不是在最后全部释放出来。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45738571

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档