我有个大学
Uni<List<String>> list = Uni.createFrom().item(List.of("a", "b", "c"));我想把这个列表中的每个项目映射到一个新的Uni中,然后异步地处理它们。
我试过以下几点:
list.map(strings -> strings.stream().map(this::processItem).toList())
.flatMap(unis -> Uni.join().all(unis).andCollectFailures())
.subscribe()
.with(System.out::println);
private Future<String> process(String s) {
return executor.submit(() -> {
System.out.println("start " + s);
Thread.sleep(5000);
System.out.println("end " + s);
return s;
});
}
private Uni<Void> processItem(String item) {
return Uni.createFrom().future(process(item))
.replaceWithVoid();
}但是,这只处理列表的第一个元素。如何将Uni映射到列表并异步处理每个Uni?
发布于 2022-11-14 09:27:50
.subscribe().with(System.out::println);是异步的,如果不添加一些检查以确保流已经结束,程序将在第一个结果可用之前退出。
我通常和Vert.x单位一起工作
@Test
public void test(TestContext context) {
Random random = new Random();
// This is only used to signal the test
// when the stream is terminated
Async async = context.async();
Uni.createFrom()
.item( List.of( "a", "b", "c" ) )
// Convert the Uni<List<String>> into a Multi<String>
.onItem().transformToMulti( Multi.createFrom()::iterable )
.onItem().transformToUniAndMerge( s ->
// You can create a uni the way you prefer
Uni.createFrom().item( s )
// Add some random duration
.onItem().delayIt().by( Duration.ofMillis( (random.nextInt(5) + 1) * 1000 ) )
)
// End the test only when the stream has terminated
.onTermination().invoke( (throwable, aBoolean) -> {
if (throwable != null ) {
context.fail(throwable);
}
else {
async.complete();
}
} )
.subscribe()
.with( s -> System.out.println("Printing: " + s) );
}时间循环也应该可以工作。
或者,你可以收集结果:
Random random = new Random();
Uni.createFrom()
.item( List.of( "a", "b", "c" ) )
// Convert the Uni<List<String>> into a Multi<String>
.onItem().transformToMulti( Multi.createFrom()::iterable )
.onItem().transformToUniAndMerge( s -> {
final int duration = (random.nextInt( 5 ) + 1) * 1000;
// You can create a uni the way you prefer
return Uni.createFrom().item( s )
// Add some random duration
.onItem().delayIt().by( Duration.ofMillis( duration ) )
.invoke( () -> System.out.println("Letter: " + s + ", duration in ms: " + duration) );
} )
.onItem().invoke( s -> System.out.println("Printing: " + s) )
.collect().asList().await().indefinitely();https://stackoverflow.com/questions/74429274
复制相似问题