更新(输出解释)
我有多个任务正在运行,所有这些任务都试图完成一个任务,但是我只想要一个任务正在执行或等待完成该任务。
在一种情况下,如果任务正在进行中,其他试图执行相同任务的任务将看到它已经在进行中,并且跳过尝试完成它并立即继续工作,因为他们不需要知道它是否或何时真正完成。
在另一种情况下,如果作业正在进行中,其他任务应该等待作业完成后再继续,但不要尝试自己完成任务,因为它只是由他们正在等待的任务完成的。一旦任务完成,所有等待的任务都可以继续进行,但是他们中的任何一个都不应该尝试完成刚刚完成的任务。
原始问题
我的目标是创建一个async Task扩展方法,该方法将生成类似于此的结果:(并不是因为每次SKIP和WAIT-TO-SKIP的顺序都不完全相同)
使用一个线程运行时:
[Task 1] LOCKED
[Task 1] WORK-START
[Task 1] WORK-END
[Task 1] UNLOCKED当使用五个线程运行时:
[Task 1] LOCKED
[Task 1] WORK-START
[Task 2] WAIT-TO-SKIP
[Task 3] WAIT-TO-SKIP
[Task 4] WAIT-TO-SKIP
[Task 5] WAIT-TO-SKIP
[Task 1] WORK-END
[Task 1] UNLOCKED
[Task 2] SKIP
[Task 3] SKIP
[Task 4] SKIP
[Task 5] SKIP它还应该有一个参数,允许它这样做:
[Task 1] LOCKED
[Task 1] WORK-START
[Task 2] SKIP
[Task 3] SKIP
[Task 4] SKIP
[Task 5] SKIP
[Task 1] WORK-END
[Task 1] UNLOCKED下面是我想出的方法:
public static async Task FirstThreadAsync(IFirstThread obj, Func<Task> action, TaskCompletionSource? waitTaskSource, string threadName = "")
{
if (obj.Locked)
{
if (waitTaskSource != null && !waitTaskSource.Task.IsCompleted)
{
Log.Debug(Logger, $"[{threadName}] WAIT-TO-SKIP");
await waitTaskSource.Task;
}
Log.Debug(Logger, $"[{threadName}] SKIP-1");
return;
}
var lockWasTaken = false;
var temp = obj;
try
{
if (waitTaskSource == null || waitTaskSource.Task.IsCompleted == false)
{
Monitor.TryEnter(temp, ref lockWasTaken);
if (lockWasTaken) obj.Locked = true;
}
}
finally
{
if (lockWasTaken) Monitor.Exit(temp);
}
if (waitTaskSource?.Task.IsCompleted == true)
{
Log.Debug(Logger, $"[{threadName}] SKIP-3");
return;
}
if (waitTaskSource != null && !lockWasTaken)
{
if (!waitTaskSource.Task.IsCompleted)
{
Log.Debug(Logger, $"[{threadName}] WAIT-TO-SKIP (LOCKED)");
await waitTaskSource.Task;
}
Log.Debug(Logger, $"[{threadName}] SKIP-2");
return;
}
Log.Debug(Logger, $"[{threadName}] LOCKED");
try
{
Log.Debug(Logger, $"[{threadName}] WORK-START");
await action.Invoke().ConfigureAwait(false);
Log.Debug(Logger, $"[{threadName}] WORK-END");
}
catch (Exception ex)
{
waitTaskSource?.TrySetException(ex);
throw;
}
finally
{
obj.Locked = false;
Log.Debug(Logger, $"[{threadName}] UNLOCKED");
waitTaskSource?.TrySetResult();
}
}下面是接口和Example类:
public interface IFirstThread
{
bool Locked { get; set; }
}
public class Example : IFirstThread
{
public bool Locked { get; set; }
public async Task DoWorkAsync(string taskName)
{
for (var i = 0; i < 10; i++)
{
await Task.Delay(5);
}
}
}下面是一个单元测试,在这个测试中,Task1 - Task5都在尝试同时运行,而Task End在它们都完成之后运行:
[TestMethod]
public async Task DoWorkOnce_AsyncX()
{
var waitTaskSource = new TaskCompletionSource();
var example = new Methods.Example();
var tasks = new List<Task>
{
FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 1"),
FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 2"),
FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 3"),
FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 4"),
FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 5"),
};
await Task.WhenAll(tasks);
await FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task End"),
//code to get and compare the output
}我遇到的问题是,99%的时间它按预期工作,但有时它允许两个线程同时运行,我似乎不知道为什么或如何阻止它。下面是一个单元测试输出的例子--当它允许两个线程同时运行时,这是非常罕见的情况:
[Task 5] LOCKED,
[Task 4] WAIT-TO-SKIP,
[Task 2] LOCKED,
[Task 3] WAIT-TO-SKIP,
[Task 1] WAIT-TO-SKIP,
[Task 5] WORK-START,
[Task 2] WORK-START,
[Task 2] WORK-END,
[Task 5] WORK-END,
[Task 5] UNLOCKED,
[Task 2] UNLOCKED,
[Task 1] SKIP-1,
[Task 4] SKIP-1,
[Task 3] SKIP-1,
[Task End] LOCKED,
[Task End] WORK-START,
[Task End] WORK-END,
[Task End] UNLOCKED正如您所看到的,Task 5和Task 2都是在其中一个应该被锁定时被锁定的。Task End只是测试的一部分,以验证在同时进行调用时类是否处于未锁定状态。另外,threadName参数是完全不必要的,只包含在其中,因此我可以判断哪个任务是用于测试目的的。
发布于 2022-11-04 18:31:45
您使用Monitor.Exit太早了。在一个线程中
Monitor.TryEnter(temp, ref lockWasTaken);之后执行
if (lockWasTaken) Monitor.Exit(temp);在另一个线程中执行,两者都可以具有lockWasTaken true。
您不需要在用作受保护资源的对象上使用Locked属性。Monitor类直接在原子物质中在对象中设置内部标记。您可以通过只依赖Monitor功能来简化您的逻辑,而不需要自己的单独标志。
另外,正如另一个用户所指出的,我们没有看到您为这些任务创建显式的单独线程。这些任务可以在同一个线程上运行,然后当多个任务在同一个线程上运行时,Monitor.TryEnter将允许多个任务同时进入。
关于为什么在异步编程中应该使用SempahoreSlim而不是Monitor,请参见SempahoreSlim。
下面是一些用SemaphoreSlim解决任务的示例代码。我看不出更深层次的目的;)
private async void button1_Click(object sender, EventArgs e)
{
TaskCompletionSource? waitTaskSource = null; // or new TaskCompletionSource();
var example = new Example();
var sempahore = new SemaphoreSlim(1);
var tasks = new List<Task>
{
FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 1"),
FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 2"),
FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 3"),
FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 4"),
FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 5"),
};
await Task.WhenAll(tasks);
await FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task End");
}
public static async Task FirstThreadAsync(SemaphoreSlim semaphore, Func<Task> action, TaskCompletionSource? waitTaskSource, string threadName = "")
{
// Try to acquire lock
bool lockAcquired = await semaphore.WaitAsync(TimeSpan.Zero);
if (lockAcquired)
{
// Lock acquired -> Do Work
Debug.WriteLine($"[{threadName}] LOCKED");
try
{
Debug.WriteLine($"[{threadName}] WORK-START");
await action.Invoke();
Debug.WriteLine($"[{threadName}] WORK-END");
semaphore.Release();
Debug.WriteLine($"[{threadName}] UNLOCKED");
}
catch (Exception ex)
{
waitTaskSource?.TrySetException(ex);
}
finally
{
waitTaskSource?.TrySetResult();
}
}
else // No lock acquired
{
// When source is specified, await it.
if(waitTaskSource != null)
{
Debug.WriteLine($"[{threadName}] WAIT-TO-SKIP");
await waitTaskSource.Task;
}
Debug.WriteLine($"[{threadName}] SKIP");
}
}
public class Example
{
public async Task DoWorkAsync()
{
for (var i = 0; i < 10; i++)
{
await Task.Delay(5);
}
}
}我刚刚看到了你的要求,那就是工作包应该只做一次。要实现这一点,您可以跳过释放信号量。
皮草进一步的讨论,我们需要知道更多的细节,关于实际的要求。您当前的一组要求似乎有点奇怪。为什么有多个任务,但只有一个工作包?
https://stackoverflow.com/questions/74321229
复制相似问题