Monix看起来像一个很好的框架,但是文档非常稀少。
在monix中,akka流的alsoTo类似物是什么?
基本上,我希望流被两个消费者消费。
发布于 2018-02-14 08:04:14
Monix遵循Rx模型,因为订阅是动态的。任何Observable都支持无限数量的订阅者:
val obs = Observable.interval(1.second)
val s1 = obs.dump("O1").subscribe()
val s2 = obs.dump("O2").subscribe()但是,有一个问题--默认情况下,Observable是所谓的“冷数据源”,这意味着每个订阅者都有自己的数据源。
因此,例如,如果您有一个从Observable读取的File,那么每个订阅服务器都会得到自己的文件句柄。
为了在多个订阅者之间“共享”这样的Observable,您必须将它转换为一个热数据源,以便共享它。您可以在multicast操作符及其版本中这样做,publish是最常用的。这会给您返回一个ConnectableObservable,它需要一个connect()调用来启动流:
val shared = obs.publish
// Nothing happens here:
val s1 = shared.dump("O1").subscribe()
val s2 = shared.dump("O2").subscribe()
// Starts actual streaming
val cancelable = shared.connect()
// You can subscribe after connect(), but you might lose events:
val s3 = shared.dump("O3").subscribe()
// You can unsubscribe one of your subscribers, but the
// data source keeps the stream active for the others
s1.cancel()
// To cancel the connection for all subscribers:
cancelable.cancel()PS:monix.io是一项正在进行的工作,欢迎PRs
https://stackoverflow.com/questions/48766913
复制相似问题