首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >带有TaskScheduler和TaskFactory的第三方物流TaskFactory

带有TaskScheduler和TaskFactory的第三方物流TaskFactory
EN

Stack Overflow用户
提问于 2012-12-06 15:04:12
回答 3查看 1.1K关注 0票数 5

我试图结合使用TaskFactory.FromAsync创建一个任务管道/排序调度程序。

我希望能够触发web服务请求(使用FromAsync来使用I/O完成端口),但是可以维护它们的顺序,并且在任何时候只有一个命令在执行。

目前,我不使用FromAsync,因此我可以执行TaskFactory.StartNew(()=>api.DoSyncWebServiceCall()),并依赖于TaskFactory使用的OrderedTaskScheduler来确保只有一个请求未完成。

我假设在使用FromAsync方法时,这种行为将保持不变,但它不会:

代码语言:javascript
复制
TaskFactory<Stuff> taskFactory = new TaskFactory<Stuff>(new OrderedTaskScheduler());
var t1 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));
var t2 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));
var t3 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));

所有这些beginGetStuff方法都是在FromAsync调用中调用的(因此,尽管它们是按顺序分派的,但n api调用是同时发生的)。

有一个过载的FromAsync需要一个TaskScheduler:

代码语言:javascript
复制
public Task FromAsync(
    IAsyncResult asyncResult,
    Action<IAsyncResult> endMethod,
    TaskCreationOptions creationOptions,
    TaskScheduler scheduler
)

但医生说:

用于调度执行end方法的任务的TaskScheduler。

正如您所看到的,它采用的是已经构建的IAsyncResult,而不是Func<IAsyncResult>

这是否需要一个自定义的FromAsync方法,还是我遗漏了什么?有人能建议从哪里开始这个实现吗?

干杯,

编辑:

我希望将此行为从调用方中抽象出来,因此,根据TaskFactory的行为(使用专门的TaskScheduler),我需要立即返回该任务--该任务不仅将封装FromAsync任务,而且还将在等待执行时将该任务的排队情况封装起来。

一个可能的解决办法是:

代码语言:javascript
复制
class TaskExecutionQueue
{
    private readonly OrderedTaskScheduler _orderedTaskScheduler;
    private readonly TaskFactory _taskFactory;
    public TaskExecutionQueue(OrderedTaskScheduler orderedTaskScheduler)
    {
        _orderedTaskScheduler = orderedTaskScheduler;
        _taskFactory = new TaskFactory(orderedTaskScheduler);

    }

    public Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator)
    {
        return _taskFactory.StartNew(taskGenerator).Unwrap();
    }
}

然而,在发生FromAsync调用时,这利用了一个线程。理想情况下,我不必那么做。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2012-12-07 17:05:47

我已经决定了一个定制的解决方案..。锁是凌乱和不想要的,但目前,这是我想要的工作。

代码语言:javascript
复制
public interface ITaskExecutionQueue
{
    Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator);
    Task<TResult> QueueTask<TResult>(Task<Task<TResult>> taskGenerator);
    int OutstandingTaskCount { get; }
    event EventHandler OutstandingTaskCountChanged;
}

/// This class ensures that only a single Task is executed at any one time.  They are executed sequentially in order being queued.
/// The advantages of this class over OrderedTaskScheduler is that you can use any type of Task such as FromAsync (I/O Completion ports) 
/// which are not able to be scheduled using a traditional TaskScheduler.
/// Ensure that the `outer` tasks you queue are unstarted.  E.g. <![CDATA[
/// _taskExeQueue.QueueTask(new Task<Task<TResult>>(() => StartMyRealTask()));
/// ]]>
class OrderedTaskExecutionQueue : ITaskExecutionQueue
{
    private readonly Queue<Task> _queuedTasks = new Queue<Task>();
    private Task _currentTask;
    private readonly object _lockSync = new object();

    /// <summary>
    /// Queues a task for execution
    /// </summary>
    /// <typeparam name="TResult"></typeparam>
    /// <param name="taskGenerator">An unstarted Task that creates your started real-work task</param>
    /// <returns></returns>
    public Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator)
    {
        return QueueTask(new Task<Task<TResult>>(taskGenerator));
    }

    public Task<TResult> QueueTask<TResult>(Task<Task<TResult>> taskGenerator)
    {
        Task<TResult> unwrapped = taskGenerator.Unwrap();
        unwrapped.ContinueWith(_ =>
                               {
                                   EndTask();
                                   StartNextTaskIfQueued();
                               }, TaskContinuationOptions.ExecuteSynchronously);

        lock (_lockSync)
        {
            _queuedTasks.Enqueue(taskGenerator);

            if (_currentTask == null)
            {
                StartNextTaskIfQueued();
            }
        }

        TaskCompletionSource<TResult> tcs = new TaskCompletionSource<TResult>();
        tcs.TrySetFromTaskIncomplete(unwrapped);

        OutstandingTaskCountChanged.Raise(this);

        return tcs.Task;
    }

    private void EndTask()
    {
        lock (_lockSync)
        {
            _currentTask = null;
            _queuedTasks.Dequeue();
        }

        OutstandingTaskCountChanged.Raise(this);
    }

    private void StartNextTaskIfQueued()
    {
        lock (_lockSync)
        {
            if (_queuedTasks.Count > 0)
            {
                _currentTask = _queuedTasks.Peek();

                _currentTask.RunSynchronously();
            }
        }
    }

    /// <summary>
    /// Includes the currently executing task.
    /// </summary>
    public int OutstandingTaskCount
    {
        get
        {
            lock (_lockSync)
            {
                return _queuedTasks.Count;
            }
        }
    }

    public event EventHandler OutstandingTaskCountChanged;
}

接收未启动的Task<Task<TResult>> --这允许队列决定何时执行它并开始FromAsync调用(这是内部任务)。用法:

代码语言:javascript
复制
Task<Task<TResult>> queueTask = new Task<Task<TResult>>(() => Task.Factory.FromAsync(beginAction, endAction));
Task<TResult> asyncCallTask = _taskExecutionQueue.QueueTask(queueTask);
票数 0
EN

Stack Overflow用户

发布于 2012-12-06 17:22:57

不能调度IO任务,因为它们没有与其关联的线程。Windows内核提供无线程的IO操作。启动这些IOs不涉及托管代码,TaskScheduler类也不起作用。

因此,您必须延迟启动IO,直到确定您确实希望网络被击中为止。您可以使用SemaphoreSlim.WaitAsync来控制当前运行的任务的数量。在启动个人IO之前等待该方法的结果,并等待该结果。

票数 2
EN

Stack Overflow用户

发布于 2012-12-06 18:11:02

最简单的方法是使用TPL数据流

您可以定义一个“块”,该“块”接收异步委托流,并一次执行它们(在启动下一个委托之前等待每个块完成):

代码语言:javascript
复制
var block = new ActionBlock<Func<Task>>(func => func());

然后,要发出web服务请求:

代码语言:javascript
复制
block.Post(() => Task.Factory.FromAsync(...));

或者(我更喜欢):

代码语言:javascript
复制
block.Post(() => client.GetStuffAsync(a, b, c));

如果您只想执行任务,那么ActionBlock方法是可以的。如果您想要生成一个输出流,那么请查看TransformBlock

代码语言:javascript
复制
var block = new TransformBlock<Func<Task<Stuff>>, Stuff>(func => func());

以同样的方式发出请求,并且可以通过调用ReceiveReceiveAsync来获得结果。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/13746421

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档