我正在使用下面的代码片段同时处理java集合。基本上,我使用TaskExecutors处理多个线程中的集合,这些线程根据事务id检查集合中的重复事务。除了重复检查之外,事务之间没有任何关系。
我想知道下面的代码是否有并发问题?
public class Txn {
private long id;
private String status;
@Override
public boolean equals(Object obj) {
return this.getId() == ((Txn) obj).getId();
}
}
public class Main {
public static void main(String[] args) throws Exception {
List<Txn> list = new ArrayList<Txn>();
List<Txn> acceptedList = new ArrayList<Txn>();
List<Txn> rejectedList = new ArrayList<Txn>();
for (long i = 0; i < 10000l; i++) {
Txn txn = new Txn();
txn.setId(i % 1000);
list.add(txn);
}
final ConcurrentHashMap<Long, Integer> map = new ConcurrentHashMap<>();
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (int i = 0; i < list.size(); i++) {
final Txn txn = list.get(i);
Callable<Void> callable = new Callable<Void>() {
@Override
public Void call() throws Exception {
if (map.putIfAbsent(txn.getId(), 1) != null) {
txn.setStatus("duplicate");
}
return null;
}
};
executorService.submit(callable);
}
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
for (Txn txn : list) {
if (txn.getStatus() != null && txn.getStatus().equalsIgnoreCase("duplicate")) {
rejectedList.add(txn);
} else {
acceptedList.add(txn);
}
}
Set<Txn> set = new HashSet<>(acceptedList);
if (set.size() != acceptedList.size()) {
throw new Exception("11111111");
}
System.out.println(acceptedList.size());
System.out.println(rejectedList.size());
}
} 谢谢你的评论。谢谢
发布于 2014-10-23 07:56:02
下面的代码使用divide方法将事务列表拆分为多个小列表,并在单独的线程中处理每个列表,然后将每个分区列表合并到一个列表中。正如sturcotte06建议的那样,需要重写hashCode方法以将重复的事务保持在相同的列表中。该方法的主要优点是在使用HashMap时不存在竞争条件。
public class MainDivideAndConquer {
public static void main(String[] args) throws Exception {
List<Txn> list = new ArrayList<Txn>();
List<Txn> acceptedList = new ArrayList<Txn>();
List<Txn> rejectedList = new ArrayList<Txn>();
for (long i = 0; i < 10000000l; i++) {
Txn txn = new Txn();
txn.setId(i % 1000);
txn.setStatus("sadden");
list.add(txn);
}
long t1 = System.nanoTime();
int cpuCount = Runtime.getRuntime().availableProcessors();
final List<Txn>[] splittedArray = split(list, cpuCount);
ExecutorService executorService = Executors.newFixedThreadPool(cpuCount);
List<Future<List<Txn>>> futures = new ArrayList<>();
for (int i = 0; i < cpuCount; i++) {
final List<Txn> splittedList = splittedArray[i];
System.out.println("list size:" + splittedList.size());
Callable<List<Txn>> callable = new Callable<List<Txn>>() {
Map<Long, Integer> map = new HashMap<Long, Integer>();
@Override
public List<Txn> call() throws Exception {
for (Txn txn : splittedList) {
if (map.containsKey(txn.getId())) {
txn.setStatus("duplicate");
} else {
map.put(txn.getId(), 1);
}
}
return splittedList;
}
};
futures.add(executorService.submit(callable));
}
for (int i = 0; i < futures.size(); i++) {
Future<List<Txn>> future = futures.get(i);
for (Txn txn : future.get()) {
if (txn.getStatus() != null && txn.getStatus().equalsIgnoreCase("duplicate")) {
rejectedList.add(txn);
} else {
acceptedList.add(txn);
}
}
}
executorService.shutdown();
long t2 = System.nanoTime();
System.out.println("Time taken:" + (t2 - t1) / 1000000000);
System.out.println(acceptedList.size());
System.out.println(rejectedList.size());
}
public static List<Txn>[] split(List<Txn> transactions, int n) {
List[] splitResult = new List[n];
for (int i = 0; i < n; i++) {
splitResult[i] = new ArrayList<>();
}
for (Txn txn : transactions) {
splitResult[txn.hashCode() % n].add(txn);
}
return splitResult;
}
}发布于 2014-10-23 02:46:05
你应该采取分而治之的方法来充分利用并行性。让您的事务类扩展hashCode():
public class Transaction {
...
@Override
public int hashCode() {
// Not really good hash fucntion, but I don't know your object
return this.id * 53 * 47 * 13;
}
}
public class Transaction {
...
@Override
public int hashCode() {
// Not really good hash fucntion, but I don't know your object
return this.id * 53 * 47 * 13;
}
}然后,创建一个方法,该方法将根据每个事务的哈希代码拆分事务。由于hashCode()应该为相等的对象返回相同的值,重复项最终将在同一个较小的集合中结束:
public Collection<Transaction>[] split(Collection<Transaction> transactions, int n) {
Collection<Transaction>[] splitResult = new Collection<Transaction>[n];
for (int i = 0; i < n; i++) {
splitResult[i] = new ArrayList<>();
}
for (Transaction transaction : transactions) {
splitResult[transaction.hashCode() % n].add(transaction);
}
return splitResult;
}https://stackoverflow.com/questions/26520262
复制相似问题