我一直在使用Kefir (或Bacon.js,选择您的最爱)进行一个涉及实时数据的个人项目,并且已经到了需要在数据库中记录数据以附加id的地步,然后用id将对象传递到链下。实际上,将数据插入数据库(NeDB)并不是问题,而是它在将记录插入数据库时使用回调和继续执行的问题,以及如何处理此行为。
过度简化的示例:
假设有几个设备将解析的数据转储到总线/池中:
function Position(data) {
this.id = null;
this.longitude = data.longitude;
this.latitude = data.latitude;
}
self.positionDataPool.map(function(position)) { // is this even what really needs to be done?
// unsure what to do here {
self.db.insert {
longitude: position.longitude
, latitude: position.latitude
}, function(e, newRecord) {
if(e) { ... }
, else {
position.id = newRecord._id;
return position;
}
}
//}
})
.filter(function(position) {
// the position without an id is passed here
...
});我怀疑这是对map函数的不正确或不恰当的使用,但是在尝试了几件事情之后,我发现了一些想法。任何想法、建议或帮助都将不胜感激。
我的解决方案
在做了大量的阅读和实验(在我已经做过的工作之上)并回到每天处理流处理的日子之后,我想出了以下解决方案。虽然这可能不是最有效的,但此解决方案通过将多个事件源插入数据池(未显示)从多个源获取输入。创建一个全新的流来对对象/数据执行单个操作。虽然可扩展性不是这里的目标,但这允许多个源监视从流中传出的数据,而不是直接将其转储到过滤器中。最后,对来自处理流的数据进行过滤,只显示我们想要的结果。
self.savedPositionDataStream = Kefir.stream(function(emitter) {
self.positionDataPool.onValue(function(val) {
self.db.insert {
longitude: position.longitude
, latitude: position.latitude
}, function(e, newRecord) {
val.id = newRecord._id;
emitter.emit(val);
}
});
});
self.filteredPositionData = savedPositionDataStream.filter(...);发布于 2015-10-28 07:17:40
至少在Bacon.js中,您可以使用Bacon.fromNodeCallback将插入调用结果封装到流中。喜欢
Bacon.fromNodeCallback(self.db, "insert", dataToBeInserted)当然,您可以使用Bacon.fromBinder或类似的Kefir.stream来实现这一点,但是fromNodeCallback助手使这更容易,因为它会自动处理成功/错误值,并将它们适当地转换为流事件。
然后是flatMap而不是map,执行插入操作,并以流的形式提供结果:
let insertionResultE = self.positionDataPool.flatMap(val =>
Bacon.fromNodeCallback(self.db, "insert", val).map("._id")
)
insertionResultE.log("insertion result")同样的方法也适用于Kefir。关键是您不能在map中执行异步计算,也可能无法执行失败的计算,但是可以在flapMap中完成。
这里只需要为insertionResultE添加至少一个订阅服务器才能激活它。在上面的例子中,log会这样做。
https://stackoverflow.com/questions/33358007
复制相似问题