我正在使用Rx.Observable.fromEvent在NodeJS中捕获应用程序中的事件。使用请求(https://www.npmjs.com/package/request)将这些信息发送到另一台服务器。为了避免高网络负载,我需要在发送请求之间的给定超时时间缓冲这些事件。
问题
使用bufferWithTime(200)将使节点进程保持运行,我不知道应用程序何时完成关闭流。
是否有任何方法可以使用Rx缓冲区说:
如果没有推送任何元素,那么就不会启动定时器,这将使进程退出。
我最初的做法是:
Rx.Observable
.fromEvent(eventEmitter, 'log')
.bufferWithTime(200) // this is the issue
.map(addEventsToRequestOption)
.map(request)
.flatMap(Promise.resolve)
.subscribe(log('Response received'))发布于 2015-10-29 15:56:19
一个建议的实现,使用delay操作符:
function emits(who){
return function (x) { console.log([who, "emits"].join(" ") + " " + x + " click(s)");};
}
var source = Rx.Observable.fromEvent(document.body, 'click');
console.log("running");
var delayedSource$ = source.delay(1200);
var buffered$ = source
.buffer(function () { return delayedSource$;}).map(function(clickBuffer){return clickBuffer.length;})
buffered$.subscribe(emits("buffer"));jsbin这里:http://jsbin.com/wilurivehu/edit?html,js,console,output
发布于 2015-10-29 15:37:45
您可能需要拆分流并使用第二部分触发第一部分。
var source = Rx.Observable.fromEvent(eventEmitter, 'log');
var closer = source.flatMapFirst(Rx.Observable.timer(2000));
source
.buffer(closer)
.map(addEventsToRequestOption)
.flatMap(function(x) { Promise.resolve(request(x)); })
//I assume this log method returns a function?
.subscribe(log('Response received'));source.flatMapFirst(Rx.Observable.timer(2000))是这里的重要线路。它创建了一个可以观察到的定时器,该定时器将在2000 ms之后触发。当第一个事件出现时,它将启动计时器。只要计时器运行,flatMapFirst将忽略后续事件。当计时器最终发出时,它将触发缓冲区发出其当前缓冲区并重新启动。
参见具有可观察边界的文档 on buffer
https://stackoverflow.com/questions/33402737
复制相似问题