我有这样的代码,它只是从.csv文件中读取数据,并将其转换为json并记录数据:
const fs = require('fs');
const path = require('path');
const sd = path.resolve(__dirname + '/fixtures/SampleData.csv');
const strm = fs.createReadStream(sd).setEncoding('utf8');
const Rx = require('rxjs/Rx');
const csv2json = require('csv2json');
const dest = strm
.pipe(csv2json({
separator: ','
}));
dest.on('error', function(e){
console.error(e.stack || e);
})
const obs = Rx.Observable.fromEvent(dest, 'data')
.flatMap(d => Rx.Observable.timer(100).mapTo(d))
obs.subscribe(v => {
console.log(String(v));
})代码所做的是在延迟100 ms之后记录所有数据。实际上,我想延迟每一行数据,并在一小部分延迟后记录每一行。
上面的代码没有做到这一点--控制数据记录速度的最佳方法是什么?
假设:所有的数据行几乎同时输入,因此所有数据都延迟了100 ms,因此它们最终几乎是在同一时间被打印出来的。我只需要开始延迟下一行后的前一个记录。
下面的代码似乎与使用上面的计时器执行相同的操作:
const obs = Rx.Observable.fromEvent(dest, 'data')
.delay(100)发布于 2017-02-02 22:08:21
假设:所有的数据行几乎同时输入,因此所有数据都延迟了100 ms,因此它们最终几乎是在同一时间被打印出来的。我只需要开始延迟下一行后的前一个记录。
你的假设是正确的
解决方案
用.flatMap()交换原始解决方案中的.concatMap()
Rx.Observable.from([1,2,3,4])
.mergeMap(i => Rx.Observable.timer(500).mapTo(i))
.subscribe(val => console.log('mergeMap value: ' + val));
Rx.Observable.from([1,2,3,4])
.concatMap(i => Rx.Observable.timer(500).mapTo(i))
.subscribe(val => console.log('concatMap value: ' + val));<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
这将确保每个发射在下一个发射被订阅之前完成,并开始延迟其值。
发布于 2017-02-02 08:25:18
我无法在RxJS库中找到我需要的功能(尽管它可能在那里,但我就是找不到它,请告诉我是否有更好、更实用的方法)。
所以我写了这个,这似乎就是我的工作:
const fs = require('fs');
const path = require('path');
const sd = path.resolve(__dirname + '/fixtures/SampleData.csv');
const strm = fs.createReadStream(sd).setEncoding('utf8');
const Rx = require('rxjs/Rx');
const csv2json = require('csv2json');
const p = Rx.Observable.prototype;
p.eachWait = function(timeout){
const source = this;
const values = [];
let flipped = true;
const onNext = function (sub){
flipped = false;
setTimeout(() => {
var c = values.pop();
if(c) sub.next(c);
if(values.length > 0){
onNext(sub);
}
else{
flipped = true;
}
}, timeout);
}
return Rx.Observable.create(sub => {
return source.subscribe(
function next(v){
values.unshift(v);
if(flipped){
onNext(sub);
}
},
sub.error.bind(sub),
sub.complete.bind(sub)
);
});
}
const dest = strm
.pipe(csv2json({
separator: ','
}));
dest.on('error', function(e){
console.error(e.stack || e);
});
const obs = Rx.Observable.fromEvent(dest, 'data')
.eachWait(1000)
obs.subscribe(v => {
console.log(String(v));
});我想这是尽你所能做到的,只有一个定时器在任何时刻都应该运行。
https://stackoverflow.com/questions/41995884
复制相似问题