首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法使用spark pickle.PicklingError:无法序列化对象写入pickle.PicklingError

无法使用spark pickle.PicklingError:无法序列化对象写入pickle.PicklingError
EN

Stack Overflow用户
提问于 2017-06-02 05:03:42
回答 1查看 1K关注 0票数 0

我试图在执行MySQL (Test)时调用的分区任务中使用spark ()函数写入一个foreachPartitions表。但是我收到了一个采摘错误。

我不确定这个问题是否是由于已经在任务内部的火花所引起的,并且spark将write.jdbc()作为任务本身运行。据我理解这是不允许的?我可以从test()函数返回列表“行”,并在main中调用write.jdbc(),但我不希望将数据结构收集回主。代码和错误:

代码:

代码语言:javascript
复制
def test(partition_iter):
    row = []
    row.append({'col1': 26, 'col2': 12, 'col2': 153.49353894392, 'col4': 1})
    df_row = SPARK.createDataFrame(row)
    df_row.write.jdbc(url="jdbc:mysql://rds-url/db_name", table="db_name", properties={"driver":"com.mysql.jdbc.Driver","user":"user", "password":"password"}, mode="append")

def main():
    SPARK.sparkcontext.parallelize([1, 2, 3, 4]).foreachPartition(test)

main()

错误:

代码语言:javascript
复制
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 107, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib64/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 568, in save_tuple
    save(element)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 214, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 251, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 554, in save_tuple
    save(element)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
    save(tmp[0])
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 208, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 256, in save_function_tuple
    save(f_globals)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 692, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 563, in save_reduce
    save(state)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 323, in get_return_value
    format(target_id, ".", name, value))
Py4JError: An error occurred while calling o47.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)


Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in main
  File "/usr/lib/spark/python/pyspark/rdd.py", line 809, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2439, in _jrdd
    self._jrdd_deserializer, profiler)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2372, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2358, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/usr/lib/spark/python/pyspark/serializers.py", line 440, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 667, in dumps
    cp.dump(obj)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 120, in dump
    raise pickle.PicklingError(msg)
pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o47.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745) 
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-06-02 09:50:31

问题是,您正在从任务中调用星火上下文,这是不允许的。

编辑以下评论:

有两种方法可以对工作进行分区,从而创建要写入DB的许多行:

代码语言:javascript
复制
# use mapPartitions to get an RDD of rows

def test(partition_iter):
    rows = [{'col1': 26, 'col2': 12, 'col2': 153.49353894392, 'col4': 1} for i in partition_iter]
    return rows

def main():
    rows = SPARK.sparkcontext.parallelize([1, 2, 3, 4]).mapPartitions(test)

    df = SPARK.createDataFrame(rows)
    df.write.jdbc(url="jdbc:mysql://rds-url/db_name", table="db_name", properties={"driver":"com.mysql.jdbc.Driver","user":"user", "password":"password"}, mode="append")

main()

或者:

代码语言:javascript
复制
# use pymysql in each partition instead of spark sql as per your original attempt

def test(partition_iter):
    row = []
    row.append({'col1': 26, 'col2': 12, 'col2': 153.49353894392, 'col4': 1})
    # TODO: pymysql code goes here:
    # mysqldriver = ???

def main():
    SPARK.sparkcontext.parallelize([1, 2, 3, 4]).foreachPartition(test)

main()
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44321158

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档