首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Nats Jetstream恰好一次交付

Nats Jetstream恰好一次交付
EN

Stack Overflow用户
提问于 2022-06-30 10:50:24
回答 1查看 1K关注 0票数 4

我想用Nats Jetstream实现一个精确的一次交付系统。文档中说Jetstream有这个选项,但是没有关于它的工作方式和客户端如何实现这个选项的示例或细节。我知道在publisher端,我们可以在创建Stream时设置MsgId并指定复制窗口,但是消费者端呢?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-06-30 11:43:52

这是准确-一次交货的文档。这有点用词不当,因为实际需要的(以及这个特性提供的)完全是一次处理。

正如您所指出的,它是服务器在接收已发布消息时的去复制和接收消息的订阅的双重ack调用(如果需要的话再加重试)的组合。

下面是一个示例(为简洁而省略的过量错误处理)。在启用JetStream:nats-server --js的情况下启动服务器,然后运行此代码(假设为nats.go v1.16+)。

代码语言:javascript
复制
package main

import (
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func failOnErr(err error) {
    if err != nil {
        log.Fatal(err)
    }
}

func main() {
    // Connect and get the JetStream context.
    nc, _ := nats.Connect(nats.DefaultURL)
    js, _ := nc.JetStream()

    // Create a test stream.
    _, err := js.AddStream(&nats.StreamConfig{
        Name:       "test",
        Storage:    nats.MemoryStorage,
        Subjects:   []string{"test.>"},
        Duplicates: time.Minute,
    })
    failOnErr(err)

    defer js.DeleteStream("test")

    // Publish some messages with duplicates.
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))

    // Create an explicit pull consumer on the stream.
    _, err = js.AddConsumer("test", &nats.ConsumerConfig{
        Durable:       "test",
        AckPolicy:     nats.AckExplicitPolicy,
        DeliverPolicy: nats.DeliverAllPolicy,
    })
    failOnErr(err)
    defer js.DeleteConsumer("test", "test")

    // Create a subscription on the pull consumer.
    // Subject can be empty since it defaults to all subjects bound to the stream.
    sub, err := js.PullSubscribe("", "test", nats.BindStream("test"))
    failOnErr(err)

    // Only two should be delivered.
    batch, _ := sub.Fetch(10)
    log.Printf("%d messages", len(batch))

    // AckSync both to ensure the server received the ack.
    batch[0].AckSync()
    batch[1].AckSync()

    // Should be zero.
    batch, _ = sub.Fetch(10, nats.MaxWait(time.Second))
    log.Printf("%d messages", len(batch))
}

值得注意的是,如果一个AckSync确实失败了(可以从它返回一个错误),那么它就会在此代码上重新尝试ack,直到收到响应为止。客户端的冗馀攻击是不可行的。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72814502

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档