首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RxJS/ReactiveX适当模块通信

RxJS/ReactiveX适当模块通信
EN

Stack Overflow用户
提问于 2016-11-03 17:14:16
回答 2查看 87关注 0票数 0

我对反应性编程很陌生,但我已经爱上了。然而,我的大脑仍然很难切换到它。我试图遵循所有的建议,如“避免使用主题”和“避免使用不纯的函数”,当然还有“避免命令式代码”。

我发现很难实现的是简单的跨模块通信,其中一个模块可以注册"action"/observable,而另一个模块可以订阅和响应它。一个简单的消息总线可能会工作,但这将强制使用主题和命令式代码风格,这是我试图避免的。

这里是我玩的一个简单的起点:

代码语言:javascript
复制
    // some sandbox
class Api {
  constructor() {
    this.actions = {};
  }

  registerAction(actionName, action) {
    // I guess this part will have to be changed
    this.actions[actionName] = action.publishReplay(10).refCount();
    //this.actions[actionName].connect();
  }

  getAction(actionName) {
    return this.actions[actionName];
  }
}

const api = new Api();

// -------------------------------------------------------------------
// module 1
let myAction = Rx.Observable.create((obs) => {
  console.log("EXECUTING");
  obs.next("42 " + Date.now());
  obs.complete();
});

api.registerAction("myAction", myAction);

let myTrigger = Rx.Observable.interval(1000).take(2);

let executedAction = myTrigger
.flatMap(x => api.getAction("myAction"))
.subscribe(
  (x) => { console.log(`executed action: ${x}`); },
  (e) => {}, 
  () => { console.log("completed");});

// -------------------------------------------------------------------
// module 2
api.getAction("myAction")
  .subscribe(
  (x) => { console.log(`SECOND executed action: ${x}`); },
  (e) => {}, 
  () => { console.log("SECOND completed");});

因此,目前第二个模块订阅它“触发”了可观察到的"myAction“。在现实生活中,这可能是一个ajax调用。有没有办法使所有订阅者延迟/等待"myAction“被正确地从module1调用?再一次--用科目来做这件事很容易,但我试图按照推荐的方法来做。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-11-10 07:48:14

所以这里有一个比我想的要简单得多的解决方案。简单地使用两个可观察到的。调度程序和subscribeOn也可以实现类似的效果。

代码语言:javascript
复制
    // some sandbox
class Action {
  constructor(name, observable) {
    this.name = name;
    this.observable = observable;
    this.replay = new Rx.ReplaySubject(10);
  }
}

function actionFactory(action, param) {

  return Rx.Observable.create(obs => {
    action.observable
    .subscribe(x => {
        obs.next(x);
        action.replay.next(x);
    }, (e) => {}, () => obs.complete);
  }); 
}

class Api {
  constructor() {
    this.actions = {};
  }

  registerAction(actionName, action) {
    let generatedAction = new Action(actionName, action);

    this.actions[actionName] = generatedAction;

    return actionFactory.bind(null, generatedAction);
  }

  getAction(actionName) {
    return this.actions[actionName].replay;
  }
}

const api = new Api();

// -------------------------------------------------------------------
// module 1
let myAction = Rx.Observable.create((obs) => {
  obs.next("42 " + Date.now());
  obs.complete();
});

let myRegisteredAction$ = api.registerAction("myAction", myAction);

let myTrigger = Rx.Observable.interval(1000).take(1).delay(1000);

let executedAction = myTrigger
.map(x => { return { someValue: x} })
.concatMap(x => myRegisteredAction$(x))
.subscribe(
  (x) => { console.log(`MAIN: ${x}`); },
  (e) => { console.log("error", e)}, 
  () => { console.log("MAIN: completed");});


// -------------------------------------------------------------------
// module 2
 var sub = api.getAction("myAction")
  .subscribe(
  (x) => { console.log(`SECOND: ${x}`); },
  (e) => {console.log("error : " + e)}, 
  () => { console.log("SECOND: completed");});
票数 0
EN

Stack Overflow用户

发布于 2016-11-03 18:45:14

如果我正确地理解了您,您希望确保,如果您调用了api.getAction,您希望在该可观察的值中的下一个值等待对getAction的调用完成。在处理其他值之前。

使用concatMap可以很容易地实现这一点。ConcatMap将接受一个返回可观察的函数(在您的例子中是对getAction的调用)。ConcatMap将等待开始处理下一个值,直到函数中的可观察返回完成。

因此,如果您这样修改代码,它应该会工作(如果我理解正确的话)。

代码语言:javascript
复制
let executedAction = myTrigger
.concatMap(x => api.getAction("myAction"))
.subscribe(
  (x) => { console.log(`executed action: ${x}`); },
  (e) => {}, 
  () => { console.log("completed");});

如果myTrigger有一个新值,则在可观察的api.getAction返回完成之前不会处理它。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40407540

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档