首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >concurrent.futures不并行写入

concurrent.futures不并行写入
EN

Stack Overflow用户
提问于 2016-07-19 10:51:31
回答 1查看 2.3K关注 0票数 0

我有一个列表dataframe_chunk,其中包含一个非常大的熊猫的数据块,我想把每一个块写到不同的csv中,并并行地这样做。但是,我看到文件是按顺序写入的,我不知道为什么会这样。下面是代码:

代码语言:javascript
复制
import concurrent.futures as cfu

def write_chunk_to_file(chunk, fpath):  
    chunk.to_csv(fpath, sep=',', header=False, index=False)

pool = cfu.ThreadPoolExecutor(N_CORES)

futures = []
for i in range(N_CORES):
    fpath = '/path_to_files_'+str(i)+'.csv'
    futures.append(pool.submit( write_chunk_to_file(dataframe_chunk[i], fpath)))

for f in cfu.as_completed(futures):
    print("finished at ",time.time())

有什么线索吗?

EN

回答 1

Stack Overflow用户

发布于 2016-07-19 12:34:00

docs中,但在3.x文档中没有说明的一点是,Python无法使用threading库实现真正的并行性--一次只执行一个线程。

您应该尝试将concurrent.futuresProcessPoolExecutor一起使用,后者为每个作业使用单独的进程,因此可以在多核CPU上实现真正的并行性。

更新

下面是适合使用multiprocessing库的程序:

代码语言:javascript
复制
#!/usr/bin/env python3

from multiprocessing import Process

import os
import time

N_CORES = 8

def write_chunk_to_file(chunk, fpath):  
    with open(fpath, "w") as f:
      for x in range(10000000):
        f.write(str(x))

futures = []

print("my pid:", os.getpid())
input("Hit return to start:")

start = time.time()
print("Started at:", start)

for i in range(N_CORES):
    fpath = './tmp/file-'+str(i)+'.csv'
    p = Process(target=write_chunk_to_file, args=(i,fpath))
    futures.append(p)

for p in futures:
  p.start()

print("All jobs started.")

for p in futures:
  p.join()

print("All jobs finished at ",time.time())

您可以在另一个窗口中使用此shell命令监视作业:

代码语言:javascript
复制
while true; do clear; pstree 12345; ls -l tmp; sleep 1; done

(用脚本发出的pid替换12345。)

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38456458

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档