首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何异步中断fs2流?

如何异步中断fs2流?
EN

Stack Overflow用户
提问于 2020-11-29 05:06:00
回答 2查看 541关注 0票数 2

我正在尝试用SignalRef中断fs2流。我用下面的代码设置并运行了流。流应在switch包含false时运行,并应在switch包含true时中断

代码语言:javascript
复制
  import cats.effect.IO
  import fs2.Stream
  import fs2.concurrent.SignallingRef

  import scala.concurrent.ExecutionContext
  import scala.concurrent.duration.DurationInt

  implicit val contextShift = IO.contextShift(ExecutionContext.global)
  implicit val timer = IO.timer(ExecutionContext.global)

  val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)

  val program: Stream[IO, Unit] = {

      val program: Stream[IO, Unit] =
        Stream
          .repeatEval(IO{
            println(java.time.LocalTime.now)
            println(switch.map(_.get).unsafeRunSync.unsafeRunSync)
          })
          .metered(1.second)

      program
        .interruptWhen(Stream.repeatEval(switch.map(_.get).unsafeRunSync))
    }
  
  program.compile.drain.unsafeRunAsync(() => _)

然后,我尝试用以下命令中断流

代码语言:javascript
复制
switch.map(_.set(true).unsafeRunSync)

然而,这条流还在继续。在stdout中我看到

代码语言:javascript
复制
15:58:33.048504
false
15:58:34.048760
false
15:58:35.049063
false
15:58:36.049356
false
15:58:37.049615
false

那么很明显,它没有接上到true的开关?

EN

回答 2

Stack Overflow用户

发布于 2020-11-29 11:03:38

就我个人而言,我使用我自己的终止开关来做这样的事情:

代码语言:javascript
复制
final case class KillSwitch[F[_]](stream: Stream[F, Unit], switch: F[Unit])
object KillSwitch {

  def apply[F[_]: Sync]: F[KillSwitch[F]] =
    Ref.of(true).map(switch => KillSwitch(Stream.repeatEval(switch.get).takeWhile(identity).void, switch.set(false)))
}

我使用它或多或少像这样:

代码语言:javascript
复制
for {
  KillSwitch(kfStream, switch) <- KillSwitch[F]
  streamFiber <- yourStream.zip(kfStream).map(_._1).compile.drain.start
  // after a while
  _ <- switch // closes kfStream
  result <- streamFiber.join
} yield result

(假设这是一个伪代码来展示这个想法)。

票数 3
EN

Stack Overflow用户

发布于 2020-11-30 06:43:06

你的代码有几个问题。

首先,请检查switch的签名

代码语言:javascript
复制
val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)

SignallingRef类型封装到IO中。这意味着新SignallingRef的创建被挂起,直到IO monad被评估(通过IO程序流隐式地或通过调用unsafeRunXXX显式地)。因此,这个值更合适的名称可能是createSwitch

当你每次使用switch.map(_.get).unsafeRunSync创建一个新的SignallingRef实例时,它的默认值是false,所以它永远不会被求值为true。

经验法则是,在完成IO/流程序的汇编之前,您应该(几乎)永远不要调用unsafeRunXXX方法,然后您应该运行一次这种方法。

正确的方法是在for- create中创建一次switch,然后可以将其作为参数传递给program

我对您的代码进行了一点重构,以完成我认为您想要做的事情,并添加了一些澄清注释。

代码语言:javascript
复制
import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt

implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)

//I changed name to createSwitch, which I think reflect reality more
val createSwitch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)

val program: Stream[IO, Unit] = {

  //I pass here switch as method's param
  def program(switch: SignallingRef[IO, Boolean]): Stream[IO, Unit] =
    Stream
      .repeatEval {
        for { //I used for-comprehension to split IO into 3 statements
          switchValue <- switch.get //here I get value of switch
          _ <- IO(println(java.time.LocalTime.now)) //I split println into 2 separate statements
          _ <- IO(println(switchValue)) //because it's not a good practive to run 2 effect in single IO
        } yield ()
      }
      .metered(1.second)

  for {
    switch <- Stream.eval(createSwitch)
    //here I create effect to set switch to true after 10 seconds and then use start to run it
    //separate fiber in background. If I didn't do that it would just wait 10 sec and only then run program
    _ <- Stream.eval(switch.set(true).delayBy(10.seconds).start)
    _ <- program(switch).interruptWhen(switch)
  } yield ()

}

program.compile.drain.unsafeRunSync()
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65054435

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档