假设我们有一个这样的工人列表:
List<Worker> workers = new ArrayList<>();
workers.add(new Worker(1));
workers.add(new Worker(2));
workers.add(new Worker(3));
workers.add(new Worker(4));
workers.add(new Worker(5));我想找到第一个完成工作的工人,所以:
Worker first = workers.parallelStream().filter(Worker::finish).findFirst().orElse(null);但有一个问题,我不想等待所有的工人完成他们的工作,然后找到第一个,但第一个工人,他一完成他的工作,!
public class Test {
public static void main(String[] args) {
List<Worker> workers = new ArrayList<>();
workers.add(new Worker(1));
workers.add(new Worker(2));
workers.add(new Worker(3));
workers.add(new Worker(4));
workers.add(new Worker(5));
Worker first = workers.parallelStream().filter(Worker::finish).findFirst().orElse(null);
if (first != null) {
System.out.println("id : " + first.id);
}
}
static class Worker {
int id;
Worker(int id) {
this.id = id;
}
boolean finish() {
int t = id * 1000;
System.out.println(id + " -> " + t);
try {
Thread.sleep(t);
} catch (InterruptedException ignored) {
}
return true;
}
}
}有什么方法可以用java.util.Stream来实现吗?
谢谢。
发布于 2014-12-20 06:09:08
当您使用finish方法作为流的筛选器时,这意味着为了计算特定工作人员的筛选器谓词,工人必须完成其工作。
但是,当您以并行流的形式运行此代码时,可能同时将筛选器应用于多个Worker,在这种情况下,第一个完成的筛选器将为您提供输出。但是,您无法控制并行流将使用多少线程。它可能决定在同一个线程上处理某些Worker,在这种情况下,其中一些根本不会被处理(因为您的终端操作要求只有一个工作人员完成它的处理)。
因此,如果您的目标是同时为所有员工执行finish,则不能使用流(甚至不是并行流)。
发布于 2014-12-20 13:26:33
您似乎对Stream有严重的误解。Stream不是用来启动工作人员的。事实上,如果您使用findFirst,可能会发生这样的情况:除了第一个工作程序之外,它不会启动任何工作人员。因此,它也不会等待“所有工作人员完成”,而只等待当前挂起的线程。但是,由于您有一个相当小的流,所以可能已经启动了所有的工作人员,因为您的环境中可用的线程数量是一样多。但这不是一种有保障的行为。
请注意,如果使用顺序流而不是并行流,则肯定只处理第一项(因为它返回true),而不处理其他项。但是由于流实现不能预测结果,它将尊重您通过并行执行“加速”操作的请求,并且可能会提前使用更多线程来处理更多的项。
发布于 2019-06-10 12:40:16
我知道这是个老问题,但在这里找到了一些很好的解决方案:https://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/
在InvokeAny下
批处理提交可调用的另一种方法是invokeAny()方法,它的工作方式与invokeAll()略有不同。该方法不返回未来的对象,而是阻塞,直到第一个可调用终止并返回该可调用的结果。
将流更改为可调用的集合。看起来真的很干净。
https://stackoverflow.com/questions/27577337
复制相似问题