首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >数据流项目的Javascript转换器

数据流项目的Javascript转换器
EN

Stack Overflow用户
提问于 2020-03-31 21:21:13
回答 1查看 96关注 0票数 1

在我当前的项目中,我正在处理大量的数值数据和转换,这些数据和转换必须以数据流可编程的方式进行。

我偶然发现了传感器的概念,它可以解决处理大数组上的多个转换的困难。好像换能器不适合我想要解决的问题。

我正在寻找一个模式/概念的传感器,它只收集一个需要的数量回溯,然后处理一个结果。类似于tensorflow,reaktor,max-msp的浏览器版本(输入输出、流程图、基于节点的可视化编程)

这些模块中的大多数都应该连接到源,但也应该能够充当将这些模块链接到其他模块的源。

代码语言:javascript
复制
source ( a stream ) =[new-value]|=> module1 => module2 => ...
                                |=> module3 => module4 // branch off here to a new chain

据我所知,在大多数博客中解释过的换能器采用了整个数组,并通过选择的变压器为每个个体的值提供信息。

然而,我的模块/变压器不需要这么多数据才能工作,一个简单的移动平均线的例子,回顾了4个步骤。

我设想这个模块可以收集足够的数据,直到它开始输出为止。我也不需要将整个数组保存在内存中,我应该只处理所需的确切数量。结果/输出将可选地存储在数据库中。

代码语言:javascript
复制
stream =[sends-1-value]=> module[collects-values-until-processing-starts] =[sends-one-value]=>...

还应该可以将多个源连接到一个模块(传感器似乎没有提供这个模块)。

这里的换能器模式是否仍然适用,或者是其他的东西?

老实说,每个程序员都有一个想法来完成这个工作,但是我要求一些既定的方法来做,就像换能器的出现一样。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-05-21 22:10:54

换能器模式当然适用于这里。您可以创建一个浮点处理器,其换能器与正确的数据结构匹配。我将给出一个基线示例,其中有一个假设:

  1. 您使用的流实现了Symbol.asyncIterator

考虑一个简单的队列

代码语言:javascript
复制
function SimpleQueue({ size }) {
  this.size = size
  this.buffer = []
}

SimpleQueue.prototype.push = function(item) {
  this.buffer.push(item)
  if (this.buffer.length > this.size) {
    this.buffer.shift()
  }
  return this
}

SimpleQueue.prototype[Symbol.iterator] = function*() {
  for (const item of this.buffer) {
    yield item
  }
}

我们的简单队列有一个方法push,它将一个项推入其内部缓冲区(一个数组)。简单队列也是可迭代的,因此可以执行for (const x of simpleQueue) {/* stuff */}

现在我们将在浮点处理器中使用我们的SimpleQueue

代码语言:javascript
复制
const average = iterable => {
  let sum = 0, count = 0
  for (const item of iterable) {
    sum += item
    count += 1
  }
  return sum / count
}

const floatingPointAverage = ({ historySize }) => {
  const queue = new SimpleQueue({ size: historySize })
  return item => {
    queue.push(item)
    const avg = average(queue)
    console.log(queue, avg) // this shows the average as the process runs
    return avg
  }
}

floatingPointAverage接受一项,将其推入SimpleQueue中,并返回队列中当前项的平均值。

最后,我们可以实现和消耗我们的传感器。

代码语言:javascript
复制
const { pipe, map, transform } = require('rubico')

const numbersStream = {
  [Symbol.asyncIterator]: async function*() {
    for (let i = 0; i < 1000; i++) yield i
  },
}

transform(
  pipe([
    map(floatingPointAverage({ historySize: 4 })),
    /* transducers that do stuff with floating point average here */
  ]),
  null,
)(numbersStream)

这种情况下的换能器map(floatingPointAverage({ historySize: 4 }))。这个转换器是由rubico提供的,这是我为解决自己的异步问题而编写的一个库。我在rubico 这里的上下文中写了有关传感器的文章

您的输出应该如下所示

代码语言:javascript
复制
SimpleQueue { size: 4, buffer: [ 0 ] } 0
SimpleQueue { size: 4, buffer: [ 0, 1 ] } 0.5
SimpleQueue { size: 4, buffer: [ 0, 1, 2 ] } 1
SimpleQueue { size: 4, buffer: [ 0, 1, 2, 3 ] } 1.5
SimpleQueue { size: 4, buffer: [ 1, 2, 3, 4 ] } 2.5
SimpleQueue { size: 4, buffer: [ 2, 3, 4, 5 ] } 3.5
SimpleQueue { size: 4, buffer: [ 3, 4, 5, 6 ] } 4.5
SimpleQueue { size: 4, buffer: [ 4, 5, 6, 7 ] } 5.5
SimpleQueue { size: 4, buffer: [ 5, 6, 7, 8 ] } 6.5
SimpleQueue { size: 4, buffer: [ 6, 7, 8, 9 ] } 7.5
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60960080

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档