我设计了一个RecursiveTask
这是我设计的任务代码。
public class SearchTask extends RecursiveTask<Map<Short, Long>> {
private static final long serialVersionUID = 1L;
private int majorDataThreshold = 16001;
private ConcurrentNavigableMap<Short, Long> dataMap;
private long fromRange;
private long toRange;
private boolean fromInclusive;
private boolean toInclusive;
public SearchTask(final Map<Short, Long> dataSource, final long fromRange, final long toRange,
final boolean fromInclusive, final boolean toInclusive) {
this.dataMap = new ConcurrentSkipListMap<>(dataSource);
this.fromRange = fromRange;
this.toRange = toRange;
this.fromInclusive = fromInclusive;
this.toInclusive = toInclusive;
}
@Override
protected Map<Short, Long> compute() {
final int size = dataMap.size();
// This is not a perfect RecursiveTask, because the if condition is designed to overcome a stackoverflow error when map filled with 32k data
if (size > majorDataThreshold+1000) {
// List<SearchTask> tasks = createSubtasks();
// tasks.get(0).fork();
// tasks.get(1).fork();
// Map<Short, Long> map = new ConcurrentHashMap<>(tasks.get(0).join());
// map.putAll(tasks.get(1).join());
// return map;
return ForkJoinTask.invokeAll(createSubtasks()).stream().map(ForkJoinTask::join)
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toConcurrentMap(Entry::getKey, Entry::getValue));
}
return search();
}
private List<SearchTask> createSubtasks() {
final short lastKey = dataMap.lastKey();
final short midkey = (short) (lastKey / 2);
final short firstKey = dataMap.firstKey();
final List<SearchTask> dividedTasks = new ArrayList<>();
dividedTasks.add(
new SearchTask(new ConcurrentSkipListMap<Short, Long>(dataMap.subMap(firstKey, true, midkey, false)),
fromRange, toRange, fromInclusive, toInclusive));
dividedTasks
.add(new SearchTask(new ConcurrentSkipListMap<Short, Long>(dataMap.subMap(midkey, true, lastKey, true)),
fromRange, toRange, fromInclusive, toInclusive));
return dividedTasks;
}
private Map<Short, Long> search() {
final Map<Short, Long> result = dataMap.entrySet().stream()
.filter(serchPredicate(fromRange, toRange, fromInclusive, toInclusive))
.collect(Collectors.toConcurrentMap(p -> p.getKey(), p -> p.getValue()));
return result;
}
private static Predicate<? super Entry<Short, Long>> serchPredicate(final long fromValue, final long toValue,
final boolean fromInclusive, final boolean toInclusive) {
if (fromInclusive && !toInclusive)
return p -> (p.getValue() >= fromValue && p.getValue() < toValue);
else if (!fromInclusive && toInclusive)
return p -> (p.getValue() > fromValue && p.getValue() <= toValue);
else if (fromInclusive && toInclusive)
return p -> (p.getValue() >= fromValue && p.getValue() <= toValue);
else
return p -> (p.getValue() > fromValue && p.getValue() < toValue);
}此任务处理的最大数据量为32000 (32k)
在代码中,如果任务超过阈值,我就拆分任务
if (size > majorDataThreshold)当我试图将majorDataThreshold值减少到16001以下时,我得到了一个错误
堆栈跟踪
at java.util.concurrent.RecursiveTask.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinPool.helpStealer(Unknown Source)
at java.util.concurrent.ForkJoinPool.awaitJoin(Unknown Source)
at java.util.concurrent.ForkJoinTask.doJoin(Unknown Source)
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
...........................Same trace
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
Caused by: java.lang.StackOverflowError
... 1024 more
Caused by: java.lang.StackOverflowError
... 1024 more
.................Same trace
Caused by: java.lang.StackOverflowError
at java.util.Collection.stream(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.search(SearchTask.java:74)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:56)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
at java.util.concurrent.RecursiveTask.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)为了解决这个问题,我尝试使用
Collectors.toMap()
ConcurrentHashMap
Join Manually问题仍然没有得到解决
有人能帮我找出RecursiveTask任务中的错误所在吗?
单元测试代码
public class Container32kUniqueDataTest {
private ForkJoinRangeContainer forkJoinContianer;
@Before
public void setUp(){
long[] data = genrateTestData();
forkJoinContianer = new ForkJoinRangeContainer(data)
}
private long[] genrateTestData(){
long[] data= new long[32000];
for (int i = 0; i < 32000; i++) {
data[i]=i+1;
}
return data;
}
@Test
public void runARangeQuery_forkJoin(){
Set<Short> ids = forkJoinContianer.findIdsInRange(14, 17, true, true);
assertEquals(true, ids.size()>0);
}
} 容器代码的简略版本
public class ForkJoinRangeContainer {
private Map<Short, Long> dataSource = new HashMap<Short, Long>();
public ForkJoinRangeContainer(long[] data) {
populateData(data);
}
private void populateData(final long[] data) {
for (short i = 0; i < data.length; i++) {
dataSource.put(i, data[i]);
}
}
public Set<Short> findIdsInRange(final long fromValue, long toValue, boolean fromInclusive, boolean toInclusive) {
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
SearchTask task = new SearchTask(dataSource, fromValue, toValue, fromInclusive, toInclusive);
Map<Short, Long> map = forkJoinPool.invoke(task);
forkJoinPool.shutdown();
return map.keySet();
}
public static void main(String[] args) {
long[] data = new long[32000];
for (int i = 0; i < 32000; i++) {
data[i] = i + 1;
}
ForkJoinRangeContainer rf2 = new ForkJoinRangeContainer(data);
Set<Short> ids = rf2.findIdsInRange(14, 17, true, true);
if (ids.size() > 0) {
System.out.println("Found Ids");
}
}发布于 2018-04-07 01:00:23
在SearchTask return ForkJoinTask.invokeAll(createSubtasks())中,您陷入了一个无休止的循环
createSubtasks()使用相同的值一遍又一遍地创建子任务,因为您从未减小过dataMap大小。
F/J的工作原理是将对象拆分为左和右。每个Left和Right都创建新的Left和Right,其值为其值的一半。这种减半一直持续到你“完成工作”的临界点。
我在编程中学到的第一课就是保持简单。
你混合了Map、ArrayMap、ConcurrentSkipListMap、ConcurrentNavigableMap、List、stream.Collectors、HashMap和Set以及F/J类。最令人困惑的是,这使得它很难遵循,并且通常会导致失败。简单就是更好。
为ForkJoinTask.invokeAll()创建列表时,请在调用()之前一次创建该列表。该列表应包含完成工作所需的所有子任务,每个子任务的值是前一个子任务的一半。不要使用流;您没有流,只有列表中的几个子任务。
或者左右拆分,然后执行Left.fork() Right.fork()。然后,每个派生的任务再次拆分为值的一半,依此类推。
如何减少对象拆分的“dataMap大小”完全取决于你。
https://stackoverflow.com/questions/49667177
复制相似问题