我尝试按顺序对异步任务进行排队,因此我为此创建了一个类:
public class TaskSequentialQueue : IDisposable, ITaskSequentialQueue
{
public delegate void OnExeptionDelegate(Exception ex);
private readonly Queue<Task> m_queue = new Queue<Task>();
private readonly Object m_lock = new Object();
private readonly CancellationTokenSource m_CancelToken = new CancellationTokenSource();
private readonly OnExeptionDelegate m_onExeptionDelegate = null;
private Task m_currentTask = null;
private bool m_isDisposed = false;
public TaskSequentialQueue(OnExeptionDelegate expDelegate = null)
{
m_onExeptionDelegate = expDelegate;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool isDisposing)
{
if (m_isDisposed)
{
return;
}
if (isDisposing)
{
lock (m_lock)
{
m_isDisposed = true;
m_queue.Clear();
}
m_CancelToken.Cancel();
m_CancelToken.Dispose();
}
}
public void EnqueueTask( Task task)
{
lock (m_lock)
{
if (m_isDisposed)
throw new ObjectDisposedException("TaskSequentialQueue");
m_queue.Enqueue(task);
}
StartNextTask();
}
public void EnqueueTask( Func<Task> task)
{
EnqueueTask(new Task<Task>(task));
}
public Task EnqueueTaskAndWait( Task task)
{
TaskCompletionSource<int> taskSource = new TaskCompletionSource<int>();
lock (m_lock)
{
if (m_isDisposed)
throw new ObjectDisposedException("TaskSequentialQueue");
Func<Task> toDo = async () =>
{
var waitabletask = task.ContinueWith( antecedent =>
{
taskSource.SetResult(0);
if (antecedent.Exception != null)
throw antecedent.Exception;
});
task.Start();
await waitabletask;
};
this.EnqueueTask(toDo);
}
StartNextTask();
return taskSource.Task; //TODO! propagate the exception correctly ?
}
private void StartNextTask()
{
Task theTask = null;
lock(m_lock)
{
if (m_currentTask == null && m_queue.Count > 0 && !m_isDisposed)
{
m_currentTask = m_queue.Dequeue();
theTask = m_currentTask;
}
}
if (theTask != null)
{
theTask.Start();
theTask.ContinueWith( (antecedent) =>
{
Exception theEx = antecedent.Exception;
if (theEx == null && antecedent is Task<Task>)
theEx = (antecedent as Task<Task>)?.Result.Exception;
if (m_onExeptionDelegate != null && theEx != null)
{
try { m_onExeptionDelegate(theEx); } catch(Exception) {}
}
lock(m_lock)
{
m_currentTask = null;
}
Task.Run( () => StartNextTask() );
}
}
}
}我是这样使用它的:
Func<Task> action = async () =>
{
Log("Entered");
await Task.Delay(5000);
Log("Exited");
}
m_taskSequentialQueue.EnqueueTask( action );
m_taskSequentialQueue.EnqueueTask( action );我希望我的日志是:
Entered
Exited
Entered
Exited相反,我得到的是:
Entered
Entered
Exited
Exited我不确定我做错了什么。
谢谢
发布于 2016-09-01 23:29:50
当您在StartNextTask中执行theTask.ContinueWith(时,您继续执行的是内部任务的开始,而不是内部任务的完成。一旦内部任务命中第一个await,theTask任务就会被认为是完成的,因为返回的是函数。
作为创可贴,你可以这样做
if (theTask != null)
{
theTask.Start();
if(theTask is Task<Task>)
{
theTask = ((Task<Task>)theTask).Unwrap();
}
theTask.ContinueWith(...然而,我认为你使用“冷任务”的整个方法是有缺陷的。您应该只使用Queue<Func<Task>>而不是Queue<Task>,这将使您的代码变得更简单。
发布于 2016-09-01 23:31:43
Func<Task> action = async () =>
{
lock (lock_x)
{
Console.WriteLine("Entered");
Thread.Sleep(5000);
Console.WriteLine("Exited");
}
};应该可以工作,因为您的队列实现是按顺序提交它们的。
https://stackoverflow.com/questions/39275185
复制相似问题