首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark executor在binaryFiles读取时挂起

Spark executor在binaryFiles读取时挂起
EN

Stack Overflow用户
提问于 2018-10-03 22:14:44
回答 1查看 248关注 0票数 1

我们在Yarn上使用Spark 2.1.0进行多行记录的批量细化。我们的工作是用Pyspark编写的,每天运行一次。输入文件夹包含大约45000个非常小的文件(每个文件的范围是1kB-100kB ),总大小约为2 2GB。

每个文件包含不同数量的多行记录。记录的第一行有一个标准模式,一个时间戳,后面跟着一个希腊µ和一些其他信息。例如:

代码语言:javascript
复制
28/09/2018 08:54:22µfirst record metadata
first record content with
undefined
number of
lines
28/09/2018 08:57:12µsecond record metadata
second record content
with a different
number of lines

这是我们在Dataframe中读取文件的方式:

代码语言:javascript
复制
df=spark.sparkContext.binaryFiles(input_path).toDF(['filename', 'content'])
raw = df.select('filename', explode(split(df.content, r'(?=\d{2}\/\d{2}\/\d{4} \d{2}:\d{2}:\d{2}µ)'))).cache()

第一行的输出是一个数据帧,每个文件有一个条目,第二行的输出是一个数据帧,每个记录有一个条目。然后缓存Dataframe并执行其他操作。

我们实际上是在测试解决方案,这是作业的当前部署模式(但是,内存需求过大):

代码语言:javascript
复制
spark2-submit --master yarn \
  --conf spark.kryoserializer.buffer.max=1g \
  --deploy-mode cluster \
  --driver-memory 16g \
  --driver-cores 1 \
  --conf spark.yarn.driver.memoryOverhead=1g \
  --num-executors 20 \
  --executor-memory 16g \
  --executor-cores 1 \
  --conf spark.yarn.executor.memoryOverhead=1g \
  spark_etl.py

该作业几乎每天都运行良好,它在10-15分钟内执行所有操作,并将结果写入HDFS。

问题是,每隔7-10天,大约45000个输入文件中的一个与其他文件的大小完全不同: 100MB到1 1GB (无论如何,小于2 1GB)。在这种情况下,我们的工作(特别是其中一个执行者)挂起,似乎在整个时间内什么都不做。在第一分钟之后没有新的日志行。这需要几个小时,我们从来没有看到这些工作的结束,因为我们必须在几个小时前杀死他们。我们怀疑这是因为“大”文件,事实上,如果我们从输入文件夹中删除它,作业运行得很好。这些是我们上次运行的截图:

Pyspark文档说明:“小文件是首选,大文件也是允许的,但可能会导致性能不佳。”我们可以接受性能恶化,但我们认为情况并非如此,因为在我们看来,这项工作在整个时间内似乎什么都不做。

从Spark的角度来看,200MB的文件真的是一个“大文件”吗?如果是,我们如何提高我们工作的绩效,或者至少了解它是否真的在做一些事情?

谢谢

EN

回答 1

Stack Overflow用户

发布于 2019-06-26 11:48:15

也许你应该提高你的执行器核心数量。binaryFiles创建BinaryFileRDD,BinaryFileRDD根据CPU处理器获取分区数量。

代码语言:javascript
复制
// setMinPartitions below will call FileInputFormat.listStatus(), which can be quite slow when
// traversing a large number of directories and files. Parallelize it.
conf.setIfUnset(FileInputFormat.LIST_STATUS_NUM_THREADS,
      Runtime.getRuntime.availableProcessors().toString)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52629062

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档