我对CompletionService有一些问题。我的任务是:要对大约300个html页面进行并行解析,只需要等待所有结果5秒,然后将结果返回到主代码。我决定为此使用CompletionService +可调用。问题是如何停止由CompletionService引起的所有线程,并从该页面返回已成功解析的结果?在此代码中删除了打印行,但我可以说5秒就足够了(结果很好,但是程序等待所有线程完成)。我的代码执行了大约2分钟。
我的呼叫代码:
Collection<Callable<HCard>> solvers = new ArrayList<Callable<HCard>>();
for (final String currentUrl : allUrls) {
solvers.add(new Callable<HCard>() {
public HCard call() throws ParserException {
HCard hCard = HCardParser.parseOne(currentUrl);
if (hCard != null) {
return hCard;
} else {
return null;
}
}
});
}
ExecutorService execService = Executors.newCachedThreadPool();
Helper helper = new Helper();
List<HCard> result = helper.solve(execService, solvers);
//then i do smth with result list我调用的代码:
public class Helper {
List<HCard> solve(Executor e, Collection<Callable<HCard>> solvers) throws InterruptedException {
CompletionService<HCard> cs = new ExecutorCompletionService<HCard>(e);
int n = solvers.size();
Future<HCard> future = null;
HCard hCard = null;
ArrayList<HCard> result = new ArrayList<HCard>();
for (Callable<HCard> s : solvers) {
cs.submit(s);
}
for (int i = 0; i < n; ++i) {
try {
future = cs.take();
hCard = future.get();
if (hCard != null) {
result.add(hCard);
}
} catch (ExecutionException e1) {
future.cancel(true);
}
}
return result;
}我试图使用:
(
请帮我了解一下我的代码。
提前感谢!
发布于 2009-07-08 15:00:47
您需要确保提交的任务对中断做出正确的响应,即检查Thread.isInterrupted()或被认为是“可中断的”。
我不确定你是否需要完成服务。
ExecutorService service = ...
// Submit all your tasks
for (Task t : tasks) {
service.submit(t);
}
service.shutdown();
// Wait for termination
boolean success = service.awaitTermination(5, TimeUnit.SECONDS);
if (!success) {
// awaitTermination timed out, interrupt everyone
service.shutdownNow();
}在这一点上,如果Task对象不响应中断,那么您就无能为力了
发布于 2009-07-17 13:12:11
问题是,您总是得到每个结果,因此代码将始终运行到完成。按照下面的代码,我将使用一个CountDownLatch来完成这个任务。
另外,不要使用Executors.newCachedThreadPool --这很可能会产生很多线程(如果您的任务花费大量的时间,则最多可以生成300个线程,因为执行器不会让空闲线程的数量降到零)。
类都是内联的,这样可以更容易地将整个代码块粘贴到一个名为LotsOfTasks的类中,并运行它。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class LotsOfTasks {
private static final int SIZE = 300;
public static void main(String[] args) throws InterruptedException {
String[] allUrls = generateUrls(SIZE);
Collection<Callable<HCard>> solvers = new ArrayList<Callable<HCard>>();
for (final String currentUrl : allUrls) {
solvers.add(new Callable<HCard>() {
public HCard call() {
HCard hCard = HCardParser.parseOne(currentUrl);
if (hCard != null) {
return hCard;
} else {
return null;
}
}
});
}
ExecutorService execService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // One thread per cpu, ideal for compute-bound
Helper helper = new Helper();
System.out.println("Starting..");
long start = System.nanoTime();
List<HCard> result = helper.solve(execService, solvers, 5);
long stop = System.nanoTime();
for (HCard hCard : result) {
System.out.println("hCard = " + hCard);
}
System.out.println("Took: " + TimeUnit.SECONDS.convert((stop - start), TimeUnit.NANOSECONDS) + " seconds");
}
private static String[] generateUrls(final int size) {
String[] urls = new String[size];
for (int i = 0; i < size; i++) {
urls[i] = "" + i;
}
return urls;
}
private static class HCardParser {
private static final Random random = new Random();
public static HCard parseOne(String currentUrl) {
try {
Thread.sleep(random.nextInt(1000)); // Wait for a random time up to 1 seconds per task (simulate some activity)
} catch (InterruptedException e) {
// ignore
}
return new HCard(currentUrl);
}
}
private static class HCard {
private final String currentUrl;
public HCard(String currentUrl) {
this.currentUrl = currentUrl;
}
@Override
public String toString() {
return "HCard[" + currentUrl + "]";
}
}
private static class Helper {
List<HCard> solve(ExecutorService e, Collection<Callable<HCard>> solvers, int timeoutSeconds) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(solvers.size());
final ConcurrentLinkedQueue<HCard> executionResults = new ConcurrentLinkedQueue<HCard>();
for (final Callable<HCard> s : solvers) {
e.submit(new Callable<HCard>() {
public HCard call() throws Exception {
try {
executionResults.add(s.call());
} finally {
latch.countDown();
}
return null;
}
});
}
latch.await(timeoutSeconds, TimeUnit.SECONDS);
final List<Runnable> unfinishedTasks = e.shutdownNow();
System.out.println("There were " + unfinishedTasks.size() + " urls not processed");
return Arrays.asList(executionResults.toArray(new HCard[executionResults.size()]));
}
}
}我系统上的典型输出如下所示:
Starting..
There were 279 urls not processed
hCard = HCard[0]
hCard = HCard[1]
hCard = HCard[2]
hCard = HCard[3]
hCard = HCard[5]
hCard = HCard[4]
hCard = HCard[6]
hCard = HCard[8]
hCard = HCard[7]
hCard = HCard[10]
hCard = HCard[11]
hCard = HCard[9]
hCard = HCard[12]
hCard = HCard[14]
hCard = HCard[15]
hCard = HCard[13]
hCard = HCard[16]
hCard = HCard[18]
hCard = HCard[17]
hCard = HCard[20]
hCard = HCard[19]
Took: 5 seconds发布于 2009-07-08 15:30:47
我从未使用过CompletionService,但我确信有一个轮询(时间单位,单位)调用来执行有限数量的等待。然后检查null。测量等待的时间,5秒后停止等待。大约:
public class Helper {
List<HCard> solve(Executor e, Collection<Callable<HCard>> solvers)
throws InterruptedException {
CompletionService<HCard> cs = new ExecutorCompletionService<HCard>(e);
int n = solvers.size();
Future<HCard> future = null;
HCard hCard = null;
ArrayList<HCard> result = new ArrayList<HCard>();
for (Callable<HCard> s : solvers) {
cs.submit(s);
}
long timeleft = 5000;
for (int i = 0; i < n; ++i) {
if (timeleft <= 0) {
break;
}
try {
long t = System.currentTimeMillis();
future = cs.poll(timeleft, TimeUnit.MILLISECONDS);
timeleft -= System.currentTimeMillis() - t;
if (future != null) {
hCard = future.get();
if (hCard != null) {
result.add(hCard);
}
} else {
break;
}
} catch (ExecutionException e1) {
future.cancel(true);
}
}
return result;
}不过还没测试过。
https://stackoverflow.com/questions/1096995
复制相似问题