我正在从上游API获得项目,这是相当慢的。我试图通过使用TPL Dataflow来创建多个连接并将这些连接结合在一起来加快速度;
class Stuff
{
int Id { get; }
}
async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();
async Task<IEnumerable<Stuff>> GetLotsOfStuff(IEnumerable<int> ids)
{
var bagOfStuff = new ConcurrentBag<Stuff>();
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5
};
var processor = new ActionBlock<int>(async id =>
{
bagOfStuff.Add(await GetStuffById(id));
}, options);
foreach (int id in ids)
{
processor.Post(id);
}
processor.Complete();
await processor.Completion;
return bagOfStuff.ToArray();
}问题是,我必须等到查询完整个Stuff集合后才能将其返回给调用方。我希望的是,每当多个并行查询中的任何一个返回一个项时,我都会以yield return的方式返回该项。因此,我不需要返回一个sync Task<IEnumerable<Stuff>>,我只需要返回一个IEnumerable<Stuff>,调用方在任何项目返回时都会提前迭代。
我试过这样做;
IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5
};
var processor = new ActionBlock<int>(async id =>
{
yield return await GetStuffById(id);
}, options);
foreach (int id in ids)
{
processor.Post(id);
}
processor.Complete();
processor.Completion.Wait();
yield break;
}但我犯了个错误
无法在匿名方法或lambda表达式中使用收益率语句。
我如何重组我的代码?
发布于 2019-10-25 19:37:08
您可以返回一个IEnumerable,但是要这样做,您必须阻止当前线程。您需要一个TransformBlock来处理ids,还需要一个馈线任务,它将用ids异步地向TransformBlock提供信息。最后,当前线程将进入一个阻塞循环,等待产生的东西产生:
static IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
using var completionCTS = new CancellationTokenSource();
var processor = new TransformBlock<int, Stuff>(async id =>
{
return await GetStuffById(id);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
BoundedCapacity = 50, // Avoid buffering millions of ids
CancellationToken = completionCTS.Token
});
var feederTask = Task.Run(async () =>
{
try
{
foreach (int id in ids)
if (!await processor.SendAsync(id)) break;
}
finally { processor.Complete(); }
});
try
{
while (processor.OutputAvailableAsync().Result)
while (processor.TryReceive(out var stuff))
yield return stuff;
}
finally // This runs when the caller exits the foreach loop
{
completionCTS.Cancel(); // Cancel the TransformBlock if it's still running
}
Task.WaitAll(feederTask, processor.Completion); // Propagate all exceptions
}不需要ConcurrentBag,因为TransformBlock有一个内部输出缓冲区。棘手的部分是处理这样一种情况,即调用方将通过提前中断或被异常所阻碍而放弃IEnumerable<Stuff>枚举。在这种情况下,您不想让馈线任务一直使用ids抽吸IEnumerable<int>直到结束。幸运的是有个解决办法。将屈服循环封装在try/finally块中,可以接收到该事件的通知,从而可以及时终止馈线任务。
另一种实现可以消除对馈线任务的需求,方法是将ids、输入块和在单个循环中生成的东西组合在一起。在这种情况下,您可能希望在抽水和屈服之间出现滞后。要实现这一点,MoreLinq的Lag (或Lead)扩展方法可能很方便。
更新:这里的是一个不同的实现,它在同一个循环中枚举并产生结果。为了实现期望的滞后,源枚举是正确的填充一些虚拟元素,在数量上与并发度相等。
此实现接受泛型类型,而不是int和Stuff。
public static IEnumerable<TResult> Transform<TSource, TResult>(
IEnumerable<TSource> source, Func<TSource, Task<TResult>> taskFactory,
int degreeOfConcurrency)
{
var processor = new TransformBlock<TSource, TResult>(async item =>
{
return await taskFactory(item);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = degreeOfConcurrency
});
var paddedSource = source.Select(item => (item, true))
.Concat(Enumerable.Repeat((default(TSource), false), degreeOfConcurrency));
int index = -1;
bool completed = false;
foreach (var (item, hasValue) in paddedSource)
{
index++;
if (hasValue) { processor.Post(item); }
else if (!completed) { processor.Complete(); completed = true; }
if (index >= degreeOfConcurrency)
{
if (!processor.OutputAvailableAsync().Result) break; // Blocking call
if (!processor.TryReceive(out var result))
throw new InvalidOperationException(); // Should never happen
yield return result;
}
}
processor.Completion.Wait();
}用法示例:
IEnumerable<Stuff> lotsOfStuff = Transform(ids, GetStuffById, 5);可以对这两个实现进行轻微的修改,以返回IAsyncEnumerable而不是IEnumerable,以避免阻塞调用线程。
发布于 2019-10-25 15:53:24
可能有几种不同的方法可以根据特定的用例来处理这个问题。但是要根据TPL来处理条目,您需要将源块更改为TransformBlock<,>,并将项目流到另一个块以处理您的项目。请注意,现在您可以摆脱收集ConcurrentBag,并确保将EnsureOrdered设置为false,如果您不关心您收到的商品的顺序。此外,链接块和传播完成,以确保您的管道完成一旦所有项目被检索和随后的处理。
class Stuff
{
int Id { get; }
}
public class GetStuff
{
async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();
async Task GetLotsOfStuff(IEnumerable<int> ids)
{
//var bagOfStuff = new ConcurrentBag<Stuff>();
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
EnsureOrdered = false
};
var processor = new TransformBlock<int, Stuff>(id => GetStuffById(id), options);
var handler = new ActionBlock<Stuff>(s => throw new NotImplementedException());
processor.LinkTo(handler, new DataflowLinkOptions() { PropagateCompletion = true });
foreach (int id in ids)
{
processor.Post(id);
}
processor.Complete();
await handler.Completion;
}
}其他选项可以使您的方法从TransformBlock中流出来,或者使用IAsyncEnumerable到yield return和异步get方法。
https://stackoverflow.com/questions/58560732
复制相似问题