首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >具有RxJS 5可观测值的延迟模式

具有RxJS 5可观测值的延迟模式
EN

Stack Overflow用户
提问于 2016-06-30 17:11:33
回答 2查看 3.3K关注 0票数 16

对于任意的承诺实现,延迟模式(不要与反模式混淆)可能如下所示:

代码语言:javascript
复制
const deferred = new Deferred;
...
// scopes where `deferred` object reference was passed before promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
...
deferred.resolve(...);
// doesn't affect promise state
deferred.reject();
...
// after promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });

deferred对象具有不确定的承诺,可以通过引用传递到其他函数范围。所有的承诺链都将在承诺和解时执行,无论deferred.promise是在与then链接之前还是之后被解决都不重要。承诺的状态在解决后是不能改变的。

正如答案所示,最初的选择是ReplaySubjectAsyncSubject

对于给定的设置(演示)

代码语言:javascript
复制
var subject = new Rx.AsyncSubject;
var deferred = subject.first();

deferred.subscribe(
  console.log.bind(console, 'Early result'),
  console.log.bind(console, 'Early error')
);

setTimeout(() => {
  deferred.subscribe(
    console.log.bind(console, 'Late result'),
    console.log.bind(console, 'Late error')
  );
});

这样做的结果是,人们表现出了良好的行为:

代码语言:javascript
复制
subject.error('one');
subject.next('two');

早期错误一 迟误一

这会导致不良行为:

代码语言:javascript
复制
subject.error('one');
subject.next('two');
subject.complete();

早期错误一 晚结果二

这会导致不良行为:

代码语言:javascript
复制
subject.next('two');
subject.complete();
subject.next('three');

早期结果二 晚结果三

ReplaySubject的结果不同,但仍然与预期的结果不一致。next值和error错误分别处理,complete不阻止观察者接收新数据。这可能适用于单个next/error,问题是nexterror可能被无意中多次调用。

之所以使用first(),是因为subscribe是一次性订阅,我想删除它们以避免泄漏。

应该如何用RxJS可观测值来实现它?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-06-30 22:58:47

您可能正在寻找一个Rx.ReplaySubject(1) (或一个Rx.AsyncSubject(),取决于您的用例)。

有关主题的更详细说明,请参见不同RxJS主题的语义是什么?

基本上,一个主题可以通过引用来传递,就像一个被推迟的主题。只要您持有该引用,您就可以向它发出值(解析将是'next' (Rxjs v5)或'onNext' (Rxjs v4),后面是'complete''onCompleted()')。

可以有任意数量的订阅者订阅某个主题,类似于延迟订阅的then。如果使用replaySubject(1),任何订阅者都将收到最后发出的值,该值将响应您的it doesn't matter if deferred.promise was settled before chaining with then or after.。在Rxjs v4中,replaySubject将在它完成后向订阅者发出它的最后一个值。我不确定Rxjs v5中的行为。

更新

以下代码与Rxjs v4一起执行:

代码语言:javascript
复制
var subject = new Rx.AsyncSubject();
var deferred = subject;

deferred.subscribe(
  console.log.bind(console, 'First result'),
  console.log.bind(console, 'First error')
);

setTimeout(() => {
  deferred.subscribe(
    console.log.bind(console, 'Second result'),
    console.log.bind(console, 'Second error')
  );
});

subject.onNext('one');
subject.onCompleted();
subject.onNext('two');
subject.onNext('three');
subject.onNext('four');

产生以下输出:

代码语言:javascript
复制
First result one
Second result one

但是,在Rxjs v5 不会中执行的代码相同:

代码语言:javascript
复制
First result one
Second result four

因此,这基本上意味着主题的语义在Rxjs v5中发生了变化!这确实是一个需要注意的重大变化。无论如何,您可以考虑回到Rxjs v4,或者使用artur在他的回答中建议的转变。您还可以在github站点上提交一个问题。我相信,这种改变是有意的,但在出现这种情况时,提出这一问题可能有助于澄清情况。在任何情况下,无论选择什么行为,都必须正确地记录在案。

主题语义学问题具有一个链接,显示异步主题与多次订阅和延迟订阅的关系。

票数 3
EN

Stack Overflow用户

发布于 2016-07-03 08:10:09

正如@ used 3743222所写的那样,AsyncSubject可能用于deferred实现,但问题是它必须是private,并且不受多个resolves /reject的影响。

下面是一个可能的实现镜像resolve-reject-promise结构:

代码语言:javascript
复制
const createDeferred = () => {
  const pending = new Rx.AsyncSubject(); // caches last value / error
  const end = (result) => {
    if (pending.isStopped) {
      console.warn('Deferred already resloved/rejected.'); // optionally throw
      return;
    }
    
    if (result.isValue) {
      pending.next(result.value);
      pending.complete();
    } else {
      pending.error(result.error);
    }
  }
  return {
    resolve: (value) => end({isValue: true, value: value }),
    reject: (error) => end({isValue: false, error: error }),
    observable: pending.asObservable() // hide subject
  };
}

// sync example
let def = createDeferred();
let obs = def.observable;
obs.subscribe(n => console.log('BEFORE-RESOLVE'));
def.resolve(1);
def.resolve(2); // warn - no action
def.reject('ERROR') // warn - no action
def.observable.subscribe(n => console.log('AFTER-RESOLVE'));

// async example
def = createDeferred();
def.observable.subscribe(() => console.log('ASYNC-BEFORE-RESOLVE'));
setTimeout(() => {
  def.resolve(1);
  setTimeout(() => {
    def.observable.subscribe(() => console.log('ASYNC-AFTER-RESOLVE'));
    def.resolve(2); // warn
    def.reject('err'); // warn
  }, 1000)
}, 1000);

// async error example
const def3 = createDeferred();
def3.observable.subscribe(
  (n) => console.log(n, 'ERROR-BEFORE-REJECTED (I will not be called)'),
  (err) => console.error('ERROR-BEFORE-REJECTED', err));
setTimeout(() => {
  def3.reject('ERR');
  setTimeout(() => {
    def3.observable.subscribe(
      (n) => console.log(n, 'ERROR-AFTER-REJECTED (I will not be called)'),
      (err) => console.error('ERROR-AFTER-REJECTED', err));
    def3.resolve(2); // warn
    def3.reject('err'); // warn
  }, 1000)
}, 3000);
代码语言:javascript
复制
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.9/Rx.umd.js"></script>

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38129467

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档