我正在使用bunny ruby gem向rabbitmq服务器发送和接收消息。如何同步地将消息从队列中弹出,同时设置等待时间的超时(即,如果在3秒后没有消息到达,则停止阻塞)?
一种显而易见的解决方案是在pop调用上循环,直到超时到期或收到消息,但这似乎非常低效。有没有更优雅的解决方案?我查看了bunny的文档以及rabbitmq站点上的教程,但我没有找到适用于此特定场景的解决方案。
发布于 2016-02-01 17:28:17
为了实现这样的特性,a被迫重写了基本方法subscribe。我想我们可以为通道设置超时时间,但是函数中没有这样的输入参数。
response = nil
subscribe(block: true, timeout: 10) do |delivery_info, properties, payload|
Rails.logger.info "got message #{payload}"
response = payload
@channel.consumers[delivery_info.consumer_tag].cancel
end
def subscribe(opts = {block: false}, &block)
ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
consumer = Bunny::Consumer.new(@channel,@response_queue,ctag)
consumer.on_delivery(&block)
@channel.basic_consume_with(consumer)
if opts[:block]
@channel.work_pool.join(opts[:timeout])
end
end发布于 2016-08-05 00:39:59
我没有找到一种使用Bunny轻松完成的方法,并且我在这里提出的内容会在没有超时的情况下阻塞。但它确实支持每次调用检索一条消息的语义。考虑到Bunny在内部使用线程池来接收消息,我认为一种更简单的方法可能是使用阻塞队列,例如Ruby的Queue类,用于将消息从Bunny线程池传输到调用线程。类似于以下内容:
# Set up your internal queue somewhere (in your class's initialize maybe?)
@internal_queue = Queue.new
# In the main thread that needs to block
...
# the call to subscribe is non-blocking
queue.subscribe do |delivery_info, properties, payload|
@internal_queue.enq(payload) # this runs inside Bunny's pool
end
# the call to deq is blocking
response = @internal_queue.deq # this blocks the main thread till a
# message is pushed to the internal_q您可以为每个需要侦听的AMQP通道维护一个@internal_queue。您可以将这些部分分解到单独的方法中,并创建一个整洁的阻塞API,一次返回一条消息。
后来,我创建了一个TimedWaitableQueue类,用监视器MonitorMixin包装了一个简单的数组,然后使用互斥锁+条件变量语义。这允许在超时的情况下阻塞出队调用。
发布于 2019-07-19 23:18:12
作为@Ilya:https://stackoverflow.com/a/35126963/448858对上述代码的一个细微变化,我发现我必须创建一个线程来超时,然后关闭通道的工作池
module Bunny
class Queue
def subscribe(opts = { block: false, timeout: 1000 }, &block)
ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
consumer = Consumer.new(@channel, self, ctag)
consumer.on_delivery(&block)
@channel.basic_consume_with(consumer)
if opts[:block]
Thread.new do
sleep(opts[:timeout]/1000.0)
@channel.work_pool.shutdown
end
@channel.work_pool.join
end
end
end
endhttps://stackoverflow.com/questions/34603250
复制相似问题