首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用HDFS存储的Spark作业

使用HDFS存储的Spark作业
EN

Stack Overflow用户
提问于 2019-10-01 19:53:42
回答 2查看 533关注 0票数 4

我有一个长期运行的Spark Structured Streaming Job,它运行在Google Cloud Dataproc上,使用Kafka作为源和接收器。我还将我的检查点保存在Google云存储中。

运行一周后,我注意到它正在稳定地消耗所有100 GB的磁盘存储空间,将文件保存到/hadoop/dfs/data/current/BP-315396706-10.128.0.26-1568586969675/current/finalized/...

我的理解是,我的Spark作业不应该对本地磁盘存储有任何依赖。

我完全误解了这一点吗?

我像这样提交了我的作业:

代码语言:javascript
复制
(cd  app/src/packages/ &&  zip -r mypkg.zip mypkg/ ) && mv app/src/packages/mypkg.zip build
gcloud dataproc jobs submit pyspark \
    --cluster cluster-26aa \
    --region us-central1 \
    --properties ^#^spark.jars.packages=org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 \
    --py-files build/mypkg.zip \
    --max-failures-per-hour 10 \
    --verbosity info \
    app/src/explode_rmq.py

以下是我工作的相关部分:

来源:

代码语言:javascript
复制
 spark = SparkSession \
        .builder \
        .appName("MyApp") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    spark.sparkContext.addPyFile('mypkg.zip')

    df = spark \
        .readStream \
        .format("kafka") \
        .options(**config.KAFKA_PARAMS) \
        .option("subscribe", "lsport-rmq-12") \
        .option("startingOffsets", "earliest") \
        .load() \
        .select(f.col('key').cast(t.StringType()), f.col('value').cast(t.StringType()))

接收器:

代码语言:javascript
复制
    sink_kafka_q = sink_df \
        .writeStream \
        .format("kafka") \
        .options(**config.KAFKA_PARAMS) \
        .option("topic", "my_topic") \
        .option("checkpointLocation", "gs://my-bucket-data/checkpoints/my_topic") \
        .start()
EN

回答 2

Stack Overflow用户

发布于 2019-10-01 21:17:59

如果内存不足,Spark将在本地磁盘上持久化信息。您可以像这样关闭磁盘上的持久化:

代码语言:javascript
复制
df.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)

或者,您可以尝试序列化信息以占用更少的内存,如下所示

代码语言:javascript
复制
df.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)

读取序列化的数据将占用更多的CPU资源。

每个数据帧都有其独特的序列化级别。

欲了解更多信息,请访问:https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

票数 1
EN

Stack Overflow用户

发布于 2019-10-10 01:24:01

您能否通过SSH连接到主节点并运行以下命令,以找出谁在消耗HDFS空间?

代码语言:javascript
复制
hdfs df -du -h /

我测试了一个简单的Spark Pi工作,

在运行作业之前:

代码语言:javascript
复制
$ hdfs dfs -du /
34       /hadoop
0        /tmp
2107947  /user

作业完成后:

代码语言:javascript
复制
$ hdfs dfs -du /user/
0        /user/hbase
0        /user/hdfs
0        /user/hive
0        /user/mapred
0        /user/pig
0        /user/root
2107947  /user/spark
0        /user/yarn
0        /user/zookeeper

$ hdfs dfs -du /user/spark/
2107947  /user/spark/eventlog

它似乎被Spark eventlog消耗掉了,参见spark.eventLog.dir。您可以考虑使用spark.eventLog.compress=true压缩事件日志或使用spark.eventLog.enabled=false禁用它

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58184273

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档