首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用highland.js分流?

如何使用highland.js分流?
EN

Stack Overflow用户
提问于 2017-02-06 17:38:34
回答 2查看 691关注 0票数 1

我有一个由sourceStream对象组成的BaseData

我想将这个流分叉到不同流的n-amount中,然后过滤和转换每个BaseData对象。

最后,我希望n流只包含特定类型,分叉流的长度可能会有所不同,因为将来可能会删除或添加数据。

我想我可以通过fork来设置这个

代码语言:javascript
复制
import * as _ from 'highland';

interface BaseData {
    id: string;
    data: string;
}

const sourceStream = _([
    {id: 'foo', data: 'poit'},
    {id: 'foo', data: 'fnord'},
    {id: 'bar', data: 'narf'}]);

const partners = [
    'foo',
    'bar',
];

partners.forEach((partner: string) => {
    const partnerStream = sourceStream.fork();

    partnerStream.filter((baseData: BaseData) => {
        return baseData.id === partner;
    });

    partnerStream.each(console.log);
});

我希望现在有两个流,foo-stream包含两个元素:

代码语言:javascript
复制
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }

和包含一个元素的bar-stream:

代码语言:javascript
复制
{ id: 'bar', data: 'narf' }

然而,我却得到了一个错误:

代码语言:javascript
复制
/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
        throw new Error(
        ^

Error: Stream already being consumed, you must either fork() or observe()
    at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
    at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
    at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
    at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
    at Array.forEach (native)
    at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
    at Module._compile (module.js:570:32)
    at Object.Module._extensions..js (module.js:579:10)
    at Module.load (module.js:487:32)
    at tryModuleLoad (module.js:446:12)

如何将流分成多个流?

我也尝试将调用链接起来,但是我只得到了一个流的结果:

代码语言:javascript
复制
partners.forEach((partner: string) => {
    console.log(partner);
    const partnerStream = sourceStream
        .fork()
        .filter((item: BaseData) => {
            return item.id === partner;
        });

    partnerStream.each((item: BaseData) => {
        console.log(item);
    });
});

只印刷:

代码语言:javascript
复制
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar

而不是预期的:

代码语言:javascript
复制
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}

这也可能是我误解fork的全部原因。根据它的文档条目

Stream.fork()对流进行分叉,允许您在共享的背压下添加额外的使用者。向多个消费者分叉的流只会以最慢的使用者所能处理的速度从其源中提取值。 注意:不要依赖于叉子之间一致的执行顺序。此转换只保证所有分叉在任何分叉处理第二个值栏之前都会处理值foo。它不能保证分叉处理foo的顺序。 提示:在分叉中修改流值时要小心(或者使用这样做的库)。由于相同的值将传递给每个分叉,在它之后执行的任何叉子中都可以看到在一个叉子中所做的更改。添加了不一致的执行顺序,最终可能会出现一些微妙的数据损坏错误。如果需要修改任何值,则应复制并修改副本。 弃用警告:当前可以在使用流后(例如,通过转换)发送流。在下一个主要版本中,这将不再可能。如果你要分叉一个流,总是叫它叉子。

所以,而不是“如何岔开一条小溪?”我的实际问题可能是:如何将一条高地河流复制成不同的溪流?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-02-07 09:59:28

一个人必须记住,在所有的叉子被创造之前,不要消耗分叉的溪流。就像一个人消耗了一个分叉的流,它和它的“父”将被消耗,使任何后续的叉子从一个空的流中分叉。

代码语言:javascript
复制
const partnerStreams: Array<Stream<BaseData>> = [];

partners.forEach((partner: string) => {
    const partnerStream = sourceStream
        .fork()
        .filter((item: BaseData) => {
            return item.id === partner;
         }
    );

    partnerStreams.push(partnerStream);
});

partnerStreams.forEach((stream, index) => {
    console.log(index, stream);
    stream.toArray((foo) => {
        console.log(index, foo);
    });
});

它打印:

代码语言:javascript
复制
0 [ { id: 'foo', data: 'poit' }, { id: 'foo', data: 'fnord' } ]
1 [ { id: 'bar', data: 'narf' } ]
票数 0
EN

Stack Overflow用户

发布于 2017-02-07 08:44:42

partnerStream.filter()返回一个新流。然后,您将再次使用partnerStream使用partnerStream.each(),而不调用fork()observe()。因此,要么链接partnerStream.filter().each()调用,要么将partnerStream.filter()的返回值赋给一个变量,然后调用该变量的.each()

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42073925

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档