首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何解决围棋-Stomp读出超时

如何解决围棋-Stomp读出超时
EN

Stack Overflow用户
提问于 2016-10-07 09:41:42
回答 2查看 2K关注 0票数 2

尝试订阅ActiveMQ(阿波罗)使用围棋,但我有一个读取超时错误。我的应用程序应该每天24小时运行,以处理收到的消息。

问题

  1. 是否有一种方法来保持订阅,尽管队列中不存在更多的消息?试图让ConnOpt.HeartBeat也不起作用
  2. 为什么在读取超时之后,我似乎仍然接受另一条消息?

以下是我的步骤:

  • 我将1000条消息放在输入队列中进行测试。
  • 运行订阅服务器,代码如下
  • 订阅者在2-3秒后读取完1000条消息,看到错误“2016/10/07 17:12:44订阅1: /queue/hflc-in: error message: reading”。
  • 另外放置1000条消息,但订阅似乎已经关闭,因此没有消息未被处理。

我的守则:

代码语言:javascript
复制
  var(
   serverAddr   = flag.String("server", "10.92.10.10:61613", "STOMP server    endpoint")
   messageCount = flag.Int("count", 10, "Number of messages to send/receive")
   inputQ       = flag.String("inputq", "/queue/hflc-in", "Input queue")
)

var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
   stomp.ConnOpt.Login("userid", "userpassword"),
   stomp.ConnOpt.Host("mybroker"),
   stomp.ConnOpt.HeartBeat(360*time.Second, 360*time.Second), // I put this but seems no impact
}

func main() {
  flag.Parse()
  jobschan := make(chan bean.Request, 10)
  //my init setup
  go getInput(1, jobschan)
}

func getInput(id int, jobschan chan bean.Request) {
   conn, err := stomp.Dial("tcp", *serverAddr, options...)

   if err != nil {
      println("cannot connect to server", err.Error())
      return
   }
   fmt.Printf("Connected %v \n", id)

   sub, err := conn.Subscribe(*inputQ, stomp.AckClient)
   if err != nil {
     println("cannot subscribe to", *inputQ, err.Error())
     return
   }

   fmt.Printf("Subscribed %v \n", id)
   var messageCount int
   for {
    msg := <-sub.C
    //expectedText := fmt.Sprintf("Message #%d", i)
    if msg != nil {

        actualText := string(msg.Body)
        
        var req bean.Request
        if actualText != "SHUTDOWN" {
            messageCount = messageCount + 1
            var err2 = easyjson.Unmarshal([]byte(actualText), &req)
            if err2 != nil {
                log.Error("Unable unmarshall", zap.Error(err))
                println("message body %v", msg.Body) // what is [0/0]0x0 ?
            } else {
                fmt.Printf("Subscriber %v received message, count %v \n  ", id, messageCount)
                jobschan <- req
            }
        } else {
            logchan <- "got some issue"
        }
    }
   }
  }

错误:

2016/10/07 17:12:44订阅1: /queue/hflc-in:错误消息:读取超时 E 2016-10-07T09:12:44Z无法启航 消息正文%v0/00x0

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-10-10 03:47:07

通过添加以下几行来解决:

在“阿波罗”中,注意到队列在空了几秒钟后就被删除了,所以把auto_delete_after放在apollo.xml的几个小时内,例如:

代码语言:javascript
复制
<queue id="hflc-in" dlq="dlq-in" nak_limit="3" auto_delete_after="7200"/>
<queue id="hflc-log" dlq="dlq-log" nak_limit="3" auto_delete_after="7200"/>
<queue id="hflc-out" dlq="dlq-out" nak_limit="3" auto_delete_after="7200"/>

在Go中,注意到go-stomp在队列中找不到任何消息后就会立即放弃,所以在conn选项中添加HeartBeat错误

代码语言:javascript
复制
var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
   //.... original configuration
   stomp.ConnOpt.HeartBeatError(360 * time.Second),
}

然而,对第二个问题仍然感到困惑。

票数 1
EN

Stack Overflow用户

发布于 2021-12-18 22:32:29

关于#2,我遇到了同样的问题,因为我将我的过程包装在一个无限循环中。我发现,在“最后一条消息”中,您使用空消息从通道订阅获得响应,但包含超时错误。这是为了能够优雅地处理断开。下面是我如何在我的应用程序上实现它

代码语言:javascript
复制
func process(subscription *stomp.Subscription) (error) {
    log.Println("Waiting for a message")
    msg := <- subscription.C
    if msg.Body != nil {
        log.Println("Message from the queue: ", string(msg.Body))
    } else {
        log.Println("message is empty")
        log.Println("error consuming more messages", msg.Err.Error())
        return msg.Err
    }
    return nil
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39914161

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档