当使用Rx.Subject创建Subject.create(observer, observable)时,Subject太懒了。当我尝试在没有订阅的情况下使用subject.onNext时,它不会传递消息。如果我先使用subject.subscribe(),那么我可以在后面立即使用onNext。
假设我有一个Observer,创建如下所示:
function createObserver(socket) {
return Observer.create(msg => {
socket.send(msg);
}, err => {
console.error(err);
}, () => {
socket.removeAllListeners();
socket.close();
});
}然后,我创建了一个可观察到的接收消息的方法:
function createObservable(socket) {
return Observable.fromEvent(socket, 'message')
.map(msg => {
// Trim out unnecessary data for subscribers
delete msg.blobs;
// Deep freeze the message
Object.freeze(msg);
return msg;
})
.publish()
.refCount();
}主题是使用这两个函数创建的。
observer = createObserver(socket);
observable = createObservable(socket);
subject = Subject.create(observer, observable);使用此设置,我无法立即subject.onNext (即使我不关心订阅)。这是故意的吗?什么是好的解决办法?
这些实际上是TCP套接字,这就是为什么我没有依赖超级光滑的websocket主题。
发布于 2016-01-18 19:12:26
基本的解决方案是,在使用ReplaySubject订阅之前缓存连接:
我认为你想做的就是用一个ReplaySubject作为你的观察者。
const { Observable, Subject, ReplaySubject } = Rx;
const replay = new ReplaySubject();
const observable = Observable.create(observer => {
replay.subscribe(observer);
});
const mySubject = Subject.create(replay, observable);
mySubject.onNext(1);
mySubject.onNext(2);
mySubject.onNext(3);
mySubject.subscribe(x => console.log(x));
mySubject.onNext(4);
mySubject.onNext(5);在以下方面的成果:
1
2
3
4
5套接字实现(例如,不要使用)
..。但是如果您想做一个套接字实现,它会变得更加复杂。这里是一个可以工作的套接字实现,,但我不建议您使用它。相反,我建议您在雷克斯多姆中使用社区支持的实现之一(如果您是RxJS 4或更低版本的话),或者作为RxJS 5的一部分使用,这两种实现我都参与了。
function createSocketSubject(url) {
let replay = new ReplaySubject();
let socket;
const observable = Observable.create(observer => {
socket = new WebSocket(url);
socket.onmessage = (e) => {
observer.onNext(e);
};
socket.onerror = (e) => {
observer.onError(e);
};
socket.onclose = (e) => {
if (e.wasClean) {
observer.onCompleted();
} else {
observer.onError(e);
}
}
let sub;
socket.onopen = () => {
sub = replay.subscribe(x => socket.send(x));
};
return () => {
socket && socket.readyState === 1 && socket.close();
sub && sub.dispose();
}
});
return Subject.create(replay, observable);
}
const socket = createSocketSubject('ws://echo.websocket.org');
socket.onNext('one');
socket.onNext('two');
socket.subscribe(x => console.log('response: ' + x.data));
socket.onNext('three');
socket.onNext('four');这是必修的JsBin
https://stackoverflow.com/questions/34845778
复制相似问题