我收到一条奇怪的错误信息
15/01/26 13:05:12 INFO spark.SparkContext: Created broadcast 0 from wholeTextFiles at NativeMethodAccessorImpl.java:-2
Traceback (most recent call last):
File "/home/user/inverted-index.py", line 78, in <module>
print sc.wholeTextFiles(data_dir).flatMap(update).top(10)#groupByKey().map(store)
File "/home/user/spark2/python/pyspark/rdd.py", line 1045, in top
return self.mapPartitions(topIterator).reduce(merge)
File "/home/user/spark2/python/pyspark/rdd.py", line 715, in reduce
vals = self.mapPartitions(func).collect()
File "/home/user/spark2/python/pyspark/rdd.py", line 676, in collect
bytesInJava = self._jrdd.collect().iterator()
File "/home/user/spark2/python/pyspark/rdd.py", line 2107, in _jrdd
pickled_command = ser.dumps(command)
File "/home/user/spark2/python/pyspark/serializers.py", line 402, in dumps
return cloudpickle.dumps(obj, 2)
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 816, in dumps
cp.dump(obj)
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 133, in dump
return pickle.Pickler.dump(self, obj)
File "/usr/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
self.save_function_tuple(obj, [themodule])
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 600, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
save(x)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
self.save_function_tuple(obj, [themodule])
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 600, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
save(x)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
self.save_function_tuple(obj, [themodule])
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 600, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
save(tmp[0])
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 249, in save_function
self.save_function_tuple(obj, modList)
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 309, in save_function_tuple
save(f_globals)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
pickle.Pickler.save_dict(self, obj)
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
pickle.Pickler.save_dict(self, obj)
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
pickle.Pickler.save_dict(self, obj)
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
pickle.Pickler.save_dict(self, obj)
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 547, in save_inst
self.save_inst_logic(obj)
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 537, in save_inst_logic
save(stuff)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
pickle.Pickler.save_dict(self, obj)
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 547, in save_inst
self.save_inst_logic(obj)
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 537, in save_inst_logic
save(stuff)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
pickle.Pickler.save_dict(self, obj)
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 616, in save_reduce
save(cls)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 467, in save_global
d),obj=obj)
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 631, in save_reduce
save(args)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
pickle.Pickler.save_dict(self, obj)
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 616, in save_reduce
save(cls)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 442, in save_global
raise pickle.PicklingError("Can't pickle builtin %s" % obj)
pickle.PicklingError: Can't pickle builtin <type 'method_descriptor'>我的更新函数返回一个(key, (value1, value2))类型的元组列表,它们都是字符串,如下所示:
def update(doc):
doc_id = doc[0][path_len:-ext_len] #actual file name
content = doc[1].lower()
new_fi = regex.split(content)
old_fi = fi_table.row(doc_id)
fi_table.put(doc_id, {'cf:col': ",".join(new_fi)})
if not old_fi:
return [(term, ('add', doc_id)) for term in new_fi]
else:
new_fi = set(new_fi)
old_fi = set(old_fi['cf:col'].split(','))
return [(term, ('add', doc_id)) for term in new_fi - old_fi] + \
[(term, ('del', doc_id)) for term in old_fi - new_fi]编辑:问题在于这两个hbase函数、行和put。当我对它们进行注释时,这两种代码都可以工作(将old_fi设置为一个空字典),但如果其中一个运行,则会产生上述错误。我在python中使用粒子基来操作hbase。有人能解释我出了什么问题吗?
发布于 2015-03-09 13:10:23
Spark试图序列化connect对象,以便它可以在执行器中使用,这肯定会失败,因为反序列化的db connect对象不能将读/写权限授予另一个范围(甚至计算机)。这个问题可以通过广播connect对象来再现。对于这个实例,序列化i/o对象时出现了问题。
通过连接到地图函数中的数据库,部分解决了这个问题。由于map函数中的每个RDD元素都有太多的连接,所以我不得不切换到分区处理,以便将db连接从20k减少到大约8-64个(基于分区的数量)。Spark开发人员应该考虑为执行器创建一个初始化函数/脚本,以避免这种死胡同问题。
假设我让每个节点执行这个init函数,那么每个节点都将连接到数据库(一些conn池,或者单独的zookeeper节点),因为init函数和map函数将共享相同的作用域,然后问题就消失了,所以您编写的代码比我找到的解决方案要快。在执行结束时,火花将释放/卸载这些定义的变量,程序将结束。
发布于 2015-01-26 07:31:10
如果这确实是MethodDescriptorType的泡菜问题,您可以注册如何对MethodDescriptorType进行腌制,如下所示:
def _getattr(objclass, name, repr_str):
# hack to grab the reference directly
try:
attr = repr_str.split("'")[3]
return eval(attr+'.__dict__["'+name+'"]')
except:
attr = getattr(objclass,name)
if name == '__dict__':
attr = attr[name]
return attar
def save_wrapper_descriptor(pickler, obj):
pickler = Pickler(file, protocol)
pickler.save_reduce(_getattr, (obj.__objclass__, obj.__name__,
obj.__repr__()), obj=obj)
return
# register the following "type" with:
# Pickler.dispatch[MethodDescriptorType] = save_wrapper_descriptor
MethodDescriptorType = type(type.__dict__['mro'])然后,如果将上面的内容注册到spark使用的酸洗调度表(如上面所示,或使用copy_reg),则可能会避免酸洗错误。
https://stackoverflow.com/questions/28142578
复制相似问题