首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多线程减压

多线程减压
EN

Code Review用户
提问于 2016-08-03 07:28:45
回答 1查看 1.1K关注 0票数 7

我正在优化一个类,用于解压质量效应3's的.sfar文件。.sfar文件是归档文件,此函数从.sfar内部解压缩文件并将其写入所提供的流。

有时文件没有被压缩,所以只会被复制到输出流中。然而,大多数情况下,文件会被分割成许多单独压缩的块(偶尔也会被解压缩)。

我想我可以通过并行解压缩这些块来加速这个过程。与原始的单线程同步函数相比,它确实导致了相当大的加速。不过,我想知道这是否是个好办法。还有什么我错过的东西能让这件事更快吗?

代码语言:javascript
复制
public async Task DecompressEntryAsync(int Index, Stream output)
{
    FileEntryStruct e = Files[Index];
    using (FileStream fs = new FileStream(FileName, FileMode.Open, FileAccess.Read, FileShare.None, 4096, useAsync: true))
    {
        fs.Seek(e.BlockOffsets[0], SeekOrigin.Begin);
        byte[] buff;
        if (e.BlockSizeIndex == 0xFFFFFFFF)
        {
            buff = new byte[e.RealUncompressedSize];
            await fs.ReadAsync(buff, 0, buff.Length).ConfigureAwait(continueOnCapturedContext: false);
            await output.WriteAsync(buff, 0, buff.Length).ConfigureAwait(continueOnCapturedContext: false);
        }
        else
        {
            uint count = 0;
            byte[] inputBlock;
            long left = e.RealUncompressedSize;
            List<Task<byte[]>> tasks = new List<Task<byte[]>>();
            while (left > 0)
            {
                uint compressedBlockSize = e.BlockSizes[count];
                if (compressedBlockSize == 0)
                    compressedBlockSize = Header.MaxBlockSize;
                if (compressedBlockSize == Header.MaxBlockSize || compressedBlockSize == left)
                {
                    left -= compressedBlockSize;
                    buff = new byte[compressedBlockSize];
                    await fs.ReadAsync(buff, 0, buff.Length).ConfigureAwait(continueOnCapturedContext: false);
                    tasks.Add(Task.FromResult(buff));
                }
                else
                {
                    var uncompressedBlockSize = Math.Min(left, Header.MaxBlockSize);
                    left -= uncompressedBlockSize;
                    if (compressedBlockSize < 5)
                    {
                        throw new Exception("compressed block size smaller than 5");
                    }
                    inputBlock = new byte[compressedBlockSize];

                    await fs.ReadAsync(inputBlock, 0, (int)compressedBlockSize).ConfigureAwait(continueOnCapturedContext: false);

                    tasks.Add(SevenZipHelper.DecompressAsync(inputBlock, (int)uncompressedBlockSize));
                }
                count++;
            }
            await Task.WhenAll(tasks).ConfigureAwait(continueOnCapturedContext: false);
            foreach (var task in tasks)
            {
                buff = task.Result;
                await output.WriteAsync(buff, 0, buff.Length).ConfigureAwait(continueOnCapturedContext: false);
            }
        } 
    }
}

下面是SevenZipHelper.DecompressAsync函数:

代码语言:javascript
复制
public static Task<byte[]> DecompressAsync(byte[] inputBytes, int outSize)
{
    return Task.Run(() =>
    {
        MemoryStream newInStream = new MemoryStream(inputBytes);

        Decoder decoder = new Decoder();

        newInStream.Seek(0, 0);

        byte[] properties2 = new byte[5];
        if (newInStream.Read(properties2, 0, 5) != 5)
            throw (new Exception("input .lzma is too short"));
        decoder.SetDecoderProperties(properties2);

        long compressedSize = newInStream.Length - newInStream.Position;
        MemoryStream newOutStream = new MemoryStream();
        decoder.Code(newInStream, newOutStream, compressedSize, outSize, null);
        if (newOutStream.Length != outSize)
            throw new Exception("Decompression Error");
        return newOutStream.ToArray();
    });
}
EN

回答 1

Code Review用户

回答已采纳

发布于 2016-08-04 13:05:17

这对TPL数据流库来说是一个完美的任务(由微软完成)。可在努基特上使用。

使用数据流,您可以连续发布任务,并行处理任务,并按顺序完成任务。这达到了@thesyndarn评论中的目标。一旦处理了第一个块,它就会立即写入输出。完成无序处理的块将等待它们被写入。

要设置数据流,需要定义两个块,一个解压缩块和一个输出写入块,并将它们链接到一起:

代码语言:javascript
复制
using System.Threading.Tasks.Dataflow;
...
internal class InputBlock
{
    public const long Uncompressed = -1;

    public byte[] Data { get; }
    public long UncompressedSize { get; }
    public bool IsCompressed { get; }

    public InputBlock(byte[] data, long uncompressedSize)
    {
        Data = data;
        UncompressedSize = uncompressedSize;
        IsCompressed = uncompressedSize > 0;
    }
}
...
var decompressor = new TransformBlock<InputBlock, byte[]>(
    input => input.IsCompressed
        ? SevenZipHelper.Decompress(input.Data, input.UncompressedSize)
        : input.Data
    , new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }
    );

var outputWriter = new ActionBlock<byte[]>(
    data => output.Write(data, 0, data.Length)
    );

decompressor.LinkTo(outputWriter, new DataflowLinkOptions { PropagateCompletion = true });
...
decompressor.Post(new InputBlock(compressedData, uncompressedBlockSize));

将数据发布到解压缩器,它将并行处理数据,并以与发布相同的顺序传递给outputWriter。

这需要一个同步Decompress

代码语言:javascript
复制
public static byte[] Decompress(byte[] inputBytes, long decompressedSize)
{
    var compressed = new MemoryStream(inputBytes);
    var decoder = new Decoder();

    var properties2 = new byte[5];
    if (compressed.Read(properties2, 0, 5) != 5)
    {
        throw (new Exception("input .lzma is too short"));
    }

    decoder.SetDecoderProperties(properties2);

    var compressedSize = compressed.Length - compressed.Position;
    var decompressed = new MemoryStream();
    decoder.Code(compressed, decompressed, compressedSize, decompressedSize, null);

    if (decompressed.Length != decompressedSize)
        throw new Exception("Decompression Error");

    return decompressed.ToArray();
}

我(认为我)使变量名更容易读懂。

现在只使用数据流管道而不是触发任务:

代码语言:javascript
复制
public async Task DecompressEntryAsync(int index, Stream output)
{
    var entry = Files[index];
    using (var fs = new FileStream(FileName, FileMode.Open, FileAccess.Read, FileShare.None, 4096, useAsync: true))
    {
        fs.Seek(entry.BlockOffsets[0], SeekOrigin.Begin);

        var isUncompressed = entry.BlockSizeIndex == 0xFFFFFFFF;
        if (isUncompressed)
        {
            var uncompressed = new byte[entry.RealUncompressedSize];
            await fs.ReadAsync(uncompressed, 0, uncompressed.Length).ConfigureAwait(continueOnCapturedContext: false);
            await output.WriteAsync(uncompressed, 0, uncompressed.Length).ConfigureAwait(continueOnCapturedContext: false);

            return;
        }

        var decompressor = new TransformBlock<InputBlock, byte[]>(
            input => input.IsCompressed
                ? SevenZipHelper.Decompress(input.Data, input.UncompressedSize)
                : input.Data
            , new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }
            );

        var outputWriter = new ActionBlock<byte[]>(
            data => output.Write(data, 0, data.Length)
            );

        decompressor.LinkTo(outputWriter, new DataflowLinkOptions { PropagateCompletion = true });

        uint count = 0;
        long left = entry.RealUncompressedSize;
        while (left > 0)
        {
            uint compressedBlockSize = entry.BlockSizes[count];
            if (compressedBlockSize == 0)
            {
                compressedBlockSize = Header.MaxBlockSize;
            }

            if (compressedBlockSize == Header.MaxBlockSize ||
                compressedBlockSize == left)
            {
                left -= compressedBlockSize;
                var uncompressedData = new byte[compressedBlockSize];
                await fs.ReadAsync(uncompressedData, 0, uncompressedData.Length).ConfigureAwait(continueOnCapturedContext: false);
                decompressor.Post(new InputBlock(uncompressedData, InputBlock.Uncompressed));
            }
            else
            {
                var uncompressedBlockSize = Math.Min(left, Header.MaxBlockSize);
                left -= uncompressedBlockSize;
                if (compressedBlockSize < 5)
                {
                    throw new Exception("compressed block size smaller than 5");
                }

                var compressedData = new byte[compressedBlockSize];
                await fs.ReadAsync(compressedData, 0, (int)compressedBlockSize).ConfigureAwait(continueOnCapturedContext: false);

                decompressor.Post(new InputBlock(compressedData, uncompressedBlockSize));
            }
            count++;
        }

        decompressor.Complete();
        await outputWriter.Completion;
    }
}
票数 3
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/136726

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档