我想用Nats Jetstream实现一个精确的一次交付系统。文档中说Jetstream有这个选项,但是没有关于它的工作方式和客户端如何实现这个选项的示例或细节。我知道在publisher端,我们可以在创建Stream时设置MsgId并指定复制窗口,但是消费者端呢?
发布于 2022-06-30 11:43:52
这是准确-一次交货的文档。这有点用词不当,因为实际需要的(以及这个特性提供的)完全是一次处理。
正如您所指出的,它是服务器在接收已发布消息时的去复制和接收消息的订阅的双重ack调用(如果需要的话再加重试)的组合。
下面是一个示例(为简洁而省略的过量错误处理)。在启用JetStream:nats-server --js的情况下启动服务器,然后运行此代码(假设为nats.go v1.16+)。
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func failOnErr(err error) {
if err != nil {
log.Fatal(err)
}
}
func main() {
// Connect and get the JetStream context.
nc, _ := nats.Connect(nats.DefaultURL)
js, _ := nc.JetStream()
// Create a test stream.
_, err := js.AddStream(&nats.StreamConfig{
Name: "test",
Storage: nats.MemoryStorage,
Subjects: []string{"test.>"},
Duplicates: time.Minute,
})
failOnErr(err)
defer js.DeleteStream("test")
// Publish some messages with duplicates.
js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
js.Publish("test.2", []byte("world"), nats.MsgId("2"))
js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
js.Publish("test.2", []byte("world"), nats.MsgId("2"))
js.Publish("test.2", []byte("world"), nats.MsgId("2"))
// Create an explicit pull consumer on the stream.
_, err = js.AddConsumer("test", &nats.ConsumerConfig{
Durable: "test",
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
})
failOnErr(err)
defer js.DeleteConsumer("test", "test")
// Create a subscription on the pull consumer.
// Subject can be empty since it defaults to all subjects bound to the stream.
sub, err := js.PullSubscribe("", "test", nats.BindStream("test"))
failOnErr(err)
// Only two should be delivered.
batch, _ := sub.Fetch(10)
log.Printf("%d messages", len(batch))
// AckSync both to ensure the server received the ack.
batch[0].AckSync()
batch[1].AckSync()
// Should be zero.
batch, _ = sub.Fetch(10, nats.MaxWait(time.Second))
log.Printf("%d messages", len(batch))
}值得注意的是,如果一个AckSync确实失败了(可以从它返回一个错误),那么它就会在此代码上重新尝试ack,直到收到响应为止。客户端的冗馀攻击是不可行的。
https://stackoverflow.com/questions/72814502
复制相似问题