我有一个帐户列表,并对ticks执行哈希连接,并使用ticks数据返回帐户。但是在哈希联接之后,我有了drainTo lListJet,然后用DistributedStream读取它并返回它。
public List<Account> populateTicksInAccounts(List<Account> accounts) {
...
...
Pipeline p = Pipeline.create();
BatchSource<Tick> ticksSource = Sources.list(TICKS_LIST_NAME);
BatchSource<Account> accountSource = Sources.fromProcessor(AccountProcessor.of(accounts));
p.drawFrom(ticksSource)
.hashJoin(p.drawFrom(accountSource), JoinClause.joinMapEntries(Tick::getTicker), accountMapper())
.drainTo(Sinks.list(TEMP_LIST));
jet.newJob(p).join();
IListJet<Account> list = jet.getList(TEMP_LIST);
return DistributedStream.fromList(list).collect(DistributedCollectors.toIList());
}在执行哈希联接之后,drainTo可以用java List代替lListJet吗?
像下面这样的事情是可能的?
IListJet<Account> accountWithTicks = new ArrayList<>();
p.drawFrom(ticksSource)
.hashJoin(p.drawFrom(accountSource), JoinClause.joinMapEntries(Tick::getTicker), accountMapper())
.drainTo(<CustomSinkProcessor(accountWithTicks)>);
return accountWithTicks;在CustomSinkProcessor中,哪里会有空的java列表并返回帐户?
发布于 2018-07-13 06:54:59
请记住,提交到Jet执行的代码在您提交它的过程之外运行。虽然理论上可以提供您所要求的API,但在幕后,它只需执行一些技巧就可以对集群的每个成员运行代码,让所有成员将其结果发送到一个位置,并填写一个列表返回给您。这将违背分布式计算的本质。
如果您认为它将有助于代码的可读性,则可以编写一个助手方法,如下所示:
public <T, R> List<R> drainToList(GeneralStage<T> stage) {
String tmpListName = randomListName();
SinkStage sinkStage = stage.drainTo(Sinks.list(tmpListName));
IListJet<R> tmpList = jet.getList(tmpListName);
try {
jet.newJob(sinkStage.getPipeline()).join();
return new ArrayList<>(tmpList);
} finally {
tmpList.destroy();
}
}特别要注意这一行
return new ArrayList<>(tmpList);而不是你的
IListJet<Account> list = jet.getList(TEMP_LIST);
return DistributedStream.fromList(list).collect(DistributedCollectors.toIList());这只是将一个Hazelcast列表复制到另一个列表并返回一个句柄。现在,您已经在Jet集群中泄露了两个列表。当你停止使用它们时,它们不会自动消失。
即使是我提供的代码也可能存在漏洞。运行它的JVM进程可以在Job.join()期间死亡,而不会到达finally。然后,暂时的名单还在继续。
发布于 2018-07-13 06:00:16
不,不是,因为喷气机的分布性质。接收器将在多个并行处理器(工作人员)中执行。它不能添加到普通的Collection中。接收器必须能够在多个群集成员上插入项。
https://stackoverflow.com/questions/51314175
复制相似问题