首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从不同线程访问后释放的BrokeredMessage

从不同线程访问后释放的BrokeredMessage
EN

Stack Overflow用户
提问于 2016-11-22 18:11:35
回答 1查看 214关注 0票数 0

这可能是this question的副本,但这与批处理数据库更新的说法混淆了,仍然没有正确的答案。

在一个使用Azure服务总线队列的简单示例中,我不能在BrokeredMessage被放到队列中之后访问它;如果我从另一个线程读取队列,它总是会被释放。

示例代码:

代码语言:javascript
复制
class Program {
    private static string _serviceBusConnectionString = "XXX";

    private static BlockingCollection<BrokeredMessage> _incomingMessages = new BlockingCollection<BrokeredMessage>();
    private static CancellationTokenSource _cancelToken = new CancellationTokenSource();

    private static QueueClient _client;

    static void Main(string[] args) {

        // Set up a few listeners on different threads
        Task.Run(async () => {
            while (!_cancelToken.IsCancellationRequested) {
                var msg = _incomingMessages.Take(_cancelToken.Token);
                if (msg != null) {
                    try {
                        await msg.CompleteAsync();
                        Console.WriteLine($"Completed Message Id: {msg.MessageId}");
                    } catch (ObjectDisposedException) {
                        Console.WriteLine("Message was disposed!?");
                    }
                }
            }
        });


        // Now set up our service bus reader
        _client = GetQueueClient("test");

        _client.OnMessageAsync(async (message) => {
            await Task.Run(() => _incomingMessages.Add(message));
        },
        new OnMessageOptions() {
            AutoComplete = false
        });

        // Now start sending
        Task.Run(async () => {
            int sent = 0;
            while (!_cancelToken.IsCancellationRequested) {
                var msg = new BrokeredMessage();
                await _client.SendAsync(msg);
                Console.WriteLine($"Sent {++sent}");
                await Task.Delay(1000);
            }
        });

        Console.ReadKey();
        _cancelToken.Cancel();

    }

    private static QueueClient GetQueueClient(string queueName) {

        var namespaceManager = NamespaceManager.CreateFromConnectionString(_serviceBusConnectionString);
        if (!namespaceManager.QueueExists(queueName)) {
            var settings = new QueueDescription(queueName);
            settings.MaxDeliveryCount = 10;
            settings.LockDuration = TimeSpan.FromSeconds(5);
            settings.EnableExpress = true;
            settings.EnablePartitioning = true;
            namespaceManager.CreateQueue(settings);
        }

        var factory = MessagingFactory.CreateFromConnectionString(_serviceBusConnectionString);
        factory.RetryPolicy = new RetryExponential(minBackoff: TimeSpan.FromSeconds(0.1), maxBackoff: TimeSpan.FromSeconds(30), maxRetryCount: 100);
        var queueClient = factory.CreateQueueClient(queueName);

        return queueClient;
    }
}

我已经尝试了设置,但不能让它工作。有什么想法吗?

EN

回答 1

Stack Overflow用户

发布于 2016-11-23 17:37:28

回答我自己的问题,来自Serkant Karaca @ Microsoft here

非常基本的规则,我不确定这是否记录在案。接收到的消息需要在回调函数的生存期内进行处理。在您的情况下,消息将在异步回调完成时被处理,这就是为什么您的完整尝试在另一个线程中使用ObjectDisposedException失败的原因。

我真的看不出排队等待进一步处理对吞吐量有什么帮助。这肯定会给客户增加更多负担。尝试在异步回调中处理消息,这应该是足够高的性能。

该死的。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40739063

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档