我很难实现用于生成数组列表的并行化。在这种情况下,每个数组都是独立生成的,然后附加到一个列表中。不知何故,当我向multiprocessing.apply_asynch()提供复杂的参数时,它会输出一个空数组。
更具体地说,为了给出上下文,我正在尝试使用并行化实现一个机器学习算法。这个想法是这样的:我有一个“系统”,以及一个在系统上执行操作的“代理”。为了教智能体(在这种情况下是神经网络)如何优化行为(相对于我在这里省略的某个奖励方案),智能体需要通过对其应用操作来生成系统的轨迹。然后,从执行动作所获得的奖励中,智能体学习做什么和不做什么。请注意,重要的是,代码中可能的操作被称为整数,其中包含:
possible_actions = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]所以我在这里尝试使用多进程来生成许多这样的轨迹(抱歉,代码在这里不能运行,因为它需要许多其他文件,但我希望有人能发现这个问题):
from quantum_simulator_EC import system
from reinforce_keras_EC import Agent
import multiprocessing as mp
s = system(1200, N=3)
s.set_initial_state([0,0,1])
agent = Agent(alpha=0.0003, gamma=0.95, n_actions=len( s.actions ))
def get_result(result):
global action_batch
action_batch.append(result)
def generate_trajectory(s, agent):
sequence_of_actions = []
for k in range( 5 ):
net_input = s.generate_net_input_FULL(6)
action = agent.choose_action( net_input )
sequence_of_actions.append(action)
return sequence_of_actions
action_batch = []
pool = mp.Pool(2)
for i in range(0, batch_size):
pool.apply_async(generate_trajectory, args=(s,agent), callback=get_result)
pool.close()
pool.join()
print(action_batch)问题是代码返回一个空数组[]。有人能给我解释一下问题出在哪里吗?我可以传递给apply_asynch的参数类型有限制吗?在这个例子中,我传递了我的系统's‘和我的'agent',这两个都是复杂的对象。我之所以提到这一点,是因为当我使用像整数或矩阵这样的简单参数来测试代码时,而不是使用agent和system,它工作得很好。如果没有明显的原因导致它不能工作,如果有人有一些调试代码的技巧,那也会很有帮助。
请注意,如果我不使用多处理,将最后一部分替换为:
action_batch = []
for i in range(0, batch_size):
get_result( generate_sequence(s,agent) )
print(action_batch)在本例中,这里的输出如预期的那样,是一个包含5个操作的序列列表:
[[4, 2, 1, 1, 7], [8, 2, 2, 12, 1], [8, 1, 9, 11, 9], [7, 10, 6, 1, 0]]发布于 2021-01-24 03:29:42
最终的结果可以直接追加到主进程中的列表中,不需要创建回调函数。然后,您可以对池执行close和join操作,最后使用get检索所有结果。
请参阅以下两个使用apply_async和starmap_async的示例(有关区别,请参阅this post )。
解决方案apply
import multiprocessing as mp
import time
def func(s, agent):
print(f"Working on task {agent}")
time.sleep(0.1) # some task
return (s, s, s)
if __name__ == '__main__':
agent = "My awesome agent"
with mp.Pool(2) as pool:
results = []
for s in range(5):
results.append(pool.apply_async(func, args=(s, agent)))
pool.close()
pool.join()
print([result.get() for result in results])解决方案starmap
import multiprocessing as mp
import time
def func(s, agent):
print(f"Working on task {agent}")
time.sleep(0.1) # some task
return (s, s, s)
if __name__ == '__main__':
agent = "My awesome agent"
with mp.Pool(2) as pool:
result = pool.starmap_async(func, [(s, agent) for s in range(5)])
pool.close()
pool.join()
print(result.get())输出
Working on task My awesome agent
Working on task My awesome agent
Working on task My awesome agent
Working on task My awesome agent
Working on task My awesome agent
[(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]https://stackoverflow.com/questions/65848446
复制相似问题