对于HTTP组件的正确设置,我遇到了一些问题。目前,微服务从提供程序中提取JSON内容,对其进行处理,并将其发送到下一个服务以供进一步处理。主要问题是这个微服务创建了大量的CLOSE_WAIT套接字连接。我理解“保持活动”的整个概念将保持连接打开直到我关闭它,但是服务器可能会出于某些原因放弃连接并创建这个CLOSE_WAIT套接字。
我创建了一个用于调试/测试的小型服务,用于对Google进行GET调用,但是即使这个连接也会一直打开,直到我关闭程序为止。我尝试过许多不同的解决方案:
但我没有成功。或许你们中的一个能帮我:
谢谢大家的帮助
发布于 2019-01-25 17:00:46
不幸的是,在应用程序最终关闭之前,所有建议的答案都没有解决我这边的CLOSE_WAIT连接状态。
我用以下测试用例再现了这个问题:
public class HttpInvokationTest extends CamelSpringTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@EndpointInject(uri = "mock:success")
private MockEndpoint successEndpoint;
@EndpointInject(uri = "mock:failure")
private MockEndpoint failureEndpoint;
@Override
protected AbstractApplicationContext createApplicationContext() {
return new AnnotationConfigApplicationContext(ContextConfig.class);
}
@Configuration
@Import(HttpClientSpringTestConfig.class)
public static class ContextConfig extends CamelConfiguration {
@Override
public List<RouteBuilder> routes() {
List<RouteBuilder> routes = new ArrayList<>(1);
routes.add(new RouteBuilder() {
@Override
public void configure() {
from("direct:start")
.log(LoggingLevel.INFO, LOG, CONFIDENTIAL, "Invoking external URL: ${header[ERPEL_URL]}")
.setHeader("Connection", constant("close"))
.recipientList(header("TEST_URL"))
.log(LoggingLevel.DEBUG, "HTTP response code: ${header["+Exchange.HTTP_RESPONSE_CODE+"]}")
.bean(CopyBodyToHeaders.class)
.choice()
.when(header(Exchange.HTTP_RESPONSE_CODE).isGreaterThanOrEqualTo(300))
.to("mock:failure")
.otherwise()
.to("mock:success");
}
});
return routes;
}
}
@Test
public void testHttpInvocation() throws Exception {
successEndpoint.expectedMessageCount(1);
failureEndpoint.expectedMessageCount(0);
ProducerTemplate template = context.createProducerTemplate();
template.sendBodyAndHeader("direct:start", null, "TEST_URL", "http4://meta.stackoverflow.com");
successEndpoint.assertIsSatisfied();
failureEndpoint.assertIsSatisfied();
Exchange exchange = successEndpoint.getExchanges().get(0);
Map<String, Object> headers = exchange.getIn().getHeaders();
String body = exchange.getIn().getBody(String.class);
for (String key : headers.keySet()) {
LOG.info("Header: {} -> {}", key, headers.get(key));
}
LOG.info("Body: {}", body);
Thread.sleep(120000);
}
}并发出netstat -ab -p tcp | grep 151.101.129.69请求,其中IP是meta.stackoverflow.com的请求。
这给出的答复如下:
tcp4 0 0 192.168.0.10.52183 151.101.129.69.https ESTABLISHED 37562 2118
tcp4 0 0 192.168.0.10.52182 151.101.129.69.http ESTABLISHED 885 523在调用followeb之后
tcp4 0 0 192.168.0.10.52183 151.101.129.69.https CLOSE_WAIT 37562 2118
tcp4 0 0 192.168.0.10.52182 151.101.129.69.http CLOSE_WAIT 885 523响应,直到应用程序由于Connection: keep-alive头而关闭为止,即使配置如下:
@Configuration
@EnableConfigurationProperties(HttpClientSettings.class)
public class HttpClientSpringTestConfig {
private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Resource
private HttpClientSettings httpClientSettings;
@Resource
private CamelContext camelContext;
private SocketConfig httpClientSocketConfig() {
/*
socket timeout:
Monitors the time passed between two consecutive incoming messages over the connection and
raises a SocketTimeoutException if no message was received within the given timeout interval
*/
LOG.info("Creating a SocketConfig with a socket timeout of {} seconds", httpClientSettings.getSoTimeout());
return SocketConfig.custom()
.setSoTimeout(httpClientSettings.getSoTimeout() * 1000)
.setSoKeepAlive(false)
.setSoReuseAddress(false)
.build();
}
private RequestConfig httpClientRequestConfig() {
/*
connection timeout:
The time span the application will wait for a connection to get established. If the connection
is not established within the given amount of time a ConnectionTimeoutException will be raised.
*/
LOG.info("Creating a RequestConfig with a socket timeout of {} seconds and a connection timeout of {} seconds",
httpClientSettings.getSoTimeout(), httpClientSettings.getConTimeout());
return RequestConfig.custom()
.setConnectTimeout(httpClientSettings.getConTimeout() * 1000)
.setSocketTimeout(httpClientSettings.getSoTimeout() * 1000)
.build();
}
@Bean(name = "httpClientConfigurer")
public HttpClientConfigurer httpConfiguration() {
ConnectionKeepAliveStrategy myStrategy = new ConnectionKeepAliveStrategy() {
@Override
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
return 5 * 1000;
}
};
PoolingHttpClientConnectionManager conMgr =
new PoolingHttpClientConnectionManager();
conMgr.closeIdleConnections(5, TimeUnit.SECONDS);
return builder -> builder.setDefaultSocketConfig(httpClientSocketConfig())
.setDefaultRequestConfig(httpClientRequestConfig())
.setConnectionTimeToLive(5, TimeUnit.SECONDS)
.setKeepAliveStrategy(myStrategy)
.setConnectionManager(conMgr);
}
@PostConstruct
public void init() {
LOG.debug("Initializing HTTP clients");
HttpComponent httpComponent = camelContext.getComponent("http4", HttpComponent.class);
httpComponent.setHttpClientConfigurer(httpConfiguration());
HttpComponent httpsComponent = camelContext.getComponent("https4", HttpComponent.class);
httpsComponent.setHttpClientConfigurer(httpConfiguration());
}
}或直接在相应的HttpComponent上定义设置。
在检查HttpClient代码中各自建议的方法时,很明显,这些方法是一次操作,而不是HttpClient内部每隔几毫秒就会检查一次的配置。
PoolingHttpClientConnectionManager进一步指出:
版本4.4中更改了对陈旧连接的处理。以前,代码默认情况下会检查每个连接,然后再使用它.代码现在只检查自上次使用连接以来所用的时间是否超过设置的超时时间。默认超时设置为2000 is。
只有在尝试重新使用连接时才会发生这种情况,这对于连接池来说是有意义的,特别是在通过同一个连接交换多条消息时。对于单次调用,这应该更像一个Connection: close,在一段时间内可能不会重用该连接,使连接打开或半关闭,因为不再尝试从该连接读取数据,从而认识到该连接可以关闭。
我注意到,我早就用传统的HttpClients解决了这个问题,并开始将这个解决方案移植到Camel上,这一解决方案很容易就实现了。
解决方案基本上包括向服务注册HttpClients,然后周期性地(在我的例子中是5秒)调用closeExpiredConnections()和closeIdleConnections(...)。
这个逻辑保存在一个单例枚举中,因为这实际上是一个库,几个应用程序都使用这个库,每个库在各自的JVM中运行。
/**
* This singleton monitor will check every few seconds for idle and stale connections and perform
* a cleanup on the connections using the registered connection managers.
*/
public enum IdleConnectionMonitor {
INSTANCE;
private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/** The execution service which runs the cleanup every 5 seconds **/
private ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(1, new NamingThreadFactory());
/** The actual thread which performs the monitoring **/
private IdleConnectionMonitorThread monitorThread = new IdleConnectionMonitorThread();
IdleConnectionMonitor() {
// execute the thread every 5 seconds till the application is shutdown (or the shutdown method
// is invoked)
executorService.scheduleAtFixedRate(monitorThread, 5, 5, TimeUnit.SECONDS);
}
/**
* Registers a {@link HttpClientConnectionManager} to monitor for stale connections
*/
public void registerConnectionManager(HttpClientConnectionManager connMgr) {
monitorThread.registerConnectionManager(connMgr);
}
/**
* Request to stop the monitoring for stale HTTP connections.
*/
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(3, TimeUnit.SECONDS)) {
LOG.warn("Connection monitor shutdown not finished after 3 seconds!");
}
} catch (InterruptedException iEx) {
LOG.warn("Execution service was interrupted while waiting for graceful shutdown");
}
}
/**
* Upon invocation, the list of registered connection managers will be iterated through and if a
* referenced object is still reachable {@link HttpClientConnectionManager#closeExpiredConnections()}
* and {@link HttpClientConnectionManager#closeIdleConnections(long, TimeUnit)} will be invoked
* in order to cleanup stale connections.
* <p/>
* This runnable implementation holds a weakly referable list of {@link
* HttpClientConnectionManager} objects. If a connection manager is only reachable by {@link
* WeakReference}s or {@link PhantomReference}s it gets eligible for garbage collection and thus
* may return null values. If this is the case, the connection manager will be removed from the
* internal list of registered connection managers to monitor.
*/
private static class IdleConnectionMonitorThread implements Runnable {
// we store only weak-references to connection managers in the list, as the lifetime of the
// thread may extend the lifespan of a connection manager and thus allowing the garbage
// collector to collect unused objects as soon as possible
private List<WeakReference<HttpClientConnectionManager>> registeredConnectionManagers =
Collections.synchronizedList(new ArrayList<>());
@Override
public void run() {
LOG.trace("Executing connection cleanup");
Iterator<WeakReference<HttpClientConnectionManager>> conMgrs =
registeredConnectionManagers.iterator();
while (conMgrs.hasNext()) {
WeakReference<HttpClientConnectionManager> weakConMgr = conMgrs.next();
HttpClientConnectionManager conMgr = weakConMgr.get();
if (conMgr != null) {
LOG.trace("Found connection manager: {}", conMgr);
conMgr.closeExpiredConnections();
conMgr.closeIdleConnections(30, TimeUnit.SECONDS);
} else {
conMgrs.remove();
}
}
}
void registerConnectionManager(HttpClientConnectionManager connMgr) {
registeredConnectionManagers.add(new WeakReference<>(connMgr));
}
}
private static class NamingThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("Connection Manager Monitor");
return t;
}
}
}如前所述,这个单例服务生成一个自己的线程,每5秒调用一次上面提到的方法。这些调用负责关闭连接,这些连接要么在一定时间内未使用,要么在规定的时间内空闲。
为了实现这项服务的骆驼化,可以使用EventNotifierSupport,以便让Camel在关闭监视线程时负责关闭它。
/**
* This Camel service with take care of the lifecycle management of {@link IdleConnectionMonitor}
* and invoke {@link IdleConnectionMonitor#shutdown()} once Camel is closing down in order to stop
* listening for stale connetions.
*/
public class IdleConnectionMonitorService extends EventNotifierSupport {
private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private IdleConnectionMonitor connectionMonitor;
@Override
public void notify(EventObject event) {
if (event instanceof CamelContextStartedEvent) {
LOG.info("Start listening for closable HTTP connections");
connectionMonitor = IdleConnectionMonitor.INSTANCE;
} else if (event instanceof CamelContextStoppingEvent){
LOG.info("Shutting down listener for open HTTP connections");
connectionMonitor.shutdown();
}
}
@Override
public boolean isEnabled(EventObject event) {
return event instanceof CamelContextStartedEvent || event instanceof CamelContextStoppingEvent;
}
public IdleConnectionMonitor getConnectionMonitor() {
return this.connectionMonitor;
}
}为了利用该服务,HttpClient Camel内部使用的连接管理器需要在服务中注册,这在下面的代码块中完成:
private void registerHttpClientConnectionManager(HttpClientConnectionManager conMgr) {
if (!getIdleConnectionMonitorService().isPresent()) {
// register the service with Camel so that on a shutdown the monitoring thread will be stopped
camelContext.getManagementStrategy().addEventNotifier(new IdleConnectionMonitorService());
}
IdleConnectionMonitor.INSTANCE.registerConnectionManager(conMgr);
}
private Optional<IdleConnectionMonitorService> getIdleConnectionMonitorService() {
for (EventNotifier eventNotifier : camelContext.getManagementStrategy().getEventNotifiers()) {
if (eventNotifier instanceof IdleConnectionMonitorService) {
return Optional.of((IdleConnectionMonitorService) eventNotifier);
}
}
return Optional.empty();
}最后但并非最不重要的一点是,在我的示例中,在httpConfiguration中定义的HttpClientSpringTestConfig中的连接管理器需要转移到引入的寄存器函数。
PoolingHttpClientConnectionManager conMgr = new PoolingHttpClientConnectionManager();
registerHttpClientConnectionManager(conMgr);这可能不是最漂亮的解决方案,但它确实关闭了我的机器上的半封闭连接。
@编辑
我刚刚了解到,您可以使用一个NoConnectionReuseStrategy,它将连接状态更改为TIME_WAIT,而不是CLOSE_WAIT,因此在短时间内删除连接。不幸的是,请求仍然以Connection: keep-alive头发出。这个策略将为每个请求创建一个新的连接,也就是说,如果您有一个301 Moved Permanently重定向响应,则重定向将发生在一个新的连接上。
为了使用上述策略,httpClientConfigurer bean需要更改为以下内容:
@Bean(name = "httpClientConfigurer")
public HttpClientConfigurer httpConfiguration() {
return builder -> builder.setDefaultSocketConfig(socketConfig)
.setDefaultRequestConfig(requestConfig)
.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE);
}发布于 2018-02-08 14:41:10
如果空闲连接在配置时间内处于空闲状态,则可以通过关闭空闲连接来完成此操作。您可以通过为Camel Http组件配置空闲连接超时来实现相同的目标。Camel Http提供了这样做的接口。
将org.apache.camel.component.http4.HttpComponent转换为PoolingHttpClientConnectionManager
PoolingHttpClientConnectionManager poolingClientConnectionManager = (PoolingHttpClientConnectionManager) httpComponent
.getClientConnectionManager();
poolingClientConnectionManager.closeIdleConnections(5000, TimeUnit.MILLISECONDS);访问这里[http://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/impl/conn/PoolingHttpClientConnectionManager.html#closeIdleConnections(long]
发布于 2019-05-23 12:46:20
首先,Roman Vottner,你的回答和你对寻找问题的纯粹的奉献帮助了我一卡车。我已经和CLOSE_WAIT做了两天的斗争了,你的回答是什么帮助了我。这就是我所做的。在我的CamelConfiguration类中添加了以下代码,本质上是在启动时对CamelContext进行篡改。
HttpComponent http4 = camelContext.getComponent("https4", HttpComponent.class);
http4.setHttpClientConfigurer(new HttpClientConfigurer() {
@Override
public void configureHttpClient(HttpClientBuilder builder) {
builder.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE);
}
});很有魅力。
https://stackoverflow.com/questions/38701588
复制相似问题