首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Nats的Java请求/答复

Nats的Java请求/答复
EN

Stack Overflow用户
提问于 2020-03-10 11:48:19
回答 1查看 944关注 0票数 0

我有以前的卡夫卡知识,我一直在玩Nats.io,这似乎是一个非常可靠的消息选择。

特别是,我对详细记录在案的请求/应答机制很感兴趣,但我在用Jnats驱动程序正确实现它时遇到了困难。

这是我的连接器:

代码语言:javascript
复制
 // Single server nats connection
    @PostConstruct
    public void connect() throws ExternalServiceUnavailableException {

        Options options = new Options.Builder()
                .server(connectionString)
                .maxReconnects(20)
                .reconnectWait(Duration.ofSeconds(5))
                .connectionTimeout(Duration.ofSeconds(5))
                .connectionListener((conn, type) -> {
                    if (type == ConnectionListener.Events.CONNECTED) {
                        LOG.info("Connected to Nats Server");
                    } else if (type == ConnectionListener.Events.RECONNECTED) {
                        LOG.info("Reconnected to Nats Server");
                    } else if (type == ConnectionListener.Events.DISCONNECTED) {
                        LOG.error("Disconnected to Nats Server, reconnect attempt in seconds");
                    } else if (type == ConnectionListener.Events.CLOSED) {
                        LOG.info("Closed connection with Nats Server");
                    }
                })
                .build();


        try {
            connection = Nats.connect(options);

        } catch (Exception e) {
            LOG.error("Unable to connect to Nats Server");
            throw new ExternalServiceUnavailableException(ExternalServiceUnavailableException.Service.NATS);
        }

    }

这是请求方法(用于测试的等待时间非常长):

代码语言:javascript
复制
 public Optional<String> asyncRequest(String topic, String message) throws ExternalServiceUnavailableException {

        Future<Message> reply = natsConnector.getConnection().request(topic, message.getBytes());
        try {

            Message msg = reply.get(10L, TimeUnit.SECONDS);

            LOG.info(new String(msg.getData()));

            return Optional.of(new String(msg.getData(), StandardCharsets.UTF_8));

        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
            LOG.error("Unable to retrieve response for the sent request: " + message);
            throw new ExternalServiceUnavailableException(ExternalServiceUnavailableException.Service.NATS);
        }

    }

这是具有应答机制的响应处理程序:

代码语言:javascript
复制
 @PostConstruct
    private void init() {
        Dispatcher dispatcher = natsConnector.getConnection().createDispatcher(message -> {
        });

        Subscription assetsInfo = dispatcher.subscribe("assets-info", message -> {
        JSONObject requestMessage = new JSONObject(new String(message.getData(), StandardCharsets.UTF_8));

            if (requestMessage.getString("requestType").equals("stock-status")) {

                if (requestMessage.getString("of").equals("all")) {

                    JSONObject response = assetQuery.retrieveYesterdayStockStatus();
                    LOG.info("response ready");
                    natsOperation.publishEvent("assets-info", response);
                    LOG.info("message sent");
                }
            }
        });
    }

我的两个独立服务通过一个被篡改的Nats.io进行通信,并且我可以通过Nats客户端正确地检查两个服务是否就同一主题发送了消息。

不幸的是,当调用asyncRequest函数时,"Requestor“并不完全处理答复,即使在reply.get(...)中等待很高。

当我试图在调试模式下计算reply对象时,它中没有任何数据,并显示了一个TimeoutException

msg.getData(),程序崩溃。

你们有什么线索给我吗?谢谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-03-11 17:15:44

您应该更改您的“应答器”代码,以便从原始消息发布到replyTo主题。

代码语言:javascript
复制
@PostConstruct
    private void init() {
        Dispatcher dispatcher = natsConnector.getConnection().createDispatcher(message -> {
        });

        Subscription assetsInfo = dispatcher.subscribe("assets-info", message -> {
        JSONObject requestMessage = new JSONObject(new String(message.getData(), StandardCharsets.UTF_8));

            if (requestMessage.getString("requestType").equals("stock-status")) {

                if (requestMessage.getString("of").equals("all")) {

                    JSONObject response = assetQuery.retrieveYesterdayStockStatus();
                    LOG.info("response ready");
                    //See Change Here
                    natsOperation.publish(message.getReplyTo(), response);
                    LOG.info("message sent");
                }
            }
        });
    }

请求应答机制正在寻找生成的replyTo主题上的单个响应。

请参阅https://docs.nats.io/nats-concepts/reqreply

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60617016

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档