首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何为不同的Apache路由设置几个不同的WebFlux客户端属性?

如何为不同的Apache路由设置几个不同的WebFlux客户端属性?
EN

Stack Overflow用户
提问于 2021-01-16 14:27:52
回答 1查看 642关注 0票数 1

在所设置的路由中,我们调用在声明路由之前设置WebClient.build():

代码语言:javascript
复制
@Override
  public void configure() {
    createSubscription(activeProfile.equalsIgnoreCase("RESTART"));
    from(String.format("reactive-streams:%s", streamName))
        .to("log:camel.proxy?level=INFO&groupInterval=500000")
        .to(String.format("kafka:%s?brokers=%s", kafkaTopic, kafkaBrokerUrls));
  }

  private void createSubscription(boolean restart) {
    WebClient.builder()
        .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_XML_VALUE)
        .build()
        .post()
        .uri(initialRequestUri)
        .body(BodyInserters.fromObject(restart ? String.format(restartRequestBody, ZonedDateTime.now(ZoneId.of("UTC")).toString().replace("[UTC]", "")) : initialRequestBody))
        .retrieve()
        .bodyToMono(String.class)
        .map(initResp ->
            new JSONObject(initResp)
                .getJSONObject("RESPONSE")
                .getJSONArray("RESULT")
                .getJSONObject(0)
                .getJSONObject("INFO")
                .getString("SSEURL")
        )
        .flatMapMany(url -> {
          log.info(url);
          return WebClient.create()
              .get()
              .uri(url)
              .retrieve()
              .bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
              })
              .flatMap(sse -> {
                    val data = new JSONObject(sse.data())
                        .getJSONObject("RESPONSE")
                        .getJSONArray("RESULT")
                        .getJSONObject(0)
                        .getJSONArray(apiName);
                    val list = new ArrayList<String>();
                    for (int i = 0; i < data.length(); i++) {
                      list.add(data.getJSONObject(i).toString());
                    }
                    return Flux.fromIterable(list);
                  }
              );
            }
        )
        .onBackpressureBuffer()
        .flatMap(msg -> camelReactiveStreamsService.toStream(streamName, msg, String.class))
        .doFirst(() -> log.info(String.format("Reactive stream %s was %s", streamName, restart ? "restarted" : "started")))
        .doOnError(err -> {
          log.error(String.format("Reactive stream %s has terminated with error, restarting", streamName), err);
          createSubscription(true);
        })
        .doOnComplete(() -> {
          log.warn(String.format("Reactive stream %s has completed, restarting", streamName));
          createSubscription(true);
        })
        .subscribe();
  }

据我所知,WebClient设置是为整个Spring应用程序创建的,而不是Apache的特定路由(它不会以某种方式弯曲到特定的路由id或url ),这就是为什么使用其他url的新反应流和具有头/初始消息的其他需求的新路由也会设置这个路由,这是不需要的。

因此,这里的问题是,是否可以设置特定的WebClient,而不是与整个应用程序相关联,而是与特定的路由相关联,并使其应用于路由?

这种配置可以使用Spring吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-01-21 08:33:45

在那里应用的方式相当复杂:

  1. 创建2条路由,第一条路径首先执行一次,并且只执行一次,并触发特定bean的特定方法,传递带有方法参数的WebClient.builder()设置,并执行对WebFlux的订阅。是的,建立的反应性流是在Spring应用程序的Spring上下文中完成的,而不是Apache上下文。因此,它与路由没有直接关联,而不是在特定路由启动时被调用。所以路由看起来像:

  1. 提供豆子。我把它放到Spring应用程序中,而不是像下面这样的Apache上下文。这里的缺点是,我必须把它放在这里,无论具体路线是可行的,还是现在。所以它总是在记忆中。

进口org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;进口org.json.JSONArray;进口org.json.JSONObject;进口org.slf4j.Logger;进口org.slf4j.LoggerFactory;进口org.springframework.core.ParameterizedTypeReference;进口org.springframework.http.HttpHeaders;进口org.springframework.http.MediaType;进口org.springframework.http.codec.ServerSentEvent;进口org.springframework.stereotype.Component;进口org.springframework.web.reactive.function.BodyInserters;导入org.springframework.web.reactive.function.client.WebClient;reactor.core.publisher.Flux;导入java.time.ZoneId;导入java.time.ZonedDateTime;导入java.util.ArrayList;@Component public class WebFluxSetUp {专用最终日志记录器= LoggerFactory.getLogger(WebFluxSetUp.class);}公共executeWebfluxSetup(布尔重新启动,字符串initialRequestUri,字符串restartRequestBody,字符串initialRequestBody,字符串apiName,字符串streamName) {{initialRequestUri?( ZonedDateTime.now(ZoneId.of("UTC")).toString().replace("UTC",“”):initialRequestBody)).retrieve().bodyToMono(String.class).map(initResp -> new JSONObject(initResp).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONObject("INFO").getString("SSEURL")).flatMapMany(url -> { logger.info(url);返回WebClient.create().get().uri(url).retrieve().bodyToFlux(new ParameterizedTypeReference() { }).flatMap(sse -> { JSONArray data = new JSONObject(sse.data()).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONArray(apiName);ArrayList list = new ArrayList();对于(int i= 0;i< data.length();i++) {list.add(data.getJSONObject(I).toString();}返回Flux.fromIterable(list);});( }).onBackpressureBuffer().flatMap(msg -> .flatMap,String.class)).doFirst() -> logger.info(String.format(“反应性流%s为%s”),streamName,重新启动?“重新启动”:“开始”).doOnError( -> {String.format(“反应流%s已终止,重新启动”,streamName),err);executeWebfluxSetup(真,initialRequestUri,restartRequestBody,initialRequestBody,apiName,streamName);}.doOnComplete(() -> {logger.warn(“反应流%s已完成,重新启动”,streamName));executeWebfluxSetup(true,initialRequestUri,restartRequestBody,initialRequestBody,apiName,streamName);}).subscribe();}

  1. 还有其他缺点--当路由停止时,WebFlux客户端仍然试图垃圾处理反应性流url。并且没有与路由相关的api/事件处理程序来阻止它并使其不被编码到特定的路由.

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65750756

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档