是否有人可以帮助您在spring异步引导中配置NATS jet流订阅:寻找类似@kafkalistener等类似于Nats jetstream的注释
我能够使用端点来提取消息,但是当尝试使用pushSubscription来提取消息时,不会调用dispatcherhandler。需要知道如何使侦听器处于活动状态,并在消息发布到主题后立即使用消息。
/examples对此的任何见解都将是有帮助的,谢谢。
发布于 2022-06-19 17:04:58
我不知道你的JetStream保留政策是什么,也不知道你想订阅的方式。但是我有WorkQueuePolicy推送订阅的示例代码,希望这会对您有所帮助。
public static void subscribe(String streamName, String subjectKey,
String queueName, IMessageHandler iMessageHandler) throws IOException,
InterruptedException, JetStreamApiException {
long s = System.currentTimeMillis();
Connection nc = Nats.connect(options);
long e = System.currentTimeMillis();
logger.info("Nats Connect in " + (e - s) + " ms");
JetStream js = nc.jetStream();
Dispatcher disp = nc.createDispatcher();
MessageHandler handler = (msg) -> {
try {
iMessageHandler.onMessageReceived(msg);
} catch (Exception exc) {
msg.nak();
}
};
ConsumerConfiguration cc = ConsumerConfiguration.builder()
.durable(queueName)
.deliverGroup(queueName)
.maxDeliver(3)
.ackWait(Duration.ofMinutes(2))
.build();
PushSubscribeOptions so = PushSubscribeOptions.builder()
.stream(streamName)
.configuration(cc)
.build();
js.subscribe(subjectKey, disp, handler, false, so);
System.out.println("NatsUtil: " + durableName + "subscribe");
}IMessageHandler是我处理nats.io接收消息的自定义接口。
发布于 2022-11-18 16:40:11
首先,配置NATS连接。在这里,您将指定所有连接细节,如服务器地址(Es)、身份验证选项、连接级回调等。
Connection natsConnection = Nats.connect(
new Options.Builder()
.server("nats://localhost:4222")
.connectionListener((connection, eventType) -> {})
.errorListener(new ErrorListener(){})
.build());然后构造一个JetStream实例
JetStream jetStream = natsConnection.jetStream();现在你可以订阅主题了。请注意,JetStream使用者可以是持久的,也可以是短暂的,可以根据推挽逻辑工作。请参考NATS文档(https://docs.nats.io/nats-concepts/jetstream/consumers)为您的特定用例做出适当的选择。下面的示例构造持久的推送消费者:
//Subscribe to a subject.
String subject = "my-subject";
//queues are analogous to Kafka consumer groups, i.e. consumers belonging
//to the same queue (or, better to say, reading the same queue) will get
//only one instance of each message from the corresponding subject
//and only one of those consumers will be chosen to process the message
String queueName = "my-queue";
//Choosing delivery policy is analogous to setting the current offset
//in a partition for a consumer or consumer group in Kafka.
DeliverPolicy deliverPolicy = DeliverPolicy.New;
PushSubscribeOptions subscribeOptions = ConsumerConfiguration.builder()
.durable(queueName)
.deliverGroup(queueName)
.deliverPolicy(deliverPolicy)
.buildPushSubscribeOptions();
Subscription subscription = jetStream.subscribe(
subject,
queueName,
natsConnection.createDispatcher(),
natsMessage -> {
//This callback will be called for incoming messages
//asynchronously. Every subscription configured this
//way will be backed by its own thread, that will be
//used to call this callback.
},
true, //true if you want received messages to be acknowledged
//automatically, otherwise you will have to call
//natsMessage.ack() manually in the above callback function
subscribeOptions);至于声明性API (即某种形式的@NatsListener注释,类似于Apache项目的Spring @KafkaListener ),Spring中没有现成的。如果您觉得自己绝对需要它,您可以自己编写一个,如果您熟悉s或其他可以帮助实现该功能的扩展机制。或者,你可以参考第三方利己人,看上去一群人(包括我自己)在从卡夫卡转向北约时感觉有点不舒服,所以他们试图从卡夫卡世界中拿出与他们一起做事的惯常方式。在github上可以找到一些例子:
也许还有其他人。
https://stackoverflow.com/questions/70888947
复制相似问题