首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >星火群集在更大的输入上失败,对于小的输入很好。

星火群集在更大的输入上失败,对于小的输入很好。
EN

Stack Overflow用户
提问于 2013-05-30 09:26:25
回答 2查看 7.5K关注 0票数 11

我在玩星火。它是默认的,预置发行版(0.7.0)从网站,默认配置,集群模式,一个工人(我的本地主机)。我读了有关安装的文档,看起来一切都很好。

我有一个CSV文件(不同大小,1000-100万行)。如果我用小输入文件(例如1000行)运行我的应用程序,一切都很好,程序将在几秒钟内完成,并产生预期的输出。但是,当我提供一个更大的文件(100.000行,即100万行)时,执行失败。我试着挖掘原木,但没有多大帮助(它重复了整个过程约9-10次,然后退出失败。此外,还有一些错误与从某个空源获取失败有关)。

第一个JavaRDD返回的结果对我来说是可疑的。如果我返回一个硬编码的单例列表(比如res.add(“某某物”);返回res;),那么一切都很好,即使有一百万行。但是,如果我添加了我想要的所有键(28串长6-20个字符),进程只会在大输入下失败。问题是,我需要所有这些密钥,这是实际的业务逻辑。

我使用的是Linux amd64,四核,8GB内存。最新的甲骨文Java7 JDK。火花配置:

代码语言:javascript
复制
SPARK_WORKER_MEMORY=4g
SPARK_MEM=3g
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar

我必须指出,当我开始这项计划时,它说:

代码语言:javascript
复制
13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1)
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address

这是我的节目。它基于最小修改的JavaWordCount示例。

代码语言:javascript
复制
public final class JavaWordCount
{
    public static void main(final String[] args) throws Exception
    {
        final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
            System.getenv("SPARK_HOME"), new String[] {"....jar" });

        final JavaRDD<String> words = ctx.textFile(args[1], 1).flatMap(new FlatMapFunction<String, String>() {

            @Override
            public Iterable<String> call(final String s)
            {
                // parsing "s" as the line, computation, building res (it's a List<String>)
                return res;
            }
        });

        final JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(final String s)
            {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override
            public Integer call(final Integer i1, final Integer i2)
            {
                return i1 + i2;
            }
        });

        counts.collect();

        for (Tuple2<?, ?> tuple : counts.collect()) {
            System.out.println(tuple._1 + ": " + tuple._2);
        }
    }
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2013-05-30 10:56:00

通过将属性spark.mesos.coarse设置为true,我成功地修复了它。更多信息,这里

更新:我已经玩了几个小时了。这些设置对我有点帮助,但似乎几乎不可能在一台机器上处理1000万行文本。

代码语言:javascript
复制
System.setProperty("spark.serializer", "spark.KryoSerializer"); // kryo is much faster
System.setProperty("spark.kryoserializer.buffer.mb", "256"); // I serialize bigger objects
System.setProperty("spark.mesos.coarse", "true"); // link provided
System.setProperty("spark.akka.frameSize", "500"); // workers should be able to send bigger messages
System.setProperty("spark.akka.askTimeout", "30"); // high CPU/IO load

注意事项:增加帧大小似乎特别有助于防止:org.apache.spark.SparkException: Error communicating with MapOutputTracker

票数 13
EN

Stack Overflow用户

发布于 2014-08-15 02:40:38

在较新的火花版本中,应使用:

代码语言:javascript
复制
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

根据http://spark.apache.org/docs/latest/tuning.html#data-serialization

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/16832429

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档