我正在用java做一些多线程工作,我有一个关于未来get方法的问题。
我有一个提交n个可调用的ExecutorSevice。提交返回一个添加到列表中的Future (稍后在某些代码中)。可调用的call方法循环遍历一个列表,对于列表的每一项,调用someMethod并返回一个CompletableFuture<T>。然后,将可完成的未来添加到地图中。
在循环的末尾,call返回包含一组CompletableFuture的映射。
因此,初始列表(uglyList)包含<String, CompletableFuture<String>>的n个映射。
我的问题是,当我对get的一个元素调用uglyList时,执行会被阻塞,直到访问了整个列表(作为参数传递给可调用的),并将所有的CompletableFutures插入到映射中,对吗?
因为,我有疑问,这个例子中的get是否也在等待地图中CompletableFutures的完成?我不知道怎么测试这个
public class A {
private class CallableC implements Callable<Map<String, CompletableFuture<String>>> {
private List<String> listString;
Map<String, CompletableFuture<String>> futureMap;
public CallableC(List<String> listString) throws Exception {
this.listString = listString;
this.futureMap = new HashMap<>();
}
@Override
public Map<String, CompletableFuture<String>> call() throws Exception {
for (int j = 0; j < listString.size(); ++j) {
CompletableFuture<String> future = someMethod();
futureMap.put(listString.get(i), future);
}
return futureMap;
}
}
public void multiThreadMethod(List<String> items) throws Exception {
List<Future<Map<String, CompletableFuture<String>>>> uglyList = new ArrayList<>();
int coresNumber = (Runtime.getRuntime().availableProcessors());
// This method split a List in n subList
List<List<String>> splittedList = split(items, coresNumber);
ExecutorService executor = Executors.newFixedThreadPool(coresNumber);
try {
for (int i = 0; i < coresNumber; ++i) {
threads.add(executor.submit(new CallableC(splittedList.get(i),)));
}
for (int i = 0; i < coresNumber; ++i) {
uglyList.get(i).get(); //here
}
} catch (Exception e) {
// YAY errors
}
finally {
executor.shutdown();
}
}
}发布于 2019-11-06 12:01:43
当我在uglyList的一个元素上调用get时,执行会被阻塞,直到访问了整个列表(作为参数传递给可调用的),并且所有的CompletableFutures都被插入到映射中了,对吗?
是的那是对的。get()将等待Callable的完成,在您的示例中,这意味着CallableC#call()方法的结束,该方法将在for循环结束时到达,此时CompletableFutures都被创建并插入到futureMap中,而不管这些CompletableFuture的状态/完成情况如何。
这个例子中的get可能也在等待地图中CompletableFutures的完成吗?
实现这一点是可能的,但没有附加代码是不可能的。
在代码中这样做的最小方法是在任务中对CompletableFuture添加一个连接调用。
@Override
public Map<String, CompletableFuture<String>> call() throws Exception {
for (int j = 0; j < listString.size(); ++j) {
CompletableFuture<String> future = someMethod();
futureMap.put(listString.get(j), future);
}
// This will wait for all the futures inside the map to terminate
try {
CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])).join();
} catch (CompletionException e) {
// ignored - the status will be checked later on
}
return futureMap;
}另一种方法是在最后等待(为了简洁起见,我跳过异常处理):
for (int i = 0; i < coresNumber; ++i) {
threads.add(executor.submit(new CallableC(splittedList.get(i),)));
}
// This will wait for all the futures to terminate (thanks to f.get())
// And then, all the completable futures will be added to an array of futures that will also be waited upon
CompletableFuture.allOf(
uglyList.stream().flatMap(f -> f.get().values().stream()).toArray(CompletableFuture[]::new)
).join();但是,如果可能的话,我会简化整个代码,只使用可完成的未来,除非您有很好的理由以这样的方式中断任务(使用列表列表、专用执行器等等)。这个看起来像这样(也许几个字)
List<String> items = ...
ConcurrentMap<String, String> resultsByItem = new ConcurrentHashMap<>();
Future<Void> allWork = CompletableFuture.allOf(
items.stream()
.map(item -> someWork(item) // Get a completable future for this item
.andThen(result -> resultsByItem.put(item, result)) // Append the result to the map
).toArray(CompletableFuture[]::new)
);
allWork.join();
// Your futureMap is completed.这将取代问题中的整个代码。
https://stackoverflow.com/questions/58729011
复制相似问题