首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >BrokeredMessage在调用OnMessage()后自动释放

BrokeredMessage在调用OnMessage()后自动释放
EN

Stack Overflow用户
提问于 2015-05-26 20:05:27
回答 5查看 2.3K关注 0票数 3

我正在尝试排队从Azure服务总线上的项目,以便我可以批量处理它们。我知道Azure服务总线有一个ReceiveBatch(),但是它似乎有问题,原因如下:

  • 我一次只能得到最多256条消息,即使这样,基于消息大小也可以是随机的。
  • 即使我偷看有多少条消息在等待,我也不知道要打多少个RequestBatch电话,因为我不知道每个电话会给我多少条消息。由于消息将源源不断地传入,所以我不能继续发出请求,直到请求为空,因为它永远不会是空的。

我决定只使用消息侦听器,这比做浪费的窥视更便宜,并将给我更多的控制。

基本上,我试着让一定数量的消息积累起来,然后立即处理它们。我使用计时器强制延迟,但我需要能够排队我的项目,因为他们进来。

根据我的计时器要求,阻塞集合似乎不是一个好的选择,所以我尝试使用ConcurrentBag。

代码语言:javascript
复制
var batchingQueue = new ConcurrentBag<BrokeredMessage>();
myQueueClient.OnMessage((m) =>
{
    Console.WriteLine("Queueing message");
    batchingQueue.Add(m);
});

while (true)
{
    var sw = WaitableStopwatch.StartNew();
    BrokeredMessage msg;
    while (batchingQueue.TryTake(out msg)) // <== Object is already disposed
    {
        ...do this until I have a thousand ready to be written to DB in batch
        Console.WriteLine("Completing message");
        msg.Complete(); // <== ERRORS HERE
    }

    sw.Wait(MINIMUM_DELAY);
}

但是,一旦我访问了OnMessage管道之外的消息,它就会显示已被释放的BrokeredMessage。

--我想这一定是OnMessage的一些自动行为,除了立即处理它之外,我找不到任何方法来处理它,而我不想这样做。

EN

回答 5

Stack Overflow用户

发布于 2015-05-27 00:11:27

这在BlockingCollection中是非常容易做到的。

代码语言:javascript
复制
var batchingQueue = new BlockingCollection<BrokeredMessage>();

myQueueClient.OnMessage((m) =>
{
    Console.WriteLine("Queueing message");
    batchingQueue.Add(m);
});

还有你的消费者线索:

代码语言:javascript
复制
foreach (var msg in batchingQueue.GetConsumingEnumerable())
{
    Console.WriteLine("Completing message");
    msg.Complete();
}

GetConsumingEnumerable返回一个迭代器,该迭代器使用队列中的项,直到设置了IsCompleted属性并且队列为空。如果队列是空的,但IsCompletedFalse,则它会对下一项进行非繁忙的等待。

要取消使用者线程(即关闭程序),您可以停止向队列中添加内容,并让主线程调用batchingQueue.CompleteAdding。使用者将清空队列,查看IsCompleted属性为True,然后退出。

在这里使用BlockingCollection比使用ConcurrentBagConcurrentQueue更好,因为BlockingCollection接口更容易使用。特别是,GetConsumingEnumerable的使用使您不必担心检查计数或执行繁忙的等待(轮询循环)。只是起作用了。

还请注意,ConcurrentBag有一些相当奇怪的删除行为。特别是,移除项的顺序取决于哪个线程移除项。创建包的线程以与其他线程不同的顺序删除项。详细信息请参见使用ConcurrentBag集合

您还没有说明为什么要对输入中的项进行批处理。除非有压倒一切的性能理由这样做,否则用这种批处理逻辑使代码复杂化似乎不是一个特别好的主意。

如果您想对数据库进行批处理写入,那么我建议使用一个简单的List<T>来缓冲这些项。如果您必须在将项目写入数据库之前对其进行处理,那么请使用我前面演示的技术来处理它们。然后,将项目添加到列表中,而不是直接写入数据库。当列表获得1,000项,或给定的时间间隔时,分配一个新列表并启动将旧列表写入数据库的任务。如下所示:

代码语言:javascript
复制
// at class scope

// Flush every 5 minutes.
private readonly TimeSpan FlushDelay = TimeSpan.FromMinutes(5);
private const int MaxBufferItems = 1000;

// Create a timer for the buffer flush.
System.Threading.Timer _flushTimer = new System.Threading.Timer(TimedFlush, FlushDelay.TotalMilliseconds, Timeout.Infinite);  

// A lock for the list. Unless you're getting hundreds of thousands
// of items per second, this will not be a performance problem.
object _listLock = new Object();

List<BrokeredMessage> _recordBuffer = new List<BrokeredMessage>();

那么,在你的消费者中:

代码语言:javascript
复制
foreach (var msg in batchingQueue.GetConsumingEnumerable())
{
    // process the message
    Console.WriteLine("Completing message");
    msg.Complete();
    lock (_listLock)
    {
        _recordBuffer.Add(msg);
        if (_recordBuffer.Count >= MaxBufferItems)
        {
            // Stop the timer
            _flushTimer.Change(Timeout.Infinite, Timeout.Infinite);

            // Save the old list and allocate a new one
            var myList = _recordBuffer;
            _recordBuffer = new List<BrokeredMessage>();

            // Start a task to write to the database
            Task.Factory.StartNew(() => FlushBuffer(myList));

            // Restart the timer
            _flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite);
        }
    }
}

private void TimedFlush()
{
    bool lockTaken = false;
    List<BrokeredMessage> myList = null;

    try
    {
        if (Monitor.TryEnter(_listLock, 0, out lockTaken))
        {
            // Save the old list and allocate a new one
            myList = _recordBuffer;
            _recordBuffer = new List<BrokeredMessage>();
        }
    }
    finally
    {
        if (lockTaken)
        {
            Monitor.Exit(_listLock);
        }
    }

    if (myList != null)
    {
        FlushBuffer(myList);
    }

    // Restart the timer
    _flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite);
}

这里的想法是,将旧列表去掉,分配一个新列表,以便继续处理,然后将旧列表的项写入数据库。锁在那里是为了防止计时器和记录计数器互相踩在一起。如果没有锁,事情可能会在一段时间内正常工作,然后你就会在不可预测的时候发生奇怪的崩溃。

我喜欢这个设计,因为它消除了用户的轮询。我唯一不喜欢的是,使用者必须知道计时器(也就是说,它必须停止,然后重新启动计时器)。再多想一想,我就可以取消这个要求了。但它的写作方式很好。

票数 3
EN

Stack Overflow用户

发布于 2016-12-06 22:14:21

转到OnMessageAsync解决了我的问题

代码语言:javascript
复制
_queueClient.OnMessageAsync(async receivedMessage =>
票数 2
EN

Stack Overflow用户

发布于 2016-11-23 09:42:55

我联系了微软,了解到BrokeredMessage在MSDN上被处理的问题,这是我的回应:

非常基本的规则,我不确定这是否被记录在案。接收到的消息需要在回调函数的生存期内处理。在您的示例中,异步回调完成时将处理消息,这就是为什么您在另一个线程中使用ObjectDisposedException完成尝试失败的原因。 我看不出用于进一步处理的排队消息对吞吐量有何帮助。这无疑会给客户增加更多的负担。尝试在异步回调中处理消息,该回调应该具有足够的性能。

在我的例子中,这意味着我不能以我想要的方式使用ServiceBus,我必须重新思考我想要的东西是如何工作的。该死的。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30467896

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档