假设我想将一些遗留的异步API转换为FS2流。API提供了一个带有3个回调的接口:下一个元素,成功,错误。我希望Stream发出所有元素,然后在接收到成功或错误回调后完成。
FS2 guide (https://functional-streams-for-scala.github.io/fs2/guide.html)建议在这样的情况下使用fs2.Queue,它对于排队非常有用,但是到目前为止,我看到的所有示例都期望queue.dequeue返回的流永远不会完成--在我的情况下,没有明显的方法来处理成功/错误回调。我尝试过使用queue.dequeue.interruptWhen(...here goes the signal...),但是如果成功/错误回调在客户端从流中读取数据之前到达,流就会过早终止--仍然有未读取的元素。我希望消费者在完成流之前读完它们。
用FS2可以做到这一点吗?对于Akka,它是微不足道的-- SourceQueueWithComplete有complete和fail方法。
更新:通过在选项中包装元素,并将任何元素视为停止读取流的信号,以及通过使用允诺来传播错误,我能够获得足够好的结果:
queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)不过,我是否忽略了更自然的做法呢?
发布于 2018-06-19 19:54:27
一种惯用的方法是创建一个Queue[Option[A]]而不是Queue[A]。在排队时,使用Some包装,您可以显式地将None排队以表示完成。在退队列方面,请执行q.dequeue.unNoneTerminate,这将为您提供一个Stream[F, A],该Stream[F, A]一旦队列发出None就会终止。
发布于 2018-08-08 15:52:52
对您的更新的回答:将unNoneTerminate与rethrow组合起来,后者接受Stream[F, Either[Throwable, A]],并返回一个Stream[F, A],该Stream[F, A]在封装可抛出时与Stream.raiseError一起出错。
然后,完整的堆栈将是一个Stream[F, Either[Throwable, Option[A]]],然后通过调用.rethrow.unNoneTerminate将其解压缩到Stream[F,A]中。
https://stackoverflow.com/questions/50212871
复制相似问题