我正在使用streadway公司的amqp库来连接一个rabbitmq服务器。库提供了一个channel.Consume()函数,它返回"<- chan传递“。它还提供了一个channel.Get()函数,该函数除其他外返回一个“传递”。
我必须实现一个channel.Get(). pop()功能,并且我使用的是然而,文档说:
"In almost all cases, using Channel.Consume will be preferred."这里的首选是否意味着推荐?在channel.Get()之上使用channel.Consume()有什么缺点吗?如果是,如何使用channel.Consume()实现Pop()函数?
发布于 2013-06-18 09:06:36
据我从医生那里得知,是的,“首选”的意思是“推荐的”。
channel.Get()似乎不像channel.Consume()那样提供更多的特性,而且由于它返回的是一个chan of Delivery,所以在并发代码中更容易使用,而不是每个单独的Delivery。
提到的额外特性是exclusive、noLocal和noWait,以及一个可选的args Table,它“具有队列或服务器的特定语义”。
要使用Pop()实现channel.Consume()函数,可以链接到来自amqp示例消费者的一些代码片段,创建一个函数,创建一个实际实现Pop()功能的Delivery,然后是goroutine。
关键是如果没有接收到,通道(在链接示例中)将在发送时阻塞。在本例中,handle() func使用range处理整个通道,直到它为空为止。您的Pop()功能可能更好地由一个函数提供,该函数只接收来自chan的最后一个值并返回它。每次运行时,它都会返回最新的Delivery。
编辑:示例函数从通道接收最新的值并对其进行处理(这可能不适用于您的用例,如果该函数将另一个Delivery上的chan发送到另一个要处理的函数中,则可能更有用。另外,我还没有测试下面的代码,它可能充满了错误)
func handle(deliveries <-chan amqp.Delivery, done chan error) {
select {
case d = <-deliveries:
// Do stuff with the delivery
// Send any errors down the done chan. for example:
// done <- err
default:
done <- nil
}
}发布于 2013-06-25 07:30:35
这真的取决于你想做什么。如果您只想从队列中获取一条消息(第一条消息),那么您可能应该使用basic.get,如果您计划处理来自队列的所有传入消息-- basic.consume就是您想要的。
可能,它不是平台或库的特定问题,而是协议理解问题。
UPD
我不太熟悉它的语言,所以我将尝试给您简要介绍AMQP的细节,并描述用例。
有时,您可能会遇到麻烦,并与basic.consume产生一定的开销:
使用basic.consume 您有这样的工作流:
basic.consume方法通知代理您希望接收消息basic.consume-ok消息
使用basic.get 您有这样的工作流:
basic.get发送到代理basic.get-ok方法,该方法保存消息或basic.empty方法,表示服务器上没有消息可用的情况
关于同步和异步方法的注意:预期同步将有一些响应,无论异步是否不响应
关于basic.qos方法prefetch-count属性的注意:当在basic.consume或basic.get上设置no-ack属性时,它将被忽略。
Spec在basic.get上有一个注释:“此方法使用为特定类型的应用程序设计的同步对话直接访问队列中的消息,在这些应用程序中,同步功能比性能更重要”,后者适用于连续消息消耗。
我的个人测试表明,使用basic.get (0.38659715652466)获取1000条消息要比在RabbitMQ 3.0.1上逐个(0.47398710250854)获取1000条消息要快,Erlang R14B04平均超过15%。
如果只使用主线程中的一条消息是您的情况--可能您必须使用basic.get。
您仍然只能异步地使用一条消息,例如在单独的线程中或使用某种事件机制。有时这对您的机器资源来说是更好的解决方案,但是您必须注意队列中没有消息可用的情况。
如果您必须一个一个地处理消息,显然应该使用basic.consume,我认为
https://stackoverflow.com/questions/17163252
复制相似问题