我为Apache创建了一个简单的作业,它使用了Gelly提供的PageRank实现。
在本地,在IDE中运行,一切都很好。但是,我尝试使用JobManager web接口将带有作业的JAR提交到机器中运行的Flink实例。但是,Flink没有为作业获得正确的计划并执行PageRank,而是呈现并执行一个非常奇怪的计划,该计划只计算图的顶点数。
我做了一些研究和调试,发现Gelly提供的PageRank实现开始计算图的顶点数,当它没有作为算法的参数时:
if (numberOfVertices == 0) {
numberOfVertices = network.numberOfVertices();
}此计算意味着嵌入作业。由于运算符是懒惰的,因此不会触发计算。在Flink服务器中,第一件事是获得作业计划。这是由一个特殊的环境OptimizerPlanEnvironment完成的,它提供了以下result方法:
public JobExecutionResult execute(String jobName) throws Exception {
Plan plan = createProgramPlan(jobName);
this.optimizerPlan = compiler.compile(plan);
// do not go on with anything now!
throw new ProgramAbortException();
}问题就在这里。一旦抛出ProgramAbortException,程序就返回到目前为止计算出来的计划。但是只计算了内部作业计划,因此从不计算或执行主作业计划。
这是我使用的代码:
public class Job {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Double, Double> graph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new VertexInit(), env);
graph.run(new PageRank<Long>(0.85, 10)).print();
}
private static class VertexInit implements MapFunction<Long, Double> {
@Override
public Double map(Long value) throws Exception { return 1.0; }
}
}如果提供顶点数,例如graph.run(new PageRank<Long>(0.85, 5, 10)),就没有问题,计划被正确计算,PageRank被计算出来。
我的问题是:我做错了什么?或者这是Flink中的一个真正的bug?
发布于 2016-03-14 10:18:49
问题是,正如您已经说过的,network.numberOfVertices内部调用顶点数据集上的count。这将触发一个独立的Flink作业,该作业计算计数值。此值通常将由main方法检索。但是,在web客户端提交的情况下,由于OptimizerPlanEnvironment只允许编译单个Flink作业,这是行不通的。该行为类似于分离的执行模式,该模式也不支持急切的计划执行。
这是目前Flink的网络客户端的一个限制。造成这种行为的原因是Flink不想阻塞Netty通道处理程序线程,这是等待count操作结果所必需的。阻塞操作将使线程池饥饿,并使此会话的web接口在解除阻塞之前没有响应。
https://stackoverflow.com/questions/35977601
复制相似问题