首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kefir/Bacon.js中的流处理

Kefir/Bacon.js中的流处理
EN

Stack Overflow用户
提问于 2015-10-27 00:32:41
回答 1查看 245关注 0票数 0

我一直在使用Kefir (或Bacon.js,选择您的最爱)进行一个涉及实时数据的个人项目,并且已经到了需要在数据库中记录数据以附加id的地步,然后用id将对象传递到链下。实际上,将数据插入数据库(NeDB)并不是问题,而是它在将记录插入数据库时使用回调和继续执行的问题,以及如何处理此行为。

过度简化的示例:

假设有几个设备将解析的数据转储到总线/池中:

代码语言:javascript
复制
function Position(data) {
    this.id = null;
    this.longitude = data.longitude;
    this.latitude = data.latitude;
}

self.positionDataPool.map(function(position)) {  // is this even what really needs to be done?
    // unsure what to do here {
        self.db.insert {
            longitude: position.longitude
            , latitude: position.latitude
        }, function(e, newRecord) {
            if(e) { ... }
            , else {
                position.id = newRecord._id;
                return position;
            }
        }
    //}
})
.filter(function(position) {
    // the position without an id is passed here
    ...
});

我怀疑这是对map函数的不正确或不恰当的使用,但是在尝试了几件事情之后,我发现了一些想法。任何想法、建议或帮助都将不胜感激。

我的解决方案

在做了大量的阅读和实验(在我已经做过的工作之上)并回到每天处理流处理的日子之后,我想出了以下解决方案。虽然这可能不是最有效的,但此解决方案通过将多个事件源插入数据池(未显示)从多个源获取输入。创建一个全新的流来对对象/数据执行单个操作。虽然可扩展性不是这里的目标,但这允许多个源监视从流中传出的数据,而不是直接将其转储到过滤器中。最后,对来自处理流的数据进行过滤,只显示我们想要的结果。

代码语言:javascript
复制
self.savedPositionDataStream = Kefir.stream(function(emitter) {
    self.positionDataPool.onValue(function(val) {
        self.db.insert {
            longitude: position.longitude
            , latitude: position.latitude
        }, function(e, newRecord) {
            val.id = newRecord._id;
            emitter.emit(val);
        }
    });
});

self.filteredPositionData = savedPositionDataStream.filter(...);
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-10-28 07:17:40

至少在Bacon.js中,您可以使用Bacon.fromNodeCallback将插入调用结果封装到流中。喜欢

代码语言:javascript
复制
Bacon.fromNodeCallback(self.db, "insert", dataToBeInserted)

当然,您可以使用Bacon.fromBinder或类似的Kefir.stream来实现这一点,但是fromNodeCallback助手使这更容易,因为它会自动处理成功/错误值,并将它们适当地转换为流事件。

然后是flatMap而不是map,执行插入操作,并以流的形式提供结果:

代码语言:javascript
复制
let insertionResultE = self.positionDataPool.flatMap(val => 
  Bacon.fromNodeCallback(self.db, "insert", val).map("._id")
)
insertionResultE.log("insertion result")

同样的方法也适用于Kefir。关键是您不能在map中执行异步计算,也可能无法执行失败的计算,但是可以在flapMap中完成。

这里只需要为insertionResultE添加至少一个订阅服务器才能激活它。在上面的例子中,log会这样做。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33358007

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档