首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >第二次计算中第一次计算的重用结果

第二次计算中第一次计算的重用结果
EN

Stack Overflow用户
提问于 2015-04-30 14:15:36
回答 2查看 489关注 0票数 4

我试图用Flink编写一个计算,它需要两个阶段。

在第一阶段,我从文本文件开始,并执行一些参数估计,从而获得一个表示数据统计模型的Java对象。

在第二阶段,我想使用这个对象来生成模拟数据。

我不知道该怎么做。我尝试使用LocalCollectionOutputFormat,它在本地工作,但是当我在集群上部署作业时,我得到了一个NullPointerException --这并不奇怪。

做这件事的方式是什么?

这是我的代码:

代码语言:javascript
复制
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
GlobalConfiguration.includeConfiguration(configuration);

// Phase 1: read file and estimate model
DataSource<Tuple4<String, String, String, String>> source = env
        .readCsvFile(args[0])
        .types(String.class, String.class, String.class, String.class);

List<Tuple4<Bayes, Bayes, Bayes, Bayes>> bayesResult = new ArrayList<>();
// Processing here...
....output(new LocalCollectionOutputFormat<>(bayesResult));

env.execute("Bayes");

DataSet<BTP> btp = env
        .createInput(new BayesInputFormat(bayesResult.get(0)))
// Phase 2: BayesInputFormat generates data for further calculations
// ....

这是我得到的例外:

代码语言:javascript
复制
Error: The program execution failed: java.lang.NullPointerException
    at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
    at java.lang.Thread.run(Thread.java:745)

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.NullPointerException
    at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
    at java.lang.Thread.run(Thread.java:745)

    at org.apache.flink.client.program.Client.run(Client.java:328)
    at org.apache.flink.client.program.Client.run(Client.java:294)
    at org.apache.flink.client.program.Client.run(Client.java:288)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:55)
    at it.list.flink.test.Test01.main(Test01.java:62)
    ...
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-04-30 14:50:46

随着最新的版本(0.9-里程碑-1),collect()方法被添加到Flink中。

代码语言:javascript
复制
public List<T> collect()

DataSet<T>作为List<T>获取到驱动程序。collect()还将触发程序的立即执行(不需要调用ExecutionEnvironment.execute())。现在,数据集的大小限制在10 MB左右。

如果不对驱动程序中的模型进行评估,还可以将两个程序链接在一起,并通过附加数据接收器将模型发送到侧面。这将更有效,因为数据不会在客户端机器上执行往返操作。

票数 3
EN

Stack Overflow用户

发布于 2015-05-01 16:30:29

如果在0.9之前使用Flink,则可以使用以下代码段将数据集收集到本地集合:

代码语言:javascript
复制
val dataJavaList = new ArrayList[K]
val outputFormat = new LocalCollectionOutputFormat[K](dataJavaList)
dataset.output(outputFormat)
env.execute("collect()")

其中K是要收集的对象的类型。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/29970164

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档