我正在尝试用SignalRef中断fs2流。我用下面的代码设置并运行了流。流应在switch包含false时运行,并应在switch包含true时中断
import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)
val program: Stream[IO, Unit] = {
val program: Stream[IO, Unit] =
Stream
.repeatEval(IO{
println(java.time.LocalTime.now)
println(switch.map(_.get).unsafeRunSync.unsafeRunSync)
})
.metered(1.second)
program
.interruptWhen(Stream.repeatEval(switch.map(_.get).unsafeRunSync))
}
program.compile.drain.unsafeRunAsync(() => _)然后,我尝试用以下命令中断流
switch.map(_.set(true).unsafeRunSync)然而,这条流还在继续。在stdout中我看到
15:58:33.048504
false
15:58:34.048760
false
15:58:35.049063
false
15:58:36.049356
false
15:58:37.049615
false那么很明显,它没有接上到true的开关?
发布于 2020-11-29 11:03:38
就我个人而言,我使用我自己的终止开关来做这样的事情:
final case class KillSwitch[F[_]](stream: Stream[F, Unit], switch: F[Unit])
object KillSwitch {
def apply[F[_]: Sync]: F[KillSwitch[F]] =
Ref.of(true).map(switch => KillSwitch(Stream.repeatEval(switch.get).takeWhile(identity).void, switch.set(false)))
}我使用它或多或少像这样:
for {
KillSwitch(kfStream, switch) <- KillSwitch[F]
streamFiber <- yourStream.zip(kfStream).map(_._1).compile.drain.start
// after a while
_ <- switch // closes kfStream
result <- streamFiber.join
} yield result(假设这是一个伪代码来展示这个想法)。
发布于 2020-11-30 06:43:06
你的代码有几个问题。
首先,请检查switch的签名
val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)将SignallingRef类型封装到IO中。这意味着新SignallingRef的创建被挂起,直到IO monad被评估(通过IO程序流隐式地或通过调用unsafeRunXXX显式地)。因此,这个值更合适的名称可能是createSwitch。
当你每次使用switch.map(_.get).unsafeRunSync创建一个新的SignallingRef实例时,它的默认值是false,所以它永远不会被求值为true。
经验法则是,在完成IO/流程序的汇编之前,您应该(几乎)永远不要调用unsafeRunXXX方法,然后您应该运行一次这种方法。
正确的方法是在for- create中创建一次switch,然后可以将其作为参数传递给program。
我对您的代码进行了一点重构,以完成我认为您想要做的事情,并添加了一些澄清注释。
import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
//I changed name to createSwitch, which I think reflect reality more
val createSwitch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)
val program: Stream[IO, Unit] = {
//I pass here switch as method's param
def program(switch: SignallingRef[IO, Boolean]): Stream[IO, Unit] =
Stream
.repeatEval {
for { //I used for-comprehension to split IO into 3 statements
switchValue <- switch.get //here I get value of switch
_ <- IO(println(java.time.LocalTime.now)) //I split println into 2 separate statements
_ <- IO(println(switchValue)) //because it's not a good practive to run 2 effect in single IO
} yield ()
}
.metered(1.second)
for {
switch <- Stream.eval(createSwitch)
//here I create effect to set switch to true after 10 seconds and then use start to run it
//separate fiber in background. If I didn't do that it would just wait 10 sec and only then run program
_ <- Stream.eval(switch.set(true).delayBy(10.seconds).start)
_ <- program(switch).interruptWhen(switch)
} yield ()
}
program.compile.drain.unsafeRunSync()https://stackoverflow.com/questions/65054435
复制相似问题