首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python multiprocess.Pool.map不能处理大型数组。

Python multiprocess.Pool.map不能处理大型数组。
EN

Stack Overflow用户
提问于 2018-04-24 21:15:08
回答 1查看 458关注 0票数 6

这是我用来在pandas.DataFrame对象的行上并行化apply函数的代码:

代码语言:javascript
复制
from multiprocessing import cpu_count, Pool
from functools import partial

def parallel_applymap_df(df: DataFrame, func, num_cores=cpu_count(),**kargs):

partitions = np.linspace(0, len(df), num_cores + 1, dtype=np.int64)
df_split = [df.iloc[partitions[i]:partitions[i + 1]] for i in range(num_cores)]
pool = Pool(num_cores)
series = pd.concat(pool.map(partial(apply_wrapper, func=func, **kargs), df_split))
pool.close()
pool.join()

return series

它适用于200,000行的子样本,但当我尝试完整的200,000,000个示例时,我得到以下错误消息:

代码语言:javascript
复制
~/anaconda3/lib/python3.6/site-packages/multiprocess/connection.py in _send_bytes(self, buf)
394         n = len(buf)
395         # For wire compatibility with 3.2 and lower
—> 396         header = struct.pack("!i", n)
397         if n > 16384:
398             # The payload is large so Nagle's algorithm won't be triggered

error: 'i' format requires -2147483648 <= number <= 2147483647

由该行生成:

代码语言:javascript
复制
series = pd.concat(pool.map(partial(apply_wrapper, func=func, **kargs), df_split))

这非常奇怪,因为我使用了一个稍微不同的版本来并行化在pandas中未向量化的操作(如Series.dt.time),它在相同数量的行上工作。这是exampes作品的版本:

代码语言:javascript
复制
def parallel_map_df(df: DataFrame, func, num_cores=cpu_count()):

partitions = np.linspace(0, len(df), num_cores + 1, dtype=np.int64)
df_split = [df.iloc[partitions[i]:partitions[i + 1]] for i in range(num_cores)]
pool = Pool(num_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()

return df
EN

回答 1

Stack Overflow用户

发布于 2021-01-07 21:40:06

错误本身来自多进程在池中的不同工作进程之间建立连接的事实。要向该工作线程发送数据以及从该工作进程发送数据,必须以字节为单位发送数据。第一步是为将发送给worker的消息创建标头。此标头包含以整数形式表示的缓冲区长度。但是,如果缓冲区的长度大于可以用整数表示的长度,则代码将失败,并生成您所显示的错误。

我们缺少重现问题所需的数据和相当多的代码,因此我将提供一个最小的工作示例:

代码语言:javascript
复制
import numpy
import pandas
import random

from typing import List
from multiprocessing import cpu_count, Pool


def parallel_applymap_df(
    input_dataframe: pandas.DataFrame, func, num_cores: int = cpu_count(), **kwargs
) -> pandas.DataFrame:

    # Create splits in the dataframe of equal size (one split will be processed by one core)
    partitions = numpy.linspace(
        0, len(input_dataframe), num_cores + 1, dtype=numpy.int64
    )
    splits = [
        input_dataframe.iloc[partitions[i] : partitions[i + 1]]
        for i in range(num_cores)
    ]

    # Just for debugging, add metadata to each split
    for index, split in enumerate(splits):
        split.attrs["split_index"] = index

    # Create a pool of workers
    with Pool(num_cores) as pool:

        # Map the splits in the dataframe to workers in the pool
        result: List[pandas.DataFrame] = pool.map(func, splits, **kwargs)

    # Combine all results of the workers into a new dataframe
    return pandas.concat(result)


if __name__ == "__main__":

    # Create some test data
    df = pandas.DataFrame([{"A": random.randint(0, 100)} for _ in range(200000000)])

    def worker(df: pandas.DataFrame) -> pandas.DataFrame:

        # Print the length of the dataframe being processed (for debugging)
        print("Working on split #", df.attrs["split_index"], "Length:", len(df))

        # Do some arbitrary stuff to the split of the dataframe
        df["B"] = df.apply(lambda row: f"test_{row['A']}", axis=1)

        # Return the result
        return df

    # Create a new dataframe by applying the worker function to the dataframe in parallel
    df = parallel_applymap_df(df, worker)
    print(df)

请注意,这可能不是最快的方法。要获得更快的替代方案,请查看swifterdask

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50002665

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档