我有一个由sourceStream对象组成的BaseData。
我想将这个流分叉到不同流的n-amount中,然后过滤和转换每个BaseData对象。
最后,我希望n流只包含特定类型,分叉流的长度可能会有所不同,因为将来可能会删除或添加数据。
我想我可以通过fork来设置这个
import * as _ from 'highland';
interface BaseData {
id: string;
data: string;
}
const sourceStream = _([
{id: 'foo', data: 'poit'},
{id: 'foo', data: 'fnord'},
{id: 'bar', data: 'narf'}]);
const partners = [
'foo',
'bar',
];
partners.forEach((partner: string) => {
const partnerStream = sourceStream.fork();
partnerStream.filter((baseData: BaseData) => {
return baseData.id === partner;
});
partnerStream.each(console.log);
});我希望现在有两个流,foo-stream包含两个元素:
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }和包含一个元素的bar-stream:
{ id: 'bar', data: 'narf' }然而,我却得到了一个错误:
/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
throw new Error(
^
Error: Stream already being consumed, you must either fork() or observe()
at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
at Array.forEach (native)
at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
at Module._compile (module.js:570:32)
at Object.Module._extensions..js (module.js:579:10)
at Module.load (module.js:487:32)
at tryModuleLoad (module.js:446:12)如何将流分成多个流?
我也尝试将调用链接起来,但是我只得到了一个流的结果:
partners.forEach((partner: string) => {
console.log(partner);
const partnerStream = sourceStream
.fork()
.filter((item: BaseData) => {
return item.id === partner;
});
partnerStream.each((item: BaseData) => {
console.log(item);
});
});只印刷:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar而不是预期的:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}这也可能是我误解fork的全部原因。根据它的文档条目
Stream.fork()对流进行分叉,允许您在共享的背压下添加额外的使用者。向多个消费者分叉的流只会以最慢的使用者所能处理的速度从其源中提取值。 注意:不要依赖于叉子之间一致的执行顺序。此转换只保证所有分叉在任何分叉处理第二个值栏之前都会处理值foo。它不能保证分叉处理foo的顺序。 提示:在分叉中修改流值时要小心(或者使用这样做的库)。由于相同的值将传递给每个分叉,在它之后执行的任何叉子中都可以看到在一个叉子中所做的更改。添加了不一致的执行顺序,最终可能会出现一些微妙的数据损坏错误。如果需要修改任何值,则应复制并修改副本。 弃用警告:当前可以在使用流后(例如,通过转换)发送流。在下一个主要版本中,这将不再可能。如果你要分叉一个流,总是叫它叉子。
所以,而不是“如何岔开一条小溪?”我的实际问题可能是:如何将一条高地河流复制成不同的溪流?
发布于 2017-02-07 09:59:28
一个人必须记住,在所有的叉子被创造之前,不要消耗分叉的溪流。就像一个人消耗了一个分叉的流,它和它的“父”将被消耗,使任何后续的叉子从一个空的流中分叉。
const partnerStreams: Array<Stream<BaseData>> = [];
partners.forEach((partner: string) => {
const partnerStream = sourceStream
.fork()
.filter((item: BaseData) => {
return item.id === partner;
}
);
partnerStreams.push(partnerStream);
});
partnerStreams.forEach((stream, index) => {
console.log(index, stream);
stream.toArray((foo) => {
console.log(index, foo);
});
});它打印:
0 [ { id: 'foo', data: 'poit' }, { id: 'foo', data: 'fnord' } ]
1 [ { id: 'bar', data: 'narf' } ]发布于 2017-02-07 08:44:42
partnerStream.filter()返回一个新流。然后,您将再次使用partnerStream使用partnerStream.each(),而不调用fork()或observe()。因此,要么链接partnerStream.filter().each()调用,要么将partnerStream.filter()的返回值赋给一个变量,然后调用该变量的.each()。
https://stackoverflow.com/questions/42073925
复制相似问题