我想要执行"execute“方法,所以为了避免对Spark的惰性计算,我想做一个动作(saveAsTextFile),如代码所示:
def execute(line1):
line = line1.split(',')
print('Hi')
session = driver.session()
#vérifions si les noeuds n'existent pas encore et si oui créons les
session.run("MERGE (n:Person {Tel: {v1} }) MERGE (m:Person {Tel: {v5}}) CREATE (n)-[:EMIT]->(c:Call {location:{v2}, start:{v3}, duration:{v4}})-[:RECEIVE]->(m) ", {'v1':line[0], 'v2':line[1], 'v3':line[3], 'v4':line[4], 'v5':line[5]})
session.close()
return line
def toCSVLine(data):
return ','.join(str(d) for d in data)
if __name__ == '__main__':
sc = SparkContext()
csvData = sc.textFile(sys.argv[1]).cache()
csvData.map(execute).map(toCSVLine).saveAsTextFile("doc")但我面临以下错误:
File "/home/josyanne/Documents/test/./appSpark.py", line 21, in <module>
csvData.map(execute).map(toCSVLine).saveAsTextFile("doc")
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1585, in saveAsTextFile
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2489, in _jrdd
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2422, in _wrap_function
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2408, in _prepare_for_python_RDD
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 568, in dumps
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 918, in dumps
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 249, in dump
_pickle.PicklingError: Could not serialize object: TypeError: Cannot serialize socket object请有人帮我解决这个错误
发布于 2018-08-09 15:31:50
必须在每个节点上初始化驱动程序。为此,您应该使用mapPartitions或foreachPartition (如果您仅使用saveAsTextFile来评估事物:
def execute(lines):
driver = GraphDatabase.driver(...)
for line in lines:
line = line.split(',')
session = driver.session()
session.run(...)
session.close()
yield line
csvData.mapPartitions(execute).map(toCSVLine).saveAsTextFile("doc")https://stackoverflow.com/questions/51760495
复制相似问题