我有带有所有Elasticsearch(ES)查询数据集的SearchRequest对象。我不能将RestHighLevel客户端用于我的usecase,因为它需要在实例化时传递端点。我根据某些条件动态地获得ES端点。一种方法是始终创建新的RestHighLevel客户端,这将是一种效率低下的方法。其他方法是在服务启动时创建静态CloseableHttpClient,并使用动态端点发出HttpPost请求。我想采取稍后的方法,但不知道如何将SearchRequest对象转换为json查询字符串。
任何代码引用/代码片段都将非常有用。
private final CloseableHttpClient client;
public GenericElasticSearchResponse search(@Nonnull final SearchRequest searchRequest,
@Nonnull final RoutingConfig route) {
final URIBuilder builder = new URIBuilder()
.setScheme(route.getScheme())
.setHost(route.getESEndpoint())
.setPort(Optional.ofNullable(route.getPort())
.orElse(80))
.setPath("/sessions*/_search");
final URI uri = builder.build();
final ContentType contentType = ContentType.create("application/json", "UTF-8");
final HttpPost httpPost = new HttpPost(uri);
httpPost.setEntity(entity);
final CloseableHttpResponse response = client.execute(httpPost);
final String responseEntity;
try (final Reader reader = new InputStreamReader(response.getEntity().getContent(), Charsets.UTF_8)) {
responseEntity = CharStreams.toString(reader);
}
final SearchResponse searchResponse = objectMapper.readValue(responseEntity, SearchResponse.class);
return new ElasticSearchResponse(searchResponse);
}发布于 2022-01-12 04:32:24
我发现searchRequest.source().toString()实际上是返回json格式的SearchRequest。下面是通过Apache客户机访问ES的完整代码片段
final EndpointConfig endpoint = route.getEndpoint();
final URIBuilder builder = new URIBuilder()
.setScheme(endpoint.getScheme())
.setHost(endpoint.getHost())
.setPort(Optional.ofNullable(endpoint.getPort())
.orElse(HTTPS_PORT))
.setPath(Optional.ofNullable(endpoint.getQueryPath())
.orElse(StringUtils.EMPTY));
final URI uri = builder.build();
final ContentType contentType = ContentType.create("application/json", "UTF-8");
final String queryString = searchRequest.source().toString();
final StringEntity entity = new StringEntity(queryString, contentType);
final HttpPost httpPost = new HttpPost(uri);
httpPost.setEntity(entity);
final CloseableHttpResponse response = sendRequest(httpPost);
final String responseEntity;
try (final Reader reader = new InputStreamReader(response.getEntity().getContent(), Charsets.UTF_8)) {
responseEntity = CharStreams.toString(reader);
}
log.info("ElasticSearchClient response: Code: {}, Entity {}", response.getCode(), responseEntity);
SearchResponse searchResponse = null;
if (Objects.nonNull(responseEntity)) {
searchResponse = parseResponse(responseEntity, searchRequest, response.getCode());
log.info("ElasticSearchClient searchResponse- {} ", searchResponse);
}
return new ElasticSearchResponse(searchResponse);
} catch (final URISyntaxException e) {
throw new IllegalStateException(
String.format("Invalid URI. host: %s", route.getEndpoint()), e);
} catch (final IOException e) {
throw new IllegalStateException("ElasticSearch Request failed.", e);
}
private SearchResponse parseResponse(@Nonnull final String responseEntity,
@Nonnull final SearchRequest searchRequest,
final int responseCode) {
if (responseCode >= 400 || responseCode < 200) {
log.info("ES error response - {} ", responseEntity);
final ESErrorResponse response = GSON.fromJson(responseEntity, ESErrorResponse.class);
throw new IllegalStateException();
}
SearchResponse searchResponse = null;
final NamedXContentRegistry registry = new NamedXContentRegistry(getDefaultNamedXContents());
final XContentParser parser;
try {
parser = JsonXContent.jsonXContent.createParser(registry,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, responseEntity);
searchResponse = SearchResponse.fromXContent(parser);
} catch (IOException e) {
throw new IllegalStateException("Error while parsing response ", e);
}
return searchResponse;
}
public static List<NamedXContentRegistry.Entry> getDefaultNamedXContents() {
final Map<String, ContextParser<Object, ? extends Aggregation>> map = new HashMap<>();
map.put(TopHitsAggregationBuilder.NAME, (p, c) -> ParsedTopHits.fromXContent(p, (String) c));
map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c));
return map.entrySet().stream()
.map(entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue()))
.collect(Collectors.toList());
}
private CloseableHttpResponse sendRequest(final HttpPost httpPost) throws IOException {
return client.execute(httpPost);
}https://stackoverflow.com/questions/70661702
复制相似问题