首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >基于IEnumerable和TPL数据流的数据流

基于IEnumerable和TPL数据流的数据流
EN

Stack Overflow用户
提问于 2019-10-25 14:43:54
回答 2查看 592关注 0票数 1

我正在从上游API获得项目,这是相当慢的。我试图通过使用TPL Dataflow来创建多个连接并将这些连接结合在一起来加快速度;

代码语言:javascript
复制
class Stuff
{
    int Id { get; }
}

async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();

async Task<IEnumerable<Stuff>> GetLotsOfStuff(IEnumerable<int> ids)
{
    var bagOfStuff = new ConcurrentBag<Stuff>();

    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5
    };

    var processor = new ActionBlock<int>(async id =>
    {
        bagOfStuff.Add(await GetStuffById(id));
    }, options);

    foreach (int id in ids)
    {
        processor.Post(id);
    }

    processor.Complete();
    await processor.Completion;

    return bagOfStuff.ToArray();
}

问题是,我必须等到查询完整个Stuff集合后才能将其返回给调用方。我希望的是,每当多个并行查询中的任何一个返回一个项时,我都会以yield return的方式返回该项。因此,我不需要返回一个sync Task<IEnumerable<Stuff>>,我只需要返回一个IEnumerable<Stuff>,调用方在任何项目返回时都会提前迭代。

我试过这样做;

代码语言:javascript
复制
IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5
    };

    var processor = new ActionBlock<int>(async id =>
    {
        yield return await GetStuffById(id);
    }, options);

    foreach (int id in ids)
    {
        processor.Post(id);
    }

    processor.Complete();
    processor.Completion.Wait();

    yield break;
}

但我犯了个错误

无法在匿名方法或lambda表达式中使用收益率语句。

我如何重组我的代码?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-10-25 19:37:08

您可以返回一个IEnumerable,但是要这样做,您必须阻止当前线程。您需要一个TransformBlock来处理ids,还需要一个馈线任务,它将用ids异步地向TransformBlock提供信息。最后,当前线程将进入一个阻塞循环,等待产生的东西产生:

代码语言:javascript
复制
static IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
    using var completionCTS = new CancellationTokenSource();

    var processor = new TransformBlock<int, Stuff>(async id =>
    {
        return await GetStuffById(id);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5,
        BoundedCapacity = 50, // Avoid buffering millions of ids
        CancellationToken = completionCTS.Token
    });

    var feederTask = Task.Run(async () =>
    {
        try
        {
            foreach (int id in ids)
                if (!await processor.SendAsync(id)) break;
        }
        finally { processor.Complete(); }
    });

    try
    {
        while (processor.OutputAvailableAsync().Result)
            while (processor.TryReceive(out var stuff))
                yield return stuff;
    }
    finally // This runs when the caller exits the foreach loop
    {
        completionCTS.Cancel(); // Cancel the TransformBlock if it's still running
    }

    Task.WaitAll(feederTask, processor.Completion); // Propagate all exceptions
}

不需要ConcurrentBag,因为TransformBlock有一个内部输出缓冲区。棘手的部分是处理这样一种情况,即调用方将通过提前中断或被异常所阻碍而放弃IEnumerable<Stuff>枚举。在这种情况下,您不想让馈线任务一直使用ids抽吸IEnumerable<int>直到结束。幸运的是有个解决办法。将屈服循环封装在try/finally块中,可以接收到该事件的通知,从而可以及时终止馈线任务。

另一种实现可以消除对馈线任务的需求,方法是将ids、输入块和在单个循环中生成的东西组合在一起。在这种情况下,您可能希望在抽水和屈服之间出现滞后。要实现这一点,MoreLinqLag (或Lead)扩展方法可能很方便。

更新:这里的是一个不同的实现,它在同一个循环中枚举并产生结果。为了实现期望的滞后,源枚举是正确的填充一些虚拟元素,在数量上与并发度相等。

此实现接受泛型类型,而不是intStuff

代码语言:javascript
复制
public static IEnumerable<TResult> Transform<TSource, TResult>(
    IEnumerable<TSource> source, Func<TSource, Task<TResult>> taskFactory,
    int degreeOfConcurrency)
{
    var processor = new TransformBlock<TSource, TResult>(async item =>
    {
        return await taskFactory(item);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = degreeOfConcurrency
    });

    var paddedSource = source.Select(item => (item, true))
        .Concat(Enumerable.Repeat((default(TSource), false), degreeOfConcurrency));
    int index = -1;
    bool completed = false;
    foreach (var (item, hasValue) in paddedSource)
    {
        index++;
        if (hasValue) { processor.Post(item); }
        else if (!completed) { processor.Complete(); completed = true; }
        if (index >= degreeOfConcurrency)
        {
            if (!processor.OutputAvailableAsync().Result) break; // Blocking call
            if (!processor.TryReceive(out var result))
                throw new InvalidOperationException(); // Should never happen
            yield return result;
        }
    }
    processor.Completion.Wait();
}

用法示例:

代码语言:javascript
复制
IEnumerable<Stuff> lotsOfStuff = Transform(ids, GetStuffById, 5);

可以对这两个实现进行轻微的修改,以返回IAsyncEnumerable而不是IEnumerable,以避免阻塞调用线程。

票数 1
EN

Stack Overflow用户

发布于 2019-10-25 15:53:24

可能有几种不同的方法可以根据特定的用例来处理这个问题。但是要根据TPL来处理条目,您需要将源块更改为TransformBlock<,>,并将项目流到另一个块以处理您的项目。请注意,现在您可以摆脱收集ConcurrentBag,并确保将EnsureOrdered设置为false,如果您不关心您收到的商品的顺序。此外,链接块和传播完成,以确保您的管道完成一旦所有项目被检索和随后的处理。

代码语言:javascript
复制
class Stuff
{
    int Id { get; }
}

public class GetStuff
{
    async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();

    async Task GetLotsOfStuff(IEnumerable<int> ids)
    {
        //var bagOfStuff = new ConcurrentBag<Stuff>();

        var options = new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 5,
            EnsureOrdered = false
        };

        var processor = new TransformBlock<int, Stuff>(id => GetStuffById(id), options);

        var handler = new ActionBlock<Stuff>(s => throw new NotImplementedException());

        processor.LinkTo(handler, new DataflowLinkOptions() { PropagateCompletion = true });

        foreach (int id in ids)
        {
            processor.Post(id);
        }

        processor.Complete();
        await handler.Completion;
    }
}

其他选项可以使您的方法从TransformBlock中流出来,或者使用IAsyncEnumerableyield return和异步get方法。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58560732

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档