我试图使用BinaryFile选项将文件从一个位置复制到另一个位置,并在autoloader中使用foreach(复制)。它在较小的文件(最多150 MB)下运行良好,但是如果更大的文件抛出在异常下面,则会失败:
22/09/07 10:25:51 INFO FileScanRDD: Reading :dbfs:/mnt/soefile.csv,范围: 0-1652464461,分区值:空行,modificationTime: 1662542176000。22/09/07 10:25:52 ERROR Utils:线程标准编写器/databricks/python/bin/python java.lang.OutOfMemoryError: Java堆空间在org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:416) at org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader.read(SpecializedGettersReader.java:75) at org.apache.spark.sql.catalyst.expressions.UnsafeRow.get(UnsafeRow.java:333) at org.apache.spark.sql.execution.python.EvaluatePython$org.apache.spark.sql.execution.python.PythonForeachWriter.$anonfun$inputByteIterator$1(PythonForeachWriter.scala:43) at org.apache.spark.sql.execution.python.PythonForeachWriter$$Lambda$1830/1643360976.apply(Unknown Source) scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:92) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:82) at scala.collection.Iterator.foreach(scala.collection.Iterator.foreach$(Iterator.scala:943) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:82) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:442) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:871) at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:573) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$2008/2134044540.将(未知源)应用于org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2275) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:365) 22/09/07 10:25:52 ERROR SparkUncaughtExceptionHandler:线程输出编写器中的未命名异常用于/databricks/python/bin/python,5、main java.lang.OutOfMemoryError: Java堆空间在org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:416) at org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader.read(SpecializedGettersReader.java:75) at org.apache.spark.sql.catalyst.expressions.UnsafeRow.get(UnsafeRow.java:333) at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:58) at org.apache.spark.sql.execution.python.PythonForeachWriter.$anonfun$inputByteIterator$1(PythonForeachWriter( org.apache.spark.sql.execution.python.PythonForeachWriter$$Lambda$1830/1643360976.apply(Unknown来源)在scala.collection.Iterator$$anon$10.next(Iterator.scala:461),org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:92),org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:82),scala.collection.Iterator.foreach(Iterator.scala:943),scala.collection.Iterator.foreach$(Iterator.scala:943),org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:82) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:442) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:871) at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:573) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$2008/2134044540.apply(Unknown Source)apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:365)
下面是供参考的高级代码片段:
集群大小为2名工作人员和1名驱动程序,每个有14 Gb内存和4个核心。
cloudfile_options = {
"cloudFiles.subscriptionId":subscription_ID,
"cloudFiles.connectionString": queue_SAS_connection_string,
"cloudFiles.format": "BinaryFile",
"cloudFiles.tenantId":tenant_ID,
"cloudFiles.clientId":client_ID,
"cloudFiles.clientSecret":client_secret,
"cloudFiles.useNotifications" :"true"
}
def copy(row):
source = row['path']
destination = "somewhere"
shutil.copy(source,destination)
spark.readStream.format("cloudFiles")
.options(**cloudfile_options)
.load(storage_input_path)
.writeStream
.foreach(copy)
.option("checkpointLocation", checkpoint_location)
.trigger(once=True)
.start()我还在foreach()之外测试了巨大文件大小(20 it )的shutil.copy,而且它似乎运行得很好。
任何关于这方面的线索都将不胜感激。
发布于 2022-09-08 07:01:18
之所以会出现这种情况,是因为您要传递包含应该从JVM序列化到Python的文件内容的完整行。如果您所做的一切只是复制文件,那么只需在.select("path")之前添加.writeStream,所以只有文件名将传递给Python,但没有内容:
https://stackoverflow.com/questions/73644042
复制相似问题