首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用RxJS5设定速率

使用RxJS5设定速率
EN

Stack Overflow用户
提问于 2017-02-02 06:51:37
回答 2查看 51关注 0票数 1

我有这样的代码,它只是从.csv文件中读取数据,并将其转换为json并记录数据:

代码语言:javascript
复制
const fs = require('fs');
const path = require('path');

const sd = path.resolve(__dirname + '/fixtures/SampleData.csv');
const strm = fs.createReadStream(sd).setEncoding('utf8');

const Rx = require('rxjs/Rx');
const csv2json = require('csv2json');


const dest = strm
  .pipe(csv2json({
    separator: ','
  }));

dest.on('error', function(e){
    console.error(e.stack || e);
})

const obs = Rx.Observable.fromEvent(dest, 'data')
          .flatMap(d => Rx.Observable.timer(100).mapTo(d))

obs.subscribe(v => {
    console.log(String(v));
})

代码所做的是在延迟100 ms之后记录所有数据。实际上,我想延迟每一行数据,并在一小部分延迟后记录每一行。

上面的代码没有做到这一点--控制数据记录速度的最佳方法是什么?

假设:所有的数据行几乎同时输入,因此所有数据都延迟了100 ms,因此它们最终几乎是在同一时间被打印出来的。我只需要开始延迟下一行后的前一个记录。

下面的代码似乎与使用上面的计时器执行相同的操作:

代码语言:javascript
复制
const obs = Rx.Observable.fromEvent(dest, 'data')
      .delay(100)
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-02-02 22:08:21

假设:所有的数据行几乎同时输入,因此所有数据都延迟了100 ms,因此它们最终几乎是在同一时间被打印出来的。我只需要开始延迟下一行后的前一个记录。

你的假设是正确的

解决方案

.flatMap()交换原始解决方案中的.concatMap()

代码语言:javascript
复制
Rx.Observable.from([1,2,3,4])
  .mergeMap(i => Rx.Observable.timer(500).mapTo(i))
  .subscribe(val => console.log('mergeMap value: ' + val));

Rx.Observable.from([1,2,3,4])
  .concatMap(i => Rx.Observable.timer(500).mapTo(i))
  .subscribe(val => console.log('concatMap value: ' + val));
代码语言:javascript
复制
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

这将确保每个发射在下一个发射被订阅之前完成,并开始延迟其值。

票数 2
EN

Stack Overflow用户

发布于 2017-02-02 08:25:18

我无法在RxJS库中找到我需要的功能(尽管它可能在那里,但我就是找不到它,请告诉我是否有更好、更实用的方法)。

所以我写了这个,这似乎就是我的工作:

代码语言:javascript
复制
const fs = require('fs');
const path = require('path');

const sd = path.resolve(__dirname + '/fixtures/SampleData.csv');
const strm = fs.createReadStream(sd).setEncoding('utf8');

const Rx = require('rxjs/Rx');
const csv2json = require('csv2json');

const p = Rx.Observable.prototype;

p.eachWait = function(timeout){

    const source = this;
    const values = [];
    let flipped = true;

    const onNext = function (sub){

          flipped = false;

          setTimeout(() => {

            var c = values.pop();
            if(c)  sub.next(c);

            if(values.length > 0){
               onNext(sub);
            }
            else{
               flipped = true;
            }

         }, timeout);
    }

      return Rx.Observable.create(sub => {

          return source.subscribe(

                function next(v){

                         values.unshift(v);

                         if(flipped){
                             onNext(sub);
                         }

                 },
              sub.error.bind(sub),
              sub.complete.bind(sub)
          );

      });

}


const dest = strm
  .pipe(csv2json({
    separator: ','
  }));

dest.on('error', function(e){
    console.error(e.stack || e);
});

const obs = Rx.Observable.fromEvent(dest, 'data')
      .eachWait(1000)

obs.subscribe(v => {
  console.log(String(v));
});

我想这是尽你所能做到的,只有一个定时器在任何时刻都应该运行。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41995884

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档