首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RabbitMQ/bunny:如果在线程中没有调用订阅块

RabbitMQ/bunny:如果在线程中没有调用订阅块
EN

Stack Overflow用户
提问于 2018-02-07 23:05:03
回答 1查看 704关注 0票数 2

如果在线程中执行,我很难获得队列订阅块。

来自红兔/交换的示例如预期的那样工作。但是,如果与线程中的使用者部分相适应,则订阅服务器块似乎不会执行。

我尝试过几个简单的变体,包括设置一个共享变量标志,但都没有成功。

我遗漏了什么?

代码

代码语言:javascript
复制
#!/usr/bin/env ruby
require "bunny"

quit = false

consumer = Thread.new do
  puts "consumer start"

  cnx = Bunny.new
  cnx.start
  cn  = cnx.create_channel
  ex = cn.topic("weathr", :auto_delete => true)

  q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
  q.subscribe do |delivery_info, properties, payload|
    puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
  end

  loop {
    sleep 1
    break if quit
  }

  cnx.close
  puts "consumer done"
end

connection = Bunny.new
connection.start
connection  = connection.create_channel
exchange = connection.topic("weathr", :auto_delete => true)
exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
  publish("Berkeley update",         :routing_key => "americas.north.us.ca.berkeley").
  publish("San Francisco update",    :routing_key => "americas.north.us.ca.sanfrancisco").
  publish("New York update",         :routing_key => "americas.north.us.ny.newyork").
  publish("São Paolo update",        :routing_key => "americas.south.brazil.saopaolo").
  publish("Hong Kong update",        :routing_key => "asia.southeast.hk.hongkong").
  publish("Kyoto update",            :routing_key => "asia.southeast.japan.kyoto").
  publish("Shanghai update",         :routing_key => "asia.southeast.prc.shanghai").
  publish("Rome update",             :routing_key => "europe.italy.roma").
  publish("Paris update",            :routing_key => "europe.france.paris")

sleep 5
connection.close

quit = true
consumer.join

实际输出

代码语言:javascript
复制
consumer start
consumer done

预期产出

代码语言:javascript
复制
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
An update for North America: New York update, routing key is americas.north.us.ny.newyork
consumer done
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-02-08 17:55:15

线程的订阅块没有执行,因为队列根本没有接收到任何消息。为了详细说明,在本例中,队列将在消息发布后创建。

这可以通过将消息切换到:mandatory => true并使用Bunny::Exchange#on_return来可视化。

代码

代码语言:javascript
复制
#!/usr/bin/env ruby
require "bunny"

quit = false

connection = Bunny.new
connection.start

consumer = Thread.new do
  puts "consumer start"
  cn  = connection.create_channel
  ex = cn.topic("weathr", :auto_delete => true)

  q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
  q.subscribe do |delivery_info, properties, payload|
    puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
  end

  sleep 1 while !quit

  cn.close
  puts "consumer done"
end

channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
  puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
end

exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
  publish("Berkeley update",         :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
  publish("San Francisco update",    :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
  publish("New York update",         :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
  publish("São Paolo update",        :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
  publish("Hong Kong update",        :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
  publish("Kyoto update",            :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
  publish("Shanghai update",         :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
  publish("Rome update",             :mandatory => true, :routing_key => "europe.italy.roma").
  publish("Paris update",            :mandatory => true, :routing_key => "europe.france.paris")

channel.close
sleep 5

quit = true
consumer.join
connection.close

输出

代码语言:javascript
复制
consumer start
San Diego update was returned! reply_code = 312, reply_text = NO_ROUTE
Berkeley update was returned! reply_code = 312, reply_text = NO_ROUTE
San Francisco update was returned! reply_code = 312, reply_text = NO_ROUTE
New York update was returned! reply_code = 312, reply_text = NO_ROUTE
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done

正如我们所看到的,所有消息最终都会以NO_ROUTE返回。

在消息发布之前强制队列(和路由)存在的简单解决方案:

代码语言:javascript
复制
#!/usr/bin/env ruby
require "bunny"

quit = false
consumer_queued = false

connection = Bunny.new
connection.start

consumer = Thread.new do
  puts "consumer start"
  cn  = connection.create_channel
  ex = cn.topic("weathr", :auto_delete => true)

  q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
  consumer_queued = true
  q.subscribe do |delivery_info, properties, payload|
    puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
    $stdout.flush
  end

  sleep 1 while !quit

  cn.close
  puts "consumer done"
end

# ensure queue is ready
sleep 0.125  while !consumer_queued

channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
  puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
  $stdout.flush
end

exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
  publish("Berkeley update",         :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
  publish("San Francisco update",    :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
  publish("New York update",         :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
  publish("São Paolo update",        :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
  publish("Hong Kong update",        :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
  publish("Kyoto update",            :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
  publish("Shanghai update",         :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
  publish("Rome update",             :mandatory => true, :routing_key => "europe.italy.roma").
  publish("Paris update",            :mandatory => true, :routing_key => "europe.france.paris")

channel.close
sleep 5

quit = true
consumer.join
connection.close

输出(附有退货通知书)

代码语言:javascript
复制
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: New York update, routing key is americas.north.us.ny.newyork
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done

接收到预期的消息,并返回其余的消息。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48674793

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档