首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Task.WaitSubset / Task.WaitN?

Task.WaitSubset / Task.WaitN?
EN

Stack Overflow用户
提问于 2014-02-09 23:52:51
回答 4查看 231关注 0票数 2

有等待所有任务的Task.WaitAll方法和等待一个任务的Task.WaitAny方法。如何等待任何N个任务?

用例:搜索结果页面被下载,每个结果都需要一个单独的任务来下载和处理。如果我在获得下一个搜索结果页面之前使用WaitAll等待子任务的结果,我将不会使用所有可用资源(一项长任务将延迟其余部分)。完全不等待会导致成千上万的任务排队,这也不是最好的主意。

那么,如何等待任务子集的完成呢?或者,如何等待任务调度程序队列只有N个任务?

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2014-02-10 02:10:03

对于TPL数据流来说,这似乎是一个很好的问题,它将允许您控制并行性和缓冲,以最大速度进行处理。

下面是一些(未经测试的)代码,向您展示我的意思:

代码语言:javascript
复制
static void Process()
{
    var searchReader =
        new TransformManyBlock<SearchResult, SearchResult>(async uri =>
    {
        // return a list of search results at uri.

        return new[]
        {
            new SearchResult
            {
                IsResult = true,
                Uri = "http://foo.com"
            },
            new SearchResult
            {
                // return the next search result page here.
                IsResult = false,
                Uri = "http://google.com/next"
            }
        };
    }, new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 8, // restrict buffer size.
        MaxDegreeOfParallelism = 4 // control parallelism.
    });

    // link "next" pages back to the searchReader.
    searchReader.LinkTo(searchReader, x => !x.IsResult);

    var resultActor = new ActionBlock<SearchResult>(async uri =>
    {
        // do something with the search result.
    }, new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 64,
        MaxDegreeOfParallelism = 16
    });

    // link search results into resultActor.
    searchReader.LinkTo(resultActor, x => x.IsResult);

    // put in the first piece of input.
    searchReader.Post(new SearchResult { Uri = "http://google/first" });
}

struct SearchResult
{
    public bool IsResult { get; set; }
    public string Uri { get; set; }
}
票数 2
EN

Stack Overflow用户

发布于 2014-02-10 11:28:02

我认为您应该独立地限制并行下载任务的数量和并发结果处理任务的数量。我会使用两个SemaphoreSlim对象来完成它,如下所示。此版本不使用同步SemaphoreSlim.Wait (感谢@svick说明了这一点)。它只经过了轻微的测试,异常处理就可以改进;替换您自己的DownloadNextPageAsyncProcessResults

代码语言:javascript
复制
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Console_21666797
{
    partial class Program
    {
        // the actual download method
        // async Task<string> DownloadNextPageAsync(string url) { ... }

        // the actual process methods
        // void ProcessResults(string data) { ... }

        // download and process all pages
        async Task DownloadAndProcessAllAsync(
            string startUrl, int maxDownloads, int maxProcesses)
        {
            // max parallel downloads
            var downloadSemaphore = new SemaphoreSlim(maxDownloads);
            // max parallel processing tasks
            var processSemaphore = new SemaphoreSlim(maxProcesses);

            var tasks = new HashSet<Task>();
            var complete = false;
            var protect = new Object(); // protect tasks

            var page = 0;

            // do the page
            Func<string, Task> doPageAsync = async (url) =>
            {
                bool downloadSemaphoreAcquired = true;
                try
                {
                    // download the page
                    var data = await DownloadNextPageAsync(
                        url).ConfigureAwait(false);

                    if (String.IsNullOrEmpty(data))
                    {
                        Volatile.Write(ref complete, true);
                    }
                    else
                    {
                        // enable the next download to happen
                        downloadSemaphore.Release();
                        downloadSemaphoreAcquired = false;

                        // process this download 
                        await processSemaphore.WaitAsync();
                        try
                        {
                            await Task.Run(() => ProcessResults(data));
                        }
                        finally
                        {
                            processSemaphore.Release();
                        }
                    }
                }
                catch (Exception)
                {
                    Volatile.Write(ref complete, true);
                    throw;
                }
                finally
                {
                    if (downloadSemaphoreAcquired)
                        downloadSemaphore.Release();
                }
            };

            // do the page and save the task
            Func<string, Task> queuePageAsync = async (url) =>
            {
                var task = doPageAsync(url);

                lock (protect)
                    tasks.Add(task);

                await task;

                lock (protect)
                    tasks.Remove(task);
            }; 

            // process pages in a loop until complete is true 
            while (!Volatile.Read(ref complete))
            {
                page++;

                // acquire download semaphore synchrnously
                await downloadSemaphore.WaitAsync().ConfigureAwait(false);

                // do the page 
                var task = queuePageAsync(startUrl + "?page=" + page);
            }

            // await completion of the pending tasks
            Task[] pendingTasks;
            lock (protect)
                pendingTasks = tasks.ToArray();
            await Task.WhenAll(pendingTasks);
        }

        static void Main(string[] args)
        {
            new Program().DownloadAndProcessAllAsync("http://google.com", 10, 5).Wait();
            Console.ReadLine();
        }
    }
}
票数 2
EN

Stack Overflow用户

发布于 2014-02-10 02:48:17

像这样的东西应该管用。可能有一些边缘情况,但总的来说,它应该确保最小的完成。

代码语言:javascript
复制
public static async Task WhenN(IEnumerable<Task> tasks, int n, CancellationTokenSource cts = null)
{
    var pending = new HashSet<Task>(tasks);

    if (n > pending.Count)
    {
        n = pending.Count;

        // or throw
    }

    var completed = 0;

    while (completed != n)
    {
        var completedTask = await Task.WhenAny(pending);

        pending.Remove(completedTask);

        completed++;
    }

    if (cts != null)
    {
        cts.Cancel();
    }
}

用法:

代码语言:javascript
复制
static void Main(string[] args)
{
    var tasks = new List<Task>();
    var completed = 0;
    var cts = new CancellationTokenSource();

    for (int i = 0; i < 100; i++)
    {
        tasks.Add(Task.Run(async () =>
        {
            await Task.Delay(temp * 100, cts.Token);
            Console.WriteLine("Completed task {0}", i);
            completed++;
        }, cts.Token));
    }

    Extensions.WhenN(tasks, 30, cts).Wait();

    Console.WriteLine(completed);

    Console.ReadLine();
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/21666797

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档