首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RX Java Single未从Single.merge返回

RX Java Single未从Single.merge返回
EN

Stack Overflow用户
提问于 2020-01-02 17:32:38
回答 2查看 176关注 0票数 0

我有几个api调用(Rx单调),我想将它们组合成一个单调。我正在使用Single.merge尝试合并这些调用的结果,但是当我订阅响应时,我得到一个空数组,因为订阅已经发生了。我调用HealthChecker,希望subscribe返回结果列表:

代码语言:javascript
复制
     new HealthChecker(vertx)
        .getHealthChecks(endpoints)
        .subscribe(messages -> {
            log.info("Completed health check {}", messages);
            routingContext.response()
                          .putHeader("content-type", "text/json")
                          .end(messages.toString());
        });

运行状况检查器类执行以下逻辑:

代码语言:javascript
复制
public class HealthChecker {

    private static Logger log = LoggerFactory.getLogger(HealthChecker.class);

    private Vertx vertx;
    private WebClient client;

    public HealthChecker(Vertx vertx) {
        this.vertx = vertx;
        client = WebClient.create(vertx);
    }

    public Single<List<String>> getHealthChecks(JsonArray endpoints) {
        return Single.fromCallable(() -> {

            List<Single<String>> healthChecks = endpoints
                .stream()
                .map(endpoint -> getHealthStatus(client, endpoint.toString()))
                .collect(Collectors.toList());

            return consumeHealthChecks(healthChecks).blockingGet();

        });
    }

    private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
        return Single.fromCallable(() -> {
            List<String> messages = new ArrayList<>();

            Single.merge(healthChecks)
                  .timeout(1500, TimeUnit.MILLISECONDS)
                  .subscribe(message -> {
                      log.info("Got health check {}", message);
                      messages.add(message);
                  }, error -> {
                      log.info("Timeout - could not get health check");

                  });

            return messages;
        });
    }

    private Single<String> getHealthStatus(WebClient client, String endpoint) {
        log.info("getting endpoint {}", endpoint);

        return client
            .getAbs(endpoint)
            .rxSend()
            .map(HttpResponse::bodyAsString)
            .map(response -> response);

    }
}

我希望返回值是一个列表,除非我得到的是一个空列表,然后返回结果。日志如下:

代码语言:javascript
复制
09:12:06.235 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5000/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5001/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5002/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5003/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5004/status
09:12:06.300 [vert.x-eventloop-thread-1] INFO  sys.health.HealthCheckVerticle - Completed health check []
09:12:06.688 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:06.844 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:06.898 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":false}
09:12:07.072 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:07.255 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-01-02 18:35:51

为什么要使用fromCallableblockingGet?此外,您实际上没有等待merge运行完成,就启动了它,因此出现了一个空列表。相反,应在内部Singles上编写:

代码语言:javascript
复制
public Single<List<String>> getHealthChecks(JsonArray endpoints) {
    return Single.defer(() -> {

        List<Single<String>> healthChecks = endpoints
            .stream()
            .map(endpoint -> getHealthStatus(client, endpoint.toString()))
            .collect(Collectors.toList());

        return consumeHealthChecks(healthChecks);
    });
}

private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
    return Single.merge(healthChecks)
                 .timeout(1500, TimeUnit.MILLISECONDS)
                 .toList();
}
票数 1
EN

Stack Overflow用户

发布于 2020-01-02 18:42:42

你的问题在这里:

代码语言:javascript
复制
    private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
        return Single.fromCallable(() -> {
            List<String> messages = new ArrayList<>();

            Single.merge(healthChecks)
                .timeout(1500, TimeUnit.MILLISECONDS)
                .subscribe(message -> {
                    log.info("Got health check {}", message);
                    messages.add(message);
                }, error -> {
                    log.info("Timeout - could not get health check");
                });

            return messages;
    });
}

您正在创建一个空列表,然后从lambda返回它,这样从consumeHealthChecks返回的Single就是一个空列表……

我假设你想要做的事情是这样的:

代码语言:javascript
复制
    private Single<List<String>> mergeHealthChecks(List<Single<String>> healthChecks) {
      return Single.merge(healthChecks)
                .timeout(1500, TimeUnit.MILLISECONDS);
    }

然后像这样使用它:

代码语言:javascript
复制
    private void consumeHealthChecks() {
        Single<List<String>> healthChecks = new HealthChecker(vertx)
                .getHealthChecks(endpoints);

        mergeHealthChecks(healthChecks)
                .subscribe(message -> {
                    log.info("Merged and consumed all health checks. Final health check: ", message);
                }, error ->  {
                    log.info("Timeout - could not merge and consume health checks");
                });
    }

注意,当您使用Single.merge时,您将只能在subscribe成功回调中获得最终合并的单个消息的结果,因此,如果您希望在成功消费消息时记录每条消息,则需要在您的subscribe调用之前使用doOnSuccess挂接一个副作用调用来记录消息。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59561036

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档