我正在尝试并行化一些工作,这些工作在我的mac上运行(MacOS10.7下的Pyton 3.2.2 ),但在Linux集群上运行时出现以下错误,我在那里获得了4个内核并访问了Python 3.2。错误消息会一直持续,直到我手动中断执行。
Exception in thread Thread-2:
Traceback (most recent call last):
File "/n/sw/python-3.2/lib/python3.2/threading.py", line 736, in _bootstrap_inner
self.run()
File "/n/sw/python-3.2/lib/python3.2/threading.py", line 689, in run
self._target(*self._args, **self._kwargs)
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/pool.py", line 338, in _handle_tasks
put(task)
_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed
Process PoolWorker-2:
Process PoolWorker-4:
Traceback (most recent call last):
Traceback (most recent call last):
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
Process PoolWorker-1:
Traceback (most recent call last):
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
Process PoolWorker-12:
Traceback (most recent call last):
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
self.run()
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/pool.py", line 102, in worker
Process PoolWorker-11:
Traceback (most recent call last):
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
self.run()
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/pool.py", line 102, in worker当然,作为参考,下面是我的代码的一部分。我不明白这个multiprocessing.Pool调用应该如何导致这些错误,特别是。编号大于4个进程的PoolWorkers。谢谢你的任何想法!
import csv
import networkx as nx
import time
import shutil
import datetime
import pydot
import os
import re
import logging
from operator import itemgetter
import numpy as np
from multiprocessing import Pool
import itertools
# Dictionary for edge attributes in projected graph:
# 0: overlap_length
# 1: overlap_start
# 2: overlap_end
# 3: cell
# 4: level
def chunks(l,n):
"""Divide a list of nodes `l` in `n` chunks"""
l_c = iter(l)
while 1:
x = tuple(itertools.islice(l_c,n))
if not x:
return
yield x
def overlaps(G,B,u,nbrs2):
l = []
for v in nbrs2:
for mutual_cell in set(B[u]) & set(B[v]):
for uspell in B.get_edge_data(u,mutual_cell).values():
ustart = uspell[1]
uend = uspell[2]
for vspell in B.get_edge_data(v,mutual_cell).values():
vstart = vspell[1]
vend = vspell[2]
if uend > vstart and vend > ustart:
ostart = max(ustart,vstart)
oend = min(uend,vend)
olen = (oend-ostart+1)/86400
ocell = mutual_cell
if (v not in G[u] or ostart not in [ edict[1] for edict in G[u][v].values() ]):
l.append((u,v,{0: olen,1: ostart,2: oend,3: ocell}))
return l
def _pmap1(arg_tuple):
"""Pool for multiprocess only accepts functions with one argument. This function
uses a tuple as its only argument.
"""
return overlaps(arg_tuple[0],arg_tuple[1],arg_tuple[2],arg_tuple[3])
def time_overlap_projected_graph_parallel(B, nodes):
G=nx.MultiGraph()
G.add_nodes_from((n,B.node[n]) for n in nodes)
add_edges_from = nx.MultiGraph.add_edges_from
get_edge_data = nx.MultiGraph.get_edge_data
p = Pool(processes=4)
node_divisor = len(p._pool)
for u in nodes:
unbrs = set(B[u])
nbrs2 = set((n for nbr in unbrs for n in B[nbr])) - set([u])
# iterate over subsets of neighbors - parallelize
node_chunks = list(chunks(nbrs2,int(len(nbrs2)/int(node_divisor))))
num_chunks = len(node_chunks)
pedgelists = p.map(_pmap1,
zip([G]*num_chunks,
[B]*num_chunks,
[u]*num_chunks,
node_chunks))
ll = []
for l in pedgelists:
ll.extend(l)
G.add_edges_from(ll)
# compile long list
# add edges from long list in a single step
return G发布于 2011-09-18 01:06:10
好吧,我“无意中”试图在集群上cProfile并行运行,而我只是让测试运行离线。代码运行得很好,但是性能分析出了问题--对于并行脚本,它总是应该这样。它与集群或LSF无关。抱歉的。
https://stackoverflow.com/questions/7449099
复制相似问题