在使用了诸如Erlang等具有轻量级并发进程的语言之后,我发现很难理解这是如何转换成Java的。假设我使用的是单个核心计算机,是否有一种方法可以执行多个并发IO绑定操作(http)?
我发现的是下面的ExecutorService和CompletableFuture。我的问题是它们是基于线程池的。默认的线程池使用core# - 1,在我使用的单个核心计算机上,它没有并发性。解决方案是只提供一个具有更多线程的自定义Executor吗?或者,在Java中,在单核计算机上是否有一种更加惯用的IO绑定并发方式?
我是在一个只有一个核心的AWS Lambda上运行这段代码。
发布于 2020-10-12 18:57:17
“默认线程池使用core# - 1,在我使用的单个核心计算机上,它没有并发性。”--为什么?并发程序可以很好地在单个核心机器上运行。它与并行性无关。
当Java线程等待I/O时,内核的调度器会将其移动到等待队列,而需要CPU时间的其他线程将被运行。因此,您可以使用任意数量的线程创建线程池,调度程序将处理并发性。即使是在一台核心机器上,这也会很好。
这里唯一的限制是您要创建的线程数。线程的默认堆栈大小将b/w 512K更改为1M。因此,这不是很好的扩展,而且在某一时刻,您将耗尽线程。在我的系统上,我可以创建大约5k的它们。像Go这样的语言通过在有限数量的内核线程上复用多个goroutines来管理这个问题。这需要按Go运行时进行调度。
如果您想缓解这一问题,您应该研究一下NIO。我编写了一个快速程序,您可以使用该程序来找出实际可以以这种方式支持多少并发连接。这应该是在进口之后运行的:
public class ConcurrentBlockingServer {
private ExecutorService pool = Executors.newCachedThreadPool();
public static void main(String[] args) {
ConcurrentBlockingServer bs = new ConcurrentBlockingServer();
try {
bs.listen();
} catch (IOException e) {
e.printStackTrace();
}
}
private void listen() throws IOException {
int connectionId = 0;
ServerSocket ss = new ServerSocket(8080);
while (true) {
Socket s = ss.accept(); // blocking call, never null
System.out.println("Connection: " + (++connectionId));
process(s);
}
}
private void process(Socket s) {
Runnable task =
() -> {
try (InputStream is = s.getInputStream();
OutputStream os = s.getOutputStream()) {
int data;
// -1 is EOF, .read() is blocking
while (-1 != (data = is.read())) {
os.write(flipCase(data));
os.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
};
pool.submit(task);
}
private int flipCase(int input) {
if (input >= 65 && input <= 90) {
return input + 32;
} else if (input >= 97 && input <= 122) {
return input - 32;
} else {
return input;
}
}
}运行这个程序,看看你能建立多少连接。
public class RogueClient {
private static long noClients = 9000;
public static void main(String[] args) {
for (int i = 0; i < noClients; i++) {
try {
new Socket("localhost", 8080);
System.out.println("Connection No: " + i);
} catch (IOException e) {
System.err.println("Exception: " + e.getMessage() + ", for connection: " + i);
}
}
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}编辑:池的大小应该取决于程序的性质。如果这是一个I/O绑定任务,您可以继续创建多个线程。但是对于CPU绑定程序,线程的数量应该等于内核的数量。
发布于 2020-10-12 18:09:54
线程是Java中的基本(也是唯一的)并发机制。ExecutorService有用于创建具有任意数量线程的线程池的方法;此外,CompletableFuture不限于ForkJoinPool.commonPool。具有线程数=核数的线程数只是CPU绑定任务的方便方法。
发布于 2020-10-12 18:38:50
对于IO绑定工作,您可以像前面提到的那样分配更多线程,当它们执行IO时会被阻塞,因此其他线程仍然可以利用CPU。
您还可以查看项目反应堆,如果您不想手动处理这个特定调度程序的线程,它就会派上用场:https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html#boundedElastic--
https://stackoverflow.com/questions/64323108
复制相似问题