首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python中的多处理和多线程

Python中的多处理和多线程
EN

Stack Overflow用户
提问于 2020-10-26 13:57:40
回答 5查看 877关注 0票数 3

我有一个python程序,它1)从磁盘读取一个非常大的文件(~95%的时间),然后2)进程,并提供相对较小的输出(~5%的时间)。此程序将在文件的TeraBytes上运行。

现在我希望通过利用多处理和多线程来优化这个程序。我正在运行的平台是一个在虚拟机上有4个处理器的虚拟机。

我计划有一个调度器进程,它将执行4个进程(与处理器相同),然后每个进程应该有一些线程,因为大部分是I/O。每个线程将处理一个文件,并将结果报告给主线程,主线程再通过IPC将结果报告回调度器进程。调度器可以对这些数据进行排队,并最终以有序的方式将它们写入磁盘

所以想知道如何决定为这种情况创建的进程和线程的数量?有没有一种数学方法可以计算出最好的混合。

谢谢你

EN

回答 5

Stack Overflow用户

发布于 2020-11-04 02:27:20

我想我会安排它与你正在做的事情相反。也就是说,我将创建一个特定大小的线程池,它将负责生成结果。提交到此池的任务将作为参数传递给处理器池,该处理器池可由工作线程用来提交工作的CPU绑定部分。换句话说,线程池工作者将主要执行所有与磁盘相关的操作,并将任何CPU密集型工作交给处理器池。

处理器池的大小应该是您的环境中拥有的处理器数量。很难给出线程池的确切大小;这取决于在回报递减规律生效之前它可以处理多少并发磁盘操作。这还取决于您的内存:池越大,占用的内存资源就越多,特别是在必须将整个文件读入内存进行处理的情况下。因此,您可能需要尝试使用此值。下面的代码概述了这些想法。从线程池中获得的是I/O操作的重叠,而不是仅仅使用较小的处理器池:

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
import os

def cpu_bound_function(arg1, arg2):
    ...
    return some_result



def io_bound_function(process_pool_executor, file_name):
    with open(file_name, 'r') as f:
        # Do disk related operations:
        . . . # code omitted
        # Now we have to do a CPU-intensive operation:
        future = process_pool_executor.submit(cpu_bound_function, arg1, arg2)
        result = future.result() # get result
        return result
    
file_list = [file_1, file_2, file_n]
N_FILES = len(file_list)
MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
N_THREADS = min(N_FILES, MAX_THREADS) # no point in creating more threds than required
N_PROCESSES = os.cpu_count() # use the number of processors you have

with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
    with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
        results = thread_pool_executor.map(partial(io_bound_function, process_pool_executor), file_list)

重要说明

另一种简单得多的方法是只有一个处理器池,它的大小大于CPU处理器的数量,例如,25个。工作进程将同时执行I/O和CPU操作。即使您有比CPU更多的进程,许多进程仍将处于等待状态,等待I/O完成,从而允许CPU密集型工作运行。

这种方法的缺点是创建N个进程的开销远远大于创建N个线程+少量进程的开销。但是,随着提交到池的任务的运行时间变得越来越长,这种增加的开销在总运行时间中所占的百分比越来越小。因此,如果您的任务不是微不足道的,那么这可能是一个合理的性能简化。

更新:两种方法的基准测试

我对这两种方法处理了24个大小约为10000KB的文件进行了一些基准测试(实际上,这只是3个不同的文件,每个文件处理8次,所以可能已经进行了一些缓存):

方法1(线程池+处理器池)

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
import os
from math import sqrt
import timeit


def cpu_bound_function(b):
    sum = 0.0
    for x in b:
        sum += sqrt(float(x))
    return sum

def io_bound_function(process_pool_executor, file_name):
    with open(file_name, 'rb') as f:
        b = f.read()
        future = process_pool_executor.submit(cpu_bound_function, b)
        result = future.result() # get result
        return result

def main():
    file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
    N_FILES = len(file_list)
    MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
    N_THREADS = min(N_FILES, MAX_THREADS) # no point in creating more threds than required
    N_PROCESSES = os.cpu_count() # use the number of processors you have

    with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
        with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
            results = list(thread_pool_executor.map(partial(io_bound_function, process_pool_executor), file_list))
            print(results)

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()', number=1, globals=globals()))

方法2(仅限处理器池)

代码语言:javascript
复制
from concurrent.futures import ProcessPoolExecutor
from math import sqrt
import timeit


def cpu_bound_function(b):
    sum = 0.0
    for x in b:
        sum += sqrt(float(x))
    return sum

def io_bound_function(file_name):
    with open(file_name, 'rb') as f:
        b = f.read()
        result = cpu_bound_function(b)
        return result

def main():
    file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
    N_FILES = len(file_list)
    MAX_PROCESSES = 50 # depends on your configuration on how well the I/O can be overlapped
    N_PROCESSES = min(N_FILES, MAX_PROCESSES) # no point in creating more threds than required

    with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
        results = list(process_pool_executor.map(io_bound_function, file_list))
        print(results)

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()', number=1, globals=globals()))

结果:

(我有8个内核)

线程池+处理器池: 13.5秒单独处理器池: 13.3秒

结论:我会先尝试更简单的方法,即只使用处理器池来处理所有事情。现在,棘手的部分是决定要创建的最大进程数,这是原始问题的一部分,当它所做的全部工作都是CPU密集型计算时,它有一个简单的答案。如果您正在读取的文件数量不是太多,那么这一点是没有意义的;每个文件可以有一个进程。但是,如果您有数百个文件,您将不希望在您的池中有数百个进程(您可以创建的进程数也有一个上限,而且还有那些令人讨厌的内存约束)。我没有办法给你一个确切的数字。如果您确实有大量的文件,请从较小的池大小开始,然后不断递增,直到没有进一步的好处为止(当然,您可能不希望处理的文件超过这些测试的最大数量,否则您将永远运行下去,只为实际运行选择一个好的池大小)。

票数 8
EN

Stack Overflow用户

发布于 2020-10-31 16:53:44

对于并行处理:我看到了this question,并引用了被接受的答案:

在实践中,可能很难找到最优的线程数量,甚至这个数量可能在每次运行程序时都会有所不同。因此,从理论上讲,最优的线程数量将是您的机器上的核心数量。如果你的核心是“超线程”( Intel称之为超线程),它可以在每个核心上运行2个线程。那么,在这种情况下,最优的线程数量是您机器上核心数量的两倍。

对于多处理:有人问了一个类似的问题here,公认的答案是这样的:

如果你所有的线程/进程都是受CPU限制的,那么你运行的进程数应该和CPU报告的核心数一样多。由于HyperThreading,每个物理CPU核心可能能够呈现多个虚拟核心。调用multiprocessing.cpu_count获取虚拟核数。如果1个线程中只有p个是受CPU限制的,那么您可以通过乘以p来调整这个数字。例如,如果您的一半进程是受CPU限制的(p = 0.5),并且您有两个CPU,每个CPU有4个核心,2x HyperThreading,那么您应该启动0.5 *2*4*2=8个进程。

这里的关键是了解您使用的是哪台机器,从中您可以选择近乎最佳的线程/进程数量来拆分代码的执行。我说几乎是最优的,因为每次运行脚本时,它都会有一点变化,所以很难从数学的角度来预测这个最优值。

对于您的特定情况,如果您的机器有4个内核,我建议您最多创建4个线程,然后拆分它们:

  • 1到主线程。
  • 3用于文件读取和处理。
票数 0
EN

Stack Overflow用户

发布于 2020-10-31 18:00:51

使用多个进程来提高IO性能可能不是一个好主意,请检查this及其下面的sample code,看看它是否有帮助

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64532146

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档