首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >围棋中的RabbitMQ消费者

围棋中的RabbitMQ消费者
EN

Stack Overflow用户
提问于 2016-04-05 07:38:50
回答 1查看 6.1K关注 0票数 4

我正试图在Go中写一个RabbitMQ消费者。它一次从队列中取出5个对象并对它们进行处理。此外,如果成功地处理了否则发送到死信队列5次,然后丢弃,那么它应该被无限地运行,并处理使用者的取消事件。我有几个问题:

  1. 在RabbitMq-go BasicConsumer -go 参考文献中有关于EventingBasicConsumerEventingBasicConsumer的概念吗?
  2. Model在RabbitMQ中是什么,在RabbitMq中是什么?
  3. 如何在死信队列失败时发送对象,并在ttl后再次对其进行重新队列
  4. 在下面的代码中,consumerTag参数在ch.Consume函数中的意义是什么?
  5. 对于这个场景,我们应该使用channel.Get()还是channel.Consume()

为了满足上述要求,我需要在下面的代码中做哪些修改。我这么问是因为我找不到像样的RabbitMq文档。

代码语言:javascript
复制
   func main() {

        consumer()        
    }

    func consumer() {

        objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}      
        initializeConn(&objConsumerConn.conn)


        ch, err := objConsumerConn.conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        msgs, err := ch.Consume(
                objConsumerConn.queueName, // queue
                "demo1",     // consumerTag
                false,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
            for d := range msgs {                  
                k := new(EventCaptureData)
                b := bytes.Buffer{}
                b.Write(d.Body)
                dec := gob.NewDecoder(&b)  
                err := dec.Decode(&k)
                d.Ack(true)  

                if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
                    fmt.Println(k)                        
            }
        }()      

        log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
        <-forever

    }

编辑问题:

按照链接link1 link2中的建议,我延迟了消息的处理。但问题是,即使在ttl之后,消息也会从死信队列返回到原来的队列。我正在使用RabbitMQ 3.0.0。有人能指出问题出在哪里吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-04-06 17:26:39

在“兔go go”中是否有BasicConsumer和EventingBasicConsumer的概念?

不完全是这样,但Channel.GetChannel.Consume调用提供了类似的概念。对于Channel.Get,您有一个非阻塞调用,如果有可用的消息,可以获取第一个消息,或者返回ok=false。使用Channel.Consume,排队的消息被传递到一个通道。

在RabbitMQ中什么是模型,在RabbitMq中有什么?

如果您指的是IModelConnection.CreateModel in C# RabbitMQ,那么这是来自C#库,而不是RabbitMQ本身。这只是试图从RabbitMQ的“频道”术语中抽象化出来,但从未流行过。

如何在死信队列失败时发送对象,并在ttl之后重新对其进行重新队列。

delivery.Nack方法与requeue=false一起使用。

在下面的代码中,consumerTag参数在ch.Consume函数中的意义是什么?

ConsumerTag只是一个消费者标识符。它可以用channel.Cancel取消通道,并识别负责交付的消费者。与channel.Consume一起传递的所有消息都将设置ConsumerTag字段。

对于这个场景,我们应该使用channel.Get()还是channel.Consume()

我认为channel.Get()几乎从来没有比channel.Consume()更可取。使用channel.Get,您将轮询队列并浪费CPU什么都不做,这在Go中是没有意义的。

为了满足上述要求,我需要在下面的代码中做哪些修改。

  1. 因为您一次批处理5次,所以您可以拥有一个从使用者通道接收的goroutine,一旦它获得了5次传递,您就调用另一个函数来处理它们。
  2. 要确认或发送到死信队列,您将使用delivery.Ackdelivery.Nack函数。您可以使用multiple=true并为批处理调用一次。一旦消息进入死信队列,您就必须检查delivery.Headers["x-death"]头的死信次数,并在它被重试5次时调用delivery.Reject
  3. 使用channel.NotifyCancel处理取消事件。
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36419994

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档