有了TPL,我们就有了CancellationTokenSource,它提供令牌,用于协同取消当前任务(或其启动)。
问题:
需要多长时间才能将取消请求传播到所有挂起的运行任务?有什么地方,在那里,代码可以检查:“从现在起”每一个感兴趣的Task,会发现取消的请求?
为什么有必要这样做?
我想要有稳定的单元测试,以显示取消工作在我们的代码。
问题详情:
我们有生成任务的"Executor“,这些任务包装了一些长时间运行的操作。执行者的主要任务是限制启动了多少并发操作。所有这些任务都可以单独取消,而且这些操作将在内部尊重CancellationToken。
我想提供单元测试,这表明当任务等待时隙开始给定操作时,当取消发生时,该任务将取消自身(最终),并且不会开始执行给定的操作。
因此,我们的想法是用单一槽来制备LimitingExecutor。然后开始阻塞操作,这将在解除阻塞时请求取消。然后是"enqueue“测试操作,在执行时应该会失败。使用该设置,测试将调用unblock,然后断言测试操作任务将在等待时抛出TaskCanceledException。
[Test]
public void RequestPropagationTest()
{
using (var setupEvent = new ManualResetEvent(initialState: false))
using (var cancellation = new CancellationTokenSource())
using (var executor = new LimitingExecutor())
{
// System-state setup action:
var cancellingTask = executor.Do(() =>
{
setupEvent.WaitOne();
cancellation.Cancel();
}, CancellationToken.None);
// Main work action:
var actionTask = executor.Do(() =>
{
throw new InvalidOperationException(
"This action should be cancelled!");
}, cancellation.Token);
// Let's wait until this `Task` starts, so it will got opportunity
// to cancel itself, and expected later exception will not come
// from just starting that action by `Task.Run` with token:
while (actionTask.Status < TaskStatus.Running)
Thread.Sleep(millisecondsTimeout: 1);
// Let's unblock slot in Executor for the 'main work action'
// by finalizing the 'system-state setup action' which will
// finally request "global" cancellation:
setupEvent.Set();
Assert.DoesNotThrowAsync(
async () => await cancellingTask);
Assert.ThrowsAsync<TaskCanceledException>(
async () => await actionTask);
}
}
public class LimitingExecutor : IDisposable
{
private const int UpperLimit = 1;
private readonly Semaphore _semaphore
= new Semaphore(UpperLimit, UpperLimit);
public Task Do(Action work, CancellationToken token)
=> Task.Run(() =>
{
_semaphore.WaitOne();
try
{
token.ThrowIfCancellationRequested();
work();
}
finally
{
_semaphore.Release();
}
}, token);
public void Dispose()
=> _semaphore.Dispose();
}这个问题的可执行演示(通过NUnit)可以在GitHub上找到。
然而,该测试的实现有时失败(没有预期的TaskCanceledException),在我的机器上解决这个问题的一种“解决方案”是在请求取消后插入Thread.Sleep。即使有了3秒的睡眠,这个测试有时也会失败(在20次跑步后发现),并且当它通过时,这种长时间的等待通常是不必要的(我猜)。有关参考资料,请参阅比较。
“其他问题”是确保取消来自“等待时间”,而不是来自Task.Run,因为ThreadPool可能很忙(其他正在执行的测试),并且在请求取消之后冷推迟第二项任务的开始--这将使这个测试变得“错误-绿色”。“容易修复的黑客”是积极地等待到第二个任务开始-它的Status变成TaskStatus.Running。请检查这个分支下的版本,看看没有这个黑客的测试有时会是“绿色的”--这样,典型的bug就可以通过它。
发布于 2018-10-07 13:38:05
您的测试方法假设cancellingTask总是在LimitingExecutor中的插槽(输入信号量)位于actionTask之前。不幸的是,这个假设是错误的,LimitingExecutor并没有保证这一点,这只是运气问题,这两个任务中哪一个占据了这个位置(实际上,在我的计算机上,它只发生在大约5%的运行中)。
要解决这个问题,您需要另一个ManualResetEvent,它将允许主线程等待,直到cancellingTask实际占据了插槽:
using (var slotTaken = new ManualResetEvent(initialState: false))
using (var setupEvent = new ManualResetEvent(initialState: false))
using (var cancellation = new CancellationTokenSource())
using (var executor = new LimitingExecutor())
{
// System-state setup action:
var cancellingTask = executor.Do(() =>
{
// This is called from inside the semaphore, so it's
// certain that this task occupies the only available slot.
slotTaken.Set();
setupEvent.WaitOne();
cancellation.Cancel();
}, CancellationToken.None);
// Wait until cancellingTask takes the slot
slotTaken.WaitOne();
// Now it's guaranteed that cancellingTask takes the slot, not the actionTask
// ...
}.NET框架不提供API来检测到Running状态的任务转换,因此如果您不喜欢在循环中轮询State属性+ Thread.Sleep(),则需要修改LimitingExecutor.Do()以提供此信息,可能需要使用另一个ManualResetEvent,例如:
public Task Do(Action work, CancellationToken token, ManualResetEvent taskRunEvent = null)
=> Task.Run(() =>
{
// Optional notification to the caller that task is now running
taskRunEvent?.Set();
// ...
}, token);https://stackoverflow.com/questions/52683874
复制相似问题