首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Azure-Databricks - autoloader选项和foreach()给出了java.lang.OutOfMemoryError: Java堆空间

Azure-Databricks - autoloader选项和foreach()给出了java.lang.OutOfMemoryError: Java堆空间
EN

Stack Overflow用户
提问于 2022-09-08 05:19:35
回答 1查看 76关注 0票数 0

我试图使用BinaryFile选项将文件从一个位置复制到另一个位置,并在autoloader中使用foreach(复制)。它在较小的文件(最多150 MB)下运行良好,但是如果更大的文件抛出在异常下面,则会失败:

22/09/07 10:25:51 INFO FileScanRDD: Reading :dbfs:/mnt/soefile.csv,范围: 0-1652464461,分区值:空行,modificationTime: 1662542176000。22/09/07 10:25:52 ERROR Utils:线程标准编写器/databricks/python/bin/python java.lang.OutOfMemoryError: Java堆空间在org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:416) at org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader.read(SpecializedGettersReader.java:75) at org.apache.spark.sql.catalyst.expressions.UnsafeRow.get(UnsafeRow.java:333) at org.apache.spark.sql.execution.python.EvaluatePython$org.apache.spark.sql.execution.python.PythonForeachWriter.$anonfun$inputByteIterator$1(PythonForeachWriter.scala:43) at org.apache.spark.sql.execution.python.PythonForeachWriter$$Lambda$1830/1643360976.apply(Unknown Source) scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:92) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:82) at scala.collection.Iterator.foreach(scala.collection.Iterator.foreach$(Iterator.scala:943) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:82) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:442) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:871) at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:573) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$2008/2134044540.将(未知源)应用于org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2275) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:365) 22/09/07 10:25:52 ERROR SparkUncaughtExceptionHandler:线程输出编写器中的未命名异常用于/databricks/python/bin/python,5、main java.lang.OutOfMemoryError: Java堆空间在org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:416) at org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader.read(SpecializedGettersReader.java:75) at org.apache.spark.sql.catalyst.expressions.UnsafeRow.get(UnsafeRow.java:333) at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:58) at org.apache.spark.sql.execution.python.PythonForeachWriter.$anonfun$inputByteIterator$1(PythonForeachWriter( org.apache.spark.sql.execution.python.PythonForeachWriter$$Lambda$1830/1643360976.apply(Unknown来源)在scala.collection.Iterator$$anon$10.next(Iterator.scala:461),org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:92),org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:82),scala.collection.Iterator.foreach(Iterator.scala:943),scala.collection.Iterator.foreach$(Iterator.scala:943),org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:82) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:442) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:871) at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:573) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$2008/2134044540.apply(Unknown Source)apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:365)

下面是供参考的高级代码片段:

集群大小为2名工作人员和1名驱动程序,每个有14 Gb内存和4个核心。

代码语言:javascript
复制
cloudfile_options = {
    "cloudFiles.subscriptionId":subscription_ID,
    "cloudFiles.connectionString": queue_SAS_connection_string,
    "cloudFiles.format": "BinaryFile", 
    "cloudFiles.tenantId":tenant_ID,
    "cloudFiles.clientId":client_ID,
    "cloudFiles.clientSecret":client_secret,
    "cloudFiles.useNotifications" :"true"
}

def copy(row):
    source = row['path']
    destination = "somewhere"
    shutil.copy(source,destination)

spark.readStream.format("cloudFiles")
                        .options(**cloudfile_options)
                        .load(storage_input_path)              
                        .writeStream
                        .foreach(copy)
                        .option("checkpointLocation", checkpoint_location)
                        .trigger(once=True)
                        .start()

我还在foreach()之外测试了巨大文件大小(20 it )的shutil.copy,而且它似乎运行得很好。

任何关于这方面的线索都将不胜感激。

EN

回答 1

Stack Overflow用户

发布于 2022-09-08 07:01:18

之所以会出现这种情况,是因为您要传递包含应该从JVM序列化到Python的文件内容的完整行。如果您所做的一切只是复制文件,那么只需在.select("path")之前添加.writeStream,所以只有文件名将传递给Python,但没有内容:

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73644042

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档