在我当前的项目中,我正在处理大量的数值数据和转换,这些数据和转换必须以数据流可编程的方式进行。
我偶然发现了传感器的概念,它可以解决处理大数组上的多个转换的困难。好像换能器不适合我想要解决的问题。
我正在寻找一个模式/概念的传感器,它只收集一个需要的数量回溯,然后处理一个结果。类似于tensorflow,reaktor,max-msp的浏览器版本(输入输出、流程图、基于节点的可视化编程)
这些模块中的大多数都应该连接到源,但也应该能够充当将这些模块链接到其他模块的源。
source ( a stream ) =[new-value]|=> module1 => module2 => ...
|=> module3 => module4 // branch off here to a new chain据我所知,在大多数博客中解释过的换能器采用了整个数组,并通过选择的变压器为每个个体的值提供信息。
然而,我的模块/变压器不需要这么多数据才能工作,一个简单的移动平均线的例子,回顾了4个步骤。
我设想这个模块可以收集足够的数据,直到它开始输出为止。我也不需要将整个数组保存在内存中,我应该只处理所需的确切数量。结果/输出将可选地存储在数据库中。
stream =[sends-1-value]=> module[collects-values-until-processing-starts] =[sends-one-value]=>...还应该可以将多个源连接到一个模块(传感器似乎没有提供这个模块)。
这里的换能器模式是否仍然适用,或者是其他的东西?
老实说,每个程序员都有一个想法来完成这个工作,但是我要求一些既定的方法来做,就像换能器的出现一样。
发布于 2020-05-21 22:10:54
换能器模式当然适用于这里。您可以创建一个浮点处理器,其换能器与正确的数据结构匹配。我将给出一个基线示例,其中有一个假设:
考虑一个简单的队列
function SimpleQueue({ size }) {
this.size = size
this.buffer = []
}
SimpleQueue.prototype.push = function(item) {
this.buffer.push(item)
if (this.buffer.length > this.size) {
this.buffer.shift()
}
return this
}
SimpleQueue.prototype[Symbol.iterator] = function*() {
for (const item of this.buffer) {
yield item
}
}我们的简单队列有一个方法push,它将一个项推入其内部缓冲区(一个数组)。简单队列也是可迭代的,因此可以执行for (const x of simpleQueue) {/* stuff */}。
现在我们将在浮点处理器中使用我们的SimpleQueue。
const average = iterable => {
let sum = 0, count = 0
for (const item of iterable) {
sum += item
count += 1
}
return sum / count
}
const floatingPointAverage = ({ historySize }) => {
const queue = new SimpleQueue({ size: historySize })
return item => {
queue.push(item)
const avg = average(queue)
console.log(queue, avg) // this shows the average as the process runs
return avg
}
}floatingPointAverage接受一项,将其推入SimpleQueue中,并返回队列中当前项的平均值。
最后,我们可以实现和消耗我们的传感器。
const { pipe, map, transform } = require('rubico')
const numbersStream = {
[Symbol.asyncIterator]: async function*() {
for (let i = 0; i < 1000; i++) yield i
},
}
transform(
pipe([
map(floatingPointAverage({ historySize: 4 })),
/* transducers that do stuff with floating point average here */
]),
null,
)(numbersStream)这种情况下的换能器是map(floatingPointAverage({ historySize: 4 }))。这个转换器是由rubico提供的,这是我为解决自己的异步问题而编写的一个库。我在rubico 这里的上下文中写了有关传感器的文章
您的输出应该如下所示
SimpleQueue { size: 4, buffer: [ 0 ] } 0
SimpleQueue { size: 4, buffer: [ 0, 1 ] } 0.5
SimpleQueue { size: 4, buffer: [ 0, 1, 2 ] } 1
SimpleQueue { size: 4, buffer: [ 0, 1, 2, 3 ] } 1.5
SimpleQueue { size: 4, buffer: [ 1, 2, 3, 4 ] } 2.5
SimpleQueue { size: 4, buffer: [ 2, 3, 4, 5 ] } 3.5
SimpleQueue { size: 4, buffer: [ 3, 4, 5, 6 ] } 4.5
SimpleQueue { size: 4, buffer: [ 4, 5, 6, 7 ] } 5.5
SimpleQueue { size: 4, buffer: [ 5, 6, 7, 8 ] } 6.5
SimpleQueue { size: 4, buffer: [ 6, 7, 8, 9 ] } 7.5https://stackoverflow.com/questions/60960080
复制相似问题