首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >FS2:是否可以优雅地完成队列?

FS2:是否可以优雅地完成队列?
EN

Stack Overflow用户
提问于 2018-05-07 11:13:25
回答 2查看 1.4K关注 0票数 4

假设我想将一些遗留的异步API转换为FS2流。API提供了一个带有3个回调的接口:下一个元素,成功,错误。我希望Stream发出所有元素,然后在接收到成功或错误回调后完成。

FS2 guide (https://functional-streams-for-scala.github.io/fs2/guide.html)建议在这样的情况下使用fs2.Queue,它对于排队非常有用,但是到目前为止,我看到的所有示例都期望queue.dequeue返回的流永远不会完成--在我的情况下,没有明显的方法来处理成功/错误回调。我尝试过使用queue.dequeue.interruptWhen(...here goes the signal...),但是如果成功/错误回调在客户端从流中读取数据之前到达,流就会过早终止--仍然有未读取的元素。我希望消费者在完成流之前读完它们。

用FS2可以做到这一点吗?对于Akka,它是微不足道的-- SourceQueueWithCompletecompletefail方法。

更新:通过在选项中包装元素,并将任何元素视为停止读取流的信号,以及通过使用允诺来传播错误,我能够获得足够好的结果:

代码语言:javascript
复制
queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)

不过,我是否忽略了更自然的做法呢?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-06-19 19:54:27

一种惯用的方法是创建一个Queue[Option[A]]而不是Queue[A]。在排队时,使用Some包装,您可以显式地将None排队以表示完成。在退队列方面,请执行q.dequeue.unNoneTerminate,这将为您提供一个Stream[F, A],该Stream[F, A]一旦队列发出None就会终止。

票数 8
EN

Stack Overflow用户

发布于 2018-08-08 15:52:52

对您的更新的回答:将unNoneTerminaterethrow组合起来,后者接受Stream[F, Either[Throwable, A]],并返回一个Stream[F, A],该Stream[F, A]在封装可抛出时与Stream.raiseError一起出错。

然后,完整的堆栈将是一个Stream[F, Either[Throwable, Option[A]]],然后通过调用.rethrow.unNoneTerminate将其解压缩到Stream[F,A]中。

票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50212871

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档