我不知道如何在本地订阅我的RabbitMQ (当我使用CloudAMPQ时,它可以工作)。我怀疑问题是我的SendPriceJob没有连接到正确的连接/通道/交换机,但我不确定。
我有一个worker,它每隔一分钟从API获取数据。
class FetchPriceJob
@queue = :update_price
def self.perform
# Do some stuff
FetchPriceJob.new.publish(response.to_json)
end
def publish(data)
channel.default_exchange.publish(data, routing_key: queue.name)
connection.close
end
def connection
@conn ||= begin
conn = Bunny.new(host: "localhost", vhost: "/", user: "guest", password: "guest")
conn.start
end
end
def channel
@channel ||= connection.create_channel
end
def queue
@queue ||= channel.queue('current_prices')
end
end我有另一个worker,它打开一个连接并进行侦听。
module SendPriceJob
@queue = :price_serve
def self.perform
conn = Bunny.new(host: "localhost", vhost: "/", user: "guest", password: "guest")
conn.start
ch = conn.create_channel
x = ch.default_exchange
q = ch.queue('current_prices')
begin
q.subscribe(block: true) do |_, _, body|
ActionCable.server.broadcast 'prices', body
end
rescue Interrupt => _
ch.close
conn.close
end
end
end这两个工作进程是由流程管理器启动的。
# Procfile
elastic: elasticsearch
redis: redis-server
web: rails server -p 3000
send_worker: QUEUE=price_serve rake resque:work
fetch_worker: QUEUE=update_price rake resque:work
scheduler: rake resque:scheduler我正在运行我的RabbitMQ服务器:http://prntscr.com/i6rcwv。我在连接/通道/交换:http://prntscr.com/i6rd7q上成功地对消息进行了排队。从我的日志中可以看出,我正在运行的调度器以及生产者:http://prntscr.com/i6rdxf都工作正常。
这是我第一次使用message queues,所以我可能做了一些完全错误的事情。我觉得我应该很接近,因为它是和CloudAMQP一起工作的。唯一的区别是Bunny.new被配置为连接到外部API。
发布于 2018-01-29 03:02:52
发布于 2018-01-29 02:08:17
在此方法中:
def publish(data)
channel.default_exchange.publish(data, routing_key: queue.name)
connection.close
end你能不能试试这个:
queue.publish(data, routing_key: queue.name)文档:http://rubybunny.info/articles/exchanges.html#default_exchange
https://stackoverflow.com/questions/48488478
复制相似问题