首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将Python函数应用于Pandas分组DataFrame --加快计算速度的最有效方法是什么?

将Python函数应用于Pandas分组DataFrame --加快计算速度的最有效方法是什么?
EN

Stack Overflow用户
提问于 2020-02-24 11:38:01
回答 2查看 1.1K关注 0票数 12

我正在处理相当大的Pandas DataFrame --我的数据集类似于下面的df设置:

代码语言:javascript
复制
import pandas as pd
import numpy  as np

#--------------------------------------------- SIZING PARAMETERS :
R1 =                    20        # .repeat( repeats = R1 )
R2 =                    10        # .repeat( repeats = R2 )
R3 =                541680        # .repeat( repeats = [ R3, R4 ] )
R4 =                576720        # .repeat( repeats = [ R3, R4 ] )
T  =                 55920        # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used

#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
         { 'measurement_id':        np.repeat( [0, 1], repeats = [ R3, R4 ] ), 
           'time':np.concatenate( [ np.repeat( A1,     repeats = R1 ),
                                    np.repeat( A2,     repeats = R1 ) ] ), 
           'group':        np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
           'object':       np.tile( np.arange( 0, R1 ),                T )
           }
        )

#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
                  df                                                  \
                    .groupby( ['measurement_id', 'time', 'group'] )    \
                    .apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
                    .explode()                                           \
                    .astype( 'float' )                                    \
                    .to_frame( 'var' )                                     \
                    .reset_index( drop = True )
                  ], axis = 1
                )

注意:为了获得一个最小的示例,可以很容易地对其进行细分(例如,使用df.loc[df['time'] <= 400, :]),但是由于我模拟了数据,所以我认为原始大小会提供更好的概述。

对于['measurement_id', 'time', 'group']定义的每个组,我需要调用以下函数:

代码语言:javascript
复制
from sklearn.cluster import SpectralClustering
from pandarallel     import pandarallel

def cluster( x, index ):
    if len( x ) >= 2:
        data = np.asarray( x )[:, np.newaxis]
        clustering = SpectralClustering( n_clusters   =  5,
                                         random_state = 42
                                         ).fit( data )
        return pd.Series( clustering.labels_ + 1, index = index )
    else:
        return pd.Series( np.nan, index = index )

为了提高性能,我尝试了两种方法:

潘达列包装

第一种方法是使用pandarallel包并行计算:

代码语言:javascript
复制
pandarallel.initialize( progress_bar = True )
df \
  .groupby( ['measurement_id', 'time', 'group'] ) \
  .parallel_apply( lambda x: cluster( x['var'], x['object'] ) )

但是,这似乎是次优的,因为它消耗了大量的内存,而且并不是所有的核都用于计算(尽管在pandarallel.initialize()方法中显式地指定了核的数量)。此外,有时计算会因各种错误而终止,尽管我还没有机会找到原因(可能是缺少RAM?)。

PySpark Pandas

我也给了火花熊猫UDF一次尝试,虽然我是完全陌生的火花。以下是我的尝试:

代码语言:javascript
复制
import findspark;  findspark.init()

from pyspark.sql           import SparkSession
from pyspark.conf          import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types     import *

spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )

@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
    if len( df['var'] ) >= 2:
        data = np.asarray( df['var'] )[:, np.newaxis]
        clustering = SpectralClustering( n_clusters   =  5,
                                         random_state = 42
                                         ).fit( data )
        return pd.DataFrame( clustering.labels_ + 1,
                             index = df['object']
                             )
    else:
        return pd.DataFrame( np.nan,
                             index = df['object']
                             )

res = df                                           \
        .groupBy( ['id_half', 'frame', 'team_id'] ) \
        .apply( cluster )                            \
        .toPandas()

不幸的是,性能也不尽如人意,从我所读到的主题来看,这可能只是使用用Python编写的UDF函数的负担,以及将所有Python对象转换为Spark对象和返回的相关需求。

,下面是我的问题:

  1. 可以调整我的任何一种方法以消除可能的瓶颈并提高性能吗?(例如PySpark设置,调整次优操作等)
  2. -它们有什么更好的选择吗?从performance?

的角度看,它们与所提供的解决方案相比如何?

EN

回答 2

Stack Overflow用户

发布于 2020-02-27 18:10:46

这不是一个答案但是..。

如果你跑

代码语言:javascript
复制
df.groupby(['measurement_id', 'time', 'group']).apply(
    lambda x: cluster(x['var'], x['object']))

(也就是说,单独使用Pandas ),您会注意到您已经使用了几个内核。这是因为sklearn默认使用joblib来并行工作。您可以将调度器替换为Dask,并且在线程之间共享数据可能会获得更高的效率,但是只要您所做的工作是这样的CPU绑定,您就无法加快它的速度。

简而言之,这是一个算法问题:在尝试考虑不同的计算框架之前,先找出真正需要计算的内容。

票数 0
EN

Stack Overflow用户

发布于 2020-03-04 10:32:20

我不是Dask方面的专家,但我提供了以下代码作为基线:

代码语言:javascript
复制
import dask.dataframe as ddf

df = ddf.from_pandas(df, npartitions=4) # My PC has 4 cores

task = df.groupby(["measurement_id", "time", "group"]).apply(
    lambda x: cluster(x["var"], x["object"]),
    meta=pd.Series(np.nan, index=pd.Series([0, 1, 1, 1])),
)

res = task.compute()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60375142

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档