我正在读取NodeJS中的流中的数据,然后使用转换流中的异步函数处理该数据。我希望这个转换流并行地启动对异步函数的几个调用,但它似乎一次只调用一个。
为了说明我的期望,我在下面编写了一个小程序,它从0生成数字到limit - 1,然后通过转换流将每个数字以小延迟递增。如果您运行下面的程序,数字1到20将被登录序列,所有与一个小的延迟。
由于默认的highWaterMark是16,所以我本以为它们会被记录在16 +4的块中。有可能得到我想要的行为吗?如果有,怎么做?
即读取流生成数据的速度非常快,转换速度较慢,但应接收到高水标,然后等待其数据被处理后,再从读取流中索取更多数据。
const stream = require('stream')
const limit = 20
let index = 0
const numberStream = new stream.Readable({
objectMode: true,
read (amount) {
const innerLimit = Math.min(index + amount, limit)
while (index < innerLimit) {
this.push(index++)
}
if (index === limit) {
this.push(null)
}
},
})
const delayedIncStream = new stream.Transform({
objectMode: true,
transform (item, _, cb) {
setTimeout(() => cb(null, item + 1), 100)
},
})
const resultStream = numberStream.pipe(delayedIncStream)
resultStream.on('data', console.log)发布于 2019-05-21 07:12:04
答案是否定的,正如本部分文档的最后部分所解释的:回调
transform._transform()从不并行调用;流实现队列机制,要接收下一个块,必须同步或异步地调用回调。
发布于 2019-09-06 10:04:39
您可以使用nodejs包并行变换流来实现这一点,同时保持转换数据的顺序。
然后,您的示例可以重写如下,以并行转换所有数字:
const stream = require('stream')
const ParallelTransform = require('parallel-transform-stream').default
const limit = 20
let index = 0
const numberStream = new stream.Readable({
objectMode: true,
read (amount) {
const innerLimit = Math.min(index + amount, limit)
while (index < innerLimit) {
this.push(index++)
}
if (index === limit) {
this.push(null)
}
},
})
const delayedIncStream = new (ParallelTransform.create((item, _, cb) => {
setTimeout(() => cb(null, item + 1), 100)
}))({
objectMode: true,
maxParallel: 20
})
const resultStream = numberStream.pipe(delayedIncStream)
resultStream.on('data', console.log)https://stackoverflow.com/questions/56185587
复制相似问题