首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Monix : InputStreamObservable不支持多个订阅者

Monix : InputStreamObservable不支持多个订阅者
EN

Stack Overflow用户
提问于 2018-02-07 14:04:43
回答 2查看 314关注 0票数 4

我正在尝试将(String,Date)的一个可观测对象拆分为两个不同的可观测对象,并将它们压缩在一起,如下所示

代码语言:javascript
复制
import monix.execution.Scheduler.Implicits.global
val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b $i")).toIterator)

val y = Observable.toReactive(x)

val fileStream = Observable.fromReactivePublisher(y).mapAsync(5)(a => Task{println(a._1); a._1})
val dateStream = Observable.fromReactivePublisher(y).mapAsync(5)(a => Task{println(a._2); a._2})

fileStream.zip(dateStream)
  .map(println)
  .subscribe()

但是我得到了以下异常

代码语言:javascript
复制
monix.reactive.exceptions.MultipleSubscribersException: InputStreamObservable does not support multiple subscribers
    at monix.reactive.exceptions.MultipleSubscribersException$.build(MultipleSubscribersException.scala:51)
    at monix.reactive.internal.builders.IteratorAsObservable.unsafeSubscribeFn(IteratorAsObservable.scala:42)
    at monix.reactive.Observable$$anon$6.subscribe(Observable.scala:155)
    at monix.reactive.internal.builders.ReactiveObservable.unsafeSubscribeFn(ReactiveObservable.scala:38)
    at monix.reactive.internal.operators.MapAsyncParallelObservable.unsafeSubscribeFn(MapAsyncParallelObservable.scala:60)
    at monix.reactive.internal.builders.Zip2Observable.unsafeSubscribeFn(Zip2Observable.scala:158)
    at monix.reactive.Observable$$anon$5.unsafeSubscribeFn(Observable.scala:139)
    at monix.reactive.Observable$class.subscribe(Observable.scala:71)
    at monix.reactive.Observable$$anon$5.subscribe(Observable.scala:136)
    at monix.reactive.Observable$class.subscribe(Observable.scala:90)
    at monix.reactive.Observable$$anon$5.subscribe(Observable.scala:136)
    at monix.reactive.Observable$class.subscribe(Observable.scala:120)
    at monix.reactive.Observable$$anon$5.subscribe(Observable.scala:136)
    at monix.reactive.Observable$class.subscribe(Observable.scala:112)
    at monix.reactive.Observable$$anon$5.subscribe(Observable.scala:136)
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-02-07 16:17:43

从反应式转换到反应式是强制的吗?

解决这个问题的一种方法是使用val x = Observable.fromIterable((0 to 10).map(i => (s"a $i", s"b $i"))),但是对于无限大的数据流,它将使用OutOfMemoryError。

另一种方法是使用.multicast(Pipe.publish[]),然后向下obs.connect()代码:

代码语言:javascript
复制
import monix.execution.Scheduler.Implicits.global
val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b $i")).iterator)

val y = Observable.toReactive(x)
val obsY = Observable.fromReactivePublisher(y)
val connectY = obsY.multicast(Pipe.publish[(String, String)])

val fileStream = connectY.mapAsync(5)(a => Task{println(a._1); a._1})
val dateStream = connectY.mapAsync(5)(a => Task{println(a._2); a._2})

fileStream.zip(dateStream)
  .map(println)
  .subscribe()

connectY.connect()

Thread.sleep(5000)
票数 2
EN

Stack Overflow用户

发布于 2019-01-12 05:39:11

除了谢尔盖-舒宾的回答之外,还可以将Observable临时转换为“热”可观察物,可以使用publishSelector将其分割为多个流,而不必手动处理multicast。这看起来像这样:

代码语言:javascript
复制
val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b $i")).toIterator)

val zipped = x.publishSelector { o =>
  val fileStream = o.mapParallelUnordered(5)(a => Task{println(a._1); a._1})
  val dateStream = o.mapParallelUnordered(5)(a => Task{println(a._2); a._2})

  fileStream.zip(dateStream)
}

zipped
  .map(println)
  .subscribe()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48657005

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档