我试图使用多处理来附加到csv文件。我有多个csv文件,我正在循环。此函数适用于正常的for循环,但不适用于多处理。希望有人能对这件事有所了解。
我的函数代码如下:
def read_write2(j, lock):
#i = 2
with open('C:\\Users\\user\\Documents\\filereader\\FileFolder\\sample_new{}.csv'.format(j), "r") as a_file: #input file
#i = i + 1
with open('samples2.csv','a') as file: #output file
for line in a_file:
lock.acquire()
stripped_line = line.strip()
a = len(stripped_line)
if "©" in stripped_line or "flow" in stripped_line or a>254:
pass
else:
file.write(stripped_line)
file.write("\n")
lock.release()我的多处理代码如下:
if __name__ == "__main__":
lock = Lock()
processes = []
for i in range(2,fileno+1):
print(i)
process = Process(target=read_write2, args=(i,lock)) #creating a new process
processes.append(process) #appending process to a processes list
for process in processes:
print(process)
process.start()
for process in processes: #loop over list to join process
process.join() #process will finish before moving on with the script产出如下:
7
2
3
4
5
6
7
<Process name='Process-1' parent=24328 initial>
<Process name='Process-2' parent=24328 initial>
<Process name='Process-3' parent=24328 initial>
<Process name='Process-4' parent=24328 initial>
<Process name='Process-5' parent=24328 initial>
<Process name='Process-6' parent=24328 initial>
7
7
7
7
7
7谢谢。
发布于 2022-05-30 02:12:02
嗯。不去上班。每个线程对文件都有一个不同的“句柄”,因为您已经多次打开它。您需要打开它一次,并将其传递给线程。
发布于 2022-05-30 03:31:28
如前所述,您多次打开和写入同一个文件,并且没有会造成麻烦的文件锁定或同步。这可能是由于文件中的位置没有在进程之间更新,因此一个进程不知道不同的进程写入文件,并开始从与另一个进程(Es)相同的位置写入文件。有更好的方法可以做到这一点,但为了尽量减少对代码的调整,我建议使用锁打开、写入和关闭输出文件,因此顺序如下:
with open('C:\\Users\\user\\Documents\\filereader\\FileFolder\\sample_new{}.csv'.format(j), "r") as a_file: #input file
for line in a_file:
if ...:
...
else:
lock.acquire()
with open('samples2.csv','a') as file: #output file
...
lock.release()尽管这将在磁盘I/O上造成很大的开销,但这应该是对代码的最小更改,以使其使用多处理操作。然后,整个职能将是:
def read_write2(j, lock):
with open('C:\\Users\\user\\Documents\\filereader\\FileFolder\\sample_new{}.csv'.format(j), "r") as a_file: #input file
for line in a_file:
stripped_line = line.strip()
a = len(stripped_line)
if "©" in stripped_line or "flow" in stripped_line or a>254:
pass
else:
lock.acquire()
with open('samples2.csv','a') as file: #output file
file.write(stripped_line)
file.write("\n")
lock.release()根据文件的数量、文件大小、输出行的数量以及许多其他因素,每个进程可能更有效地写入自己的文件,然后在主循环中将输出整理成一个文件。这节省了大量文件打开/关闭,并消除了锁的需要。例如,按以下方式重写该函数:
def read_write2(j):
with open('C:\\Users\\user\\Documents\\filereader\\FileFolder\\sample_new{}.csv'.format(j), "r") as a_file: #input file
with open('samples2_{}.csv'.format(j),'a') as file: #output file
for line in a_file:
stripped_line = line.strip()
a = len(stripped_line)
if "©" in stripped_line or "flow" in stripped_line or a>254:
pass
else:
file.write(stripped_line)
file.write("\n")然后,在主代码(在if __name__ == "__main__":下)中,替换以下代码:
for process in processes: #loop over list to join process
process.join() #process will finish before moving on with the script在这方面:
with open('samples2.csv', 'w') as out_f:
for e, process in enumerate(processes):
process.join()
with open('samples2_{}.csv'.format(e+2), 'r') as in_f:
out_f.write(in_f.read()) # NOTE: this is highly inefficient, and may consume too much memory. But that's not relevant to the question at hand.发布于 2022-05-30 06:09:36
谢谢大家的投入。他们帮助我找到了答案。
答案其实是把所有的东西都放在主要的地方。它似乎工作得很好,并且解决了错误。我正在检查1000 s和1000 s的urls。
我将所有函数声明放入if name == "main“中,并能够解决它。
再次感谢大家。:)
https://stackoverflow.com/questions/72428509
复制相似问题