我对反应性编程很陌生,很难使用“一切都可以是流”的咒语。我正在考虑以下场景--我有一个websocket事件流定义如下:
Rx.Observable.create((observer) => {
io.on('connect', function(socket){
socket.on("enroll", function(player) {
observer.next({
event: 'enroll',
player,
socket
});
});
socket.on('resign', function(player){
observer.next({
event: 'resign',
player,
socket
});
});
});
return {
dispose: io.close
};
});然后我就能做些像
enrollmentStream = events$
.filter(find({ event: "enroll" }))
.map(pick('player'));同样的
resignationStream = events$
.filter(find({ event: "resign" }))
.map(pick('player'));我想收集已注册的玩家在一个流,这将批他们在4,但很明显,这应该是只为用户,在注册流,但不是在resignationStream或至少最后一个事件是注册。我该怎么做?
这是大理石图。

有5名球员报名。游戏开始时,有4名球员登记。请注意,第二个玩家(紫罗兰一号)加入,但随后退出,所以游戏不以蓝色大理石开始,而是与下一个黄色-因为只有在那之后,真正有4个球员准备好了。
也许应该有一些像“没有”这样的流操作..。在那里吗?
发布于 2017-08-10 07:50:07
我认为在这个场景中,您可以使用combineLatest()和scan()操作符,然后自己列出未辞职的玩家:
const bufferedEnrollment = enrollmentStream.scan((acc, val) => { acc.push(val); return acc; }, []);
const bufferedResignation = enrollmentStream.scan((acc, val) => { acc.push(val); return acc; }, []);
Observable.combineLatest(bufferedEnrollment, bufferedResignation)
.map(values => {
const enrolled = values[0];
const resigned = values[1];
// remove resigned players from `enrolled` array
return enrolled;
})
.filter(players => players.length === 4)
.subscribe(...)scan()运算符仅用于将播放器收集到数组中。例如,如果您希望能够重置数组,则可以将其与另一个可观察到的数组合并。
enrollmentStream
.merge(resetStream)
.scan((acc, val) => {
if (!val) {
return [];
}
acc.push(val);
return acc;
}, []);(出于明显的原因,我没有测试这段代码)。
https://stackoverflow.com/questions/45596405
复制相似问题