首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Python中的多处理循环迭代器

使用Python中的多处理循环迭代器
EN

Stack Overflow用户
提问于 2022-07-27 13:39:31
回答 1查看 51关注 0票数 0

我有一个迭代器,它将根据某些特性从一个非常大的(>20 on )文件中检索不同数量的行。迭代器工作正常,但我只能使用一个线程来处理结果。我想将每个迭代的值输入到多个线程/进程。

我使用一个包含9行的文本文件来模拟我的数据,下面是我的代码。我一直在纠结于如何创建反馈,所以当一个过程完成时,它将返回下一个迭代:

代码语言:javascript
复制
from multiprocessing import Process, Manager
import time

# Iterator
class read_file(object):
    def __init__(self, filePath):
        self.file = open(filePath, 'r')

    def __iter__(self):
        return self

    def __next__(self):
        line = self.file.readline()
        if line:
            return line
        else:
            raise StopIteration

# worker for one process
def print_worker(a, n, stat):
    print(a)
    stat[n] = True  # Set the finished status as True
    return None

# main
def main():
    file_path = 'tst_mp.txt'  # the txt file wit 9 lines
    n_worker = 2
    file_handle = read_file(file_path)
    workers = []
    
    # Create shared list for store dereplicated dict and progress counter
    manager = Manager()
    status = manager.list([False] * 2)  # list of dictonary for each thread
    
    # Initiate the workers
    for i in range(n_worker):
        workers.append(Process(target=print_worker, args=(file_handle.__next__(), i, status,)))
    for worker in workers:
        worker.start()
    
    block = file_handle.__next__() # The next block (line)
    while block:  # continue is there is still block left
        print(status)
        time.sleep(1)  # for every second
        for i in range(2):
            if status[i]:  # Worker i finished
                workers[i].join()
                # workers[i].close()
                workers[i] = Process(target=print_worker, args=(block, i, status,))
                status[i] = False  # Set worker i as busy (False)
                workers[i].start()  # Start worker i
                try:  # try to get the next item in the iterator
                    block = file_handle.__next__()
                except StopIteration:
                    block = False
   
if __name__ == '__main__':
    main()                

代码很笨拙,但它确实打印出了序列,但当我两次运行代码时也出现了一些错误:

代码语言:javascript
复制
1

2

3

4

5

6

7

8

9

Process Process-10:
Traceback (most recent call last):
  File "/home/zewei/mambaforge/lib/python3.9/multiprocessing/managers.py", line 802, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/zewei/mambaforge/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/home/zewei/mambaforge/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/zewei/share/paf_depth/test_multiprocess.py", line 31, in print_worker
    stat[n] = True # Set the finished status as True
  File "<string>", line 2, in __setitem__
  File "/home/zewei/mambaforge/lib/python3.9/multiprocessing/managers.py", line 806, in _callmethod
    self._connect()
  File "/home/zewei/mambaforge/lib/python3.9/multiprocessing/managers.py", line 794, in _connect
    dispatch(conn, None, 'accept_connection', (name,))
  File "/home/zewei/mambaforge/lib/python3.9/multiprocessing/managers.py", line 90, in dispatch
    kind, result = c.recv()
  File "/home/zewei/mambaforge/lib/python3.9/multiprocessing/connection.py", line 255, in recv
    buf = self._recv_bytes()
  File "/home/zewei/mambaforge/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
    buf = self._recv(4)
  File "/home/zewei/mambaforge/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
    chunk = read(handle, remaining)
ConnectionResetError: [Errno 104] Connection reset by peer

这里是我被塞的地方,我想知道是否有任何修复或更优雅的方式来解决这个问题?

谢谢!

EN

回答 1

Stack Overflow用户

发布于 2022-07-27 15:25:52

这里有一个更好的方法来做您正在做的事情,使用池:

代码语言:javascript
复制
from multiprocessing import Pool
import time

.
.
.
.

# worker for one process
def print_worker(a):
    print(a)
    return None

def main():
    file_path = r''  # the txt file wit 9 lines
    n_worker = 2
    file_handle = read_file(file_path)
    results = []

    with Pool(n_worker) as pool:
        for result in pool.imap(print_worker, file_handle):
            results.append(result)

        print(results)



if __name__ == '__main__':
    main()

在这里,imap函数懒洋洋地在迭代器上迭代,所以不会将整个文件读入内存。池处理将任务自动扩展到启动的进程数(使用n_worker),这样您就不必自己管理它。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73139007

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档