首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >PubSub不承认消息

PubSub不承认消息
EN

Stack Overflow用户
提问于 2020-04-24 12:47:50
回答 1查看 443关注 0票数 0

我有一个pubsub订阅(除了go例程的数目为1000),所有默认设置都是这样的,而且由于某种原因,消息永远不会被确认,因此重新发送。退货需要1到2分钟。在收到消息后不到1秒,我就给message.Ack()打电话,所以我不知道发生了什么。这不应该是因为应用程序和pubsub本身之间的延迟,因为在将消息发布到主题之后,消息实际上是立即传递的。

订阅的确认截止时间为10秒。我试着把这个增加到120个,但是同样的问题还是发生了。我想不出为什么这些消息没有被确认,因此被重新传递。

供参考的代码:

代码语言:javascript
复制
if err := pubsubSubscription(client).Receive(ctx, func(lctx context.Context, message *pubsub.Message) {
    log.Println("Received message") // occurs < 1s after publishing
    ack := message.Ack  
    if err := adapters.Handle(conn, id, gatewayAddr, message.Data); err != nil {
        log.Println("Will nack message")
        ack = message.Nack // not reached (in this context/example)
        cancel()
    }
    log.Println("Will ack message") // occurs ~200µs after message receipt
    ack()
}); err != nil {
    return fmt.Errorf("unable to subscribe to PubSub messages: %s", err)
}

为了澄清,我只向这个主题发布了一条消息,但是这个回调被无限地称为每1到2分钟一次。

编辑

只有当订阅接收设置中的go例程设置设置为高于runtime.NumCPU()的数字时,才会发生这种情况。这是预期的行为吗?如果是这样的话,如何与Kubernetes (我正在使用的)一起工作?

编辑2 --请求复制的完整代码

代码语言:javascript
复制
const (
    DefaultMaxOutstandingMessages = 1000000
    DefaultMaxOutstandingBytes    = 1e9
)

func SubscribeToTables(id int) error {
    var opts []option.ClientOption
    if sa := os.Getenv("SERVICE_ACCOUNT"); sa != "" {
        opts = append(opts, option.WithCredentialsJSON([]byte(sa)))
    }

    ctx := context.Background()
    projectID := os.Getenv("PROJECT_ID")
    client, err := pubsub.NewClient(ctx, projectID, opts...)
    if err != nil {
        return fmt.Errorf("error creating GCP PubSub client: %s", err)
    }

    cctx, cancel := context.WithCancel(ctx)
    go func() {
        qch := make(chan os.Signal)
        signal.Notify(qch, os.Interrupt, syscall.SIGTERM)
        <-qch
        cancel()
    }()

    mch := make(chan *pubsub.Message)
    gatewayAddr := os.Getenv("GATEWAY_ADDRESS")
    conn, err := adapters.GetGatewayConn(gatewayAddr)
    if err != nil {
        return fmt.Errorf("unable to connect to Gateway: %s", err)
    }
    go func() {
        for {
            select {
            case message := <-mch:
                if err := adapters.Handle(conn, id, gatewayAddr, message.Data); err != nil {
                    cancel()
                    return
                }
                message.Ack()
            case <-ctx.Done():
                return
            }
        }
    }()
    if err := pubsubSubscription(client).Receive(cctx, func(_ context.Context, message *pubsub.Message) {
        mch <- message
    }); err != nil {
        return fmt.Errorf("unable to subscribe to PubSub messages: %s", err)
    }
    return nil
}

func pubsubSubscription(client *pubsub.Client) *pubsub.Subscription {
    sub := client.Subscription(os.Getenv("SUBSCRIPTION_ID"))
    sub.ReceiveSettings = pubsub.ReceiveSettings{
        MaxExtension:       pubsub.DefaultReceiveSettings.MaxExtension,
        MaxExtensionPeriod: pubsub.DefaultReceiveSettings.MaxExtensionPeriod,
        MaxOutstandingMessages: parsePubSubReceiveSetting(
            "MAX_OUTSTANDING_MESSAGES",
            "max outstanding messages",
            DefaultMaxOutstandingMessages,
        ),
        MaxOutstandingBytes: parsePubSubReceiveSetting(
            "MAX_OUTSTANDING_BYTES",
            "max outstanding bytes",
            DefaultMaxOutstandingBytes,
        ),
        NumGoroutines: parsePubSubReceiveSetting( // if this is higher than runtimie.NumCPU(), the aforementioned issue occurs 
            "NUM_GO_ROUTINES",
            "Go-routines",
            1000, 
        ),
    }
    return sub
}

func parsePubSubReceiveSetting(env, name string, defaultValue int) int {
    e := os.Getenv(env)
    i, err := strconv.Atoi(e)
    if err != nil {
        log.Printf("Unable to parse number of GCP PubSub %s. Can't parse '%s' as int", name, e)
        log.Printf("Using default number of %s (%d)", name, defaultValue)
        return defaultValue
    }
    return i
}
EN

回答 1

Stack Overflow用户

发布于 2020-04-24 19:46:29

我怀疑你退出代码太快了。您必须使用cancel()上下文来停止Receive循环并将数据刷新回PubSub。

尝试在您的cancel()之后添加ack()

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61408720

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档