获取以下PySpark代码的OutOfMemory错误:(写入一定数量的行后失败。如果我尝试写入hadoop文件系统而不是使用s3a,则不会发生这种情况,所以我认为我已经将问题缩小到s3a。)-写入s3a的最终目标。我想知道是否有一个最优的s3a配置,这样对于非常大的表,我就不会耗尽内存。
df = spark.sql("SELECT * FROM my_big_table")
df.repartition(1).write.option("header", "true").csv("s3a://mycsvlocation/folder/")我的s3a配置(电子病历默认设置):
('fs.s3a.attempts.maximum', '10')
('fs.s3a.buffer.dir', '${hadoop.tmp.dir}/s3a')
('fs.s3a.connection.establish.timeout', '5000')
('fs.s3a.connection.maximum', '15')
('fs.s3a.connection.ssl.enabled', 'true')
('fs.s3a.connection.timeout', '50000')
('fs.s3a.fast.buffer.size', '1048576')
('fs.s3a.fast.upload', 'true')
('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
('fs.s3a.max.total.tasks', '1000')
('fs.s3a.multipart.purge', 'false')
('fs.s3a.multipart.purge.age', '86400')
('fs.s3a.multipart.size', '104857600')
('fs.s3a.multipart.threshold', '2147483647')
('fs.s3a.paging.maximum', '5000')
('fs.s3a.threads.core', '15')
('fs.s3a.threads.keepalivetime', '60')
('fs.s3a.threads.max', '256')
('mapreduce.fileoutputcommitter.algorithm.version', '2')
('spark.authenticate', 'true')
('spark.network.crypto.enabled', 'true')
('spark.network.crypto.saslFallback', 'true')
('spark.speculation', 'false')堆栈跟踪的基础:
Caused by: java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.hadoop.fs.s3a.S3AFastOutputStream.write(S3AFastOutputStream.java:194)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:60)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
at com.univocity.parsers.common.input.WriterCharAppender.writeCharsAndReset(WriterCharAppender.java:152)
at com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:808)
... 16 more发布于 2020-09-01 11:07:29
这里的问题是,默认的s3a上传不支持上传大于2 2GB或2147483647字节的单个大文件。
('fs.s3a.multipart.threshold', '2147483647')我的电子病历版本比最近的版本旧,所以multipart.threshold参数只是一个整数,因此单个“部分”或文件的限制是2147483647字节。较新的版本使用long而不是int,并且可以支持更大的单个文件大小限制。
我将使用一种变通方法,将文件写到本地hdfs,然后通过单独的java程序将其移动到s3。
https://stackoverflow.com/questions/63650472
复制相似问题