首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在pygmo中使用队列进行函数计算

在pygmo中使用队列进行函数计算
EN

Stack Overflow用户
提问于 2017-09-26 14:55:31
回答 1查看 114关注 0票数 1

我试图使用通过Anaconda3安装的Anaconda3优化库和一些现有代码,这些代码通过一个可执行文件封装参数向量的异步和分布式评估,该可执行文件可以进行轨迹优化(如果有人感兴趣的话是POST2 )。为了方便这一点,我在网络中使用multiprocessing.SyncManager和multiprocessing.Queues来传递输入,接收输出和日志记录消息。因此,在这种情况下,pygmo将选择要尝试的向量,并且支持代码将其传递到一个输入队列中,一些分布式工作人员将获取输入队列,通过可执行文件进行计算,并将结果传回,这些结果最终将被返回给用于评估的任何pygmo.algorithm。

我的问题是,当pygmo初始化一个问题时,它会对提供的类进行深度复制,在我的示例中和下面提供的示例代码中,该类包含多个队列。在执行深度复制时,我会得到错误

代码语言:javascript
复制
  File "pygmo_testing.py", line 121, in <module>
    main()
  File "pygmo_testing.py", line 108, in main
    prob = pg.problem(my_prob)
  File "C:\Anaconda3\lib\copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "C:\Anaconda3\lib\copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "C:\Anaconda3\lib\copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "C:\Anaconda3\lib\copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "C:\Anaconda3\lib\copy.py", line 169, in deepcopy
    rv = reductor(4)
  File "C:\Anaconda3\lib\multiprocessing\queues.py", line 58, in __getstate__
    context.assert_spawning(self)
  File "C:\Anaconda3\lib\multiprocessing\context.py", line 356, in assert_spawning
    ' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

有办法绕道吗?我需要保持该代码所使用的其他方法的异步和分布式执行风格。我还尝试过queue.Queue和multiprocessing.Manager.Queue (这两种代码都不能与其他现有代码一起工作)以确保完整性,但这始终取决于深度复制。

谢谢大家!

代码语言:javascript
复制
"""
****************************** Import Statements ******************************
"""
import pygmo as pg
from multiprocessing import Pool, Queue

"""
****************************** Utility Functions ******************************
"""  
def sphere_fitness(x):
    return sum(x*x)

def worker(inp_q, out_q):

    while True:

        x = inp_q.get()
        print("got {}".format(x))
        if x == False:
            break
        else:
            fit = sphere_fitness(x)
            print("x: {} f: {}".format(x, fit))
            out_q.put_nowait(fit)
            print("submitted {}".format(x))        
"""
********************************** Class(es) ************************************
"""
class distributed_submit(object):
    """ Class for pygmo Problem"""

    def __init__(self, dim, inp_q, out_q):
        self.dim = dim
        self._inp_q = inp_q
        self._out_q = out_q

    def _submit(self, inp_q, x):
        self._inp_q.put_nowait(x)
        print("x delivered")

    def _receive(self, out_q):
        return self._out_q.get()

    def fitness(self, x):
        self._submit(x)
        print("put in {}".format(x))
        fit = self._receive()
        print("got {}".format(fit))
        return [fit]

    def get_bounds(self):
        return ([-1]*self.dim, [1]*self.dim)

    def get_name(self):
        return "Sphere Function"

    def get_extra_info(self):
        return "\tDimensions: {}".format(self.dim)


"""
******************************* Main Function ********************************
"""
def main():

    # Queues from multiprocessing
    _inp_q   = Queue()
    _out_q   = Queue()

    _workers = Pool(initializer=worker,
                    initargs=(_inp_q, _out_q))
    _workers.close()

    my_prob = distributed_submit(3, _inp_q, _out_q)

    prob = pg.problem(my_prob)

    algo = pg.algorithm(pg.bee_colony(gen=20, limit=20))

    pop = pg.population(prob, 10)
    print(pop)

    pop = algo.evolve(pop)

    print(pop.champion_f)


if __name__ == "__main__":
    main()
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-10-26 17:20:06

对于任何发现这个并需要答案的人,collections.deque支持深度复制。

https://docs.python.org/3/library/collections.html#collections.deque

除上述外,德克支持..。copy.copy(d),copy.deepcopy(d),

但是,collections.deque不了解进程,因此不能用于分发。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46429779

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档