首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多线程套接字服务器高负载

多线程套接字服务器高负载
EN

Code Review用户
提问于 2014-04-22 06:09:36
回答 1查看 9.1K关注 0票数 8

我试图为QuizUp做一个后端,就像应用程序一样:用户连接到服务器,发送凭据,并与另一个用户配对。在该服务器处理每一对之后,周期性地将服务器消息以一对的形式发送给每个用户,并在它们之间重定向用户的宅院。

服务器类:

代码语言:javascript
复制
private static class Server{

    private static final int NUM_THREADS = 2400;

    private ExecutorService executorService;

    private ServerSocket serverSocket; 

    private int listeningPort;

    public volatile boolean isRunning;  

    private Thread mainThread;

    private volatile Map<String, Conn> playRequests;

    public Server(int port){

        try {
            executorService = Executors.newFixedThreadPool(NUM_THREADS);
            listeningPort = port;
            serverSocket = new ServerSocket(listeningPort);
            isRunning = true;
            playRequests = new ConcurrentHashMap<String, Conn>();
            mainThread = new Thread(new Runnable(){

                @Override
                public void run() {
                    handleIncomingConnections();
                }
            });

        } catch (IOException e) {
            System.out.println(e.toString());
        }
    }

    public void run(){
        mainThread.start();
    }

    private void handleIncomingConnections(){
        while(isRunning){               
            try {
                final Socket client = serverSocket.accept();
                Runnable gameRunnable = new Runnable(){

                    @Override
                    public void run() {
                        try{
                            BufferedReader reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
                            PrintWriter writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(client.getOutputStream())), true);  

                            String read = null;
                            String id = null;
                            boolean isRequesting = false;
                            String rid = null;

                            while(!(read = reader.readLine()).equals("FIN_1")){
                                String[] str = read.split("#");
                                if(str[0].equals("id")){
                                    id = str[1];
                                }else if(str[0].equals("isRequesting")){
                                    isRequesting = (str[1].equals("1"));
                                }else if(str[0].equals("rid")){
                                    rid = str[1];
                                }
                            }
                            Conn connection = new Conn(client, isRequesting, id, writer, reader);

                            if(isRequesting){
                                playRequests.put(rid, connection);
                            }else{
                                if(playRequests.containsKey(id)){
                                    Conn conn = playRequests.get(id);
                                    playRequests.remove(id);
                                    handleGame(conn, connection);
                                }
                            }                               
                        }catch(Exception e){
                            System.out.println(e.toString());
                        }
                    }                       
                };

                executorService.execute(gameRunnable);

            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }           
        }
    }

    private void handleGame(Conn a, Conn b){

        new GameHandler(a, b).execute();

    }
}

GameHandler类:

代码语言:javascript
复制
private class GameHandler{

        private volatile Conn a;
        private volatile Conn b;    

        private Thread aReadThread;
        private Thread bReadThread;
        private Thread messageThread;

        private Runnable aReadRunnable;
        private Runnable bReadRunnable;
        private Runnable messageRunnable;

        private volatile PrintWriter aWriter;
        private volatile PrintWriter bWriter;

        private volatile BufferedReader aReader;
        private volatile BufferedReader bReader;

        private volatile boolean aIsReady;
        private volatile boolean bIsReady;

        private volatile boolean isGameRunning;
        public GameHandler(final Conn s1, final Conn s2){
            this.a = s1;            
            this.b = s2;
            isGameRunning = true;
            try {
                aWriter = a.writer;
                bWriter = b.writer;
                aReader = a.reader;
                bReader = b.reader;
            } catch (Exception e) {
                try {
                    isGameRunning = false;
                    a.close();
                    b.close();
                } catch (IOException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
                System.out.println(e.toString());
            }

            messageRunnable = new Runnable(){

                @Override
                public void run() {
                    System.out.println(a.id + " " + b.id);

                    messageThread = Thread.currentThread();
                    for(int i = 0; i < 6; i++){
                        if(isGameRunning){
                            try{
                                Thread.sleep(4000);
                            }catch(InterruptedException e){

                            }
                        }
                    }

                    //end game
                    isGameRunning = false;
                    try {
                        a.close();
                        b.close();
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            };

            aReadRunnable = new Runnable(){

                @Override
                public void run() {     
                    aReadThread = Thread.currentThread();
                    String line = null;
                    try {
                        while (isGameRunning && (line = aReader.readLine()) != null &&  !(line = aReader.readLine()).equals("FIN")){
                            bWriter.println(line);
                        }
                        a.close();
                        System.out.println(a.id + " done");
                    } catch (Exception e) {
                        try {
                            isGameRunning = false;
                            a.close();
                            b.close();
                        } catch (IOException e1) {
                            // TODO Auto-generated catch block
                            e1.printStackTrace();
                        }
                        System.out.println(e.toString());
                    }
                }

            };

            bReadRunnable = new Runnable(){

                @Override
                public void run() {
                    bReadThread = Thread.currentThread();
                    String line = null;
                    try {
                        while (isGameRunning && (line = bReader.readLine()) != null &&  !(line = bReader.readLine()).equals("FIN")){
                            aWriter.println(line);
                        }
                        b.close();
                        System.out.println(b.id + " done");
                    } catch (Exception e) {
                        try {
                            isGameRunning = false;
                            a.close();
                            b.close();
                        } catch (IOException e1) {
                            // TODO Auto-generated catch block
                            e1.printStackTrace();
                        }
                        System.out.println(e.toString());
                    }
                }                   
            };
        }           

        public void execute() {
            executorService.execute(messageRunnable);
            executorService.execute(aReadRunnable);
            executorService.execute(bReadRunnable);         
        }           
    } 

和一个容器类,供每个用户保存打开的套接字、入/出流、凭据等:

代码语言:javascript
复制
private class Conn{
        public Socket s;
        public boolean isRequesting;
        public PrintWriter writer;
        public String id;
        public BufferedReader reader;
        Conn(Socket s, boolean isRequesting, String id, PrintWriter writer, BufferedReader reader){
            this.s = s;
            this.isRequesting = isRequesting;
            this.id = id;   
            this.writer = writer;
            this.reader = reader;
        }

        public void close() throws IOException{
            s.close();
        }
    }

其逻辑如下:

服务器有一个mainThread,它接受传入的连接并创建客户端套接字。对于每一个新的套接字,它都会创建一个gameRunnable,其中它侦听客户端的凭据(不管这个客户端是请求连接的那个客户端,它要连接的用户的id,它本身的id )。接收凭据后,服务器将创建一个新的Conn对象,存储所有信息( id,以及套接字和入/出流,因此以后不必再打开它),并将其放置在地图(playRequests)中,并以请求的用户id作为键。如果映射中有匹配的对,服务器将为这两个Conn对象创建一个新的GameHandler (所有这些仍然在gameRunnable中)。每个GameHandler包含三个Runnable:从服务器向两个用户发送消息的messageRunnable和从两个套接字读取传入数据的两个Runnable (aReadRunnablebReadRunnable)。因此,基本上,每个通信会话(游戏)需要4个线程(1个获得证书并启动游戏,3个线程在游戏结束前维护游戏)。以下是我要问的问题:

  1. 您在这里看到了什么设计/实现问题吗?请尽量挑剔,因为我真的不希望它在高负荷下坠毁。如果您看到smth,欢迎您给出您的解决方案。
  2. 我知道拥有大量和不受控制的线程是一种糟糕的做法,所以我使用带有固定线程池的executer来执行所有Runnable。但是,由于游戏的特性,我不能让请求连接的用户等待池中的空线程,如果我有大量的传入连接,那么显然会发生什么。那么这里线程池的使用是否合理呢?如果是的话,我应该使用多少线程,假设我每次需要4个线程,并且每个游戏大约持续2分钟。
  3. 我是否正确地关闭了所有的插座?有内存泄漏吗?

其他问题涉及服务器部署:

  1. 我计划在亚马逊的EC2上运行它。我应该为此使用Tomcat服务器,还是可以将其作为JVM上的普通java程序运行?
  2. 我在我的笔记本电脑上测试了它,并且有许多同时连接,堆大小不足以处理所有这些连接。我应该在午餐前尽量增加堆大小,还是会影响性能?
EN

回答 1

Code Review用户

回答已采纳

发布于 2014-04-22 18:48:02

阻塞I/O不能很好地扩展

阻塞I/O通常需要线程和流之间的1:1耦合。瘦客户机可以避免使用阻塞I/O,因为它们不会打开100+连接。对于连接持久的服务器来说,这是不可行的。

您已经注意到这一点,因为您的内存需求激增。考虑一下,如果一个线程在堆栈空间中只使用256 in (甚至1MB),并且运行2400个线程,那么只为您的执行器运行600 1MB(或2400 1MB)。

输入非阻塞I/O

Java1.4引入了非阻塞I/O (NIO)来绕过1:1线程到流的耦合。NIO的工作原理非常类似于GUI中的事件处理:您将流附加到选择器,轮询该选择器以查找有趣的事件(例如,流具有未读数据,连接已准备就绪),然后对该事件执行一些操作。就像GUI每个组件不需要单独的线程一样,NIO允许您运行一个服务器,服务器端基本上只有一个工作线程。

下面是一个在NIO下运行的东西的快速而肮脏的例子,除去执行器和简洁的清理:

代码语言:javascript
复制
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

public class Repeater {
  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel server = ServerSocketChannel.open();
    server.configureBlocking(false);
    server.socket().bind(null);
    server.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler());
    System.out.println("Listening on " + server.socket().getInetAddress() + " @ " + server.socket().getLocalPort());

    while ( selector.select() > 0 ) {
      Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
      while ( keys.hasNext() ) {
        SelectionKey key = keys.next();
        try {
          ((Handler) key.attachment()).handle(selector, key);
        } catch (Exception ex) {
          ex.printStackTrace();
          continue;
        } finally {
          keys.remove(); // [!]
        }
      }
    }
  }

  static interface Handler {
    void handle(Selector selector, SelectionKey key) throws IOException;
  }

  static final int BUFFER_SIZE_IN_BYTES = 140;
  static class AcceptHandler implements Handler {
    public void handle(Selector selector, SelectionKey key) throws IOException {
      if ( key.isAcceptable() ) {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel client = server.accept();
        client.configureBlocking(false);
        client.write(ByteBuffer.wrap(("Please leave a message no longer than " + BUFFER_SIZE_IN_BYTES + " bytes.\r\n").getBytes()));

        client.register(selector, SelectionKey.OP_READ, new ReadHandler());
      }
    }
  }

  static class ReadHandler implements Handler { 
    private final ByteBuffer myStorage = ByteBuffer.allocate(BUFFER_SIZE_IN_BYTES); 
    public void handle(Selector selector, SelectionKey key) throws IOException {
      if ( key.isReadable() ) {
        SocketChannel client = (SocketChannel) key.channel();
        client.read(myStorage);
        if ( !myStorage.hasRemaining() || new String(myStorage.array(), 0, myStorage.position()).endsWith("\n") ) {
          myStorage.flip();
          client.write(myStorage);
          myStorage.clear();
        } else {
          client.register(selector, SelectionKey.OP_READ, this);
        }
      }
    }
  }
}

在64位JVM上进行的快速测试,对于4000个连接,进程大小约为22 on,对于10000个连接,进程大小为36 on。实际使用会更多一些,因为这个示例很简单,但是您可以了解它是如何扩展的。

如果您需要在发送答复之前进行认真的处理,则仍然可以将信道处理委托给执行者。

...or考虑异步I/O

Java 7引入了名为NIO.2的异步I/O,看起来它可以取消手动控制选择器。我还没有以任何严肃的方式使用它,所以我不能评论它处理事情的能力或它的易用性,但是值得看一看哪一种方法更适合你的船。

网络库

我想你也是为了锻炼自己。如果不是,请考虑使用阿帕奇米娜来保护您不受某些复杂性的影响。

票数 8
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/47851

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档