我正在尝试从S3中将大文件读入块中,而不会为了并行处理而削减任何行。
让我举个例子来解释: S3上有1G大小的文件。我想把这个文件分成64 MB的块。这很简单,我可以这样做:
S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));
InputStream stream = s3object.getObjectContent();
byte[] content = new byte[64*1024*1024];
while (stream.read(content) != -1) {
//process content here
}但是chunk的问题是它可能有100个完整的行和一个不完整的行。但是我不能处理不完整的行,也不想丢弃它。
有没有办法处理这种情况?意味着所有的chucks都没有分割线。
发布于 2017-11-20 03:21:59
我通常的方法(InputStream -> BufferedReader.lines() -> batches of lines -> CompletableFuture)在这里不起作用,因为对于大型文件,底层S3ObjectInputStream最终会超时。
因此,我创建了一个新的类S3InputStream,它不关心打开多长时间,并使用短暂的AWS SDK调用按需读取字节块。您提供了一个将被重用的byte[]。new byte[1 << 24] (16Mb)似乎工作正常。
package org.harrison;
import java.io.IOException;
import java.io.InputStream;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
/**
* An {@link InputStream} for S3 files that does not care how big the file is.
*
* @author stephen harrison
*/
public class S3InputStream extends InputStream {
private static class LazyHolder {
private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();
}
private final String bucket;
private final String file;
private final byte[] buffer;
private long lastByteOffset;
private long offset = 0;
private int next = 0;
private int length = 0;
public S3InputStream(final String bucket, final String file, final byte[] buffer) {
this.bucket = bucket;
this.file = file;
this.buffer = buffer;
this.lastByteOffset = LazyHolder.S3.getObjectMetadata(bucket, file).getContentLength() - 1;
}
@Override
public int read() throws IOException {
if (next >= length) {
fill();
if (length <= 0) {
return -1;
}
next = 0;
}
if (next >= length) {
return -1;
}
return buffer[this.next++];
}
public void fill() throws IOException {
if (offset >= lastByteOffset) {
length = -1;
} else {
try (final InputStream inputStream = s3Object()) {
length = 0;
int b;
while ((b = inputStream.read()) != -1) {
buffer[length++] = (byte) b;
}
if (length > 0) {
offset += length;
}
}
}
}
private InputStream s3Object() {
final GetObjectRequest request = new GetObjectRequest(bucket, file).withRange(offset,
offset + buffer.length - 1);
return LazyHolder.S3.getObject(request).getObjectContent();
}
}发布于 2017-06-06 20:04:41
aws-java-sdk已经为您的S3对象提供了流功能。你必须调用"getObject“,结果将是一个InputStream。
1) AmazonS3Client.getObject(GetObjectRequest getObjectRequest) -> S3Object
2) S3Object.getObjectContent()
注意:该方法是一个简单的getter方法,并不实际创建流。如果检索S3Object,则应该尽快关闭此输入流,因为对象内容不会在内存中缓冲,也不会直接从Amazon S3中流出来。此外,如果不能关闭此流,可能会导致请求池被阻塞。
aws java docs
发布于 2017-07-07 09:24:22
100完整的行和一个不完整的行
你的意思是你需要逐行阅读这篇文章吗?如果是这样的话,不要使用InputStream,而是尝试使用BufferedReader读取s3对象流,这样您就可以逐行读取流,但我认为这会比按块读取稍慢一些。
S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));
BufferedReader in = new BufferedReader(new InputStreamReader(s3object.getObjectContent()));
String line;
while ((line = in.readLine()) != null) {
//process line here
}https://stackoverflow.com/questions/44389194
复制相似问题