我正在处理大型File,其中有一个列表:
val originalFiles: List<File>我需要读取每个文件的InputStream,对其进行处理,并将其写入另一个processedFile。为了简单起见,让我们假设我只是读取原始InputStream并将其写入目标文件输出流。
我想并行地处理originalFiles。
第一种前向方式是使用parallelStream()
override suspend fun processFiles(workdir: File): Either<Throwable, Unit> = either {
val originalFiles = ...
originalFiles.parallelStream().forEach {
val destination = File("${workdir.absolutePath}/${it.name}-processed.txt")
logger.info("Starting merge {}", destination)
FileOutputStream(destination).use { faos ->
IOUtils.copy(it.inputStream(), faos)
}
logger.info("Finished processing {}", destination)
}
}但是,考虑到我正在使用coroutines和Arrow,我会得到一个编译警告Possibly blocking call in non-blocking context could lead to thread starvation。
coroutines/Arrow?是否有一种正确的(非阻塞的)方法来处理具有协同/挂起函数的输入/输出流?
发布于 2022-10-06 12:13:51
最好使用parTraverse (将在2.x.x中重命名为parMap )。这个函数来自Arrow,您也可以使用Flow#parMap和Flow#parMapUnordered。
您还需要确保FileOutputStream是正确关闭的,并且在面临取消时,我建议使用Resource。
Possibly blocking call in non-blocking context could lead to thread starvation警告将通过在正确的Dispatchers.IO上调用它而消失。
suspend fun processFiles(workdir: File): Unit {
val originalFiles: List<File> = emptyList<File>()
originalFiles.parTraverse(Dispatchers.IO) {
val destination = File("${workdir.absolutePath}/${it.name}-processed.txt")
logger.info("Starting merge {}", destination)
FileOutputStream(destination).use { faos ->
IOUtils.copy(it.inputStream(), faos)
}
logger.info("Finished processing {}", destination)
}
}以下是你问题的答案摘要:
是否有一种正确的(非阻塞)方法来处理具有协同/挂起函数的输入/输出流?
使用suspend + Dispatchers.IO执行它们。
,是否有更好的方法可以用coroutines/Arrow对列表处理进行并行处理?
利用parTraverse并行Kotlin中的List转换。如果还想限制并行进程的数量,可以选择parTraverseN。
https://stackoverflow.com/questions/73973017
复制相似问题