首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用bunny gem,如何在收到消息或超时之前阻塞

使用bunny gem,如何在收到消息或超时之前阻塞
EN

Stack Overflow用户
提问于 2016-01-05 10:25:28
回答 3查看 1.3K关注 0票数 2

我正在使用bunny ruby gem向rabbitmq服务器发送和接收消息。如何同步地将消息从队列中弹出,同时设置等待时间的超时(即,如果在3秒后没有消息到达,则停止阻塞)?

一种显而易见的解决方案是在pop调用上循环,直到超时到期或收到消息,但这似乎非常低效。有没有更优雅的解决方案?我查看了bunny的文档以及rabbitmq站点上的教程,但我没有找到适用于此特定场景的解决方案。

EN

回答 3

Stack Overflow用户

发布于 2016-02-01 17:28:17

为了实现这样的特性,a被迫重写了基本方法subscribe。我想我们可以为通道设置超时时间,但是函数中没有这样的输入参数。

代码语言:javascript
复制
response = nil

subscribe(block: true, timeout: 10) do |delivery_info, properties, payload|
  Rails.logger.info "got message #{payload}"
  response = payload
  @channel.consumers[delivery_info.consumer_tag].cancel
end



def subscribe(opts = {block: false}, &block)
    ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
    consumer = Bunny::Consumer.new(@channel,@response_queue,ctag)

    consumer.on_delivery(&block)
    @channel.basic_consume_with(consumer)

    if opts[:block]
      @channel.work_pool.join(opts[:timeout])
    end
end
票数 2
EN

Stack Overflow用户

发布于 2016-08-05 00:39:59

我没有找到一种使用Bunny轻松完成的方法,并且我在这里提出的内容会在没有超时的情况下阻塞。但它确实支持每次调用检索一条消息的语义。考虑到Bunny在内部使用线程池来接收消息,我认为一种更简单的方法可能是使用阻塞队列,例如Ruby的Queue类,用于将消息从Bunny线程池传输到调用线程。类似于以下内容:

代码语言:javascript
复制
# Set up your internal queue somewhere (in your class's initialize maybe?)
@internal_queue = Queue.new

# In the main thread that needs to block
...
# the call to subscribe is non-blocking
queue.subscribe do |delivery_info, properties, payload|
  @internal_queue.enq(payload)  # this runs inside Bunny's pool
end

# the call to deq is blocking
response = @internal_queue.deq  # this blocks the main thread till a
                                # message is pushed to the internal_q

您可以为每个需要侦听的AMQP通道维护一个@internal_queue。您可以将这些部分分解到单独的方法中,并创建一个整洁的阻塞API,一次返回一条消息。

后来,我创建了一个TimedWaitableQueue类,用监视器MonitorMixin包装了一个简单的数组,然后使用互斥锁+条件变量语义。这允许在超时的情况下阻塞出队调用。

票数 1
EN

Stack Overflow用户

发布于 2019-07-19 23:18:12

作为@Ilya:https://stackoverflow.com/a/35126963/448858对上述代码的一个细微变化,我发现我必须创建一个线程来超时,然后关闭通道的工作池

代码语言:javascript
复制
module Bunny
  class Queue
    def subscribe(opts = { block: false, timeout: 1000 }, &block)
      ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
      consumer = Consumer.new(@channel, self, ctag)
      consumer.on_delivery(&block)
      @channel.basic_consume_with(consumer)
      if opts[:block]
        Thread.new do
          sleep(opts[:timeout]/1000.0)
          @channel.work_pool.shutdown
        end
        @channel.work_pool.join
      end
    end
  end
end
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34603250

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档