我试图为QuizUp做一个后端,就像应用程序一样:用户连接到服务器,发送凭据,并与另一个用户配对。在该服务器处理每一对之后,周期性地将服务器消息以一对的形式发送给每个用户,并在它们之间重定向用户的宅院。
服务器类:
private static class Server{
private static final int NUM_THREADS = 2400;
private ExecutorService executorService;
private ServerSocket serverSocket;
private int listeningPort;
public volatile boolean isRunning;
private Thread mainThread;
private volatile Map<String, Conn> playRequests;
public Server(int port){
try {
executorService = Executors.newFixedThreadPool(NUM_THREADS);
listeningPort = port;
serverSocket = new ServerSocket(listeningPort);
isRunning = true;
playRequests = new ConcurrentHashMap<String, Conn>();
mainThread = new Thread(new Runnable(){
@Override
public void run() {
handleIncomingConnections();
}
});
} catch (IOException e) {
System.out.println(e.toString());
}
}
public void run(){
mainThread.start();
}
private void handleIncomingConnections(){
while(isRunning){
try {
final Socket client = serverSocket.accept();
Runnable gameRunnable = new Runnable(){
@Override
public void run() {
try{
BufferedReader reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
PrintWriter writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(client.getOutputStream())), true);
String read = null;
String id = null;
boolean isRequesting = false;
String rid = null;
while(!(read = reader.readLine()).equals("FIN_1")){
String[] str = read.split("#");
if(str[0].equals("id")){
id = str[1];
}else if(str[0].equals("isRequesting")){
isRequesting = (str[1].equals("1"));
}else if(str[0].equals("rid")){
rid = str[1];
}
}
Conn connection = new Conn(client, isRequesting, id, writer, reader);
if(isRequesting){
playRequests.put(rid, connection);
}else{
if(playRequests.containsKey(id)){
Conn conn = playRequests.get(id);
playRequests.remove(id);
handleGame(conn, connection);
}
}
}catch(Exception e){
System.out.println(e.toString());
}
}
};
executorService.execute(gameRunnable);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private void handleGame(Conn a, Conn b){
new GameHandler(a, b).execute();
}
}GameHandler类:
private class GameHandler{
private volatile Conn a;
private volatile Conn b;
private Thread aReadThread;
private Thread bReadThread;
private Thread messageThread;
private Runnable aReadRunnable;
private Runnable bReadRunnable;
private Runnable messageRunnable;
private volatile PrintWriter aWriter;
private volatile PrintWriter bWriter;
private volatile BufferedReader aReader;
private volatile BufferedReader bReader;
private volatile boolean aIsReady;
private volatile boolean bIsReady;
private volatile boolean isGameRunning;
public GameHandler(final Conn s1, final Conn s2){
this.a = s1;
this.b = s2;
isGameRunning = true;
try {
aWriter = a.writer;
bWriter = b.writer;
aReader = a.reader;
bReader = b.reader;
} catch (Exception e) {
try {
isGameRunning = false;
a.close();
b.close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
System.out.println(e.toString());
}
messageRunnable = new Runnable(){
@Override
public void run() {
System.out.println(a.id + " " + b.id);
messageThread = Thread.currentThread();
for(int i = 0; i < 6; i++){
if(isGameRunning){
try{
Thread.sleep(4000);
}catch(InterruptedException e){
}
}
}
//end game
isGameRunning = false;
try {
a.close();
b.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
aReadRunnable = new Runnable(){
@Override
public void run() {
aReadThread = Thread.currentThread();
String line = null;
try {
while (isGameRunning && (line = aReader.readLine()) != null && !(line = aReader.readLine()).equals("FIN")){
bWriter.println(line);
}
a.close();
System.out.println(a.id + " done");
} catch (Exception e) {
try {
isGameRunning = false;
a.close();
b.close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
System.out.println(e.toString());
}
}
};
bReadRunnable = new Runnable(){
@Override
public void run() {
bReadThread = Thread.currentThread();
String line = null;
try {
while (isGameRunning && (line = bReader.readLine()) != null && !(line = bReader.readLine()).equals("FIN")){
aWriter.println(line);
}
b.close();
System.out.println(b.id + " done");
} catch (Exception e) {
try {
isGameRunning = false;
a.close();
b.close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
System.out.println(e.toString());
}
}
};
}
public void execute() {
executorService.execute(messageRunnable);
executorService.execute(aReadRunnable);
executorService.execute(bReadRunnable);
}
} 和一个容器类,供每个用户保存打开的套接字、入/出流、凭据等:
private class Conn{
public Socket s;
public boolean isRequesting;
public PrintWriter writer;
public String id;
public BufferedReader reader;
Conn(Socket s, boolean isRequesting, String id, PrintWriter writer, BufferedReader reader){
this.s = s;
this.isRequesting = isRequesting;
this.id = id;
this.writer = writer;
this.reader = reader;
}
public void close() throws IOException{
s.close();
}
}其逻辑如下:
服务器有一个mainThread,它接受传入的连接并创建客户端套接字。对于每一个新的套接字,它都会创建一个gameRunnable,其中它侦听客户端的凭据(不管这个客户端是请求连接的那个客户端,它要连接的用户的id,它本身的id )。接收凭据后,服务器将创建一个新的Conn对象,存储所有信息( id,以及套接字和入/出流,因此以后不必再打开它),并将其放置在地图(playRequests)中,并以请求的用户id作为键。如果映射中有匹配的对,服务器将为这两个Conn对象创建一个新的GameHandler (所有这些仍然在gameRunnable中)。每个GameHandler包含三个Runnable:从服务器向两个用户发送消息的messageRunnable和从两个套接字读取传入数据的两个Runnable (aReadRunnable和bReadRunnable)。因此,基本上,每个通信会话(游戏)需要4个线程(1个获得证书并启动游戏,3个线程在游戏结束前维护游戏)。以下是我要问的问题:
其他问题涉及服务器部署:
发布于 2014-04-22 18:48:02
阻塞I/O通常需要线程和流之间的1:1耦合。瘦客户机可以避免使用阻塞I/O,因为它们不会打开100+连接。对于连接持久的服务器来说,这是不可行的。
您已经注意到这一点,因为您的内存需求激增。考虑一下,如果一个线程在堆栈空间中只使用256 in (甚至1MB),并且运行2400个线程,那么只为您的执行器运行600 1MB(或2400 1MB)。
Java1.4引入了非阻塞I/O (NIO)来绕过1:1线程到流的耦合。NIO的工作原理非常类似于GUI中的事件处理:您将流附加到选择器,轮询该选择器以查找有趣的事件(例如,流具有未读数据,连接已准备就绪),然后对该事件执行一些操作。就像GUI每个组件不需要单独的线程一样,NIO允许您运行一个服务器,服务器端基本上只有一个工作线程。
下面是一个在NIO下运行的东西的快速而肮脏的例子,除去执行器和简洁的清理:
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;
public class Repeater {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind(null);
server.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler());
System.out.println("Listening on " + server.socket().getInetAddress() + " @ " + server.socket().getLocalPort());
while ( selector.select() > 0 ) {
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while ( keys.hasNext() ) {
SelectionKey key = keys.next();
try {
((Handler) key.attachment()).handle(selector, key);
} catch (Exception ex) {
ex.printStackTrace();
continue;
} finally {
keys.remove(); // [!]
}
}
}
}
static interface Handler {
void handle(Selector selector, SelectionKey key) throws IOException;
}
static final int BUFFER_SIZE_IN_BYTES = 140;
static class AcceptHandler implements Handler {
public void handle(Selector selector, SelectionKey key) throws IOException {
if ( key.isAcceptable() ) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.write(ByteBuffer.wrap(("Please leave a message no longer than " + BUFFER_SIZE_IN_BYTES + " bytes.\r\n").getBytes()));
client.register(selector, SelectionKey.OP_READ, new ReadHandler());
}
}
}
static class ReadHandler implements Handler {
private final ByteBuffer myStorage = ByteBuffer.allocate(BUFFER_SIZE_IN_BYTES);
public void handle(Selector selector, SelectionKey key) throws IOException {
if ( key.isReadable() ) {
SocketChannel client = (SocketChannel) key.channel();
client.read(myStorage);
if ( !myStorage.hasRemaining() || new String(myStorage.array(), 0, myStorage.position()).endsWith("\n") ) {
myStorage.flip();
client.write(myStorage);
myStorage.clear();
} else {
client.register(selector, SelectionKey.OP_READ, this);
}
}
}
}
}在64位JVM上进行的快速测试,对于4000个连接,进程大小约为22 on,对于10000个连接,进程大小为36 on。实际使用会更多一些,因为这个示例很简单,但是您可以了解它是如何扩展的。
如果您需要在发送答复之前进行认真的处理,则仍然可以将信道处理委托给执行者。
Java 7引入了名为NIO.2的异步I/O,看起来它可以取消手动控制选择器。我还没有以任何严肃的方式使用它,所以我不能评论它处理事情的能力或它的易用性,但是值得看一看哪一种方法更适合你的船。
我想你也是为了锻炼自己。如果不是,请考虑使用阿帕奇米娜来保护您不受某些复杂性的影响。
https://codereview.stackexchange.com/questions/47851
复制相似问题