首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法在Camel HTTP组件中配置“保持活力”

无法在Camel HTTP组件中配置“保持活力”
EN

Stack Overflow用户
提问于 2016-08-01 14:36:43
回答 4查看 6.2K关注 0票数 6

对于HTTP组件的正确设置,我遇到了一些问题。目前,微服务从提供程序中提取JSON内容,对其进行处理,并将其发送到下一个服务以供进一步处理。主要问题是这个微服务创建了大量的CLOSE_WAIT套接字连接。我理解“保持活动”的整个概念将保持连接打开直到我关闭它,但是服务器可能会出于某些原因放弃连接并创建这个CLOSE_WAIT套接字。

我创建了一个用于调试/测试的小型服务,用于对Google进行GET调用,但是即使这个连接也会一直打开,直到我关闭程序为止。我尝试过许多不同的解决方案:

  • .setHeader(“连接”,常数(“关闭”))
  • -Dhttp.keepAlive=false作为VM参数
  • 从Camel-Http切换到Camel-Http 4
  • httpClient.soTimeout=500 (骆驼-HTTP),httpClient.socketTimeout=500和connectionTimeToLive=500 (骆驼-HTTP4)
  • .setHeader("Connection",simple(“备存”))和.setHeader(“保持活着”,simple("timeout=10")) (Camel-HTTP4 4)
  • 通过调试将DefaultConnectionKeepAliveStrategy的响应从-1 (永不结束)设置为Camel-HTTP4中的特定值--这是可行的,但我无法注入自己的策略。

但我没有成功。或许你们中的一个能帮我:

  • 我如何告诉Camel-HTTP,当一个特定的时间被传递时,它应该关闭连接?例如,服务每小时从内容提供者那里提取一次。在3-4小时后,HttpComponent应该关闭连接后,拉,并重新打开它,当下一个拉在那里。目前,每个连接都将返回到MultiThreadedHttpConnectionManager中,并且套接字仍处于打开状态。
  • 如果用Camel-HTTP无法做到这一点:我如何在创建路由时注入HttpClientBuilder?我知道通过httpClient选项应该是可能的,但我不理解文档的具体部分。

谢谢大家的帮助

EN

回答 4

Stack Overflow用户

发布于 2019-01-25 17:00:46

不幸的是,在应用程序最终关闭之前,所有建议的答案都没有解决我这边的CLOSE_WAIT连接状态。

我用以下测试用例再现了这个问题:

代码语言:javascript
复制
public class HttpInvokationTest extends CamelSpringTestSupport {

  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  @EndpointInject(uri = "mock:success")
  private MockEndpoint successEndpoint;
  @EndpointInject(uri = "mock:failure")
  private MockEndpoint failureEndpoint;

  @Override
  protected AbstractApplicationContext createApplicationContext() {
    return new AnnotationConfigApplicationContext(ContextConfig.class);
  }

  @Configuration
  @Import(HttpClientSpringTestConfig.class)
  public static class ContextConfig extends CamelConfiguration {

    @Override
    public List<RouteBuilder> routes() {
      List<RouteBuilder> routes = new ArrayList<>(1);
      routes.add(new RouteBuilder() {
        @Override
        public void configure() {
          from("direct:start")
            .log(LoggingLevel.INFO, LOG, CONFIDENTIAL, "Invoking external URL: ${header[ERPEL_URL]}")
            .setHeader("Connection", constant("close"))
            .recipientList(header("TEST_URL"))
            .log(LoggingLevel.DEBUG, "HTTP response code: ${header["+Exchange.HTTP_RESPONSE_CODE+"]}")
            .bean(CopyBodyToHeaders.class)
            .choice()
              .when(header(Exchange.HTTP_RESPONSE_CODE).isGreaterThanOrEqualTo(300))
                .to("mock:failure")
              .otherwise()
                .to("mock:success");
        }
      });
      return routes;
    }
  }

  @Test
  public void testHttpInvocation() throws Exception {
    successEndpoint.expectedMessageCount(1);
    failureEndpoint.expectedMessageCount(0);

    ProducerTemplate template = context.createProducerTemplate();

    template.sendBodyAndHeader("direct:start", null, "TEST_URL", "http4://meta.stackoverflow.com");

    successEndpoint.assertIsSatisfied();
    failureEndpoint.assertIsSatisfied();

    Exchange exchange = successEndpoint.getExchanges().get(0);
    Map<String, Object> headers = exchange.getIn().getHeaders();
    String body = exchange.getIn().getBody(String.class);
    for (String key : headers.keySet()) {
      LOG.info("Header: {} -> {}", key, headers.get(key));
    }
    LOG.info("Body: {}", body);

    Thread.sleep(120000);
  }
}

并发出netstat -ab -p tcp | grep 151.101.129.69请求,其中IP是meta.stackoverflow.com的请求。

这给出的答复如下:

代码语言:javascript
复制
tcp4       0      0  192.168.0.10.52183     151.101.129.69.https   ESTABLISHED      37562       2118
tcp4       0      0  192.168.0.10.52182     151.101.129.69.http    ESTABLISHED        885        523

在调用followeb之后

代码语言:javascript
复制
tcp4       0      0  192.168.0.10.52183     151.101.129.69.https   CLOSE_WAIT       37562       2118
tcp4       0      0  192.168.0.10.52182     151.101.129.69.http    CLOSE_WAIT         885        523

响应,直到应用程序由于Connection: keep-alive头而关闭为止,即使配置如下:

代码语言:javascript
复制
@Configuration
@EnableConfigurationProperties(HttpClientSettings.class)
public class HttpClientSpringTestConfig {

  private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  @Resource
  private HttpClientSettings httpClientSettings;

  @Resource
  private CamelContext camelContext;

  private SocketConfig httpClientSocketConfig() {
    /*
      socket timeout:
      Monitors the time passed between two consecutive incoming messages over the connection and
      raises a SocketTimeoutException if no message was received within the given timeout interval
     */
    LOG.info("Creating a SocketConfig with a socket timeout of {} seconds", httpClientSettings.getSoTimeout());
    return SocketConfig.custom()
        .setSoTimeout(httpClientSettings.getSoTimeout() * 1000)
        .setSoKeepAlive(false)
        .setSoReuseAddress(false)
        .build();
  }

  private RequestConfig httpClientRequestConfig() {
    /*
      connection timeout:
      The time span the application will wait for a connection to get established. If the connection
      is not established within the given amount of time a ConnectionTimeoutException will be raised.
     */
    LOG.info("Creating a RequestConfig with a socket timeout of {} seconds and a connection timeout of {} seconds",
             httpClientSettings.getSoTimeout(), httpClientSettings.getConTimeout());
    return RequestConfig.custom()
        .setConnectTimeout(httpClientSettings.getConTimeout() * 1000)
        .setSocketTimeout(httpClientSettings.getSoTimeout() * 1000)
        .build();
  }

  @Bean(name = "httpClientConfigurer")
  public HttpClientConfigurer httpConfiguration() {
    ConnectionKeepAliveStrategy myStrategy = new ConnectionKeepAliveStrategy() {
      @Override
      public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
        return 5 * 1000;
      }
    };

    PoolingHttpClientConnectionManager conMgr =
        new PoolingHttpClientConnectionManager();
    conMgr.closeIdleConnections(5, TimeUnit.SECONDS);

    return builder -> builder.setDefaultSocketConfig(httpClientSocketConfig())
                             .setDefaultRequestConfig(httpClientRequestConfig())
                             .setConnectionTimeToLive(5, TimeUnit.SECONDS)
                             .setKeepAliveStrategy(myStrategy)
                             .setConnectionManager(conMgr);
  }

  @PostConstruct
  public void init() {
    LOG.debug("Initializing HTTP clients");
    HttpComponent httpComponent = camelContext.getComponent("http4", HttpComponent.class);
    httpComponent.setHttpClientConfigurer(httpConfiguration());
    HttpComponent httpsComponent = camelContext.getComponent("https4", HttpComponent.class);
    httpsComponent.setHttpClientConfigurer(httpConfiguration());
  }
}

或直接在相应的HttpComponent上定义设置。

在检查HttpClient代码中各自建议的方法时,很明显,这些方法是一次操作,而不是HttpClient内部每隔几毫秒就会检查一次的配置。

PoolingHttpClientConnectionManager进一步指出:

版本4.4中更改了对陈旧连接的处理。以前,代码默认情况下会检查每个连接,然后再使用它.代码现在只检查自上次使用连接以来所用的时间是否超过设置的超时时间。默认超时设置为2000 is。

只有在尝试重新使用连接时才会发生这种情况,这对于连接池来说是有意义的,特别是在通过同一个连接交换多条消息时。对于单次调用,这应该更像一个Connection: close,在一段时间内可能不会重用该连接,使连接打开或半关闭,因为不再尝试从该连接读取数据,从而认识到该连接可以关闭。

我注意到,我早就用传统的HttpClients解决了这个问题,并开始将这个解决方案移植到Camel上,这一解决方案很容易就实现了。

解决方案基本上包括向服务注册HttpClients,然后周期性地(在我的例子中是5秒)调用closeExpiredConnections()closeIdleConnections(...)

这个逻辑保存在一个单例枚举中,因为这实际上是一个库,几个应用程序都使用这个库,每个库在各自的JVM中运行。

代码语言:javascript
复制
/**
 * This singleton monitor will check every few seconds for idle and stale connections and perform
 * a cleanup on the connections using the registered connection managers.
 */
public enum IdleConnectionMonitor {

  INSTANCE;

  private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  /** The execution service which runs the cleanup every 5 seconds **/
  private ScheduledExecutorService executorService =
      Executors.newScheduledThreadPool(1, new NamingThreadFactory());
  /** The actual thread which performs the monitoring **/
  private IdleConnectionMonitorThread monitorThread = new IdleConnectionMonitorThread();

  IdleConnectionMonitor() {
    // execute the thread every 5 seconds till the application is shutdown (or the shutdown method
    // is invoked)
    executorService.scheduleAtFixedRate(monitorThread, 5, 5, TimeUnit.SECONDS);
  }

  /**
   * Registers a {@link HttpClientConnectionManager} to monitor for stale connections
   */
  public void registerConnectionManager(HttpClientConnectionManager connMgr) {
    monitorThread.registerConnectionManager(connMgr);
  }

  /**
   * Request to stop the monitoring for stale HTTP connections.
   */
  public void shutdown() {
    executorService.shutdown();
    try {
      if (!executorService.awaitTermination(3, TimeUnit.SECONDS)) {
        LOG.warn("Connection monitor shutdown not finished after 3 seconds!");
      }
    } catch (InterruptedException iEx) {
      LOG.warn("Execution service was interrupted while waiting for graceful shutdown");
    }
  }

  /**
   * Upon invocation, the list of registered connection managers will be iterated through and if a
   * referenced object is still reachable {@link HttpClientConnectionManager#closeExpiredConnections()}
   * and {@link HttpClientConnectionManager#closeIdleConnections(long, TimeUnit)} will be invoked
   * in order to cleanup stale connections.
   * <p/>
   * This runnable implementation holds a weakly referable list of {@link
   * HttpClientConnectionManager} objects. If a connection manager is only reachable by {@link
   * WeakReference}s or {@link PhantomReference}s it gets eligible for garbage collection and thus
   * may return null values. If this is the case, the connection manager will be removed from the
   * internal list of registered connection managers to monitor.
   */
  private static class IdleConnectionMonitorThread implements Runnable {

    // we store only weak-references to connection managers in the list, as the lifetime of the
    // thread may extend the lifespan of a connection manager and thus allowing the garbage
    // collector to collect unused objects as soon as possible
    private List<WeakReference<HttpClientConnectionManager>> registeredConnectionManagers =
        Collections.synchronizedList(new ArrayList<>());

    @Override
    public void run() {

      LOG.trace("Executing connection cleanup");
      Iterator<WeakReference<HttpClientConnectionManager>> conMgrs =
          registeredConnectionManagers.iterator();
      while (conMgrs.hasNext()) {
        WeakReference<HttpClientConnectionManager> weakConMgr = conMgrs.next();
        HttpClientConnectionManager conMgr = weakConMgr.get();
        if (conMgr != null) {
          LOG.trace("Found connection manager: {}", conMgr);
          conMgr.closeExpiredConnections();
          conMgr.closeIdleConnections(30, TimeUnit.SECONDS);
        } else {
          conMgrs.remove();
        }
      }
    }

    void registerConnectionManager(HttpClientConnectionManager connMgr) {
      registeredConnectionManagers.add(new WeakReference<>(connMgr));
    }
  }

  private static class NamingThreadFactory implements ThreadFactory {

    @Override
    public Thread newThread(Runnable r) {
      Thread t = new Thread(r);
      t.setName("Connection Manager Monitor");
      return t;
    }
  }
}

如前所述,这个单例服务生成一个自己的线程,每5秒调用一次上面提到的方法。这些调用负责关闭连接,这些连接要么在一定时间内未使用,要么在规定的时间内空闲。

为了实现这项服务的骆驼化,可以使用EventNotifierSupport,以便让Camel在关闭监视线程时负责关闭它。

代码语言:javascript
复制
/**
 * This Camel service with take care of the lifecycle management of {@link IdleConnectionMonitor} 
 * and invoke {@link IdleConnectionMonitor#shutdown()} once Camel is closing down in order to stop
 * listening for stale connetions.
 */
public class IdleConnectionMonitorService extends EventNotifierSupport {

  private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  private IdleConnectionMonitor connectionMonitor;

  @Override
  public void notify(EventObject event) {
    if (event instanceof CamelContextStartedEvent) {
      LOG.info("Start listening for closable HTTP connections");
      connectionMonitor = IdleConnectionMonitor.INSTANCE;
    } else if (event instanceof CamelContextStoppingEvent){
      LOG.info("Shutting down listener for open HTTP connections");
      connectionMonitor.shutdown();
    }
  }

  @Override
  public boolean isEnabled(EventObject event) {
    return event instanceof CamelContextStartedEvent || event instanceof CamelContextStoppingEvent;
  }

  public IdleConnectionMonitor getConnectionMonitor() {
    return this.connectionMonitor;
  }
}

为了利用该服务,HttpClient Camel内部使用的连接管理器需要在服务中注册,这在下面的代码块中完成:

代码语言:javascript
复制
private void registerHttpClientConnectionManager(HttpClientConnectionManager conMgr) {
  if (!getIdleConnectionMonitorService().isPresent()) {
    // register the service with Camel so that on a shutdown the monitoring thread will be stopped
    camelContext.getManagementStrategy().addEventNotifier(new IdleConnectionMonitorService());
  }
  IdleConnectionMonitor.INSTANCE.registerConnectionManager(conMgr);
}

private Optional<IdleConnectionMonitorService> getIdleConnectionMonitorService() {
  for (EventNotifier eventNotifier : camelContext.getManagementStrategy().getEventNotifiers()) {
    if (eventNotifier instanceof IdleConnectionMonitorService) {
      return Optional.of((IdleConnectionMonitorService) eventNotifier);
    }
  }
  return Optional.empty();
}

最后但并非最不重要的一点是,在我的示例中,在httpConfiguration中定义的HttpClientSpringTestConfig中的连接管理器需要转移到引入的寄存器函数。

代码语言:javascript
复制
PoolingHttpClientConnectionManager conMgr = new PoolingHttpClientConnectionManager();
registerHttpClientConnectionManager(conMgr);

这可能不是最漂亮的解决方案,但它确实关闭了我的机器上的半封闭连接。

@编辑

我刚刚了解到,您可以使用一个NoConnectionReuseStrategy,它将连接状态更改为TIME_WAIT,而不是CLOSE_WAIT,因此在短时间内删除连接。不幸的是,请求仍然以Connection: keep-alive头发出。这个策略将为每个请求创建一个新的连接,也就是说,如果您有一个301 Moved Permanently重定向响应,则重定向将发生在一个新的连接上。

为了使用上述策略,httpClientConfigurer bean需要更改为以下内容:

代码语言:javascript
复制
@Bean(name = "httpClientConfigurer")
public HttpClientConfigurer httpConfiguration() {
    return builder -> builder.setDefaultSocketConfig(socketConfig)
        .setDefaultRequestConfig(requestConfig)
        .setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE);
}
票数 3
EN

Stack Overflow用户

发布于 2018-02-08 14:41:10

如果空闲连接在配置时间内处于空闲状态,则可以通过关闭空闲连接来完成此操作。您可以通过为Camel Http组件配置空闲连接超时来实现相同的目标。Camel Http提供了这样做的接口。

将org.apache.camel.component.http4.HttpComponent转换为PoolingHttpClientConnectionManager

代码语言:javascript
复制
        PoolingHttpClientConnectionManager poolingClientConnectionManager = (PoolingHttpClientConnectionManager) httpComponent
                .getClientConnectionManager();

        poolingClientConnectionManager.closeIdleConnections(5000, TimeUnit.MILLISECONDS);

访问这里[http://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/impl/conn/PoolingHttpClientConnectionManager.html#closeIdleConnections(long]

票数 1
EN

Stack Overflow用户

发布于 2019-05-23 12:46:20

首先,Roman Vottner,你的回答和你对寻找问题的纯粹的奉献帮助了我一卡车。我已经和CLOSE_WAIT做了两天的斗争了,你的回答是什么帮助了我。这就是我所做的。在我的CamelConfiguration类中添加了以下代码,本质上是在启动时对CamelContext进行篡改。

代码语言:javascript
复制
    HttpComponent http4 = camelContext.getComponent("https4", HttpComponent.class);
    http4.setHttpClientConfigurer(new HttpClientConfigurer() {

        @Override
        public void configureHttpClient(HttpClientBuilder builder) {
            builder.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE);
        }
    });

很有魅力。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38701588

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档