因此,我有一个函数,它在下面所示的一个大的未排序的数字数组中找到一个大于N的数字。
import java.util.*;
public class program {
// Linear-search function to find the index of an element
public static int findIndex(int arr[], int t)
{
// if array is Null
if (arr == null) {
return -1;
}
// find length of array
int len = arr.length;
int i = 0;
// traverse in the array
while (i < len) {
// if the i-th element is t
// then return the index
if (arr[i] > t) {
return i;
}
else {
i = i + 1;
}
}
return -1;
}
// Driver Code
public static void main(String[] args)
{
int[] my_array = { 5, 4, 6, 1, 3, 2, 7, 8, 9 };
int i = findIndex(my_array, 7);
// find the index of 5
System.out.println("Index position of 5 is: "
+ my_array[i]);
}
}但我必须找到一种方法来并行地实现这一点。我不知道如何开始,也不知道该怎么做,就像我在并行编程领域刚刚起步一样。
任何帮助都将不胜感激。
发布于 2021-01-23 11:39:30
最直接的方法是使用平行流,@Govinda很好地说明了这一点。
但是,如果您想使用此示例学习如何使用线程,那么可以按照以下步骤对代码进行并行化:
要创建线程,我们可以执行以下操作:
Thread[] threads = new Thread[total_threads];
for(int t = 0; t < threads.length; t++) {
threads[t] = new Thread(/** the parallel work **/);
threads[t].start();
}要将工作分配给线程,我们需要在线程之间拆分数组。最简单的方法实际上是拆分迭代,而不是数组本身。线程接收整个数组,但只处理它的某些位置,例如:
private final static int NO_FOUND = -1;
// Linear-search function to find the index of an element
public static int findIndex(int[] arr, int t, int threadId, int total_threads){
for (int i = threadId; i < arr.length; i += total_threads)
if ( arr[i] > t)
return i;
return NO_FOUND;
}对于每个线程,我们从0 to N-1分配一个0 to N-1范围,其中N是线程的总数。
要通知线程,我们可以在线程之间使用共享原子整数,用于更新找到的值的索引。因此,最后的代码如下所示:
public class program {
private final static int NO_FOUND = -1;
// Linear-search function to find the index of an element
public static int findIndex(int[] arr, int t, int threadId, int total_threads, AtomicInteger shared_index){
for (int i = threadId; i < arr.length && shared_index.get() == -1; i += total_threads)
if ( arr[i] > t)
return i;
return NO_FOUND;
}
public static void main(String[] args) throws InterruptedException {
final int N = 8;
int[] my_array = { 5, 4, 6, 1, 3, 2, 7, 8, 9 };
int total_threads = 4;
AtomicInteger shared_index = new AtomicInteger(-1);
Thread[] threads = new Thread[total_threads];
for(int t = 0; t < threads.length; t++) {
final int thread_id = t;
threads[t] = new Thread(() ->parallel_work(N, my_array, total_threads, shared_index, thread_id));
threads[t].start();
}
for (Thread thread : threads)
thread.join();
System.out.println("Index of value bigger than " + N + " : " + shared_index.get());
}
private static void parallel_work(int n, int[] my_array, int total_threads, AtomicInteger shared_index, int thread_id) {
int index_found = findIndex(my_array, n, thread_id, total_threads, shared_index);
shared_index.compareAndExchange(NO_FOUND, index_found);
}
}输出:
Index of value bigger than 8 : 8发布于 2021-01-23 11:41:57
您可以使用并行流并行运行操作。
public static int findIndex(int arr[], int t)
{
// if array is Null
if (arr == null) {
return -1;
}
return IntStream.rangeClosed(0, arr.length)
.parallel()
.filter(index -> arr[index] > t)
.findFirst()
.orElse(-1);
}如果您这样做是为了学习,您可以将数组划分为子数组,然后将每个部分提交到一个单独的线程中,然后评估并返回结果。
首先,实现一个可调用的函数,它可以接受子数组并返回索引。
class RangeFinder implements Callable<Integer> {
private final int[] arr;
private final int startIndex;
private final int t;
RangeFinder(int[] arr, int startIndex, int t) {
this.arr = arr;
this.startIndex = startIndex;
this.t = t;
}
@Override
public Integer call() throws Exception {
for (int i = 0; i < this.arr.length; i++) {
if (arr[i] > t) {
return startIndex + i;
}
}
return -1;
}
}现在,您可以将数组划分为块,并将其提交并行处理。
public static int findIndex(int arr[], int t) throws ExecutionException, InterruptedException {
if (arr == null) {
return -1;
}
int numberOfThreads = 4;
return findIndexInParallel(arr, t, numberOfThreads);
}
private static int findIndexInParallel(int[] arr, int t, int threadCount) throws ExecutionException, InterruptedException {
if(threadCount > arr.length) {
threadCount = arr.length;
}
int startIndex = 0;
int range = (int) Math.ceil(arr.length / (double) threadCount);
int endIndex = range;
// step 1: create threads using Executor FrameWork
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
List<Future> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
// step 2: create object of callable
RangeFinder rangeFinder = new RangeFinder(Arrays.copyOfRange(arr, startIndex, endIndex), startIndex, t);
// Step 3: submit the task to thread pool
Future<Integer> submit = executorService.submit(rangeFinder);
// Step 4: recalculate array indexes for the next iteration
startIndex = startIndex + range;
int newEndIndex = endIndex + range;
endIndex = newEndIndex < arr.length ? newEndIndex : arr.length;
futures.add(submit);
}
// step 5: evaluate and return the results
for (Future future : futures) {
int index = (int) future.get();
if(index != -1) {
executorService.shutdownNow();
return index;
}
}
return -1;
}上面的代码可以进一步重构,当然,您也需要处理异常。
https://stackoverflow.com/questions/65858672
复制相似问题