首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用ReactiveX进行Http调用

使用ReactiveX进行Http调用
EN

Stack Overflow用户
提问于 2017-02-24 23:18:19
回答 1查看 8.3K关注 0票数 8

我是ReactiveX for Java的新手,我有下面的代码块来进行外部http调用,但它不是异步的。我们使用的是rxjava 1.2和Java1.8

代码语言:javascript
复制
  private ResponseEntity<String> callExternalUrl(String url, String json, HttpMethod method) {

    RestTemplate restTemplate;
    HttpEntity request;

      request = new HttpEntity(jsonContent, httpHeaders);

    return restTemplate.exchange(url, httpMethod, request, String.class);

  }

我在网上找到了下面的代码块,但我无法完全理解它以及如何将它应用到我的代码库中。

代码语言:javascript
复制
private RxClient<RxObservableInvoker> httpClient;
public <T> Observable<T> fetchResult(String url, Func1<Response, T> mapper) {

    return httpClient.target(url)
        .request()
        .rx()
        .get()
        .subscribeOn(Schedulers.io())
        .map(mapper);
  }
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-03-07 02:09:18

如果我正确地理解了你,你需要这样的东西来包装你现有的callExternalUrl

代码语言:javascript
复制
static Observable<String> callExternalUrlAsync(String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> callExternalUrl(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends String>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

守则简介:

  1. 它调度现有callExternalUrlSchedulers.io上的执行。
  2. ResponseEntity<T>转换为成功的T和错误案例。它也发生在io调度程序上,但它并不重要,因为它确实很短。(如果callExternalUrl内部有异常,则按原样传递。)
  3. 使订阅者订阅要在Schedulers.computation上执行的结果。

警告

  1. 您可能希望为subscribeOnobserveOn使用自定义调度程序。
  2. 您可能希望在传递给flatMap的第一个lambda中有一些更好的逻辑来区分成功和错误,当然还需要一些更具体的异常类型。

高阶幻象

如果您愿意使用更高级的函数,并以少量的性能换取更少的代码重复,那么您可以这样做:

代码语言:javascript
复制
// Universal wrapper method
static <T> Observable<T> wrapCallExternalAsAsync(Func3<String, String, HttpMethod, ResponseEntity<T>> externalCall, String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> externalCall.call(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends T>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

static Observable<String> callExternalUrlAsync_HigherOrder(String url, String json, HttpMethod method)
{
    return wrapCallExternalAsAsync(MyClass::callExternalUrl, url, json, method);
}

MyClass在哪里,无论您的callExternalUrl在哪里。

更新(仅异步调用)

私有静态RxClient httpClient = Rx.newClient(RxObservableInvoker.class);//在这里可以传递自定义ExecutorService

代码语言:javascript
复制
private <T> Observable<String> executeHttpAsync(String url, String httpMethod, Entity<T> entity) {
    return httpClient.target(url)
            .request()
            .headers(httpHeaders) // assuming httpHeaders is something global as in your example
            .rx()
            .method(httpMethod, entity)
            .map(resp -> {
                if (200 != resp.getStatus()) {
                    throw new RuntimeException("Bad status code " + resp.getStatus());
                } else {
                    if (!resp.hasEntity()) {
                        // return null; // or error?
                        throw new RuntimeException("Empty response"); // or empty?
                    } else {
                        try {
                            return resp.readEntity(String.class);
                        } catch (Exception ex) {
                            throw new RuntimeException(ex); // wrap exception into unchecked
                        }
                    }
                }
            })
            .observeOn(Schedulers.computation());
}

private Observable<String> executeGetAsync(String url) {
    return executeHttpAsync(url, "GET", null);
}

private Observable<String> executePostAsync(String url, String json) {
    return executeHttpAsync(url, "POST", Entity.json(json));
}

同样,类似的警告也适用于

  1. 您可能希望为newClient调用和observeOn使用自定义调度程序。
  2. 您可能希望有一些更好的错误处理逻辑,而不仅仅是检查它是否是HTTP 200,而且您肯定需要一些更具体的异常类型。但这都是特定的业务逻辑,所以这取决于你。

此外,从您的示例中也不清楚请求(HttpEntity)的主体是如何构建的,以及您是否始终希望String作为响应,就像在您最初的示例中一样。但我还是照搬了你的逻辑。如果您需要更多的东西,您可能应该参考https://jersey.java.net/documentation/2.25/media.html#json的文档

票数 8
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42449904

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档