假设我们有两个fs2流:
val stream1 = fs2.Stream.bracket(IO { println("Acquire 1"); 2})(_ => IO { println("Release 1") })
.flatMap(p => fs2.Stream.range(1,p))
val stream2 = fs2.Stream.bracket(IO { println("Acquire 2"); 4})(_ => IO { println("Release 2") })
.flatMap(p => fs2.Stream.range(1,p))我想互相联系:
def connect[F[_]]: (fs2.Stream[F, Int], fs2.Stream[F, Int]) => fs2.Stream[F, Int] = {
def go(stream1: fs2.Stream[F, Int], stream2: fs2.Stream[F, Int]): Pull[F, Int, Unit] =
stream1.pull.uncons1.flatMap { stream1Element =>
stream2.pull.uncons1.flatMap { stream2Element =>
(stream1Element, stream2Element) match {
case (Some((stream1Head, stream1Tail)), Some((stream2Head, stream2Tail))) =>
println("Some, Some")
Pull.output1(stream1Head + stream2Head) >> go(stream1Tail, stream2Tail)
case (Some((stream1Head, stream1Tail)), None) =>
println("1 Stream still available")
Pull.output1(stream1Head) >> go(fs2.Stream.empty, stream1Tail)
case (None, Some((stream2Head, stream2Tail))) =>
println("2 Stream still available")
Pull.output1(stream2Head) >> go(fs2.Stream.empty, stream2Tail)
case _ => Pull.output1(-1)
}
}
}
(one, two) => go(one, two).stream}
现在查看日志,我看到:
Acquire 1
Acquire 2
Some, Some
Release 2
Release 1
2 Stream still available
2 Stream still available这对我来说有点令人惊讶,因为一旦第一个流完成,第二个流的资源也关闭了。现在假设资源是到数据库的连接,那么就不能再获取第二个流中的元素了。
行为正确吗?是否有办法避免关闭第二个流的资源?令人惊讶的是,如果第一个流比第二个流有更多的元素,那么一切都按预期的方式工作(当第二个流完成时,流1的资源不会关闭)
发布于 2021-12-25 17:18:14
通过检查zipAllWith函数的实现,我发现在这种情况下确实应该避免uncons1。最后的解决方案是使用stepLeg函数而不是uncons1。因此,上面的函数应该如下所示:
def connect[F[_]]: (fs2.Stream[F, Int], fs2.Stream[F, Int]) => fs2.Stream[F, Int] = {
def go(stream1: fs2.Stream[F, Int], stream2: fs2.Stream[F, Int]): Pull[F, Int, Unit] =
stream1.pull.stepLeg.flatMap { stream1Element =>
stream2.pull.stepLeg.flatMap { stream2Element =>
(stream1Element, stream2Element) match {
case (Some(sl1), Some(sl2)) =>
println("Some, Some")
val one = sl1.head(0)
val two = sl2.head(0)
Pull.output1(one + two) >> go(sl1.stream, sl2.stream)
case (Some(sl1), None) =>
val one = sl1.head(0)
println("1 Stream still available")
Pull.output1(one) >> go(sl1.stream, fs2.Stream.empty)
case (None, Some(sl2)) =>
val two = sl2.head(0)
println("2 Stream still available")
Pull.output1(two) >> go(fs2.Stream.empty, sl2.stream)
case _ => Pull.output1(-1)
}
}
}
(one, two) => {
go(one.flatMap(fs2.Stream.emit), two.flatMap(fs2.Stream.emit)).stream
}
}和原木:
Acquire 1
Acquire 2
Some, Some
Release 1
2 Stream still available
2 Stream still available
Release 2这个问题的另一个例子可以在这里找到:取消对stepLeg的声明
https://stackoverflow.com/questions/70473139
复制相似问题