首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Rx.Subject创建Subject.create,允许无需订阅的onNext

使用Rx.Subject创建Subject.create,允许无需订阅的onNext
EN

Stack Overflow用户
提问于 2016-01-18 01:03:39
回答 1查看 7.5K关注 0票数 1

当使用Rx.Subject创建Subject.create(observer, observable)时,Subject太懒了。当我尝试在没有订阅的情况下使用subject.onNext时,它不会传递消息。如果我先使用subject.subscribe(),那么我可以在后面立即使用onNext

假设我有一个Observer,创建如下所示:

代码语言:javascript
复制
function createObserver(socket) {
  return Observer.create(msg => {
    socket.send(msg);
  }, err => {
    console.error(err);
  }, () => {
    socket.removeAllListeners();
    socket.close();
  });
}

然后,我创建了一个可观察到的接收消息的方法:

代码语言:javascript
复制
function createObservable(socket) {
  return Observable.fromEvent(socket, 'message')
                   .map(msg => {
                     // Trim out unnecessary data for subscribers
                     delete msg.blobs;
                     // Deep freeze the message
                     Object.freeze(msg);
                     return msg;
                   })
                   .publish()
                   .refCount();
}

主题是使用这两个函数创建的。

代码语言:javascript
复制
observer = createObserver(socket);
observable = createObservable(socket);
subject = Subject.create(observer, observable);

使用此设置,我无法立即subject.onNext (即使我不关心订阅)。这是故意的吗?什么是好的解决办法?

这些实际上是TCP套接字,这就是为什么我没有依赖超级光滑的websocket主题。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-01-18 19:12:26

基本的解决方案是,在使用ReplaySubject订阅之前缓存连接:

我认为你想做的就是用一个ReplaySubject作为你的观察者。

代码语言:javascript
复制
const { Observable, Subject, ReplaySubject } = Rx;

const replay = new ReplaySubject();

const observable = Observable.create(observer => {
  replay.subscribe(observer);
});

const mySubject = Subject.create(replay, observable);


mySubject.onNext(1);
mySubject.onNext(2);
mySubject.onNext(3);

mySubject.subscribe(x => console.log(x));

mySubject.onNext(4);
mySubject.onNext(5);

在以下方面的成果:

代码语言:javascript
复制
1
2
3
4
5

套接字实现(例如,不要使用)

..。但是如果您想做一个套接字实现,它会变得更加复杂。这里是一个可以工作的套接字实现,,但我不建议您使用它。相反,我建议您在雷克斯多姆中使用社区支持的实现之一(如果您是RxJS 4或更低版本的话),或者作为RxJS 5的一部分使用,这两种实现我都参与了。

代码语言:javascript
复制
function createSocketSubject(url) {
  let replay = new ReplaySubject();
  let socket;

  const observable = Observable.create(observer => {
    socket = new WebSocket(url);

    socket.onmessage = (e) => {
      observer.onNext(e);
    };

    socket.onerror = (e) => {
      observer.onError(e);
    };

    socket.onclose = (e) => {
      if (e.wasClean) {
        observer.onCompleted();
      } else {
        observer.onError(e);
      }
    }

    let sub;
    socket.onopen = () => {
      sub = replay.subscribe(x => socket.send(x));      
    };
    return () => {
      socket && socket.readyState === 1 && socket.close();
      sub && sub.dispose();
    }
  });

  return Subject.create(replay, observable);
}

const socket = createSocketSubject('ws://echo.websocket.org');

socket.onNext('one');
socket.onNext('two');
socket.subscribe(x => console.log('response: ' + x.data));
socket.onNext('three');
socket.onNext('four');

这是必修的JsBin

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34845778

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档