首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >后台工作者内部异步等待中介死锁--如何检测线程调用本身

后台工作者内部异步等待中介死锁--如何检测线程调用本身
EN

Stack Overflow用户
提问于 2020-05-12 17:45:07
回答 2查看 2.4K关注 0票数 1

我有一个中介器,最近我需要它在后台线程上一次同步一个消息分发,但是它是锁定的,如下所示。

我向队列发送命令并从TaskCompletionSource返回一个任务:

代码语言:javascript
复制
public Task<object> Send(object command, CancellationToken cancellationToken)
{
    var item = new CommandItem() { Command = request, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };            
    this.queue.Writer.WriteAsync(item); // just write and immediatly return the tcs
    return item.Tcs.Task;
}

然后从后台工作者中获取它,并创建处理程序:

代码语言:javascript
复制
var item = await this.queue.Reader.ReadAsync(cancellationToken);
// work out command  type snipped
var command = item.Command as LockMeGoodCommand;
var handler = new LockMeGoodCommandHandler();
var result = await handler.Handle(command, item.Ct);
item.Tcs.SetResult(result);

然后,当命令处理程序被发送到命令处理程序中时(当使用后台线程时,但在线程中,它是可以的),然后使用下面的锁定:

代码语言:javascript
复制
public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
{
   Console.WriteLine(command.GetType().Name);

   // this would get the result but will lock forever when using background worker bus implementation
   var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);

   // perform some action based on the result - but we never get here
   Console.WriteLine("otherResult is " + otherResult);

   return 3;
}

**问题和可能的解决办法**

我相信我们可以避免死锁,方法是检测后台线程是否从它的线程内部(通过命令处理程序调用Send()来发送一个新命令)发送到它自己,如果是这样的话,它不应该使用任何线程机制(post到命令队列或TaskCompletionSource),而应该直接处理任务。

我试图检测线程,但它不起作用,所以在我的处理程序中,在var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken, true)和中,我将手动标记设置为true,我可以确认它正常工作,并且避免了死锁,避免了。

在这个补丁里有什么警告吗?如何检测后台线程是否请求发送命令(线程如何检测自身),以及如何完成以下代码(从DispatchOnBackgroundThread.Send()中包含此自调用检测(因此我可以取消isSameThread标志)?

这似乎更复杂,因为每个等待都会提供不同的线程ID。

代码语言:javascript
复制
// in thread start we set the thread id of the background thread
this.workerThreadId = System.Threading.Thread.CurrentThread.ManagedThreadId;

public Task<object> Send(object command, CancellationToken cancellationToken, bool isSameThread = false)
{
    Console.WriteLine($"this.workerThreadId: {this.workerThreadId}, Thread.CurrentThread.ManagedThreadId: {Thread.CurrentThread.ManagedThreadId}");

    // below doesnt work gives different numbers so i use flag instead
    // this.workerThreadId == Thread.CurrentThread.ManagedThreadId
    if (isSameThread == true)
    {
        if (command is BoringCommand boringCommand)
        {
            var handler = new BoringCommandHandler();
            return handler.Handle(boringCommand, cancellationToken).ContinueWith(t => (object)t);

        }
        else if (command is LockMeGoodCommand lockMeGoodCommand)
        {
            var handler = new LockMeGoodCommandHandler(this);
            return handler.Handle(lockMeGoodCommand, cancellationToken).ContinueWith(t => (object)t);
        }
        else
            throw new Exception("unknown");
    }
    else
    {
        var item = new CommandItem() { Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };
        this.queue.Writer.WriteAsync(item); // just write and immediatly return the cts
        return item.Tcs.Task;
    }
}

**展示问题的守则**

代码语言:javascript
复制
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace TestDeadlock
{
    class BoringCommand { }
    class LockMeGoodCommand { }    

    class BoringCommandHandler
    {
        public Task<int> Handle(BoringCommand command, CancellationToken cancellationToken)
        {
            Console.WriteLine(command.GetType().Name);         
            return Task.FromResult(1);
        }
    }
    class LockMeGoodCommandHandler
    {
        private readonly DispatchOnBackgroundThread commandBus;

        public LockMeGoodCommandHandler(DispatchOnBackgroundThread commandBus) => this.commandBus = commandBus;

        public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
        {
            Console.WriteLine(command.GetType().Name);

            // this locks forever
            var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);
            Console.WriteLine("otherResult is " + otherResult);
            return 3;
        }
    }

    public class DispatchOnBackgroundThread
    {
        private readonly Channel<CommandItem> queue = Channel.CreateUnbounded<CommandItem>();
        private Task worker = null;

        class CommandItem
        {
            public object Command { get; set; }
            public CancellationToken Ct { get; set; }
            public TaskCompletionSource<object> Tcs { get; set; }
        }

        public Task<object> Send(object command, CancellationToken cancellationToken)
        {
            var item = new CommandItem()
            { Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };            
            this.queue.Writer.WriteAsync(item); // just write and immediatly return the tcs
            return item.Tcs.Task;
        }

        public void Start(CancellationToken cancellationToken)
        {
            this.worker = Task.Factory.StartNew(async () =>
            {
                try
                {                    
                    while (cancellationToken.IsCancellationRequested == false)
                    {
                        var item = await this.queue.Reader.ReadAsync(cancellationToken);

                        // simplified DI container magic to static invocation
                        if (item.Command is BoringCommand boringCommand)
                        {
                            var handler = new BoringCommandHandler();
                            var result = await handler.Handle(boringCommand, item.Ct);
                            item.Tcs.SetResult(result);
                        }
                        if (item.Command is LockMeGoodCommand lockMeGoodCommand)
                        {
                            var handler = new LockMeGoodCommandHandler(this);
                            var result = await handler.Handle(lockMeGoodCommand, item.Ct);
                            item.Tcs.SetResult(result);
                        }
                    }
                }
                catch (TaskCanceledException) { }
            },
            TaskCreationOptions.LongRunning)
            .Unwrap();
        }

        public async Task StopAsync()
        {
            this.queue.Writer.Complete();
            await this.worker;
        }
    }

    class Program
    {
        static async Task Main(string[] args)
        {
            var cts = new CancellationTokenSource();
            var threadStrategy = new DispatchOnBackgroundThread();
            threadStrategy.Start(cts.Token);

            var result1 = await threadStrategy.Send(new BoringCommand(), cts.Token);
            var result2 = await threadStrategy.Send(new LockMeGoodCommand(), cts.Token);

            cts.Cancel();
            await threadStrategy.StopAsync();
        }
    }
}

**不需要锁定的简单非线程中介实现**

代码语言:javascript
复制
public class DispatchInCallingThread
{
    public async Task<object> Send(object request, CancellationToken cancellationToken)
    {
        // simplified DI container magic to static invocation
        if (request is BoringCommand boringCommand)
        {
            var handler = new BoringCommandHandler();
            return await handler.Handle(boringCommand, cancellationToken);
        }
        else if (request is LockMeGoodCommand lockMeGoodCommand)
        {
            var handler = new LockMeGoodCommandHandler(this);
            return await handler.Handle(lockMeGoodCommand, cancellationToken);
        }
        else
            throw new Exception("unknown");
    }
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-05-13 18:41:30

谢谢斯蒂芬的回答和彼得的评论,当你说谢谢的时候,这是非常清楚的,

有一个负责处理队列的代码循环(不是一个特定的线程,请参阅下面)。当它处理每个命令时,它将等待该命令的处理程序。

有一个命令处理程序正在等待另一个要处理的命令。但是,这是无法工作的,因为将不再处理其他命令;在此命令完成之前,代码循环将不会对下一个命令进行排队列。

考虑到上述情况,我找到了一种无需任何线程攻击(检测堆栈/重入深度等)或调度程序的方法。

在下面的示例中,我“注入”给处理程序,而不是循环调用类,而是一种不同类型的命令处理程序dispatcher,它不执行任何队列,而是直接在线程中处理。

下面是从线程循环中调用的,然后不存在相互依赖关系:

代码语言:javascript
复制
public class DispatchInCallingThread: ICommandBus
{
    public async Task<object> Send(object request, CancellationToken cancellationToken)
    {
        // simplified DI container magic to static invocation
        if (request is BoringCommand boringCommand)
        {
            var handler = new BoringCommandHandler();
            return await handler.Handle(boringCommand, cancellationToken);
        }
        else if (request is LockMeGoodCommand lockMeGoodCommand)
        {
            var handler = new LockMeGoodCommandHandler(this);
            return await handler.Handle(lockMeGoodCommand, cancellationToken);
        }
        else
            throw new Exception("cough furball");
    }

    public void Start(CancellationToken cancellationToken) { }

    public Task StopAsync() { return Task.CompletedTask; }
}

在后台线程中,这是对实例化命令处理程序的注入:

代码语言:javascript
复制
else if (item.Command is LockMeGoodCommand lockMeGoodCommand)
{
    var handler = new LockMeGoodCommandHandler(this.dispatchInCallingThread);
    var result = await handler.Handle(lockMeGoodCommand, item.Ct);
    item.Tcs.SetResult(result);
}

现在,代码将永远运行(需要为正在设置的取消令牌源实现适当的关闭逻辑):

代码语言:javascript
复制
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace TestDeadlock
{
    class BoringCommand { }
    class LockMeGoodCommand { }    

    class BoringCommandHandler
    {
        public Task<int> Handle(BoringCommand command, CancellationToken cancellationToken)
        {
            Console.WriteLine(command.GetType().Name);         
            return Task.FromResult(1);
        }
    }

    class LockMeGoodCommandHandler
    {
        private readonly ICommandBus commandBus;

        public LockMeGoodCommandHandler(ICommandBus commandBus) => this.commandBus = commandBus;

        public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
        {            
            Console.WriteLine(command.GetType().Name);
            var otherResult =  await this.commandBus.Send(new BoringCommand(), cancellationToken);
            var otherResult2 = await this.commandBus.Send(new BoringCommand(), cancellationToken);
            return 3;
        }
    }

    public interface ICommandBus
    {
        Task<object> Send(object request, CancellationToken cancellationToken);
        void Start(CancellationToken cancellationToken);
        Task StopAsync();
    }

    public class DispatchOnBackgroundThread : ICommandBus
    {
        private readonly Channel<CommandItem> queue = Channel.CreateUnbounded<CommandItem>();
        private Task worker = null;
        private readonly DispatchInCallingThread dispatchInCallingThread = new DispatchInCallingThread();

        class CommandItem
        {
            public object Command { get; set; }
            public CancellationToken Ct { get; set; }
            public TaskCompletionSource<object> Tcs { get; set; }
        }

        public Task<object> Send(object command, CancellationToken cancellationToken)
        {
            var item = new CommandItem() { Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };
            this.queue.Writer.WriteAsync(item, cancellationToken); // just write and immediatly return the cts
            return item.Tcs.Task;            
        }

        public void Start(CancellationToken cancellationToken)
        {
            var scheduler = new ConcurrentExclusiveSchedulerPair();

            this.worker = Task.Factory.StartNew(async () =>
            {
                CommandItem item = null;
                try
                {                
                    while (cancellationToken.IsCancellationRequested == false)
                    {
                        item = await this.queue.Reader.ReadAsync(cancellationToken);

                        // simplified DI container magic to static invocation
                        if (item.Command is BoringCommand boringCommand)
                        {
                            var handler = new BoringCommandHandler();
                            var result = handler.Handle(boringCommand, item.Ct);
                            item.Tcs.SetResult(result);

                        }
                        else if (item.Command is LockMeGoodCommand lockMeGoodCommand)
                        {
                            var handler = new LockMeGoodCommandHandler(this.dispatchInCallingThread);
                            var result = await handler.Handle(lockMeGoodCommand, item.Ct);
                            item.Tcs.SetResult(result);
                        }
                        else
                            throw new Exception("unknown");
                    }
                }
                catch (TaskCanceledException)
                {
                    if (item != null)
                        item.Tcs.SetCanceled();
                }
                Console.WriteLine("exit background thread");
            })
            .Unwrap();  

        }

        public async Task StopAsync()
        {
            this.queue.Writer.Complete();
            await this.worker;
        }
    }

    public class DispatchInCallingThread: ICommandBus
    {
        public async Task<object> Send(object request, CancellationToken cancellationToken)
        {
            // simplified DI container magic to static invocation
            if (request is BoringCommand boringCommand)
            {
                var handler = new BoringCommandHandler();
                return await handler.Handle(boringCommand, cancellationToken);
            }
            else if (request is LockMeGoodCommand lockMeGoodCommand)
            {
                var handler = new LockMeGoodCommandHandler(this);
                return await handler.Handle(lockMeGoodCommand, cancellationToken);
            }
            else
                throw new Exception("unknown");
        }

        public void Start(CancellationToken cancellationToken) { }
        public Task StopAsync() { return Task.CompletedTask; }
    }

    class Program
    {
        static async Task Main(string[] args)
        {
            await TestDispatchOnBackgroundThread();
        }

        static async Task TestDispatchOnBackgroundThread()
        {
            var cts = new CancellationTokenSource();

            Console.CancelKeyPress += delegate {
                Console.WriteLine("setting cts.Cancel()");
                cts.Cancel();
            };

            var threadStrategy = new DispatchOnBackgroundThread();
            threadStrategy.Start(cts.Token);

            while (cts.IsCancellationRequested == false)
            {
                Console.WriteLine("***************** sending new batch ****************");                
                var result1 = await threadStrategy.Send(new BoringCommand(), cts.Token);
                var result3 = await threadStrategy.Send(new LockMeGoodCommand(), cts.Token);
                Thread.Sleep(1000);
            }
            await threadStrategy.StopAsync();
        }
    }
}

关于进一步的信息,使用依赖项注入的实际实现是在这里https://stackoverflow.com/a/61791817/915839,它能够动态地切换到工作线程内的线程内分派。

票数 0
EN

Stack Overflow用户

发布于 2020-05-13 02:35:11

死锁的原因相当简单:

  • 有一个负责处理队列的代码循环(不是一个特定的线程;请参阅下面)。当它处理每个命令时,它会await该命令的处理程序。
  • 有一个命令处理程序,await是要处理的另一个命令。但是,这是无法工作的,因为将不再处理其他命令;在此命令完成之前,代码循环不会对下一个命令进行排队列。

换句话说,如果一次只能执行一个命令,那么一个命令执行另一个命令在逻辑上是不可能的。

解决这个问题有几种可能的方法。我不推荐“可重入者”的方法;重入是导致许多微妙的逻辑错误的原因。我建议的方法如下:

  1. 更改Send语义,使其成为“队列”语义。这意味着不可能获得命令结果;结果必须以消息的形式通过某些mediator.
  2. Have发送--代码循环而不是命令处理程序await,允许它返回并获取下一个命令。这意味着它不是“一次同步一个”--任何more.
  3. Redefine“一次同步一个”,而是“一次一个同步”,但是如果它是awaiting,那么它就不算一个。在这种情况下,您可能可以使用类似于ConcurrentExclusiveSchedulerPairNito.AsyncEx.AsyncContext的东西一次运行一个块的方法。

附带注意:LongRunning不做您认为它正在做的事情。StartNew is not async-aware,因此LongRunning标志只应用于第一个await之前的代码;之后,该lambda中的代码将在任意线程池线程上运行(没有设置LongRunning )。将StartNew替换为Task.Run将使代码更加清晰。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61758446

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档