首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Golang Nats订阅问题

Golang Nats订阅问题
EN

Stack Overflow用户
提问于 2017-08-26 01:05:11
回答 2查看 3K关注 0票数 3

我目前的工作是一个微服务架构。在我将NATS插入到我的项目中之前,我想用它测试一些简单的场景。

在一个场景中,我有一个简单的发布器,它在一个运行在localhost:4222上的基本Nats服务器上的for循环中发布100.000条消息。

它最大的问题是订阅者。当他收到30.000 - 40.000条消息时,我的整个main.go程序和所有其他go例程都会停止,什么也不做。我可以直接用ctrl + c退出,但是发布者仍然在发送消息。当我打开一个新的终端并启动一个新的订户实例时,所有的一切都工作得很好,直到订户收到大约30000条消息。最糟糕的是,甚至没有一个错误出现,服务器上也没有日志,所以我不知道发生了什么。

在那之后,我尝试用QueueSubscribe-method替换Subscribe-method,一切工作正常。

Subscribe和QueueSubscribe的主要区别是什么?

NATS-Streaming是一个更好的机会吗?或者在哪种情况下我应该更喜欢流媒体,在哪种情况下标准NATS服务器

下面是我的代码:

出版商:

代码语言:javascript
复制
package main

import (
    "fmt"
    "log"
    "time"

    "github.com/nats-io/go-nats"
)

func main() {
    go createPublisher()

    for {

    }
}

func createPublisher() {

    log.Println("pub started")

    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    msg := make([]byte, 16)

    for i := 0; i < 100000; i++ {
        nc.Publish("alenSub", msg)
        if (i % 100) == 0 {
            fmt.Println("i", i)
        }
        time.Sleep(time.Millisecond)
    }

    log.Println("pub finish")

    nc.Flush()

}

订阅者:

代码语言:javascript
复制
package main

import (
    "fmt"
    "log"
    "time"

    "github.com/nats-io/go-nats"
)

var received int64

func main() {
    received = 0

    go createSubscriber()
    go check()

    for {

    }
}

func createSubscriber() {

    log.Println("sub started")

    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    nc.Subscribe("alenSub", func(msg *nats.Msg) {
        received++
    })
    nc.Flush()

    for {

    }
}

func check() {
    for {
        fmt.Println("-----------------------")
        fmt.Println("still running")
        fmt.Println("received", received)
        fmt.Println("-----------------------")
        time.Sleep(time.Second * 2)
    }
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-08-28 22:40:41

无限的for循环可能会让垃圾收集器饿死:https://github.com/golang/go/issues/15442#issuecomment-214965471

我只需要运行出版商就可以重现这个问题。要解决此问题,我建议使用sync.WaitGroup。下面是我如何更新评论中链接到的代码,以使其完成:

代码语言:javascript
复制
package main

import (
    "fmt"
    "log"
    "sync"
    "time"

    "github.com/nats-io/go-nats"
)

// create wait group
var wg sync.WaitGroup

func main() {
    // add 1 waiter
    wg.Add(1)
    go createPublisher()

    // wait for wait group to complete
    wg.Wait()
}

func createPublisher() {

    log.Println("pub started")
    // mark wait group done after createPublisher completes
    defer wg.Done()

    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    msg := make([]byte, 16)

    for i := 0; i < 100000; i++ {
        if errPub := nc.Publish("alenSub", msg); errPub != nil {
            panic(errPub)
        }

        if (i % 100) == 0 {
            fmt.Println("i", i)
        }
        time.Sleep(time.Millisecond * 1)
    }

    log.Println("pub finish")

    errFlush := nc.Flush()
    if errFlush != nil {
        panic(errFlush)
    }

    errLast := nc.LastError()
    if errLast != nil {
        panic(errLast)
    }

}

我建议以类似的方式更新上面的订阅者代码。

SubscribeQueueSubscriber之间的主要区别在于,在Subscribe中,所有订阅者的所有消息都来自。而在QueueSubscribe中,每条消息只向QueueGroup中的一个订户发送。

关于NATS流的其他功能的一些详细信息在这里:https://nats.io/documentation/streaming/nats-streaming-intro/

我们看到NATS和NATS流都用于从数据管道到控制平面的各种用例中。您的选择应该由您的用例需求驱动。

票数 1
EN

Stack Overflow用户

发布于 2018-10-02 13:35:37

如上所述,删除for{}循环。替换为runtime.Goexit()。

对于订阅者,您不需要在Go例程中创建订阅者。异步订阅者已经有了自己的回调Go例程。

还用原子或互斥保护了接收到的变量。

也可以在这里查看示例。

https://github.com/nats-io/go-nats/tree/master/examples

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45886359

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档