我正试图在Go中写一个RabbitMQ消费者。它一次从队列中取出5个对象并对它们进行处理。此外,如果成功地处理了否则发送到死信队列5次,然后丢弃,那么它应该被无限地运行,并处理使用者的取消事件。我有几个问题:
BasicConsumer -go 参考文献中有关于EventingBasicConsumer和EventingBasicConsumer的概念吗?Model在RabbitMQ中是什么,在RabbitMq中是什么?ttl后再次对其进行重新队列consumerTag参数在ch.Consume函数中的意义是什么?channel.Get()还是channel.Consume()?为了满足上述要求,我需要在下面的代码中做哪些修改。我这么问是因为我找不到像样的RabbitMq文档。
func main() {
consumer()
}
func consumer() {
objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}
initializeConn(&objConsumerConn.conn)
ch, err := objConsumerConn.conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
msgs, err := ch.Consume(
objConsumerConn.queueName, // queue
"demo1", // consumerTag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
k := new(EventCaptureData)
b := bytes.Buffer{}
b.Write(d.Body)
dec := gob.NewDecoder(&b)
err := dec.Decode(&k)
d.Ack(true)
if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
fmt.Println(k)
}
}()
log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
<-forever
}编辑问题:
按照链接link1 link2中的建议,我延迟了消息的处理。但问题是,即使在ttl之后,消息也会从死信队列返回到原来的队列。我正在使用RabbitMQ 3.0.0。有人能指出问题出在哪里吗?
发布于 2016-04-06 17:26:39
在“兔go go”中是否有BasicConsumer和EventingBasicConsumer的概念?
不完全是这样,但Channel.Get和Channel.Consume调用提供了类似的概念。对于Channel.Get,您有一个非阻塞调用,如果有可用的消息,可以获取第一个消息,或者返回ok=false。使用Channel.Consume,排队的消息被传递到一个通道。
在RabbitMQ中什么是模型,在RabbitMq中有什么?
如果您指的是IModel和Connection.CreateModel in C# RabbitMQ,那么这是来自C#库,而不是RabbitMQ本身。这只是试图从RabbitMQ的“频道”术语中抽象化出来,但从未流行过。
如何在死信队列失败时发送对象,并在ttl之后重新对其进行重新队列。
将delivery.Nack方法与requeue=false一起使用。
在下面的代码中,consumerTag参数在ch.Consume函数中的意义是什么?
ConsumerTag只是一个消费者标识符。它可以用channel.Cancel取消通道,并识别负责交付的消费者。与channel.Consume一起传递的所有消息都将设置ConsumerTag字段。
对于这个场景,我们应该使用
channel.Get()还是channel.Consume()?
我认为channel.Get()几乎从来没有比channel.Consume()更可取。使用channel.Get,您将轮询队列并浪费CPU什么都不做,这在Go中是没有意义的。
为了满足上述要求,我需要在下面的代码中做哪些修改。
multiple=true并为批处理调用一次。一旦消息进入死信队列,您就必须检查delivery.Headers["x-death"]头的死信次数,并在它被重试5次时调用delivery.Reject。https://stackoverflow.com/questions/36419994
复制相似问题