我试图在我的一个程序中查找我认为是内存/线程泄漏的内容。
我的程序使用一个函数将文件上传到窗口透光存储块。为了使该函数对各种瞬态错误条件(如间歇性网络故障等)具有弹性,我想使用企业库中的用于Windows的瞬态故障处理应用程序块 "topaz",它包括一种可配置的重试机制。
此外,为了避免处理大文件时的超时条件,并改进和缩小在失败情况下应该重复的上传部分的范围,我尝试使用以块形式上载指定的文件流。,使每个上传都可以独立上传。
最后,为了坚持使用Windows存储的最佳实践,我在处理Windows时尽可能多地使用异步操作,以提高应用程序的可伸缩性。
下面是生成的片段函数:
public static Task ChunkedUploadStreamAsync(CloudBlockBlob blob, Stream source, BlobRequestOptions options, int chunkSize, RetryPolicy policy)
{
var blockids = new List<string>();
var blockid = 0;
var count = 0;
var bytes = new byte[chunkSize];
// first create a list of TPL Tasks for uploading blocks asynchronously
var tasks = new List<Task>();
while ((count = source.Read(bytes, 0, bytes.Length)) != 0)
{
var id = Convert.ToBase64String(BitConverter.GetBytes(++blockid));
Func<Task> uploadTaskFunc = () => new TaskFactory()
.FromAsync(
(asyncCallback, state) => blob.BeginPutBlock(id, new MemoryStream(bytes, 0, count), null, null, null, null, asyncCallback, state)
, blob.EndPutBlock
, null
)
.ContinueWith(antecedent => blockids.Add(id), TaskContinuationOptions.NotOnFaulted);
tasks.Add(policy.ExecuteAsync(uploadTaskFunc));
}
return new TaskFactory().ContinueWhenAll(
tasks.ToArray(),
array =>
{
// propagate exceptions and make all faulted Tasks as observed
Task.WaitAll(array);
// create continuation task for committing uploaded blocks
Func<Task> commitTaskFunc = () => new TaskFactory()
.FromAsync(
(asyncCallback, state) => blob.BeginPutBlockList(blockids, asyncCallback, state)
, blob.EndPutBlockList
, null);
policy
.ExecuteAsync(commitTaskFunc)
.Wait();
});
}使用Performance,我可以观察到在调用此函数之后,我的程序使用的线程数和内存量显著增加。例如,下面是这个函数的一个调用的快照:

拜托,有人能告诉我哪里做错了什么吗?如有必要,请提出更好的设计建议。
发布于 2013-01-11 16:49:16
代码中有几个问题使其容易出错:
count的意思。blockids中块的顺序可能与您发送的块顺序不同,因此生成的BLOB块的顺序可能不正确。blockids上可能存在线程争用,因为不同的线程可能同时添加一个新元素,从而导致各种问题,如结果列表中缺少块、块ID不正确等。Task.Factory而不是new TaskFactory()BeginPutBlockList除此之外,我没有从您提供的图表中看到内存泄漏。线程可能仍然存在于线程池中,内存可能还没有被GC收集。
下面是我得到的结果(请注意,它可能消耗更多的内存,因为您需要存储当前正在内存中发送的所有块):
private static Task PutBlockAsync(CloudBlockBlob blob, string id, Stream stream, RetryPolicy policy)
{
Func<Task> uploadTaskFunc = () => Task.Factory
.FromAsync(
(asyncCallback, state) => blob.BeginPutBlock(id, stream, null, null, null, null, asyncCallback, state)
, blob.EndPutBlock
, null
);
return policy.ExecuteAsync(uploadTaskFunc);
}
public static Task ChunkedUploadStreamAsync(CloudBlockBlob blob, Stream source, BlobRequestOptions options, int chunkSize, RetryPolicy policy)
{
var blockids = new List<string>();
var blockid = 0;
int count;
// first create a list of TPL Tasks for uploading blocks asynchronously
var tasks = new List<Task>();
var bytes = new byte[chunkSize];
while ((count = source.Read(bytes, 0, bytes.Length)) != 0)
{
var id = Convert.ToBase64String(BitConverter.GetBytes(++blockid));
blockids.Add(id);
tasks.Add(PutBlockAsync(blob, id, new MemoryStream(bytes, 0, count), policy));
bytes = new byte[chunkSize]; //need a new buffer to avoid overriding previous one
}
return Task.Factory.ContinueWhenAll(
tasks.ToArray(),
array =>
{
// propagate exceptions and make all faulted Tasks as observed
Task.WaitAll(array);
policy.ExecuteAction(() => blob.PutBlockList(blockids));
});
}https://codereview.stackexchange.com/questions/20429
复制相似问题