由于我对C#'s任务异步编程和一般并发非常天真,而且测试非常困难,所以我担心这段代码的安全性。很可能这些主题,或者对我来说更深奥的问题(例如处理器特定的问题,比如波动性规则),让我忽略了一些对你来说很明显的事情。请帮助我理解我在这里错过了什么!
给定一个不支持并发的Func<T>,我们可以安全地同时激发该函数,并接收Task<T>。并发初始化被“连接”到函数的两次迭代中的一次,即活动迭代或单个未决迭代。调用方可能不允许加入正在进行的迭代,以避免结果不一致。
ExclusiveFunc类需要一个Func<T>,它是在构造时设置的,不能更改。ExclusiveFunc有一个公共实例方法Task<T> Run(bool joinLate = false),用于启动或“触发”Func<T>。ExclusiveFunc对于多线程的使用是安全的.Func<T>的多个实例永远不会从对Run的任何并发调用中并发运行。当通过ExclusiveFunc.Run进行代理时,Func<T>受到保护,不受计划任务或其他并发使用可能导致的自并发问题的影响。Func<T>。也就是说,最多只有一个Func<T>在运行,另一个在等待运行。必须实现Func<T>,以便对所有挂起的调用者来说,任何未来迭代的副作用和返回值都足够。Task<T>,无论是加入正在运行的还是挂起的操作。如果并发调用者将使用结果,则T必须是并发使用的安全的。Func<T>已经运行时,默认操作是将Func<T>的一个迭代设置为在活动操作完成时运行。在此默认操作中,保证调用方在发出请求后一段时间内启动Func<T>。joinLate。ExclusiveFunc.Run。但是,这种实现对于大量使用来说并不理想,因为每次对Run的调用都会在其自己的线程上对结果进行内部await。Func<T>的任何迭代中加入的所有调用者都将收到他们等待的特定迭代引发的异常。Func<T>的线程上,而是在排队的线程上运行。ExclusiveFunc类不容易受到内部转换争用条件的影响。如果Func<T>抛出或队列耗尽,它将不会停止。lock-less实现引起的跨平台问题?例如,应该将queue标记为volatile吗?using System;
using System.Threading;
using System.Threading.Tasks;
public class ExclusiveFunc<T>
{
public ExclusiveFunc(Func<T> func)
{
queue = new Queue(() => {
try {
return func();
}
finally {
queue = queue.Next();
}
});
}
public async Task<T> Run(bool joinLate = false)
{
var a = queue;
if (a.Current.Acquire())
{
a.Current.Acquired.Start();
return await a.Current.Acquired;
}
if (joinLate) {
return await a.Current.Acquired;
}
if (a.Pending.Acquire()) {
await a.Current.Acquired;
a.Pending.Acquired.Start();
}
return await a.Pending.Acquired;
}
private Queue queue;
private class Queue
{
public readonly State Current;
public readonly State Pending;
public Queue(Func<T> func) : this(func, new State(func)) { }
public Queue Next()
{
return new Queue(func, Pending);
}
private readonly Func<T> func;
private Queue(Func<T> func, State pending)
{
this.func = func;
Current = pending;
Pending = new State(func);
}
}
private class State
{
public Task<T> Acquired;
public State(Func<T> func)
{
this.func = func;
}
public bool Acquire()
{
return Interlocked.CompareExchange(ref Acquired, new Task<T>(func), null) == null;
}
private readonly Func<T> func;
}
}using System.Threading;
using System.Threading.Tasks;
using Xunit;
public class ExclusiveIncrementer {
private int locked = 0;
private int count = 0;
public int Slow() {
Assert.Equal(0, Interlocked.Exchange(ref locked, 1));
Thread.Sleep(100);
Assert.Equal(1, Interlocked.Exchange(ref locked, 0));
return Interlocked.Increment(ref count);
}
}
public class ExclusiveFuncTest_WithoutThreads {
protected delegate Task<int> RunEf(bool joinLate = false);
protected virtual RunEf GetRun() {
return new ExclusiveFunc<int>(new ExclusiveIncrementer().Slow).Run;
}
[Fact]
public async Task ConcurrentRequestCanJoinOngoing() {
var run = GetRun();
var master = run();
var slave = run(true);
Assert.Equal(1, await master);
Assert.Equal(1, await slave);
}
[Fact]
public async Task ConcurrentRequestCanQueueIfOngoing() {
var run = GetRun();
var immediate = run();
var queued = run();
Assert.Equal(1, await immediate);
Assert.Equal(2, await queued);
}
[Fact]
public async Task ProceedsAfterQueueEmpty() {
var run = GetRun();
var first = run();
Assert.Equal(1, await first);
var second = run();
Assert.Equal(2, await second);
}
[Fact]
public async Task FireAndForgetCompletes() {
var run = GetRun();
var first = run();
var second = run();
Assert.Equal(2, await second);
}
[Fact]
public async Task OrderDeterminedByCallNotAwait() {
var run = GetRun();
var first = run();
var second = run();
Assert.Equal(2, await second);
Assert.Equal(1, await first);
}
[Fact]
public async Task MultiplePendingShareOperation() {
var run = GetRun();
var blocking = run();
var firstPending = run();
var secondPending = run();
Assert.Equal(2, await firstPending);
Assert.Equal(2, await secondPending);
}
[Fact]
public async Task JoinWillStartIfRequired() {
var run = GetRun();
var only = run(true);
Assert.Equal(1, await only);
}
}
public class ExclusiveFuncTest_WithThreads : ExclusiveFuncTest_WithoutThreads {
protected override RunEf GetRun() {
var run = base.GetRun();
return runThread;
Task<int> runThread(bool joinLate = false) {
// We enforce order with Sleep, to allow human-readable test outcomes
Thread.Sleep(30);
return Task.Run(() => run(joinLate));
}
}
}背景(不是主要问题,但欢迎ofc评论):
我想将锁定逻辑从系统中的几个可调度任务中分离出来(例如,预定的电话会议设置、到期的电子邮件)。虽然不太可能,但非常紧凑的任务可能会同时运行。与其人为地将调度解析限制为任意的“几乎肯定是安全的”值,我希望确保最多只有一个在运行。调用方可以确定加入正在进行的运行是否足够。
我知道特定域的幂等效应/同步通常更好。
相关:( Node.js) 并发触发和忘记异步任务队列
发布于 2019-06-17 19:29:38
我很感激你一次又一次地修改你的问题,以澄清你的目标。一开始,我以为您正在使用任何给定时间可用的线程来制作同步任务调度程序,这也很酷!实际上,如果允许的话,您精心设计的API可以扩展为一个API:
Func<T>的特定Run实例我没有发现任何与日程安排有关的重大问题。我只想说几句小话。
设计私有类使您能够在定义访问修饰符和验证内部状态和方法的参数方面发挥更大的作用。但是,在单元测试的早期,我会包括一些只对验证参数进行调试的检查,以检测错误的设计。
private class State
{
public Task<T> Acquired; // <- OK, since your API does not allow public access to it
public State(Func<T> func)
{
Debug.Assert(func != null); // or use Contracts API in Debug Mode
this.func = func;
}
}公共API应该使用参数检查。
public ExclusiveFunc(Func<T> func)
{
func = func ?? throw new ArgumentNullException(nameof(func)); // <- a MUST in public API
queue = new Queue(() => {
try {
return func();
}
finally {
queue = queue.Next();
}
});
}发布于 2019-06-21 14:55:36
一些修正/可能的改进:
上崩溃
这是我发现的最重要的。负责启动挂起任务的线程首先等待当前的任务完成,但异常将中止此延续,使队列处于挂起状态。这个例外应该被抓住。
if (a.Pending.Acquire()) {
try {
await a.Current.Acquired;
}
catch {}
a.Pending.Acquired.Start();
}(在发现这个缺陷后,我首先错误地编辑了代码,版主向我指出了这一点。我已经恢复了修复,并把它移到了这个答案。)
待决与当前
之间的分歧
https://codereview.stackexchange.com/questions/222463
复制相似问题