我想通过以下代码在本地使用Flink-API读取CSV文件:
csvPath="data/weather.csv";
List<Tuple2<String, Double>> csv= env.readCsvFile(csvPath)
.types(String.class,Double.class).collect();我尝试了一些不同大小的文件(从800mb到6 6gb)。由于以下超时异常,操作有时成功完成,有时不成功:
Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.ready(package.scala:169)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.shutdown(FlinkMiniCluster.scala:439)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:408)
at org.apache.flink.client.LocalExecutor.stop(LocalExecutor.java:127)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:195)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:923)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.simpleCSV.run(simpleCSV.java:83)我如何解决这个问题?是否以编程方式增加此超时时间?或者我应该在某个地方放一个配置文件?是否有一个特定的堆大小,我应该根据文件大小来设置?
发布于 2017-01-11 20:41:39
collect()将数据从群集传输到本地客户端。这只适用于非常小的数据集(< 10MB)。
如果您有更大的数据集,您需要在集群上处理它们,并通过输出格式发出结果,例如,将其写入文件。
发布于 2017-10-25 14:20:17
如果您正在调试这个程序,您可以在org.apache.flink.api.java.LocalEnvironment的构造函数(带有config的构造函数)上设置一个断点,然后运行以下命令将超时时间更改为200秒( IntelliJ Idea中的Alt+F8):
config.setString("akka.ask.timeout", "200 s") 要在IntelliJ Idea中找到LocalEnvironment类,请按Ctr+n,并选中"Include non-project classes in the pop“,然后在编辑框中键入"LocalEnvironment”。
https://stackoverflow.com/questions/41589732
复制相似问题