首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多处理并行处理比顺序处理慢。

多处理并行处理比顺序处理慢。
EN

Stack Overflow用户
提问于 2014-10-08 13:56:34
回答 3查看 4K关注 0票数 2

我有需要并行化的代码。代码本身没有问题。代码是python类的方法。例如,

代码语言:javascript
复制
class test:
     def __init__(self):
         <...>
     def method(self):
         <...>

我是这样写的,因为全码的细节可能不相关,而且很长。在开始时,我尝试并行化这段代码(只有两个实例):

代码语言:javascript
复制
t1=test()
t2=test()
pr1=Process(target=t1.method, args=(,))
pr2=Process(target=t2.method, args=(,))
pr1.start()
pr2.start()
pr1.join()
pr2.join()

但这不起作用。不仅它的运行速度比一个实例和另一个实例慢得多,而且还存在未修改类变量的问题。由于这条线中@MattDMo的回答,解决了最后一个问题,方法是创建一个共享名称空间、共享变量和共享列表,其中包括:

代码语言:javascript
复制
import multiprocessing as mp
<...>
self.manager=mp.Manager()
self.shared=self.manager.Namespace()
self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])
self.shared.V=V

但它的运行速度仍然非常缓慢。

一开始,我想,因为我在一个有两个内核的笔记本电脑上执行代码,这两个核就饱和了,但是两个实例和计算机变得慢了,因为不能快速执行任何其他任务。因此,我决定在一个6核的桌面PC (也是一个linux系统)中尝试这些代码。它不能解决这个问题。不过,并行化的版本要慢得多。另一方面,桌面计算机的CPU并不像我用多线程执行C编译代码时那样热。有人知道发生了什么事吗?

完整代码这里,包括以下内容:

代码语言:javascript
复制
from math import exp
from pylab import *
from scipy.stats import norm
from scipy.integrate import ode
from random import gauss,random
from numpy import dot,fft
from time import time

import multiprocessing as mp
from multiprocessing import Pool
from multiprocessing import Process
from multiprocessing import Queue, Pipe
from multiprocessing import Lock, current_process


#Global variables

sec_steps=1000 #resolution (steps per second)
DT=1/float(sec_steps)
stdd=20 #standard deviation for retina random input
stdd2=20 #standard deviation for sigmoid

#FUNCTION TO APPROXIMATE NORMAL CUMULATIVE DISTRIBUTION FUNCTION

def sigmoid(x,mu,sigma):
    beta1=-0.0004406
    beta2=0.0418198
    beta3=0.9
    z=(x-mu)/sigma
    if z>8:
        return 1
    elif z<-8:
        return 0
    else:
        return 1/(1+exp(-sqrt(pi)*(beta1*z**5+beta2*z**3+beta3*z)))

#CLASSES

class retina: ##GAUSSIAN WHITE NOISE GENERATOR
    def __init__(self,mu,sigma):
        self.mu=mu
        self.sigma=sigma
    def create_pulse(self):
        def pulse():
            return gauss(self.mu,self.sigma)
            #return uniform(-1,1)*sqrt(3.)*self.sigma+self.mu
        return pulse
        def test_white_noise(self,N): #test frequency spectrum of random number generator for N seconds
                noise=[]
                pulse=self.create_pulse()
        steps=sec_steps*N+1
        t=linspace(0,N,steps)
                for i in t:
                        noise.append(pulse())
                X=fft(noise)
                X=[abs(x)/(steps/2.0) for x in X]
        xlim([0,steps/N])
        xlabel("freq. (Hz)")
        ylabel("Ampl. (V)")
                plot((t*steps/N**2)[1:],X[1:],color='black')
        #savefig('./wnoise.eps', format='eps', dpi=1000)
                show()
        return noise


class cleft: #object: parent class for a synaptic cleft
    def __init__(self):
        self.shared=manager.Namespace()
        self.shared.preV=0.0 #pre-synaptic voltage
        self.shared.r=0.0 #proportion of channels opened
    Tmax=1.0  #mM
    mu=-35.0  #mV
    sigma=stdd2 #mV

    def T(self): #Receives presynaptic Voltage preV, returns concentration of the corresponding neurotransmitter.
        return self.Tmax*sigmoid(self.shared.preV,self.mu,self.sigma)
    def r_next(self): #Solves kinematic ode  -analytical solution- to find r after one time step DT (needs T and alfa and beta parameters)
        """ 
        runs the ode for one unit of time dt, as specified
        updates the previous r taken as initial condition
        """
        tau=1.0/(self.alfa*self.T()+self.beta)
        r_inf=self.alfa*self.T()*tau
        self.shared.r=r_inf+(self.shared.r-r_inf)*exp(-DT/tau)
    def DI(self,postV): #Receives PSP and computes resulting change in PSC
        return self.g*self.shared.r*(postV-self.restV)

class ampa_cleft(cleft): #Child class for ampa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5 #initial condition for r
        self.alfa=2.0
        self.beta=0.1
        self.restV=0.0
        self.g=0.1


class gaba_a_cleft(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.shared=manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-75.0
        self.g=0.2

class gaba_a_cleft_trnTOtrn(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-75.0
        self.g=0.2

class gaba_a_cleft_inTOin(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-75.0
        self.g=0.2

class gaba_a_cleft_trnTOtcr(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-85.0
        self.g=0.1

class gaba_a_cleft_inTOtcr(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-85.0
        self.g=0.1

class gaba_b_cleft(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.shared.R=0.5
        self.shared.X=0.5
        self.alfa_1=0.02
        self.alfa_2=0.03
        self.beta_1=0.05
        self.beta_2=0.01
        self.restV=-100.0
        self.g=0.06

        self.n=4
        self.Kd=100 #Dissociation constant
    def r_next(self): #Solves kinematic ode  SECOND MESSENGER -analytical solution- to find r after one time step DT (needs T and alfa and beta parameters)
        """ 
        runs the ode for one unit of time dt, as specified
        updates the previous r taken as initial condition
        """
        Q1=self.alfa_1*self.T()
        Q2=-Q1-self.beta_1
        R0=self.shared.R
        X0=self.shared.X
        self.shared.R=(Q1*(exp(Q2*DT)-1)+Q2*R0*exp(Q2*DT))/Q2
        self.shared.X=(exp(-self.beta_2*DT)*(self.alfa_2*(self.beta_2*(exp(DT*(self.beta_2+Q2))*(Q1+Q2*R0)+Q1*(-exp(self.beta_2*DT))-Q2*R0)-Q1*Q2*(exp(self.beta_2*DT)-1))+self.beta_2*Q2*X0*(self.beta_2+Q2)))/(self.beta_2*Q2*(self.beta_2+Q2))
        self.shared.r=self.shared.X**self.n/(self.shared.X**self.n+self.Kd)

#######################################################################################################################################################

class neuronEnsemble:
    def __init__(self,V):  #Parent class for a Neuron ensemble
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])  #Variables to store changes in PSC produced by synaptic connection 
        self.shared.V=V    #Actual state of the membrane potential
    kappa=1.0 #conductance
    def V_next(self):  #ode analitycally for a single time step DT 
        K1=self.C[0]*self.g/self.kappa
        K2=(-dot(self.C,self.I)+self.C[0]*self.g*self.restV)/self.kappa
        self.shared.V=K2/K1+(self.shared.V-K2/K1)*exp(-K1*DT)

class TCR_neuronEnsemble(neuronEnsemble):
    def __init__(self,V):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])  #Variables to store changes in PSC produced by synaptic connection 
        self.shared.V=V    #Actual state of the membrane potential
        self.g=0.01 #conductance of leak
        self.restV=-55.0 #rest of leak
        self.C=(1.0,7.1,1.0/2.0*30.9/4.0,1.0/2.0*3.0*30.9/4.0,1.0/2.0*30.9)     #Cleak,C2,C3,C4,C7!!  #connectivity constants to the ensemble
                        #First one is Cleak, the others in same order as in diagram

class TRN_neuronEnsemble(neuronEnsemble):
    def __init__(self,V):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])  #Variables to store changes in PSC produced by synaptic connection 
        self.shared.V=V    #Actual state of the membrane potential
        self.g=0.01 #conductance of leak
        self.restV=-72.5 #rest of leak
        self.C=(1.0,15.0,35.0,0.0,0.0)  #Cleak,C5,C8  #connectivity constants to the ensemble
                        #First one is Cleak, the others in same order as in diagram

class IN_neuronEnsemble(neuronEnsemble): #!!! update all parameters !!!
    def __init__(self,V):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])  #Variables to store changes in PSC produced by synaptic connection 
        self.shared.V=V    #Actual state of the membrane potential
        self.g=0.01 #conductance of leak
        self.restV=-70.0 #rest of leak
        self.C=(1.0,47.4,23.6,0.0,0.0)  #Cleak,C1,C6!!  #connectivity constants to the ensemble
                                #First one is Cleak, the others in same order as in diagram
######################################INSTANCE GROUP#################################################################
class group:
    def __init__(self,tcr_V0,trn_V0,in_V0):
        #Declarations of instances
        ####################

        #SYNAPTIC CLEFTS
        self.cleft_ret_in=ampa_cleft() #cleft between retina and IN ensemble
        self.cleft_ret_tcr=ampa_cleft() #cleft between retina and TCR ensemble
        self.cleft_in_in=gaba_a_cleft_inTOin() #cleft between IN and IN ensembles
        self.cleft_in_tcr=gaba_a_cleft_inTOtcr() #cleft between IN and TCR ensembles
        self.cleft_tcr_trn=ampa_cleft() #cleft between TCR and TRN ensembles
        self.cleft_trn_trn=gaba_a_cleft_trnTOtrn() #cleft between TRN and TRN ensembles
        self.cleft_trn_tcr_a=gaba_a_cleft_trnTOtcr() #cleft between TRN and TCR ensembles GABAa
        self.cleft_trn_tcr_b=gaba_b_cleft() #cleft between TRN and TCR ensembles GABAb
        #POPULATIONS    
        self.in_V0=in_V0 #mV i.c excitatory potential
        self.IN=IN_neuronEnsemble(self.in_V0) #create instance of IN ensemble

        self.tcr_V0=tcr_V0 #mV i.c excitatory potential
        self.TCR=TCR_neuronEnsemble(self.tcr_V0) #create instance of TCR ensemble

        self.trn_V0=trn_V0 #mV i.c inhibitory potential
        self.TRN=TRN_neuronEnsemble(self.trn_V0) #create instance of TCR ensemble
    def step(self,p): #makes a step of the circuit for the given instance
        #UPDATE TRN
        self.cleft_tcr_trn.shared.preV=self.TCR.shared.V #cleft takes presynaptic V
        self.cleft_tcr_trn.r_next()   #cleft updates r
        self.TRN.I[2]=self.cleft_tcr_trn.DI(self.TRN.shared.V) #update PSC TCR--->TRN 

        self.cleft_trn_trn.shared.preV=self.TRN.shared.V #cleft takes presynaptic V
        self.cleft_trn_trn.r_next()   #cleft updates r
        self.TRN.I[1]=self.cleft_trn_trn.DI(self.TRN.shared.V) #update PSC TRN--->TRN

        self.TRN.V_next()  #update PSP in TRN

        #record retinal pulse ------|> IN AND TCR
        self.cleft_ret_in.shared.preV=self.cleft_ret_tcr.shared.preV=p

        #UPDATE TCR
        self.cleft_ret_tcr.r_next()      #cleft updates r
        self.TCR.I[1]=self.cleft_ret_tcr.DI(self.TCR.shared.V) #update PSC RET---|> TCR

        self.cleft_trn_tcr_b.shared.preV=self.TRN.shared.V #cleft takes presynaptic V
        self.cleft_trn_tcr_b.r_next()   #cleft updates r
        self.TCR.I[2]=self.cleft_trn_tcr_b.DI(self.TCR.shared.V)  #update PSC

        self.cleft_trn_tcr_a.shared.preV=self.TRN.shared.V #cleft takes presynaptic V
        self.cleft_trn_tcr_a.r_next()   #cleft updates r
        self.TCR.I[3]=self.cleft_trn_tcr_a.DI(self.TCR.shared.V) #cleft updates r

        self.cleft_in_tcr.shared.preV=self.IN.shared.V #cleft takes presynaptic V
        self.cleft_in_tcr.r_next()   #cleft updates r
        self.TCR.I[4]=self.cleft_in_tcr.DI(self.TCR.shared.V) #update PSC

        self.TCR.V_next()

        #UPDATE IN

        self.cleft_ret_in.r_next()       #cleft updates r
        self.IN.I[1]=self.cleft_ret_in.DI(self.IN.shared.V) #update PSC

        self.cleft_in_in.shared.preV=self.IN.shared.V #cleft takes presynaptic V
        self.cleft_in_in.r_next()   #cleft updates r
        self.IN.I[2]=self.cleft_in_in.DI(self.IN.shared.V)  #update PSC

        self.IN.V_next()
                #----------------------------------------
    def stepN(self, p, N, data_Vtcr, data_Vtrn, data_Vin): #makes N steps, receives a vector of N retinal impulses and output lists
        data_Vtcr.append(self.tcr_V0)
        data_Vtrn.append(self.trn_V0)
        data_Vin.append(self.in_V0)
        for i in xrange(N):
            self.step(p[i])
            data_Vtcr.append(self.TCR.shared.V)     #write to output list
            data_Vtrn.append(self.TRN.shared.V)
            data_Vin.append(self.IN.shared.V)
            name=current_process().name
            print name+" "+str(i)

######################################################################################################################
############################### CODE THAT RUNS THE SIMULATION OF THE MODEL ###########################################
######################################################################################################################

def run(exec_t): 
    """
    runs the simulation for t=exec_t seconds
    """
    t_0=time()
    mu=-45.0 #mV
    sigma=stdd  #20.0 #mV
    ret=retina(mu,sigma) #create instance of white noise generator
    #initial conditions
    tcr_V0=-61.0 #mV i.c excitatory potential
    trn_V0=-84.0 #mV i.c inhibitory potential
    in_V0=-70.0 #mV i.c excitatory potential
    ###########################LISTS FOR STORING DATA POINTS################################
    t=linspace(0.0,exec_t,exec_t*sec_steps+1)
#   data_Vtcr=[]
#   data_Vtcr.append(tcr_V0)
#
#   data_Vtrn=[]
#   data_Vtrn.append(trn_V0)
#
#   data_Vin=[]
#   data_Vin.append(in_V0)
#   ###NUMBER OF INSTANCES
#   N=2
#   pulse=ret.create_pulse()
#   #CREATE INSTANCES
#   groupN=[]
#   for i in xrange(N):
#       g=group(in_V0,tcr_V0,trn_V0)
#       groupN.append(g)
#
#   for i in t[1:]:
#       p=pulse()
#       proc=[]
#       for j in xrange(N):
#           pr=Process(name="group_"+str(j),target=groupN[j].step, args=(p,))
#           pr.start()
#           proc.append(pr)
#       for j in xrange(N):
#           proc[j].join(N)
#
#       data_Vtcr.append((groupN[0].TCR.shared.V+groupN[1].TCR.shared.V)*0.5)     #write to output list
#       data_Vtrn.append((groupN[0].TRN.shared.V+groupN[1].TRN.shared.V)*0.5)
#       data_Vin.append((groupN[0].IN.shared.V+groupN[1].IN.shared.V)*0.5)
#############FOR LOOPING INSIDE INSTANCE ---FASTER#############################################
    #CREATE p vector of retinal pulses
    p=[]
    pulse=ret.create_pulse()
    for k in xrange(len(t)-1):
        p.append(pulse())   
    #CREATE INSTANCES
    N=2
    groupN=[]
    proc=[]

    manager=mp.Manager() #creating a shared namespace

    data_Vtcr_0 = manager.list()
    data_Vtrn_0 = manager.list()
    data_Vin_0  = manager.list()

    data_Vtcr_1 = manager.list()
    data_Vtrn_1 = manager.list()
    data_Vin_1  = manager.list()

    data_Vtcr=[data_Vtcr_0, data_Vtcr_1]
    data_Vtrn=[data_Vtrn_0, data_Vtrn_1]
    data_Vin=[data_Vin_0, data_Vin_1]

    for j in xrange(N):
        g=group(tcr_V0,trn_V0,in_V0)
        groupN.append(g)
    for j in xrange(N):
        pr=Process(name="group_"+str(j),target=groupN[j].stepN, args=(p, len(t)-1, data_Vtcr[j], data_Vtrn[j], data_Vin[j],))
        pr.start()
        proc.append(pr)
    for j in xrange(N):
        proc[j].join()
    data_Vtcr_av=[0.5*i for i in map(add, data_Vtcr[0], data_Vtcr[1])]
    data_Vtrn_av=[0.5*i for i in map(add, data_Vtrn[0], data_Vtrn[1])]
    data_Vin_av =[0.5*i for i in map(add, data_Vin[0],  data_Vin[1])]

    print len(t), len(data_Vtcr[0]), len(data_Vtcr_av)
    ##Plotting#####################################
    subplot(3,1,1)
    xlabel('t')
    ylabel('tcr - mV')
    plot(t[50*sec_steps:],array(data_Vtcr_av)[50*sec_steps:], color='black')

    subplot(3,1,2)
    xlabel('t')
    ylabel('trn - mV')
    plot(t[50*sec_steps:],array(data_Vtrn_av)[50*sec_steps:], color='magenta')

    subplot(3,1,3)
    xlabel('t')
    ylabel('IN - mV')
    plot(t[50*sec_steps:],array(data_Vin_av)[50*sec_steps:], color='red')

    #savefig('./v_tcr.eps', format='eps', dpi=1000)
    ###############################################

    t_1=time() #measure elapsed time
    print "elapsed time: ", t_1-t_0, " seconds."
    #save data to file
    FILE=open("./output.dat","w")
    FILE.write("########################\n")
    FILE.write("# t                                                           V       #\n")
    FILE.write("########################\n")
    for k in range(len(t)):
        FILE.write(str(t[k]).zfill(5)+"\t"*3+repr(data_Vtcr_av[k])+"\n")
    FILE.close()
    #################
    show()
    return t,array(data_Vtcr)

######################################################################################################################
######################################################################################################################
if __name__ == "__main__":
    run(60)    #run simulation for 60 seconds
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2014-10-13 05:30:12

您的问题是,您过于依赖multiprocessing.Manager Proxy对象来进行数学计算。我试图警告您,multiprocessing.Manager in 我的回答对于您最初的问题的缺点是什么,但是我的措辞不够有力。我说过:

请记住,multiprocessing.Manager启动一个子进程来管理您创建的所有共享实例,而且每次访问其中一个Proxy实例时,您实际上都在对Manager进程进行IPC调用。

我应该补充一句:“在同一进程中,IPC调用比正常访问要昂贵得多”。您的原始问题并没有真正说明您将广泛地使用Manager实例,所以我不想强调它。

考虑下面这个简单地从循环中的one Proxy变量读取的示例:

代码语言:javascript
复制
>>> timeit.timeit("for _ in range(1000): x = v + 2", setup="v = 0", number=1000)
0.040110111236572266
>>> timeit.timeit("for _ in range(1000): x = shared.v + 2", 
                  setup="import multiprocessing ; m = multiprocessing.Manager() ; shared = m.Namespace(); shared.v = 0", 
                  number=1000)
15.048354864120483

当您引入共享变量时,它几乎慢了400倍。现在,这个例子有点极端,因为我们在一个紧密的循环中访问共享变量,但关键是;访问Proxy变量是slow。你在你的节目里做了很多。访问Proxy的额外开销比同时运行两个进程所获得的开销要高得多。

您需要对这段代码进行显著的重构,以便将Proxy变量的使用降到最低。您可能会发现将multiprocessing.Namespace的大部分用法替换为multiprocessing.Value更成功,后者存储在共享内存中,而不是单独的进程中。这使得它们更快(尽管仍然比常规变量慢得多):

代码语言:javascript
复制
>>> timeit.timeit("for _ in range(1000): x = v.value + 2", setup="import multiprocessing ; v = multiprocessing.Value('i', 0)", number=1000)
0.29022717475891113

如果你用lock=False初始化它,事情会变得更快

代码语言:javascript
复制
>>> timeit.timeit("for _ in range(1000): x = v.value + 2", setup="import multiprocessing ; v = multiprocessing.Value('i', 0, lock=False)", number=1000)
0.06386399269104004

但是,Value不再是自动进程安全的。如果可能同时在两个进程中更改变量,则需要显式创建和接受multiprocessing.Lock来同步对变量的访问。

multiprocessing.Value的唯一其他限制是您仅限于ctypes模块支持的类型。实际上,这对您来说应该是可以的,因为您主要使用的是ints和浮点数。作为Proxy实例,您可能需要保留的唯一部分是列表,尽管您可能也可以使用multiprocessing.Array

票数 9
EN

Stack Overflow用户

发布于 2014-10-08 14:32:03

每个进程运行多长时间--它们做了多少工作?启动一个单独的进程有一个开销--当您这样做时,必须创建一个新的子进程,并且必须将父进程的环境复制到它。如果您处理的任务非常短,那么它很可能比单个进程、单线程替代方案慢。

如果你的个人工作是短期的,试着增加他们正在做的工作量,看看你的速度是否有了提高。

票数 1
EN

Stack Overflow用户

发布于 2014-10-12 23:51:57

一些数字python模块,比如numpy,可以改变python解释器的cpu亲和力(解释器可以同时使用多少核心)。当python模块与某些多线程BLAS库链接时,通常会发生这种情况。此问题可能导致多个python进程仅在一个核心上运行,使其比单线程版本慢,特别是在处理共享状态时。

检查您的程序是否没有使用所有的核心。如果没有全部使用它们,则可以通过使用适当的参数对os.system命令执行系统调用(使用taskset )来更改cpu关联。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/26258728

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档