首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >由于PicklingError,Pyspark操作未执行

由于PicklingError,Pyspark操作未执行
EN

Stack Overflow用户
提问于 2018-08-09 14:49:21
回答 1查看 133关注 0票数 1

我想要执行"execute“方法,所以为了避免对Spark的惰性计算,我想做一个动作(saveAsTextFile),如代码所示:

代码语言:javascript
复制
def execute(line1):
    line = line1.split(',')    
    print('Hi')
    session = driver.session()
    #vérifions si les noeuds n'existent pas encore et si oui créons les
    session.run("MERGE (n:Person {Tel: {v1} }) MERGE (m:Person {Tel: {v5}}) CREATE (n)-[:EMIT]->(c:Call {location:{v2}, start:{v3}, duration:{v4}})-[:RECEIVE]->(m) ", {'v1':line[0], 'v2':line[1], 'v3':line[3], 'v4':line[4], 'v5':line[5]})    
    session.close()
    return line

def toCSVLine(data):
    return ','.join(str(d) for d in data)

if __name__ == '__main__':
    sc = SparkContext()
    csvData = sc.textFile(sys.argv[1]).cache()    
    csvData.map(execute).map(toCSVLine).saveAsTextFile("doc")

但我面临以下错误:

代码语言:javascript
复制
 File "/home/josyanne/Documents/test/./appSpark.py", line 21, in <module>
csvData.map(execute).map(toCSVLine).saveAsTextFile("doc")
 File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1585, in saveAsTextFile
 File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2489, in _jrdd
 File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2422, in _wrap_function
 File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2408, in _prepare_for_python_RDD
 File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 568, in dumps
 File "/usr/local/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 918, in dumps
 File "/usr/local/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 249, in dump
_pickle.PicklingError: Could not serialize object: TypeError: Cannot serialize socket object

请有人帮我解决这个错误

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-08-09 15:31:50

必须在每个节点上初始化驱动程序。为此,您应该使用mapPartitionsforeachPartition (如果您仅使用saveAsTextFile来评估事物:

代码语言:javascript
复制
def execute(lines):
    driver = GraphDatabase.driver(...)

    for line in lines:
        line = line.split(',')    
        session = driver.session()
        session.run(...)
        session.close()
        yield line

csvData.mapPartitions(execute).map(toCSVLine).saveAsTextFile("doc")
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51760495

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档