首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >合并可观测序列,其中一些序列是异步的,并保持顺序

合并可观测序列,其中一些序列是异步的,并保持顺序
EN

Stack Overflow用户
提问于 2015-05-19 14:58:59
回答 1查看 3K关注 0票数 3

例如,我有两个可观测序列,"data1“和"data2”,我希望将它们合并成一个可观测序列,同时保持初始顺序。

当两个可观测序列没有异步工作时--这里用一个小延迟建模--这在Rx.Observable.merge中是很容易做到的。但是,使用这种方法,任何异步工作都会破坏顺序。

是否有可能合并可观测序列,传递值,如果他们的已知值和等待值还不知道,使用内置运算符?如果没有,我应该建造哪种类型的操作员?

代码语言:javascript
复制
'use strict';
var Rx = require('rx');
var EventEmitter = require('events').EventEmitter;

var eventEmitter = new EventEmitter();

var end = Rx.Observable.fromEvent(eventEmitter, 'end');

var data1 = Rx.Observable.fromEvent(eventEmitter, 'data1')
  .takeUntil(end);
var data2 = Rx.Observable.fromEvent(eventEmitter, 'data2')
  .takeUntil(end)
  .delay(1000);

Rx.Observable
  .merge(data1, data2)
  .reduce(function(acc, str) {
    return acc + str + ';';
  }, '')
  .subscribe(function(data) {
    console.log(data);
  });

eventEmitter.emit('data1', '1');
eventEmitter.emit('data2', '2');
eventEmitter.emit('data1', '3');
eventEmitter.emit('data1', '4');
eventEmitter.emit('data2', '5');
eventEmitter.emit('end');

// expected: "1;2;3;4;5;"
// actual: "1;3;4;2;5;"

谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-05-19 15:40:11

一种选择:在完成工作之前合并流,以便捕获订单,然后执行工作(使用concat维护订单)

代码语言:javascript
复制
// synchronous work...but return it as an Observable so that
// concatMap() can use it correctly
var data1Work = function (data) { return Rx.Observable.of(data); };
// asynchronous work...returns an observable with the result
var data2Work = function (data) {
    // cold observable (wont start until concatMap hits it)
    var work = Rx.Observable.of(data).delay(1000);
    // return work;

    // return hot observable (starts immediately)
    hotwork = work.replay();
    hotwork.connect(); // start it now
    return hotwork;


    // alternatively you can start your async operation and return
    // a Promise that will resolve when it is complete if that
    // pattern feels better to you since Promises are always hot
    // and Rx knows how to consume promises
    // return someFuncThatReturnsPromise(data);
};

var data1 = Rx.Observable.fromEvent(eventEmitter, "data1")
    .map(function (d) { return { type: "data1", data: d }; });
var data2 = Rx.Observable.fromEvent(eventEmitter, "data2")
    .map(function (d) { return { type: "data1", data: d }; });

var results = Rx.Observable
    .merge(data1, data2)
    .concatMap(function (d) {
        var workFunc = d.type === "data1" ? data1Work : data2Work;
        return workFunc(d.data);
    });
票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30329228

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档