首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >并发处理rabbitmq消息

并发处理rabbitmq消息
EN

Stack Overflow用户
提问于 2017-02-17 06:14:15
回答 1查看 360关注 0票数 0

我问了here一个问题,为什么使用Thread.Run启动一个进程执行的并发请求没有我预期的那么多。

这个问题背后的原因是,我试图创建一个类,它可以从rabbitmq队列中拉出消息,并并发处理它们,直到并发消息的最大数量。

为此,我在EventingBasicConsumer类的Received处理程序中使用了以下代码。

代码语言:javascript
复制
async void Handle(EventArgs e) 
{
    await _semaphore.WaitAsync();

    var thread = new Thread(() =>
    {
        Process(e);
        _semaphore.Release(); 
        _channel.BasicAck(....);
    });
    thread.Start();
} 

然而,上一篇文章的评论是,除非执行CPU限制的工作,否则不会启动线程。

上面的处理程序不知道工作是否会受到CPU、网络、磁盘或其他方面的限制。(Process是一种抽象方法)。

尽管如此,我认为我必须在这里启动一个线程或任务,否则Process方法会阻塞rabbitmq线程,并且事件处理程序在完成之前不会再次调用。所以我一次只能处理一个方法。

在这里开始一个新的Thread可以吗?最初我使用的是Task.Run,但这并没有产生想要的那么多的工人。请参阅其他帖子。

仅供参考。通过在信号量上设置InitialCount来限制并发线程的数量。

EN

回答 1

Stack Overflow用户

发布于 2017-02-21 04:17:31

正如在链接的问题中所说的,大量的线程并不能保证性能,因为如果它们的数量超过了逻辑核心的数量,你就会得到一个没有实际工作的thread starvation情况。

但是,如果您仍然需要处理并发操作的数量,您可以尝试使用TPL Dataflow库,在MaxDegreeOfParallelism上进行设置,就像在this tutorial中一样。

代码语言:javascript
复制
var workerBlock = new ActionBlock<EventArgs>(
    // Process event
    e => Process(e),
    // Specify a maximum degree of parallelism.
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = InitialCount
    });
var bufferBlock = new BufferBlock();
// link the blocks for automatically propagading the messages
bufferBlock.LinkTo(workerBlock);

// asynchronously send the message
await bufferBlock.SendAsync(...);
// synchronously send the message
bufferBlock.Post(...);

BufferBlock是一个队列,因此消息的顺序将被保留。此外,您还可以通过使用filter lambda链接块来添加不同的处理程序(具有不同的并行度):

代码语言:javascript
复制
bufferBlock.LinkTo(cpuWorkerBlock, e => e is CpuEventArgs);
bufferBlock.LinkTo(networkWorkerBlock, e => e is NetworkEventArgs);
bufferBlock.LinkTo(diskWorkerBlock, e => e is DiskEventArgs);

但在这种情况下,您应该在链的末尾设置一个默认处理程序,这样消息就不会消失(为此,您可以使用NullTarget块):

代码语言:javascript
复制
bufferBlock.LinkTo(DataflowBlock.NullTarget<EventArgs>);

此外,代码块可以是观察者,因此它们可以在UI端与Reactive Extensions完美地协作。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42285680

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档