首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >异步并发预防类

异步并发预防类
EN

Code Review用户
提问于 2019-06-17 14:35:51
回答 2查看 225关注 0票数 5

由于我对C#'s任务异步编程和一般并发非常天真,而且测试非常困难,所以我担心这段代码的安全性。很可能这些主题,或者对我来说更深奥的问题(例如处理器特定的问题,比如波动性规则),让我忽略了一些对你来说很明显的事情。请帮助我理解我在这里错过了什么!

意图

给定一个不支持并发的Func<T>,我们可以安全地同时激发该函数,并接收Task<T>。并发初始化被“连接”到函数的两次迭代中的一次,即活动迭代或单个未决迭代。调用方可能不允许加入正在进行的迭代,以避免结果不一致。

合同

  • 这个ExclusiveFunc类需要一个Func<T>,它是在构造时设置的,不能更改。
  • ExclusiveFunc有一个公共实例方法Task<T> Run(bool joinLate = false),用于启动或“触发”Func<T>
  • ExclusiveFunc对于多线程的使用是安全的.Func<T>的多个实例永远不会从对Run的任何并发调用中并发运行。当通过ExclusiveFunc.Run进行代理时,Func<T>受到保护,不受计划任务或其他并发使用可能导致的自并发问题的影响。
  • 也不会有超过一个挂起的Func<T>。也就是说,最多只有一个Func<T>在运行,另一个在等待运行。必须实现Func<T>,以便对所有挂起的调用者来说,任何未来迭代的副作用和返回值都足够。
  • 调用者在他们加入的迭代中接收Task<T>,无论是加入正在运行的还是挂起的操作。如果并发调用者将使用结果,则T必须是并发使用的安全的。
  • 因为“加入运行晚了”可能会导致不可接受的肮脏/陈旧信息或错过处理,因此当Func<T>已经运行时,默认操作是将Func<T>的一个迭代设置为在活动操作完成时运行。在此默认操作中,保证调用方在发出请求后一段时间内启动Func<T>
  • 如果对正在进行的迭代(包括其返回值和副作用)有足够的了解,并且不需要启动额外的迭代,调用方可以选择joinLate
  • 如果需要“着火和遗忘”的实现,则不需要等待ExclusiveFunc.Run。但是,这种实现对于大量使用来说并不理想,因为每次对Run的调用都会在其自己的线程上对结果进行内部await
  • Func<T>的任何迭代中加入的所有调用者都将收到他们等待的特定迭代引发的异常。
  • 等待调用者不会不必要地等待(例如以后的运行),特别是,队列请求不会被强制放到正在运行Func<T>的线程上,而是在排队的线程上运行。
  • ExclusiveFunc类不容易受到内部转换争用条件的影响。如果Func<T>抛出或队列耗尽,它将不会停止。

特定关注事项

  • 是否存在lock-less实现引起的跨平台问题?例如,应该将queue标记为volatile吗?
  • 有异常处理问题吗?例如,调用方不会接收异常,或者异常会导致队列崩溃。
  • 在工作发生的地方是否有导致意外结果的缺陷?例如,一个任务从未完成,因为它的线程也运行挂起的请求。
  • 缺少任务方法选项是一个问题吗?例如,这是否导致不正确的SynchronizationContext?
  • 这有可能导致线程池问题吗?例如,使用调用方期望的两倍的线程?
  • 使用一个任务以这种方式同步多个线程是否有问题?

代码语言:javascript
复制
using System;
using System.Threading;
using System.Threading.Tasks;

public class ExclusiveFunc<T>
{
    public ExclusiveFunc(Func<T> func)
    {
        queue = new Queue(() => {
            try {
                return func();
            }
            finally {
                queue = queue.Next();
            }
        });
    }

    public async Task<T> Run(bool joinLate = false)
    {
        var a = queue;
        if (a.Current.Acquire())
        {
            a.Current.Acquired.Start();
            return await a.Current.Acquired;
        }
        if (joinLate) {
            return await a.Current.Acquired;
        }
        if (a.Pending.Acquire()) {
            await a.Current.Acquired;
            a.Pending.Acquired.Start();
        }
        return await a.Pending.Acquired;
    }

    private Queue queue;

    private class Queue
    {
        public readonly State Current;
        public readonly State Pending;
        public Queue(Func<T> func) : this(func, new State(func)) { }
        public Queue Next()
        {
            return new Queue(func, Pending);
        }

        private readonly Func<T> func;
        private Queue(Func<T> func, State pending)
        {
            this.func = func;
            Current = pending;
            Pending = new State(func);
        }
    }

    private class State
    {
        public Task<T> Acquired;
        public State(Func<T> func)
        {
            this.func = func;
        }
        public bool Acquire()
        {
            return Interlocked.CompareExchange(ref Acquired, new Task<T>(func), null) == null;
        }

        private readonly Func<T> func;
    }
}

测试

代码语言:javascript
复制
using System.Threading;
using System.Threading.Tasks;
using Xunit;

public class ExclusiveIncrementer {
    private int locked = 0;
    private int count = 0;
    public int Slow() {
        Assert.Equal(0, Interlocked.Exchange(ref locked, 1));
        Thread.Sleep(100);
        Assert.Equal(1, Interlocked.Exchange(ref locked, 0));
        return Interlocked.Increment(ref count);
    }
}

public class ExclusiveFuncTest_WithoutThreads {
    protected delegate Task<int> RunEf(bool joinLate = false);

    protected virtual RunEf GetRun() {
        return new ExclusiveFunc<int>(new ExclusiveIncrementer().Slow).Run;
    }

    [Fact]
    public async Task ConcurrentRequestCanJoinOngoing() {
        var run = GetRun();
        var master = run();
        var slave = run(true);
        Assert.Equal(1, await master);
        Assert.Equal(1, await slave);
    }

    [Fact]
    public async Task ConcurrentRequestCanQueueIfOngoing() {
        var run = GetRun();
        var immediate = run();
        var queued = run();
        Assert.Equal(1, await immediate);
        Assert.Equal(2, await queued);
    }

    [Fact]
    public async Task ProceedsAfterQueueEmpty() {
        var run = GetRun();
        var first = run();
        Assert.Equal(1, await first);
        var second = run();
        Assert.Equal(2, await second);
    }

    [Fact]
    public async Task FireAndForgetCompletes() {
        var run = GetRun();
        var first = run();
        var second = run();
        Assert.Equal(2, await second);
    }

    [Fact]
    public async Task OrderDeterminedByCallNotAwait() {
        var run = GetRun();
        var first = run();
        var second = run();
        Assert.Equal(2, await second);
        Assert.Equal(1, await first);
    }

    [Fact]
    public async Task MultiplePendingShareOperation() {
        var run = GetRun();
        var blocking = run();
        var firstPending = run();
        var secondPending = run();
        Assert.Equal(2, await firstPending);
        Assert.Equal(2, await secondPending);
    }

    [Fact]
    public async Task JoinWillStartIfRequired() {
        var run = GetRun();
        var only = run(true);
        Assert.Equal(1, await only);
    }
}

public class ExclusiveFuncTest_WithThreads : ExclusiveFuncTest_WithoutThreads {
    protected override RunEf GetRun() {
        var run = base.GetRun();
        return runThread;

        Task<int> runThread(bool joinLate = false) {
            // We enforce order with Sleep, to allow human-readable test outcomes
            Thread.Sleep(30);
            return Task.Run(() => run(joinLate));
        }
    }
}

背景(不是主要问题,但欢迎ofc评论):

我想将锁定逻辑从系统中的几个可调度任务中分离出来(例如,预定的电话会议设置、到期的电子邮件)。虽然不太可能,但非常紧凑的任务可能会同时运行。与其人为地将调度解析限制为任意的“几乎肯定是安全的”值,我希望确保最多只有一个在运行。调用方可以确定加入正在进行的运行是否足够。

我知道特定域的幂等效应/同步通常更好。

相关:( Node.js) 并发触发和忘记异步任务队列

EN

回答 2

Code Review用户

发布于 2019-06-17 19:29:38

前言

我很感激你一次又一次地修改你的问题,以澄清你的目标。一开始,我以为您正在使用任何给定时间可用的线程来制作同步任务调度程序,这也很酷!实际上,如果允许的话,您精心设计的API可以扩展为一个API:

  • 将待执行的处决链接起来
  • 每个Func<T>的特定Run实例

评论

我没有发现任何与日程安排有关的重大问题。我只想说几句小话。

设计私有类使您能够在定义访问修饰符和验证内部状态和方法的参数方面发挥更大的作用。但是,在单元测试的早期,我会包括一些只对验证参数进行调试的检查,以检测错误的设计。

代码语言:javascript
复制
private class State
{
     public Task<T> Acquired; // <- OK, since your API does not allow public access to it
     public State(Func<T> func)
     {
          Debug.Assert(func != null); // or use Contracts API in Debug Mode
          this.func = func;
     }
}

公共API应该使用参数检查。

代码语言:javascript
复制
public ExclusiveFunc(Func<T> func)
{
    func = func ?? throw new ArgumentNullException(nameof(func)); // <- a MUST in public API
    queue = new Queue(() => {
        try {
            return func();
        }
        finally {
            queue = queue.Next();
        }
    });
}
票数 3
EN

Code Review用户

发布于 2019-06-21 14:55:36

一些修正/可能的改进:

调度器在异常

上崩溃

这是我发现的最重要的。负责启动挂起任务的线程首先等待当前的任务完成,但异常将中止此延续,使队列处于挂起状态。这个例外应该被抓住。

代码语言:javascript
复制
if (a.Pending.Acquire()) {
    try {
        await a.Current.Acquired;
    }
    catch {}
    a.Pending.Acquired.Start();
}

(在发现这个缺陷后,我首先错误地编辑了代码,版主向我指出了这一点。我已经恢复了修复,并把它移到了这个答案。)

契约声明的线程分配不匹配

待决与当前

SynchronizationContext

之间的分歧

票数 0
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/222463

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档