首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >NATS JetStream侦听器

NATS JetStream侦听器
EN

Stack Overflow用户
提问于 2022-01-28 04:49:50
回答 2查看 375关注 0票数 2

是否有人可以帮助您在spring异步引导中配置NATS jet流订阅:寻找类似@kafkalistener等类似于Nats jetstream的注释

我能够使用端点来提取消息,但是当尝试使用pushSubscription来提取消息时,不会调用dispatcherhandler。需要知道如何使侦听器处于活动状态,并在消息发布到主题后立即使用消息。

/examples对此的任何见解都将是有帮助的,谢谢。

EN

回答 2

Stack Overflow用户

发布于 2022-06-19 17:04:58

我不知道你的JetStream保留政策是什么,也不知道你想订阅的方式。但是我有WorkQueuePolicy推送订阅的示例代码,希望这会对您有所帮助。

代码语言:javascript
复制
public static void subscribe(String streamName, String subjectKey,
                             String queueName, IMessageHandler iMessageHandler) throws IOException,
        InterruptedException, JetStreamApiException {
    long s = System.currentTimeMillis();
    Connection nc = Nats.connect(options);
    long e = System.currentTimeMillis();
    logger.info("Nats Connect in " + (e - s) + " ms");
    JetStream js = nc.jetStream();
    Dispatcher disp = nc.createDispatcher();
    MessageHandler handler = (msg) -> {
        try {
            iMessageHandler.onMessageReceived(msg);
        } catch (Exception exc) {
            msg.nak();
        }
    };
    ConsumerConfiguration cc = ConsumerConfiguration.builder()
            .durable(queueName)
            .deliverGroup(queueName)
            .maxDeliver(3)
            .ackWait(Duration.ofMinutes(2))
            .build();
    PushSubscribeOptions so = PushSubscribeOptions.builder()
            .stream(streamName)
            .configuration(cc)
            .build();
    js.subscribe(subjectKey, disp, handler, false, so);
    System.out.println("NatsUtil: " + durableName + "subscribe");
}

IMessageHandler是我处理nats.io接收消息的自定义接口。

票数 0
EN

Stack Overflow用户

发布于 2022-11-18 16:40:11

首先,配置NATS连接。在这里,您将指定所有连接细节,如服务器地址(Es)、身份验证选项、连接级回调等。

代码语言:javascript
复制
Connection natsConnection = Nats.connect(
            new Options.Builder()
                    .server("nats://localhost:4222")
                    .connectionListener((connection, eventType) -> {})
                    .errorListener(new ErrorListener(){})
                    .build());

然后构造一个JetStream实例

代码语言:javascript
复制
JetStream jetStream = natsConnection.jetStream();

现在你可以订阅主题了。请注意,JetStream使用者可以是持久的,也可以是短暂的,可以根据推挽逻辑工作。请参考NATS文档(https://docs.nats.io/nats-concepts/jetstream/consumers)为您的特定用例做出适当的选择。下面的示例构造持久的推送消费者:

代码语言:javascript
复制
    //Subscribe to a subject.
    String subject = "my-subject";

    //queues are analogous to Kafka consumer groups, i.e. consumers belonging
    //to the same queue (or, better to say, reading the same queue) will get
    //only one instance of each message from the corresponding subject
    //and only one of those consumers will be chosen to process the message
    String queueName = "my-queue";

    //Choosing delivery policy is analogous to setting the current offset
    //in a partition for a consumer or consumer group in Kafka.
    DeliverPolicy deliverPolicy = DeliverPolicy.New;
    PushSubscribeOptions subscribeOptions = ConsumerConfiguration.builder()
            .durable(queueName)
            .deliverGroup(queueName)
            .deliverPolicy(deliverPolicy)
            .buildPushSubscribeOptions();
    Subscription subscription = jetStream.subscribe(
            subject,
            queueName,
            natsConnection.createDispatcher(),
            natsMessage -> {
                //This callback will be called for incoming messages
                //asynchronously. Every subscription configured this
                //way will be backed by its own thread, that will be
                //used to call this callback.
            },
            true,  //true if you want received messages to be acknowledged
                   //automatically, otherwise you will have to call
                   //natsMessage.ack() manually in the above callback function
            subscribeOptions);

至于声明性API (即某种形式的@NatsListener注释,类似于Apache项目的Spring @KafkaListener ),Spring中没有现成的。如果您觉得自己绝对需要它,您可以自己编写一个,如果您熟悉s或其他可以帮助实现该功能的扩展机制。或者,你可以参考第三方利己人,看上去一群人(包括我自己)在从卡夫卡转向北约时感觉有点不舒服,所以他们试图从卡夫卡世界中拿出与他们一起做事的惯常方式。在github上可以找到一些例子:

也许还有其他人。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70888947

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档