首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >沉到java列表可能与Hazelcast喷气机?

沉到java列表可能与Hazelcast喷气机?
EN

Stack Overflow用户
提问于 2018-07-12 20:46:01
回答 2查看 199关注 0票数 1

我有一个帐户列表,并对ticks执行哈希连接,并使用ticks数据返回帐户。但是在哈希联接之后,我有了drainTo lListJet,然后用DistributedStream读取它并返回它。

代码语言:javascript
复制
public List<Account> populateTicksInAccounts(List<Account> accounts) {
    ...
    ...
    Pipeline p = Pipeline.create();
    BatchSource<Tick> ticksSource = Sources.list(TICKS_LIST_NAME);
    BatchSource<Account> accountSource = Sources.fromProcessor(AccountProcessor.of(accounts));

    p.drawFrom(ticksSource)
        .hashJoin(p.drawFrom(accountSource), JoinClause.joinMapEntries(Tick::getTicker), accountMapper())
        .drainTo(Sinks.list(TEMP_LIST));

    jet.newJob(p).join();
    IListJet<Account> list = jet.getList(TEMP_LIST);
    return DistributedStream.fromList(list).collect(DistributedCollectors.toIList());
}

在执行哈希联接之后,drainTo可以用java List代替lListJet吗?

像下面这样的事情是可能的?

代码语言:javascript
复制
IListJet<Account> accountWithTicks = new ArrayList<>();
p.drawFrom(ticksSource)
    .hashJoin(p.drawFrom(accountSource), JoinClause.joinMapEntries(Tick::getTicker), accountMapper())
    .drainTo(<CustomSinkProcessor(accountWithTicks)>);
return accountWithTicks;

在CustomSinkProcessor中,哪里会有空的java列表并返回帐户?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-07-13 06:54:59

请记住,提交到Jet执行的代码在您提交它的过程之外运行。虽然理论上可以提供您所要求的API,但在幕后,它只需执行一些技巧就可以对集群的每个成员运行代码,让所有成员将其结果发送到一个位置,并填写一个列表返回给您。这将违背分布式计算的本质。

如果您认为它将有助于代码的可读性,则可以编写一个助手方法,如下所示:

代码语言:javascript
复制
public <T, R> List<R> drainToList(GeneralStage<T> stage) {
    String tmpListName = randomListName();
    SinkStage sinkStage = stage.drainTo(Sinks.list(tmpListName));
    IListJet<R> tmpList = jet.getList(tmpListName);
    try {
        jet.newJob(sinkStage.getPipeline()).join();
        return new ArrayList<>(tmpList);
    } finally {
        tmpList.destroy();
    }
}

特别要注意这一行

代码语言:javascript
复制
return new ArrayList<>(tmpList);

而不是你的

代码语言:javascript
复制
IListJet<Account> list = jet.getList(TEMP_LIST);
return DistributedStream.fromList(list).collect(DistributedCollectors.toIList());

这只是将一个Hazelcast列表复制到另一个列表并返回一个句柄。现在,您已经在Jet集群中泄露了两个列表。当你停止使用它们时,它们不会自动消失。

即使是我提供的代码也可能存在漏洞。运行它的JVM进程可以在Job.join()期间死亡,而不会到达finally。然后,暂时的名单还在继续。

票数 2
EN

Stack Overflow用户

发布于 2018-07-13 06:00:16

不,不是,因为喷气机的分布性质。接收器将在多个并行处理器(工作人员)中执行。它不能添加到普通的Collection中。接收器必须能够在多个群集成员上插入项。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51314175

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档