首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >拆分Monix可观察对象

拆分Monix可观察对象
EN

Stack Overflow用户
提问于 2019-09-12 22:58:35
回答 2查看 429关注 0票数 4

我想为monix.reactive.Observable写一个拆分函数。它应该根据谓词的值将源Observable[A]拆分成新的一对(Observable[A], Observable[A]),并根据源中的每个元素进行计算。我希望分裂能独立于观测到的源是热的还是冷的。在源是冷的情况下,新的一对观察量也应该是冷的,而在源是热的情况下,新的一对观察量将是热的。我想知道这样的实现是否可能,如果可能,如何实现(我在下面粘贴了一个失败的测试用例)。

作为隐式类上的方法,签名将类似于或类似于

代码语言:javascript
复制
    /**
      * Split an observable by a predicate, placing values for which the predicate returns true
      * to the right (and values for which the predicate returns false to the left).
      * This is consistent with the convention adopted by Either.cond.
      */
    def split(p: T => Boolean)(implicit scheduler: Scheduler, taskLike: TaskLike[Future]): (Observable[T], Observable[T]) = {
      splitEither[T, T](elem => Either.cond(p(elem), elem, elem))
    }

目前,我有一个简单的实现,它使用源元素并将它们推送到PublishSubject。因此,这对新的可观测性是很热门的。我对一个冷可观察物的测试失败了。

代码语言:javascript
复制
import monix.eval.TaskLike
import monix.execution.{Ack, Scheduler}
import monix.reactive.{Observable, Observer}
import monix.reactive.subjects.PublishSubject

import scala.concurrent.Future

object ObservableOps {

  implicit class ObservableExtensions[T](o: Observable[T]) {

    /**
      * Split an observable by a predicate, placing values for which the predicate returns true
      * to the right (and values for which the predicate returns false to the left).
      * This is consistent with the convention adopted by Either.cond.
      */
    def split(p: T => Boolean)(implicit scheduler: Scheduler, taskLike: TaskLike[Future]): (Observable[T], Observable[T]) = {
      splitEither[T, T](elem => Either.cond(p(elem), elem, elem))
    }

    /**
      * Split an observable into a pair of Observables, one left, one right, according
      * to a determinant function.
      */
    def splitEither[U, V](f: T => Either[U, V])(implicit scheduler: Scheduler, taskLike: TaskLike[Future]): (Observable[U], Observable[V]) = {
      val l = PublishSubject[U]()
      val r = PublishSubject[V]()

      o.subscribe(new Observer[T] {
        override def onNext(elem: T): Future[Ack] = {
          f(elem) match {
            case Left(u) => l.onNext(u)
            case Right(v) => r.onNext(v)
          }
        }

        override def onError(ex: Throwable): Unit = {
          l.onError(ex)
          r.onError(ex)
        }

        override def onComplete(): Unit = {
          l.onComplete()
          r.onComplete()
        }
      })

      (l, r)
    }
  }
}

//////////


import ObservableOps._

import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import monix.reactive.subjects.PublishSubject

import org.scalatest.FlatSpec
import org.scalatest.Matchers._
import org.scalatest.concurrent.ScalaFutures._

class ObservableOpsSpec extends FlatSpec {

  val isEven: Int => Boolean = _ % 2 == 0

  "Observable Ops" should "split a cold observable" in {
    val o = Observable(1, 2, 3, 4, 5)

    val (l, r) = o.split(isEven)

    l.toListL.runToFuture.futureValue shouldBe List(1, 3, 5)
    r.toListL.runToFuture.futureValue shouldBe List(2, 4)
  }

  "Observable Ops" should "split a hot observable" in {
    val o = PublishSubject[Int]()

    val (l, r) = o.split(isEven)
    val lbuf = l.toListL.runToFuture
    val rbuf = r.toListL.runToFuture

    Observable.fromIterable(1 to 5).mapEvalF(i => o.onNext(i)).subscribe()
    o.onComplete()

    lbuf.futureValue shouldBe List(1, 3, 5)
    rbuf.futureValue shouldBe List(2, 4)
  }
}

我希望上面的两个测试用例都能通过,但是"Observable Ops" should "split a cold observable"失败了。

编辑:工作代码

通过这两个测试用例的实现如下所示:

代码语言:javascript
复制
import monix.execution.Scheduler
import monix.reactive.Observable

object ObservableOps {

  implicit class ObservableExtension[T](o: Observable[T]) {

    /**
      * Split an observable by a predicate, placing values for which the predicate returns true
      * to the right (and values for which the predicate returns false to the left).
      * This is consistent with the convention adopted by Either.cond.
      */
    def split(
        p: T => Boolean
    )(implicit scheduler: Scheduler): (Observable[T], Observable[T]) = {
      splitEither[T, T](elem => Either.cond(p(elem), elem, elem))
    }

    /**
      * Split an observable into a pair of Observables, one left, one right, according
      * to a determinant function.
      */
    def splitEither[U, V](
        f: T => Either[U, V]
    )(implicit scheduler: Scheduler): (Observable[U], Observable[V]) = {

      val oo = o.map(f)

      val l = oo.collect {
        case Left(u) => u
      }

      val r = oo.collect {
        case Right(v) => v
      }

      (l, r)
    }
  }
}
EN

回答 2

Stack Overflow用户

发布于 2019-09-12 23:51:39

代码语言:javascript
复制
class ObservableOpsSpec extends FlatSpec {

  val isEven: Int => Boolean = _ % 2 == 0

  "Observable Ops" should "split a cold observable" in {
    val o = Observable(1, 2, 3, 4, 5)
   val o2 =  o.publish
    val (l, r) = o2.split(isEven)

  val x=   l.toListL.runToFuture
  val y =   r.toListL.runToFuture
    o2.connect()
    x.futureValue shouldBe List(1, 3, 5)
    y.futureValue shouldBe List(2, 4)
  }

  "Observable Ops" should "split a hot observable" in {
    val o = PublishSubject[Int]()

    val (l, r) = o.split(isEven)
    val lbuf = l.toListL.runToFuture
    val rbuf = r.toListL.runToFuture

    Observable.fromIterable(1 to 5).mapEvalF(i => o.onNext(i)).subscribe()
    o.onComplete()

    lbuf.futureValue shouldBe List(1, 3, 5)
    rbuf.futureValue shouldBe List(2, 4)
  }
}
票数 2
EN

Stack Overflow用户

发布于 2019-09-13 04:03:08

根据定义,可以对每个订阅者进行惰性评估。如果不对所有内容进行两次求值或将其转换为热对象,则无法拆分它。

如果您不介意对所有内容进行两次评估,那么只需使用.filter两次。如果您不介意转换为热,可以使用.publish (或.publish.refCount,这样您就不需要手动转换为connect )。如果您想保留冷/热属性并并行处理两个部分,有一个publishSelector方法可以让您在有限的范围内将任何可观察对象视为热对象:

代码语言:javascript
复制
coldOrHot.publishSelector { totallyHot =>
  val s1 = totallyHot.filter(...).flatMap(...) // any processing
  val s2 = totallyHot.filter(...).mapEval(...) // any processing 2
  Observable(s1, s2).merge
}

除了作用域之外,它限制是内部lambda的结果必须是另一个可观察的(将从publishSelector返回),因此您不能拥有具有所需签名的helper。但如果原件是冷的,结果仍然是冷的。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57909519

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档