首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark NotSerializableException

Spark NotSerializableException
EN

Stack Overflow用户
提问于 2015-06-14 19:03:14
回答 6查看 2.7K关注 0票数 5

在我的Spark代码中,我尝试从csv文件创建一个IndexedRowMatrix。但是,我得到以下错误:

代码语言:javascript
复制
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext

下面是我的代码:

代码语言:javascript
复制
sc = new JavaSparkContext("local", "App",
              "/srv/spark", new String[]{"target/App.jar"});

JavaRDD<String> csv = sc.textFile("data/matrix.csv").cache();


JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(
              new  Function<scala.Tuple2<String, Long>, IndexedRow>() {
                /**
                 * 
                **/ 
                private static final long serialVersionUID = 4795273163954440089L;

                @Override
                public IndexedRow call(Tuple2<String, Long> tuple)
                        throws Exception {
                    String line = tuple._1;
                    long index = tuple._2;
                    String[] strings = line.split(",");
                    double[] doubles = new double[strings.length];
                     for (int i = 0; i < strings.length; i++) {
                         doubles[i] = Double.parseDouble(strings[i]);
                     }
                     Vector v = new DenseVector(doubles);
                     return new IndexedRow(index, v);
                }
            });
EN

回答 6

Stack Overflow用户

发布于 2016-02-26 04:59:07

我也有同样的问题。它驱使我绕过转弯。这是Java对匿名实例和可序列化的限制。我的解决方案是将函数的匿名实例声明为实现Serializable的命名静态类,并实例化它。我基本上声明了一个函数库,它是一个外部类,包含我想要使用的函数的静态内部类定义。

当然,如果您用Scala编写它,它将是一个代码更整洁的文件,但在这种情况下,这对您没有帮助。

票数 3
EN

Stack Overflow用户

发布于 2015-06-14 20:24:25

有些东西有点可疑,如果你给我们展示更多的代码,也许我们可以给出更好的答案。

无论如何,您可以尝试在表示您的映射器函数的单独文件中创建一个公共类:

代码语言:javascript
复制
public class Mapper implements Function<Tuple2<String,Long>, IndexedRow> {

  @Override
  public IndexedRow call(Tuple2<String, Long> tuple) throws Exception {
    String line = tuple._1();
    long index = tuple._2();
    String[] strings = line.split(",");
    double[] doubles = new double[strings.length];
    for (int i = 0; i < strings.length; i++) {
      doubles[i] = Double.parseDouble(strings[i]);
    }
    Vector v = new DenseVector(doubles);
    return new IndexedRow(index, v);
  }
}

然后用它来映射你的JavaRDD:

代码语言:javascript
复制
JavaRDD<String> csv = jsc.textFile("data/matrix.csv").cache();
JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(new Mapper());

这样,对于map()调用,Spark只需要序列化Mapper类,该类中没有任何不可序列化的属性。

然而,作业可能会因为其他我们不知道的原因而失败,因为我们看不到所有涉及的代码。

票数 2
EN

Stack Overflow用户

发布于 2015-08-05 22:25:49

当您遇到序列化问题时,添加以下参数总是很好的:-Dsun.io.serialization.extendedDebugInfo=true这样您就可以更准确地看到它失败的地方。

现在,以下是您的代码中可能发生的事情。JavaSparkContext确实是不可序列化的(原因很多,你可以在网上找到)。在您的代码中,您没有直接序列化它,但是您确实持有对它的引用,因为您的Function不是静态的,因此它持有对封闭类的引用。因此,当您发送映射时,基本上发生的是,它将尝试序列化包含不可序列化的JavaSparkContext的封闭类,这就是您的异常应该来自的地方。您可以尝试静态地重写此函数,或者将您的函数编写为非嵌套类,或者使JavaSparkContext成为本地函数,这样它就不会被序列化。

如果可能,我建议您采用最新的选项,原因很简单:最佳实践是在本地创建此JavaSparkContext,否则,由于您可能持有的类中的每个引用(有时很难找到),您将会遇到数百个不可序列化的问题。例如,您可以通过在主类中实例化JavaSparkContext来做到这一点:

代码语言:javascript
复制
public static void main(String[] args) {
   JavaSparkContext sc = new JavaSparkContext();

   // do whatever you need to do, if you need sc inside other classes,
   // store this sc into a static class, say Registry.set(sc) and Registry.getJSC()

   JavaRDD<String> csv = sc.textFile("data/matrix.csv").cache();
   JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(
          new  Function<scala.Tuple2<String, Long>, IndexedRow>() {
            private static final long serialVersionUID = 4795273163954440089L; // won't be serialized

            @Override
            public IndexedRow call(Tuple2<String, Long> tuple)
                    throws Exception {
                String line = tuple._1;
                long index = tuple._2;
                String[] strings = line.split(",");
                double[] doubles = new double[strings.length];
                 for (int i = 0; i < strings.length; i++) {
                     doubles[i] = Double.parseDouble(strings[i]);
                 }
                 Vector v = new DenseVector(doubles);
                 return new IndexedRow(index, v);
            }
        });
}

还要注意,静态字段并不与实例相关联,而是与类相关联,所以我认为您的serialVersionUID也没有被序列化(以防它在某个时候成为您的问题)。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30828595

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档