首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在使用pandas_udf和Parquet序列化时内存泄漏?

在使用pandas_udf和Parquet序列化时内存泄漏?
EN

Stack Overflow用户
提问于 2019-05-27 23:45:01
回答 2查看 1.9K关注 0票数 14

我目前正在使用PySpark开发我的第一个完整系统,我遇到了一些奇怪的、与内存相关的问题。在其中一个阶段中,我想要类似于拆分-应用-组合策略来修改DataFrame。也就是说,我想将一个函数应用于由给定列定义的每个组,并最终将它们组合在一起。问题是,我想要应用的函数是一种拟合模型的预测方法,该模型“说”出了Pandas成语,即它是矢量化的,并以Pandas系列作为输入。

然后,我设计了一个迭代策略,遍历组并手动应用pandas_udf.Scalar来解决问题。组合部分是通过增量调用DataFrame.unionByName()来完成的。我决定不使用GroupedMap类型的pandas_udf,因为文档规定内存应该由用户管理,并且当其中一个组可能太大而无法将其保存在内存中或由Pandas DataFrame表示时,您应该特别小心。

主要的问题是,所有的处理看起来都运行得很好,但最终我想将最终的DataFrame序列化为一个Parquet文件。正是在这一点上,我收到了许多关于DataFrameWriter的类似Java的错误,或者内存不足异常。

我已经在Windows和Linux机器上尝试过这些代码。我设法避免错误的唯一方法是增加机器中的--driver-memory值。最小值在每个平台上都是不同的,并且取决于问题的大小,这在某种程度上让我怀疑内存泄漏。

直到我开始使用pandas_udf,这个问题才出现。我认为在使用pandas_udf时,在整个pyarrow序列化过程中可能存在内存泄漏。

我已经创建了一个最小的可重现的例子。如果我直接使用Python运行这个脚本,它会产生错误。使用spark-submit并增加大量驱动内存,可以使其正常工作。

代码语言:javascript
复制
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as spktyp


# Dummy pandas_udf -------------------------------------------------------------
@F.pandas_udf(spktyp.DoubleType())
def predict(x):
    return x + 100.0


# Initialization ---------------------------------------------------------------
spark = pyspark.sql.SparkSession.builder.appName(
        "mre").master("local[3]").getOrCreate()

sc = spark.sparkContext

# Generate a dataframe ---------------------------------------------------------
out_path = "out.parquet"

z = 105
m = 750000

schema = spktyp.StructType(
    [spktyp.StructField("ID", spktyp.DoubleType(), True)]
)

df = spark.createDataFrame(
    [(float(i),) for i in range(m)],
    schema
)

for j in range(z):
    df = df.withColumn(
        f"N{j}",
        F.col("ID") + float(j)
    )

df = df.withColumn(
    "X",
    F.array(
        F.lit("A"),
        F.lit("B"),
        F.lit("C"),
        F.lit("D"),
        F.lit("E")
    ).getItem(
        (F.rand()*3).cast("int")
    )
)

# Set the column names for grouping, input and output --------------------------
group_col = "X"
in_col = "N0"
out_col = "EP"

# Extract different group ids in grouping variable -----------------------------
rows = df.select(group_col).distinct().collect()
groups = [row[group_col] for row in rows]
print(f"Groups: {groups}")

# Split and treat the first id -------------------------------------------------
first, *others = groups

cur_df = df.filter(F.col(group_col) == first)
result = cur_df.withColumn(
    out_col,
    predict(in_col)
)

# Traverse the remaining group ids ---------------------------------------------
for i, other in enumerate(others):
    cur_df = df.filter(F.col(group_col) == other)
    new_df = cur_df.withColumn(
        out_col,
        predict(in_col)
    )

    # Incremental union --------------------------------------------------------
    result = result.unionByName(new_df)

# Save to disk -----------------------------------------------------------------
result.write.mode("overwrite").parquet(out_path)

令人震惊的是(至少对我来说),如果我在序列化语句之前调用repartition(),这个问题似乎就消失了。

代码语言:javascript
复制
result = result.repartition(result.rdd.getNumPartitions())
result.write.mode("overwrite").parquet(out_path)

将这一行放到适当的位置后,我可以大大降低驱动程序内存配置,并且脚本运行良好。我几乎不能理解所有这些因素之间的关系,尽管我怀疑代码的懒惰评估和pyarrow序列化可能是相关的。

这是我当前用于开发的环境:

代码语言:javascript
复制
arrow-cpp                 0.13.0           py36hee3af98_1    conda-forge
asn1crypto                0.24.0                py36_1003    conda-forge
astroid                   2.2.5                    py36_0
atomicwrites              1.3.0                      py_0    conda-forge
attrs                     19.1.0                     py_0    conda-forge
blas                      1.0                         mkl
boost-cpp                 1.68.0            h6a4c333_1000    conda-forge
brotli                    1.0.7             he025d50_1000    conda-forge
ca-certificates           2019.3.9             hecc5488_0    conda-forge
certifi                   2019.3.9                 py36_0    conda-forge
cffi                      1.12.3           py36hb32ad35_0    conda-forge
chardet                   3.0.4                 py36_1003    conda-forge
colorama                  0.4.1                    py36_0
cryptography              2.6.1            py36hb32ad35_0    conda-forge
dill                      0.2.9                    py36_0
docopt                    0.6.2                    py36_0
entrypoints               0.3                      py36_0
falcon                    1.4.1.post1     py36hfa6e2cd_1000    conda-forge
fastavro                  0.21.21          py36hfa6e2cd_0    conda-forge
flake8                    3.7.7                    py36_0
future                    0.17.1                py36_1000    conda-forge
gflags                    2.2.2                ha925a31_0
glog                      0.3.5                h6538335_1
hug                       2.5.2            py36hfa6e2cd_0    conda-forge
icc_rt                    2019.0.0             h0cc432a_1
idna                      2.8                   py36_1000    conda-forge
intel-openmp              2019.3                      203
isort                     4.3.17                   py36_0
lazy-object-proxy         1.3.1            py36hfa6e2cd_2
libboost                  1.67.0               hd9e427e_4
libprotobuf               3.7.1                h1a1b453_0    conda-forge
lz4-c                     1.8.1.2              h2fa13f4_0
mccabe                    0.6.1                    py36_1
mkl                       2018.0.3                      1
mkl_fft                   1.0.6            py36hdbbee80_0
mkl_random                1.0.1            py36h77b88f5_1
more-itertools            4.3.0                 py36_1000    conda-forge
ninabrlong                0.1.0                     dev_0    <develop>
nose                      1.3.7                 py36_1002    conda-forge
nose-exclude              0.5.0                      py_0    conda-forge
numpy                     1.15.0           py36h9fa60d3_0
numpy-base                1.15.0           py36h4a99626_0
openssl                   1.1.1b               hfa6e2cd_2    conda-forge
pandas                    0.23.3           py36h830ac7b_0
parquet-cpp               1.5.1                         2    conda-forge
pip                       19.0.3                   py36_0
pluggy                    0.11.0                     py_0    conda-forge
progressbar2              3.38.0                     py_1    conda-forge
py                        1.8.0                      py_0    conda-forge
py4j                      0.10.7                   py36_0
pyarrow                   0.13.0           py36h8c67754_0    conda-forge
pycodestyle               2.5.0                    py36_0
pycparser                 2.19                     py36_1    conda-forge
pyflakes                  2.1.1                    py36_0
pygam                     0.8.0                      py_0    conda-forge
pylint                    2.3.1                    py36_0
pyopenssl                 19.0.0                   py36_0    conda-forge
pyreadline                2.1                      py36_1
pysocks                   1.6.8                 py36_1002    conda-forge
pyspark                   2.4.1                      py_0
pytest                    4.5.0                    py36_0    conda-forge
pytest-runner             4.4                        py_0    conda-forge
python                    3.6.6                hea74fb7_0
python-dateutil           2.8.0                    py36_0
python-hdfs               2.3.1                      py_0    conda-forge
python-mimeparse          1.6.0                      py_1    conda-forge
python-utils              2.3.0                      py_1    conda-forge
pytz                      2019.1                     py_0
re2                       2019.04.01       vc14h6538335_0  [vc14]  conda-forge
requests                  2.21.0                py36_1000    conda-forge
requests-kerberos         0.12.0                   py36_0
scikit-learn              0.20.1           py36hb854c30_0
scipy                     1.1.0            py36hc28095f_0
setuptools                41.0.0                   py36_0
six                       1.12.0                   py36_0
snappy                    1.1.7                h777316e_3
sqlite                    3.28.0               he774522_0
thrift-cpp                0.12.0            h59828bf_1002    conda-forge
typed-ast                 1.3.1            py36he774522_0
urllib3                   1.24.2                   py36_0    conda-forge
vc                        14.1                 h0510ff6_4
vs2015_runtime            14.15.26706          h3a45250_0
wcwidth                   0.1.7                      py_1    conda-forge
wheel                     0.33.1                   py36_0
win_inet_pton             1.1.0                    py36_0    conda-forge
wincertstore              0.2              py36h7fe50ca_0
winkerberos               0.7.0                    py36_1
wrapt                     1.11.1           py36he774522_0
xz                        5.2.4                h2fa13f4_4
zlib                      1.2.11               h62dcd97_3
zstd                      1.3.3                hfe6a214_0

任何提示或帮助都将不胜感激。

EN

回答 2

Stack Overflow用户

发布于 2019-06-05 23:59:28

我想评论你的帖子,但我的名声太低了。

根据我的经验,udf会极大地降低您的性能,特别是如果您用python (或pandas?)编写它们的话。有一篇文章,为什么你不应该使用python udfs而使用scala udfs:https://medium.com/wbaa/using-scala-udfs-in-pyspark-b70033dd69b9

在我的例子中,使用内置函数是可能的,即使它相当复杂,与以前相比,运行时间减少到大约5%。

对于你的OOM错误和为什么重新分区能为你工作,我没有任何解释。我能给你的唯一建议是尽量避免UDF,尽管在你的情况下似乎没有那么容易。

票数 7
EN

Stack Overflow用户

发布于 2020-04-04 23:55:11

这个线程有点老了,但我偶然发现了完全相同的问题,并花了相当多的时间在上面。因此,我只是想解释一下我是如何解决这个问题的,希望它能为将来遇到同样问题的其他人节省一些时间。

这里的问题与pandas_udf或parquet无关,而是与使用withColumn生成列有关。当向数据帧中添加多个列时,使用select方法要高效得多。This article解释了其中的原因。

所以举个例子,不是

代码语言:javascript
复制
for j in range(z):
   df = df.withColumn(
       f"N{j}",
       F.col("ID") + float(j)
   )

你应该写下

代码语言:javascript
复制
df = df.select(
    *df.columns,
    *[(F.col("ID") + float(j)).alias(f"N{j}") for j in range(z)]
)

重写的脚本如下所示(请注意,我仍然必须将驱动程序内存增加到2 2GB,但至少是相当合理的内存量)

代码语言:javascript
复制
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as spktyp


# Dummy pandas_udf -------------------------------------------------------------
@F.pandas_udf(spktyp.DoubleType())
def predict(x):
    return x + 100.0


# Initialization ---------------------------------------------------------------
spark = (pyspark.sql.SparkSession.builder
        .appName("mre")
        .config("spark.driver.memory", "2g")
        .master("local[3]").getOrCreate())

sc = spark.sparkContext

# Generate a dataframe ---------------------------------------------------------
out_path = "out.parquet"

z = 105
m = 750000

schema = spktyp.StructType(
    [spktyp.StructField("ID", spktyp.DoubleType(), True)]
)

df = spark.createDataFrame(
    [(float(i),) for i in range(m)],
    schema
)


df = df.select(
    *df.columns,
    *[(F.col("ID") + float(j)).alias(f"N{j}") for j in range(z)]
)

df = df.withColumn(
    "X",
    F.array(
        F.lit("A"),
        F.lit("B"),
        F.lit("C"),
        F.lit("D"),
        F.lit("E")
    ).getItem(
        (F.rand()*3).cast("int")
    )
)

# Set the column names for grouping, input and output --------------------------
group_col = "X"
in_col = "N0"
out_col = "EP"

# Extract different group ids in grouping variable -----------------------------
rows = df.select(group_col).distinct().collect()
groups = [row[group_col] for row in rows]
print(f"Groups: {groups}")

# Split and treat the first id -------------------------------------------------
first, *others = groups

cur_df = df.filter(F.col(group_col) == first)
result = cur_df.withColumn(
    out_col,
    predict(in_col)
)

# Traverse the remaining group ids ---------------------------------------------
for i, other in enumerate(others):
    cur_df = df.filter(F.col(group_col) == other)
    new_df = cur_df.select(
        *cur_df.columns,
        predict(in_col).alias(out_col)
    )
    # Incremental union --------------------------------------------------------
    result = result.unionByName(new_df)

# Save to disk -----------------------------------------------------------------
result.write.mode("overwrite").parquet(out_path)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56329093

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档