我问了here一个问题,为什么使用Thread.Run启动一个进程执行的并发请求没有我预期的那么多。
这个问题背后的原因是,我试图创建一个类,它可以从rabbitmq队列中拉出消息,并并发处理它们,直到并发消息的最大数量。
为此,我在EventingBasicConsumer类的Received处理程序中使用了以下代码。
async void Handle(EventArgs e)
{
await _semaphore.WaitAsync();
var thread = new Thread(() =>
{
Process(e);
_semaphore.Release();
_channel.BasicAck(....);
});
thread.Start();
} 然而,上一篇文章的评论是,除非执行CPU限制的工作,否则不会启动线程。
上面的处理程序不知道工作是否会受到CPU、网络、磁盘或其他方面的限制。(Process是一种抽象方法)。
尽管如此,我认为我必须在这里启动一个线程或任务,否则Process方法会阻塞rabbitmq线程,并且事件处理程序在完成之前不会再次调用。所以我一次只能处理一个方法。
在这里开始一个新的Thread可以吗?最初我使用的是Task.Run,但这并没有产生想要的那么多的工人。请参阅其他帖子。
仅供参考。通过在信号量上设置InitialCount来限制并发线程的数量。
发布于 2017-02-21 04:17:31
正如在链接的问题中所说的,大量的线程并不能保证性能,因为如果它们的数量超过了逻辑核心的数量,你就会得到一个没有实际工作的thread starvation情况。
但是,如果您仍然需要处理并发操作的数量,您可以尝试使用TPL Dataflow库,在MaxDegreeOfParallelism上进行设置,就像在this tutorial中一样。
var workerBlock = new ActionBlock<EventArgs>(
// Process event
e => Process(e),
// Specify a maximum degree of parallelism.
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = InitialCount
});
var bufferBlock = new BufferBlock();
// link the blocks for automatically propagading the messages
bufferBlock.LinkTo(workerBlock);
// asynchronously send the message
await bufferBlock.SendAsync(...);
// synchronously send the message
bufferBlock.Post(...);BufferBlock是一个队列,因此消息的顺序将被保留。此外,您还可以通过使用filter lambda链接块来添加不同的处理程序(具有不同的并行度):
bufferBlock.LinkTo(cpuWorkerBlock, e => e is CpuEventArgs);
bufferBlock.LinkTo(networkWorkerBlock, e => e is NetworkEventArgs);
bufferBlock.LinkTo(diskWorkerBlock, e => e is DiskEventArgs);但在这种情况下,您应该在链的末尾设置一个默认处理程序,这样消息就不会消失(为此,您可以使用NullTarget块):
bufferBlock.LinkTo(DataflowBlock.NullTarget<EventArgs>);此外,代码块可以是观察者,因此它们可以在UI端与Reactive Extensions完美地协作。
https://stackoverflow.com/questions/42285680
复制相似问题