首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python多处理代码运行良好,但不终止。

Python多处理代码运行良好,但不终止。
EN

Stack Overflow用户
提问于 2016-11-22 16:58:42
回答 1查看 100关注 0票数 1

我有这个代码(很抱歉,它几乎完全是我工作代码的拷贝粘贴。)我不知道问题出在哪里,所以我把整个问题放在这里):

代码语言:javascript
复制
def init(Q):
    """Serves to initialize the queue across all child processes"""
    global q
    q = Q

def queue_manager(q):
    """Listens on the queue, and writes pushed data to file"""
    while True:
        data = q.get()
        if data is None:
            break
        key, preds = data
        with pd.HDFStore(hdf_out, mode='a', complevel=5, complib='blosc') as out_store:
            out_store.append(key, preds)

def writer(message):
    """Pushes messages to queue"""
    q.put(message)

def reader(key):
    """Reads data from store, selects required days, processes it"""
    try:
        # Read the data
        with pd.HDFStore(hdf_in, mode='r') as in_store:
            df = in_store[key]
    except KeyError as ke:
        # Almost guaranteed to not happen
        return (key, pd.DataFrame())
    else:
        # Executes only if exception is not raised
        fit_df = df[(df.index >= '2016-09-11') & \
                    (df.index < '2016-09-25') & \
                    (df.index.dayofweek < 5)].copy()
        pre_df = df[(df.index >= '2016-09-18') & \
                    (df.index < '2016-10-2') & \
                    (df.index.dayofweek < 5)].copy()
        del df
        # model_wrapper below is a custom function in another module.
        # It works fine.
        models, preds = model_wrapper(fit_df=fit_df, pre_df=pre_df)
        if preds is not None:
            writer((key, preds))
            del preds
    return (key, models)

def main():
    sensors = pd.read_csv('sens_metadata.csv', index_col=[0])
    nprocs = int(cpu_count() - 0)
    maxproc = 10
    q = Queue()
    t = Thread(target=queue_manager, args=(q,))

    print("Starting process at\t{}".format(dt.now().time()))
    sys.stdout.flush()
    t.start()
    with Pool(processes=nprocs, maxtasksperchild=maxproc, initializer=init,
              initargs=(q,)) as p:
        models = p.map(reader, sensors.index.tolist(), 1)
    print("Processing done at\t{}".format(dt.now().time()))
    print("\nJoining Thread, and finishing writing predictions")
    sys.stdout.flush()
    q.put(None)
    t.join()
    print("Thread joined successfully at\t{}".format(dt.now().time()))
    print("\nConcatenating models and serializing to pickle")
    sys.stdout.flush()
    pd.concat(dict(models)).to_pickle(path + 'models.pickle')
    print("Pickled successfully at\t{}".format(dt.now().time()))

if __name__ == '__main__':
    main()

这段代码的行为就像一个严重偏颇的硬币投掷。大多数情况下,它不起作用,有时,它起作用。当它运行时,我知道完成整个数据(所有keys)大约需要2.5个小时。9/ 10运行,它将处理所有数据,我在hdf_out文件中看到数据,但多处理池不连接。所有子进程都是活动的,但不执行任何工作。我只是不明白为什么程序会像这样挂起来。

当发生这种情况时,我看不到"Processing done at ...""Joining Thread, ..."消息。另外,如果我给它更小的数据集,它就完成了。如果不计算preds,它就完成了。我不能排除models的计算而不进行大量的修改,这将不利于项目的其余部分。

我不知道为什么会发生这种事。我正在使用Linux (Kubuntu16.04)。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-11-23 17:58:22

显然,放弃maxtaskperchild kwag解决了这个问题。为什么有些事我不明白。我认为这与fork进程(Linux上的默认进程)和派生进程(Windows上唯一的选项)之间的区别有关。

对于叉进程,maxtaskperchild显然不是必需的,因为没有它的性能会更好。我注意到通过删除maxtaskperchild提高了内存的使用。子进程不占用内存,而是从父进程共享内存。然而,在我不得不使用Windows的时候,maxtaskperchild是防止子进程膨胀的关键方法,特别是当运行内存密集型任务时,任务列表很长。

谁知道发生了什么更好,请随时编辑这个答案。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40747540

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档