我有一个列表dataframe_chunk,其中包含一个非常大的熊猫的数据块,我想把每一个块写到不同的csv中,并并行地这样做。但是,我看到文件是按顺序写入的,我不知道为什么会这样。下面是代码:
import concurrent.futures as cfu
def write_chunk_to_file(chunk, fpath):
chunk.to_csv(fpath, sep=',', header=False, index=False)
pool = cfu.ThreadPoolExecutor(N_CORES)
futures = []
for i in range(N_CORES):
fpath = '/path_to_files_'+str(i)+'.csv'
futures.append(pool.submit( write_chunk_to_file(dataframe_chunk[i], fpath)))
for f in cfu.as_completed(futures):
print("finished at ",time.time())有什么线索吗?
发布于 2016-07-19 12:34:00
在 docs中,但在3.x文档中没有说明的一点是,Python无法使用threading库实现真正的并行性--一次只执行一个线程。
您应该尝试将concurrent.futures与ProcessPoolExecutor一起使用,后者为每个作业使用单独的进程,因此可以在多核CPU上实现真正的并行性。
更新
下面是适合使用multiprocessing库的程序:
#!/usr/bin/env python3
from multiprocessing import Process
import os
import time
N_CORES = 8
def write_chunk_to_file(chunk, fpath):
with open(fpath, "w") as f:
for x in range(10000000):
f.write(str(x))
futures = []
print("my pid:", os.getpid())
input("Hit return to start:")
start = time.time()
print("Started at:", start)
for i in range(N_CORES):
fpath = './tmp/file-'+str(i)+'.csv'
p = Process(target=write_chunk_to_file, args=(i,fpath))
futures.append(p)
for p in futures:
p.start()
print("All jobs started.")
for p in futures:
p.join()
print("All jobs finished at ",time.time())您可以在另一个窗口中使用此shell命令监视作业:
while true; do clear; pstree 12345; ls -l tmp; sleep 1; done(用脚本发出的pid替换12345。)
https://stackoverflow.com/questions/38456458
复制相似问题