我正在尝试使用ExecutorService和它的函数invokeAll用Java语言编写一个程序。我的问题是:invokeAll函数是否同时解决这些任务?我的意思是,如果我有两个处理器,会不会同时有两个工作者?因为aI不能让它正确缩放。如果我给newFixedThreadPool(2)或1,它需要同样的时间来完成这个问题。
List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
tasks.add(new Map(ps, keyWords));
}
list = executor.invokeAll(tasks);Map是一个实现Callable的类,wp是一个部分解决方案的向量,一个在不同时间保存一些信息的类。
为什么它不能扩展?可能的问题是什么?
这是PartialSolution的代码:
import java.util.HashMap;
import java.util.Vector;
public class PartialSolution
{
public String fileName;//the name of a file
public int b, e;//the index of begin and end of the fragment from the file
public String info;//the fragment
public HashMap<String, Word> hm;//here i retain the informations
public HashMap<String, Vector<Word>> hmt;//this i use for the final reduce
public PartialSolution(String name, int b, int e, String i, boolean ok)
{
this.fileName = name;
this.b = b;
this.e = e;
this.info = i;
hm = new HashMap<String, Word>();
if(ok == true)
{
hmt = new HashMap<String, Vector<Word>>();
}
else
{
hmt = null;
}
}
}这是Map的代码:
public class Map implements Callable<PartialSolution>
{
private PartialSolution ps;
private Vector<String> keyWords;
public Map(PartialSolution p, Vector<String> kw)
{
this.ps = p;
this.keyWords = kw;
}
@Override
public PartialSolution call() throws Exception
{
String[] st = this.ps.info.split("\\n");
for(int j = 0 ; j < st.length ; j++)
{
for(int i = 0 ; i < keyWords.size() ; i++)
{
if(keyWords.elementAt(i).charAt(0) != '\'')
{
int k = 0;
int index = 0;
int count = 0;
while((index = st[j].indexOf(keyWords.elementAt(i), k)) != -1)
{
k = index + keyWords.elementAt(i).length();
count++;
}
if(count != 0)
{
Word wr = this.ps.hm.get(keyWords.elementAt(i));
if(wr != null)
{
Word nw = new Word(ps.fileName);
nw.nrap = wr.nrap + count;
nw.lines = wr.lines;
int grep = count;
while(grep > 0)
{
nw.lines.addElement(ps.b + j);
grep--;
}
this.ps.hm.put(keyWords.elementAt(i), nw);
}
else
{
Word nw = new Word(ps.fileName);
nw.nrap = count;
int grep = count;
while(grep > 0)
{
nw.lines.addElement(ps.b + j);
grep--;
}
this.ps.hm.put(keyWords.elementAt(i), nw);
}
}
}
else
{
String regex = keyWords.elementAt(i).substring(1, keyWords.elementAt(i).length() - 1);
StringBuffer sb = new StringBuffer(regex);
regex = sb.toString();
Pattern pt = Pattern.compile(regex);
Matcher m = pt.matcher(st[j]);
int count = 0;
while(m.find())
{
count++;
}
if(count != 0)
{
Word wr = this.ps.hm.get(keyWords.elementAt(i));
if(wr != null)
{
Word nw = new Word(this.ps.fileName);
nw.nrap = wr.nrap + count;
nw.lines = wr.lines;
int grep = count;
while(grep > 0)
{
nw.lines.addElement(ps.b + j);
grep--;
}
this.ps.hm.put(keyWords.elementAt(i), nw);
}
else
{
Word nw = new Word(this.ps.fileName);
nw.nrap = count;
int grep = count;
while(grep > 0)
{
nw.lines.addElement(ps.b + j);
grep--;
}
this.ps.hm.put(keyWords.elementAt(i), nw);
}
}
}
}
}
this.ps.info = null;
return this.ps;
}
}所以在Map中,我从片段中取出每一行,搜索每个表达式,出现的次数,并保存行数。在处理完所有片段之后,在同一个PartialSolution中,我将信息保存在一个散列映射中,并返回新的PartialSolution。在下一步中,我将组合具有相同fileName的PartialSolutions,并在一个可调用的类Reduce中引入它们,Reduce与map相同,不同之处在于它进行其他操作,但也返回一个PartialSolution。
以下是运行Map任务的代码:
List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
tasks.add(new Map(ps, keyWords));
}
list = executor.invokeAll(tasks);在task中,我创建类型为Map的任务,并在列表中获取它们。我不知道如何读取JVM线程转储。我希望我给你的信息足够好。如果有帮助的话,我在NetBeans 7.0.1中工作。
谢谢你,亚历克斯
发布于 2011-11-29 17:46:50
我想知道的是,如果我创建了一个有10个线程的ExcutorService,那么invokeAll方法将同时解决10个任务,还是一次解决一个任务?
如果您将10个任务提交给一个具有10个线程的ExecutorService,它将同时运行所有这些任务。它们是否能够完全并行并相互独立地进行,取决于它们正在做什么。但它们各自都有自己的线程。
和另一个问题,如果我说list.get(i).get(),它将在求解后返回PartialSolution?
是的,它将阻塞,直到计算完成(如果还没有完成),并返回结果。
我真的不明白为什么当我使用2个线程而不是1个线程的时候,时间没有改善。
我们需要看到更多的代码。它们会在某些共享数据上进行同步吗?这些任务需要多长时间?如果它们非常短,您可能不会注意到任何差异。如果它们花费的时间更长,请查看JVM线程转储,以验证所有这些线程是否都在运行。
发布于 2011-11-30 06:07:15
如果创建具有两个线程的线程池,则会同时运行两个任务。
我发现有两件事可能会导致两个线程占用与一个线程相同的时间。
如果只有一个Map任务占用了您的大部分时间,那么额外的线程不会使该任务运行得更快。它不能比最慢的作业完成得更快。
另一种可能性是您的map任务经常从共享向量读取。这可能会导致足够的争用,从而抵消拥有两个线程所带来的收益。
您应该在jvisualvm中启动此命令,以查看每个线程都在做什么。
发布于 2016-01-30 11:36:32
Java8在Executors - newWorkStealingPool中又引入了一个API来创建工作窃取池。您不必创建RecursiveTask和RecursiveAction,但仍然可以使用ForkJoinPool。
public static ExecutorService newWorkStealingPool()使用所有可用的处理器作为其目标并行级别来创建工作窃取线程池。
默认情况下,它将接受CPU核心数作为并行度的参数。如果您有核心CPU,则可以有8个线程来处理工作任务队列。
Work stealing of idle worker threads from busy worker threads improves overall performance. Since task queue is unbounded in nature, this ForkJoinPool is recommended for the tasks executing in short time intervals.
如果您没有共享数据、共享锁定(同步)和线程间通信,那么ExecutorService、ForkJoinPool或ThreadPoolExecutor的性能都会很好。如果任务队列中所有任务都是相互独立的,则性能将得到提高。
用于自定义和控制任务工作流的ThreadPoolExecutor构造函数:
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)查看相关SE问题:
https://stackoverflow.com/questions/8307423
复制相似问题