我有个琐碎的星火计划。我已经将输入缩减为一个文件,其中包含一行。所以我相信这不是传统的记忆压力。
Exception in thread "main" com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 32749568, required: 34359296
at com.esotericsoftware.kryo.io.Output.require(Output.java:138)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at carbonite.serializer$write_map.invoke(serializer.clj:69)我可以设置spark.kryoserializer.buffer.mb,但我认为我只是推迟了这个问题。我想了解它。
我不认为这个计划有什么不合标准的地方。如果我删除一行(似乎是随机的),错误就会消失。
看来我达到了某种固定的极限。但事实上,我的输入文件非常小,我正在做的唯一操作是可预测的maps和reduceByKey,我怀疑还有其他问题。
我使用的是Flambo Clojure 0.4.0库(但我不认为这是造成这种情况的原因)和Spark 2.10。
这是一个最小的工作例子。对不起,有点含糊不清,但我已经把所有无关的东西都去掉了。
(ns mytest.core
(:require [flambo.conf :as conf])
(:require [flambo.api :as f]))
(def sc (f/spark-context (-> (conf/spark-conf)
(conf/master "local")
(conf/app-name "test")
(conf/set "spark.driver.memory" "1g")
(conf/set "spark.executor.memory" "1g"))))
(defn -main
[& args]
(let [logfile (f/text-file sc "file://tmp/one-line-file")
a (f/map logfile (f/fn [u] nil))
b (f/map logfile (f/fn [u] nil))
c (f/map logfile (f/fn [u] nil))
d (f/map logfile (f/fn [u] nil))
e (f/map logfile (f/fn [u] nil))
g (f/map logfile (f/fn [u] nil))
h (f/map logfile (f/fn [u] nil))
i (f/map logfile (f/fn [u] nil))
j (f/map logfile (f/fn [u] nil))
k (f/map logfile (f/fn [u] nil))
l (f/map logfile (f/fn [u] nil))
m (f/map logfile (f/fn [u] nil))
n (f/map logfile (f/fn [u] nil))
o (f/map logfile (f/fn [u] nil))
p (f/map logfile (f/fn [u] nil))
q (f/map logfile (f/fn [u] nil))
r (f/map logfile (f/fn [u] nil))
s (f/map logfile (f/fn [u] nil))
t (f/map logfile (f/fn [u] nil))
]))编辑
如果我将其分割成两个块并重新创建延迟文件流,它就会工作:
(defn get-inputs []
(f/text-file sc "file://tmp/one-line-file"))
(defn -main
[& args]
(let [logfile (get-inputs)
a (f/map logfile (f/fn [u] nil))
b (f/map logfile (f/fn [u] nil))
c (f/map logfile (f/fn [u] nil))
d (f/map logfile (f/fn [u] nil))
e (f/map logfile (f/fn [u] nil))
g (f/map logfile (f/fn [u] nil))
h (f/map logfile (f/fn [u] nil))
i (f/map logfile (f/fn [u] nil))])
(let [logfile (get-inputs)
j (f/map logfile (f/fn [u] nil))
k (f/map logfile (f/fn [u] nil))
l (f/map logfile (f/fn [u] nil))
m (f/map logfile (f/fn [u] nil))
n (f/map logfile (f/fn [u] nil))
o (f/map logfile (f/fn [u] nil))
p (f/map logfile (f/fn [u] nil))
q (f/map logfile (f/fn [u] nil))
r (f/map logfile (f/fn [u] nil))
s (f/map logfile (f/fn [u] nil))
t (f/map logfile (f/fn [u] nil))]))在Java中,这相当于创建两个本地作用域(例如,两个单独的方法)。get-inputs只是一个返回新构造的文本文件对象的方法。
我认为textFile方法将创建一个可以(重新)多次读取的惰性流,因此这两个示例不应该有太大的不同。
发布于 2014-12-11 10:39:44
将此添加到星火上下文conf中:
conf.set("spark.kryoserializer.buffer.mb","128")https://stackoverflow.com/questions/27403732
复制相似问题