首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >TPL线程泄漏和内存泄漏

TPL线程泄漏和内存泄漏
EN

Code Review用户
提问于 2013-01-11 15:16:34
回答 1查看 4.5K关注 0票数 1

我试图在我的一个程序中查找我认为是内存/线程泄漏的内容。

我的程序使用一个函数将文件上传到窗口透光存储块。为了使该函数对各种瞬态错误条件(如间歇性网络故障等)具有弹性,我想使用企业库中的用于Windows的瞬态故障处理应用程序块 "topaz",它包括一种可配置的重试机制。

此外,为了避免处理大文件时的超时条件,并改进和缩小在失败情况下应该重复的上传部分的范围,我尝试使用以块形式上载指定的文件流。,使每个上传都可以独立上传。

最后,为了坚持使用Windows存储的最佳实践,我在处理Windows时尽可能多地使用异步操作,以提高应用程序的可伸缩性。

下面是生成的片段函数:

代码语言:javascript
复制
    public static Task ChunkedUploadStreamAsync(CloudBlockBlob blob, Stream source, BlobRequestOptions options, int chunkSize, RetryPolicy policy)
    {
        var blockids = new List<string>();
        var blockid = 0;

        var count = 0;
        var bytes = new byte[chunkSize];

        // first create a list of TPL Tasks for uploading blocks asynchronously

        var tasks = new List<Task>();

        while ((count = source.Read(bytes, 0, bytes.Length)) != 0)
        {
            var id = Convert.ToBase64String(BitConverter.GetBytes(++blockid));

            Func<Task> uploadTaskFunc = () => new TaskFactory()
                .FromAsync(
                    (asyncCallback, state) => blob.BeginPutBlock(id, new MemoryStream(bytes, 0, count), null, null, null, null, asyncCallback, state)
                    , blob.EndPutBlock
                    , null
                )

                .ContinueWith(antecedent => blockids.Add(id), TaskContinuationOptions.NotOnFaulted);

            tasks.Add(policy.ExecuteAsync(uploadTaskFunc));
        }

        return new TaskFactory().ContinueWhenAll(
            tasks.ToArray(),
            array =>
                {
                    // propagate exceptions and make all faulted Tasks as observed
                    Task.WaitAll(array);

                    // create continuation task for committing uploaded blocks
                    Func<Task> commitTaskFunc = () => new TaskFactory()
                        .FromAsync(
                            (asyncCallback, state) => blob.BeginPutBlockList(blockids, asyncCallback, state)
                            , blob.EndPutBlockList
                            , null);

                    policy
                        .ExecuteAsync(commitTaskFunc)
                        .Wait();
                });
    }

使用Performance,我可以观察到在调用此函数之后,我的程序使用的线程数和内存量显著增加。例如,下面是这个函数的一个调用的快照:

拜托,有人能告诉我哪里做错了什么吗?如有必要,请提出更好的设计建议。

EN

回答 1

Code Review用户

回答已采纳

发布于 2013-01-11 16:49:16

代码中有几个问题使其容易出错:

  • 您正在使用单个字节数组来输入所有内存流,因此可能会以不同的块发送相同的数据。同样是count的意思。
  • blockids中块的顺序可能与您发送的块顺序不同,因此生成的BLOB块的顺序可能不正确。
  • blockids上可能存在线程争用,因为不同的线程可能同时添加一个新元素,从而导致各种问题,如结果列表中缺少块、块ID不正确等。
  • 使用Task.Factory而不是new TaskFactory()
  • 如果您创建了一个任务并立即等待它,那么您就不需要该任务了,所以您可以用同步版本替换最后一个BeginPutBlockList

除此之外,我没有从您提供的图表中看到内存泄漏。线程可能仍然存在于线程池中,内存可能还没有被GC收集。

下面是我得到的结果(请注意,它可能消耗更多的内存,因为您需要存储当前正在内存中发送的所有块):

代码语言:javascript
复制
    private static Task PutBlockAsync(CloudBlockBlob blob, string id, Stream stream, RetryPolicy policy)
    {
        Func<Task> uploadTaskFunc = () => Task.Factory
            .FromAsync(
                (asyncCallback, state) => blob.BeginPutBlock(id, stream, null, null, null, null, asyncCallback, state)
                , blob.EndPutBlock
                , null
            );
        return policy.ExecuteAsync(uploadTaskFunc);
    }

    public static Task ChunkedUploadStreamAsync(CloudBlockBlob blob, Stream source, BlobRequestOptions options, int chunkSize, RetryPolicy policy)
    {
        var blockids = new List<string>();
        var blockid = 0;

        int count;

        // first create a list of TPL Tasks for uploading blocks asynchronously
        var tasks = new List<Task>();

        var bytes = new byte[chunkSize];
        while ((count = source.Read(bytes, 0, bytes.Length)) != 0)
        {
            var id = Convert.ToBase64String(BitConverter.GetBytes(++blockid));
            blockids.Add(id);
            tasks.Add(PutBlockAsync(blob, id, new MemoryStream(bytes, 0, count), policy));
            bytes = new byte[chunkSize]; //need a new buffer to avoid overriding previous one
        }

        return Task.Factory.ContinueWhenAll(
            tasks.ToArray(),
            array =>
            {
                // propagate exceptions and make all faulted Tasks as observed
                Task.WaitAll(array);
                policy.ExecuteAction(() => blob.PutBlockList(blockids));
            });
    }
票数 4
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/20429

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档