我试图创建一个fs2流,
stream
下面是我的方法,但我不确定这是否是最好的方法。
我使用大写的函数名来突出显示它被调用的位置。
private def CREATE_STREAM_RECURSIVE(
session: AuthSession,
myService: MyService[IO],
sessionStore: SessionStore[IO],
): Stream[IO, Unit] = {
Stream
.repeatEval(service.fetchData(session))
.flatMap { data =>
val jsonStream = Stream.eval(IO.delay(data.asJson.noSpaces))
persistToFile(jsonStream) <-- another fs2 stream that writes to a file
}
.metered(3.seconds)
.handleErrorWith { _ =>
Stream.force {
myService.login()
.flatMap(sessionStore.setSession)
.map(CREATE_STREAM_RECURSIVE(_, myService, sessionStore)) //recursive
}
}
.interruptWhen(isItOkToInterruptStream)
}问题:
谢谢
发布于 2022-08-11 21:21:37
在SystemFw的帮助下,我能够通过fs2不和谐渠道找到答案。
If an error keeps happening, is it possible for this recursion to blow the stack?
不,因为递归是一元的,它通过一个flatMap,在本例中是由force隐藏的,因此它是堆栈安全的
When the interruption happens, is it possible for this stream to stop without completing the file write?
是
If so, is there anyway to ensure that the file is written completely before the exit happens?
是的,通过将文件转换为IO并在其上调用.uncancelable,使其写入不可中断
https://stackoverflow.com/questions/73297171
复制相似问题