在所设置的路由中,我们调用在声明路由之前设置WebClient.build():
@Override
public void configure() {
createSubscription(activeProfile.equalsIgnoreCase("RESTART"));
from(String.format("reactive-streams:%s", streamName))
.to("log:camel.proxy?level=INFO&groupInterval=500000")
.to(String.format("kafka:%s?brokers=%s", kafkaTopic, kafkaBrokerUrls));
}
private void createSubscription(boolean restart) {
WebClient.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_XML_VALUE)
.build()
.post()
.uri(initialRequestUri)
.body(BodyInserters.fromObject(restart ? String.format(restartRequestBody, ZonedDateTime.now(ZoneId.of("UTC")).toString().replace("[UTC]", "")) : initialRequestBody))
.retrieve()
.bodyToMono(String.class)
.map(initResp ->
new JSONObject(initResp)
.getJSONObject("RESPONSE")
.getJSONArray("RESULT")
.getJSONObject(0)
.getJSONObject("INFO")
.getString("SSEURL")
)
.flatMapMany(url -> {
log.info(url);
return WebClient.create()
.get()
.uri(url)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
})
.flatMap(sse -> {
val data = new JSONObject(sse.data())
.getJSONObject("RESPONSE")
.getJSONArray("RESULT")
.getJSONObject(0)
.getJSONArray(apiName);
val list = new ArrayList<String>();
for (int i = 0; i < data.length(); i++) {
list.add(data.getJSONObject(i).toString());
}
return Flux.fromIterable(list);
}
);
}
)
.onBackpressureBuffer()
.flatMap(msg -> camelReactiveStreamsService.toStream(streamName, msg, String.class))
.doFirst(() -> log.info(String.format("Reactive stream %s was %s", streamName, restart ? "restarted" : "started")))
.doOnError(err -> {
log.error(String.format("Reactive stream %s has terminated with error, restarting", streamName), err);
createSubscription(true);
})
.doOnComplete(() -> {
log.warn(String.format("Reactive stream %s has completed, restarting", streamName));
createSubscription(true);
})
.subscribe();
}据我所知,WebClient设置是为整个Spring应用程序创建的,而不是Apache的特定路由(它不会以某种方式弯曲到特定的路由id或url ),这就是为什么使用其他url的新反应流和具有头/初始消息的其他需求的新路由也会设置这个路由,这是不需要的。
因此,这里的问题是,是否可以设置特定的WebClient,而不是与整个应用程序相关联,而是与特定的路由相关联,并使其应用于路由?
这种配置可以使用Spring吗?
发布于 2021-01-21 08:33:45
在那里应用的方式相当复杂:
WebClient.builder()设置,并执行对WebFlux的订阅。是的,建立的反应性流是在Spring应用程序的Spring上下文中完成的,而不是Apache上下文。因此,它与路由没有直接关联,而不是在特定路由启动时被调用。所以路由看起来像:进口org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;进口org.json.JSONArray;进口org.json.JSONObject;进口org.slf4j.Logger;进口org.slf4j.LoggerFactory;进口org.springframework.core.ParameterizedTypeReference;进口org.springframework.http.HttpHeaders;进口org.springframework.http.MediaType;进口org.springframework.http.codec.ServerSentEvent;进口org.springframework.stereotype.Component;进口org.springframework.web.reactive.function.BodyInserters;导入org.springframework.web.reactive.function.client.WebClient;reactor.core.publisher.Flux;导入java.time.ZoneId;导入java.time.ZonedDateTime;导入java.util.ArrayList;@Component public class WebFluxSetUp {专用最终日志记录器= LoggerFactory.getLogger(WebFluxSetUp.class);}公共executeWebfluxSetup(布尔重新启动,字符串initialRequestUri,字符串restartRequestBody,字符串initialRequestBody,字符串apiName,字符串streamName) {{initialRequestUri?( ZonedDateTime.now(ZoneId.of("UTC")).toString().replace("UTC",“”):initialRequestBody)).retrieve().bodyToMono(String.class).map(initResp -> new JSONObject(initResp).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONObject("INFO").getString("SSEURL")).flatMapMany(url -> { logger.info(url);返回WebClient.create().get().uri(url).retrieve().bodyToFlux(new ParameterizedTypeReference() { }).flatMap(sse -> { JSONArray data = new JSONObject(sse.data()).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONArray(apiName);ArrayList list = new ArrayList();对于(int i= 0;i< data.length();i++) {list.add(data.getJSONObject(I).toString();}返回Flux.fromIterable(list);});( }).onBackpressureBuffer().flatMap(msg -> .flatMap,String.class)).doFirst() -> logger.info(String.format(“反应性流%s为%s”),streamName,重新启动?“重新启动”:“开始”).doOnError( -> {String.format(“反应流%s已终止,重新启动”,streamName),err);executeWebfluxSetup(真,initialRequestUri,restartRequestBody,initialRequestBody,apiName,streamName);}.doOnComplete(() -> {logger.warn(“反应流%s已完成,重新启动”,streamName));executeWebfluxSetup(true,initialRequestUri,restartRequestBody,initialRequestBody,apiName,streamName);}).subscribe();}
。
https://stackoverflow.com/questions/65750756
复制相似问题