首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring + Atmosphere + Tomcat线程泄漏

Spring + Atmosphere + Tomcat线程泄漏
EN

Stack Overflow用户
提问于 2014-01-09 20:51:41
回答 1查看 3.2K关注 0票数 1

我正面临着一个奇怪的大气问题,不知道如何解决它。我正在尝试实现服务器推送通知。通知应该广播到所有连接的客户端(最多10个,这是一个内部网4.2应用程序),这些客户端都是ANDROID 4.2浏览器。

通知工作得很好,所有的东西都被推送了,但是after创建了大量的tomcat线程,在大约3-4k页面请求后以线程泄漏结束。Tomcat 7.0.40,如果配置了NIO连接器,最大线程数为150,超时时间为60000

web.xml:

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://java.sun.com/xml/ns/javaee" 
    xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    id="WebApp_ID" version="3.0">

    <context-param>
        <param-name>contextClass</param-name>
        <param-value>org.springframework.web.context.support.XmlWebApplicationContext</param-value>
    </context-param>
    <listener>
        <listener-class>org.springframework.security.web.session.HttpSessionEventPublisher</listener-class>
    </listener>
    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    </listener>
    <listener>
        <listener-class>org.springframework.web.context.request.RequestContextListener</listener-class>
    </listener>

    <!-- Hibernate -->
    <filter>
        <filter-name>hibernateFilter</filter-name>
        <filter-class>org.springframework.orm.hibernate4.support.OpenSessionInViewFilter</filter-class>
        <async-supported>true</async-supported>
        <init-param>
            <param-name>singleSession</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>hibernateFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <!-- SECURITY -->
    <filter>
        <filter-name>springSecurityFilterChain</filter-name>
        <filter-class>org.springframework.web.filter.DelegatingFilterProxy</filter-class>
        <async-supported>true</async-supported>
    </filter>
    <filter-mapping>
        <filter-name>springSecurityFilterChain</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <!-- char encoding -->
    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <async-supported>true</async-supported>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping> 

    <!-- Declare a DispatcherServlet as usual -->

    <servlet>
        <servlet-name>dispatcher</servlet-name>
        <servlet-class>org.atmosphere.cpr.MeteorServlet</servlet-class>
        <init-param>
            <param-name>org.atmosphere.servlet</param-name>
            <param-value>org.springframework.web.servlet.DispatcherServlet</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.asyncSupport</param-name>
            <param-value>org.atmosphere.container.Tomcat7BIOSupportWithWebSocket</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.broadcasterClass</param-name>
            <param-value>org.atmosphere.cpr.DefaultBroadcaster</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.broadcasterCacheClass</param-name>
            <param-value>org.atmosphere.cache.UUIDBroadcasterCache</param-value>
       </init-param>
<!--       <init-param> -->
<!--            <param-name>org.atmosphere.cpr.sessionSupport</param-name> -->
<!--            <param-value>true</param-value> -->
<!--        </init-param> -->
        <init-param>
            <param-name>org.atmosphere.resumeOnBroadcast</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.broadcaster.shareableThreadPool</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.broadcaster.maxProcessingThreads</param-name>
            <param-value>20</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.broadcaster.maxAsyncWriteThreads</param-name>
            <param-value>20</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.useNative</param-name>
            <param-value>true</param-value>
        </init-param>
<!--         <init-param> -->
<!--            <param-name>org.atmosphere.useBlocking</param-name> -->
<!--            <param-value>false</param-value> -->
<!--        </init-param> -->
        <init-param>
            <param-name>org.atmosphere.useStream</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.AtmosphereInterceptor</param-name>
            <param-value>org.atmosphere.interceptor.HeartbeatInterceptor</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.interceptor.HeartbeatInterceptor.heartbeatFrequencyInSeconds</param-name>
            <param-value>60</param-value>
        </init-param>
        <init-param>
          <param-name>org.atmosphere.cpr.broadcasterLifeCyclePolicy</param-name>
          <param-value>EMPTY_DESTROY</param-value>
      </init-param>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>/WEB-INF/dispatcher-servlet.xml</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
        <async-supported>true</async-supported>
    </servlet>

    <servlet-mapping>
        <servlet-name>dispatcher</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>

    <!-- Default page to serve -->
    <display-name>cielo-cp</display-name>
    <welcome-file-list>
        <welcome-file>/</welcome-file>
    </welcome-file-list>

    <session-config>
      <session-timeout>1440</session-timeout>
    </session-config>

    <error-page>
        <error-code>404</error-code>
        <location>/404</location>
    </error-page>
    <error-page>
        <error-code>403</error-code>
        <location>/accessDenied</location>
    </error-page>
</web-app>

Spring MVC控制器:

代码语言:javascript
复制
@Controller
public class PushController extends AbstractController{

    @ResponseBody
    @RequestMapping(value="/push")
    public void pushAsync(AtmosphereResource atmosphereResource){
        AtmosphereUtils.suspend(atmosphereResource);
    }

}

BroadcasterService:

代码语言:javascript
复制
@Service
public class BroadcasterService {

    @Autowired
    private PushNotificationService service;

    private ObjectMapper mapper = new ObjectMapper();


    public void receiveMessage(@Observes PushEvent e) {
        List<PushableMessage> messages = service.pollMessages(e.getType());
        try {
            Broadcaster b = AtmosphereUtils.lookupBroadcaster(false);       
            try {   
                b.broadcast(mapper.writeValueAsString(messages));

            } catch(Exception ex){
                logger.error(ex.getMessage(), ex);
            }
        } catch (Throwable t){
            logger.debug(t.getMessage(), t);
        }
    }

}

代码语言:javascript
复制
@Service
public class PushNotificationService {

    private static Logger logger = Logger.getLogger(PushNotificationService.class);

    @Autowired
    private Event<PushEvent> event;


    public List<PushableMessage> pollMessages(TipologiaPush key) {
            ....
        return result;
    }


    public void pushMessage(PushableMessage message){
        ...
        queue.put(message);
        event.fire(message.getEvent());
    }

}

和utils:

代码语言:javascript
复制
public final class AtmosphereUtils {

    public static AtmosphereResource getAtmosphereResource(HttpServletRequest request) {
        return getMeteor(request).getAtmosphereResource();
    }

    public static Meteor getMeteor(HttpServletRequest request) {
        return Meteor.build(request);
    }

    public static void suspend(final AtmosphereResource resource) {

        final CountDownLatch countDownLatch = new CountDownLatch(1);
        resource.addEventListener(new AtmosphereResourceEventListenerAdapter() {
            @Override
            public void onSuspend(AtmosphereResourceEvent event) {
                countDownLatch.countDown();
                logger.debug("Suspending Client..." + resource.uuid() + " with transport " + resource.transport());
                resource.removeEventListener(this);
            }

            @Override
            public void onDisconnect(AtmosphereResourceEvent event) {
                logger.debug("Disconnecting Client..." + resource.uuid());
                super.onDisconnect(event);
            }    
        });

        if (AtmosphereResource.TRANSPORT.LONG_POLLING.equals(resource.transport())) {
            resource.resumeOnBroadcast(true).suspend();
        } else {
            resource.suspend();
        }

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            logger.error("Interrupted while trying to suspend resource {}", resource);
        }

        AtmosphereUtils.lookupBroadcaster(true).addAtmosphereResource(resource);
    }

    public static Broadcaster lookupBroadcaster(boolean create) {
        Broadcaster b = BroadcasterFactory.getDefault().lookup("/*", create);
        return b;
    }

}

其他服务调用: notificationService.pushMessage(....);javascript部分是:

代码语言:javascript
复制
var socket = $.atmosphere;

function handleAtmosphere(url, handleResult) {

    var request = new $.atmosphere.AtmosphereRequest();
    request.transport = "websocket"; // "streaming is even worse";
    request.url = url;
    request.contentType = "application/json";
    request.fallbackTransport = "long-polling"; //for android 4.2, default browser don't support websocket

    request.onMessage = function(response){
        ....
    };


    var subSocket = socket.subscribe(request);

}

话虽如此,我已经尝试了许多配置,但仍然有一些线程保持活动。我使用的是Spring3.2.5、SpringSec 3.2 RC2和Atmosphere 2.0.4

EN

回答 1

Stack Overflow用户

发布于 2014-01-09 22:03:10

看看这个:https://github.com/Atmosphere/atmosphere/issues/717

从该线程中需要注意的几件事:使用可共享线程池,您必须自己关闭相关的executor。这是一个框架限制。但是为什么BroadcastConfig被阻止而不是AsyncWrite需要调查呢?

运行的线程与ExecutorServices相关联。决定何时终止这些线程的是ExecutorService,而不是Atmosphere。由于您使用的是CachedThreadPool,这在某种程度上是意料之中的。我使用Tomcat 7.0.27、共享线程池和NIO连接器进行了测试。如果您使用BIO连接器,EMPTY_DESTROY将无法工作,因为无法检测到断开连接。

因此,我建议您使用有界线程池,这样就不会有太多线程在运行。

看看这是不是你要面对的。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/21020819

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档