我正在优化一个类,用于解压质量效应3's的.sfar文件。.sfar文件是归档文件,此函数从.sfar内部解压缩文件并将其写入所提供的流。
有时文件没有被压缩,所以只会被复制到输出流中。然而,大多数情况下,文件会被分割成许多单独压缩的块(偶尔也会被解压缩)。
我想我可以通过并行解压缩这些块来加速这个过程。与原始的单线程同步函数相比,它确实导致了相当大的加速。不过,我想知道这是否是个好办法。还有什么我错过的东西能让这件事更快吗?
public async Task DecompressEntryAsync(int Index, Stream output)
{
FileEntryStruct e = Files[Index];
using (FileStream fs = new FileStream(FileName, FileMode.Open, FileAccess.Read, FileShare.None, 4096, useAsync: true))
{
fs.Seek(e.BlockOffsets[0], SeekOrigin.Begin);
byte[] buff;
if (e.BlockSizeIndex == 0xFFFFFFFF)
{
buff = new byte[e.RealUncompressedSize];
await fs.ReadAsync(buff, 0, buff.Length).ConfigureAwait(continueOnCapturedContext: false);
await output.WriteAsync(buff, 0, buff.Length).ConfigureAwait(continueOnCapturedContext: false);
}
else
{
uint count = 0;
byte[] inputBlock;
long left = e.RealUncompressedSize;
List<Task<byte[]>> tasks = new List<Task<byte[]>>();
while (left > 0)
{
uint compressedBlockSize = e.BlockSizes[count];
if (compressedBlockSize == 0)
compressedBlockSize = Header.MaxBlockSize;
if (compressedBlockSize == Header.MaxBlockSize || compressedBlockSize == left)
{
left -= compressedBlockSize;
buff = new byte[compressedBlockSize];
await fs.ReadAsync(buff, 0, buff.Length).ConfigureAwait(continueOnCapturedContext: false);
tasks.Add(Task.FromResult(buff));
}
else
{
var uncompressedBlockSize = Math.Min(left, Header.MaxBlockSize);
left -= uncompressedBlockSize;
if (compressedBlockSize < 5)
{
throw new Exception("compressed block size smaller than 5");
}
inputBlock = new byte[compressedBlockSize];
await fs.ReadAsync(inputBlock, 0, (int)compressedBlockSize).ConfigureAwait(continueOnCapturedContext: false);
tasks.Add(SevenZipHelper.DecompressAsync(inputBlock, (int)uncompressedBlockSize));
}
count++;
}
await Task.WhenAll(tasks).ConfigureAwait(continueOnCapturedContext: false);
foreach (var task in tasks)
{
buff = task.Result;
await output.WriteAsync(buff, 0, buff.Length).ConfigureAwait(continueOnCapturedContext: false);
}
}
}
}下面是SevenZipHelper.DecompressAsync函数:
public static Task<byte[]> DecompressAsync(byte[] inputBytes, int outSize)
{
return Task.Run(() =>
{
MemoryStream newInStream = new MemoryStream(inputBytes);
Decoder decoder = new Decoder();
newInStream.Seek(0, 0);
byte[] properties2 = new byte[5];
if (newInStream.Read(properties2, 0, 5) != 5)
throw (new Exception("input .lzma is too short"));
decoder.SetDecoderProperties(properties2);
long compressedSize = newInStream.Length - newInStream.Position;
MemoryStream newOutStream = new MemoryStream();
decoder.Code(newInStream, newOutStream, compressedSize, outSize, null);
if (newOutStream.Length != outSize)
throw new Exception("Decompression Error");
return newOutStream.ToArray();
});
}发布于 2016-08-04 13:05:17
这对TPL数据流库来说是一个完美的任务(由微软完成)。可在努基特上使用。
使用数据流,您可以连续发布任务,并行处理任务,并按顺序完成任务。这达到了@thesyndarn评论中的目标。一旦处理了第一个块,它就会立即写入输出。完成无序处理的块将等待它们被写入。
要设置数据流,需要定义两个块,一个解压缩块和一个输出写入块,并将它们链接到一起:
using System.Threading.Tasks.Dataflow;
...
internal class InputBlock
{
public const long Uncompressed = -1;
public byte[] Data { get; }
public long UncompressedSize { get; }
public bool IsCompressed { get; }
public InputBlock(byte[] data, long uncompressedSize)
{
Data = data;
UncompressedSize = uncompressedSize;
IsCompressed = uncompressedSize > 0;
}
}
...
var decompressor = new TransformBlock<InputBlock, byte[]>(
input => input.IsCompressed
? SevenZipHelper.Decompress(input.Data, input.UncompressedSize)
: input.Data
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }
);
var outputWriter = new ActionBlock<byte[]>(
data => output.Write(data, 0, data.Length)
);
decompressor.LinkTo(outputWriter, new DataflowLinkOptions { PropagateCompletion = true });
...
decompressor.Post(new InputBlock(compressedData, uncompressedBlockSize));将数据发布到解压缩器,它将并行处理数据,并以与发布相同的顺序传递给outputWriter。
这需要一个同步Decompress:
public static byte[] Decompress(byte[] inputBytes, long decompressedSize)
{
var compressed = new MemoryStream(inputBytes);
var decoder = new Decoder();
var properties2 = new byte[5];
if (compressed.Read(properties2, 0, 5) != 5)
{
throw (new Exception("input .lzma is too short"));
}
decoder.SetDecoderProperties(properties2);
var compressedSize = compressed.Length - compressed.Position;
var decompressed = new MemoryStream();
decoder.Code(compressed, decompressed, compressedSize, decompressedSize, null);
if (decompressed.Length != decompressedSize)
throw new Exception("Decompression Error");
return decompressed.ToArray();
}我(认为我)使变量名更容易读懂。
现在只使用数据流管道而不是触发任务:
public async Task DecompressEntryAsync(int index, Stream output)
{
var entry = Files[index];
using (var fs = new FileStream(FileName, FileMode.Open, FileAccess.Read, FileShare.None, 4096, useAsync: true))
{
fs.Seek(entry.BlockOffsets[0], SeekOrigin.Begin);
var isUncompressed = entry.BlockSizeIndex == 0xFFFFFFFF;
if (isUncompressed)
{
var uncompressed = new byte[entry.RealUncompressedSize];
await fs.ReadAsync(uncompressed, 0, uncompressed.Length).ConfigureAwait(continueOnCapturedContext: false);
await output.WriteAsync(uncompressed, 0, uncompressed.Length).ConfigureAwait(continueOnCapturedContext: false);
return;
}
var decompressor = new TransformBlock<InputBlock, byte[]>(
input => input.IsCompressed
? SevenZipHelper.Decompress(input.Data, input.UncompressedSize)
: input.Data
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }
);
var outputWriter = new ActionBlock<byte[]>(
data => output.Write(data, 0, data.Length)
);
decompressor.LinkTo(outputWriter, new DataflowLinkOptions { PropagateCompletion = true });
uint count = 0;
long left = entry.RealUncompressedSize;
while (left > 0)
{
uint compressedBlockSize = entry.BlockSizes[count];
if (compressedBlockSize == 0)
{
compressedBlockSize = Header.MaxBlockSize;
}
if (compressedBlockSize == Header.MaxBlockSize ||
compressedBlockSize == left)
{
left -= compressedBlockSize;
var uncompressedData = new byte[compressedBlockSize];
await fs.ReadAsync(uncompressedData, 0, uncompressedData.Length).ConfigureAwait(continueOnCapturedContext: false);
decompressor.Post(new InputBlock(uncompressedData, InputBlock.Uncompressed));
}
else
{
var uncompressedBlockSize = Math.Min(left, Header.MaxBlockSize);
left -= uncompressedBlockSize;
if (compressedBlockSize < 5)
{
throw new Exception("compressed block size smaller than 5");
}
var compressedData = new byte[compressedBlockSize];
await fs.ReadAsync(compressedData, 0, (int)compressedBlockSize).ConfigureAwait(continueOnCapturedContext: false);
decompressor.Post(new InputBlock(compressedData, uncompressedBlockSize));
}
count++;
}
decompressor.Complete();
await outputWriter.Completion;
}
}https://codereview.stackexchange.com/questions/136726
复制相似问题