我正在努力理解猫是如何影响Cancelable的。我有下面的基于文档的最小应用程序
import java.util.concurrent.{Executors, ScheduledExecutorService}
import cats.effect._
import cats.implicits._
import scala.concurrent.duration._
object Main extends IOApp {
def delayedTick(d: FiniteDuration)
(implicit sc: ScheduledExecutorService): IO[Unit] = {
IO.cancelable { cb =>
val r = new Runnable {
def run() =
cb(Right(()))
}
val f = sc.schedule(r, d.length, d.unit)
// Returning the cancellation token needed to cancel
// the scheduling and release resources early
val mayInterruptIfRunning = false
IO(f.cancel(mayInterruptIfRunning)).void
}
}
override def run(args: List[String]): IO[ExitCode] = {
val scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor()
for {
x <- delayedTick(1.second)(scheduledExecutorService)
_ <- IO(println(s"$x"))
} yield ExitCode.Success
}
}当我运行这个:
❯ sbt run
[info] Loading global plugins from /Users/ethan/.sbt/1.0/plugins
[info] Loading settings for project stackoverflow-build from plugins.sbt ...
[info] Loading project definition from /Users/ethan/IdeaProjects/stackoverflow/project
[info] Loading settings for project stackoverflow from build.sbt ...
[info] Set current project to cats-effect-tutorial (in build file:/Users/ethan/IdeaProjects/stackoverflow/)
[info] Compiling 1 Scala source to /Users/ethan/IdeaProjects/stackoverflow/target/scala-2.12/classes ...
[info] running (fork) Main
[info] ()程序就挂在这一点上。我有很多问题:
mayInterruptIfRunning = false?中断正在运行的任务不是整个取消的目的吗?ScheduledExecutorService的推荐方法吗?我没有看到文档中的例子。() (然后意外挂起)。如果我想还别的东西呢?例如,假设我想返回一个字符串,这是一些长期运行的计算的结果。如何从IO.cancelable中提取该值?困难似乎是IO.cancelable返回取消操作,而不是要取消的进程的返回值。请原谅,这是我的build.sbt
name := "cats-effect-tutorial"
version := "1.0"
fork := true
scalaVersion := "2.12.8"
libraryDependencies += "org.typelevel" %% "cats-effect" % "1.3.0" withSources() withJavadoc()
scalacOptions ++= Seq(
"-feature",
"-deprecation",
"-unchecked",
"-language:postfixOps",
"-language:higherKinds",
"-Ypartial-unification")发布于 2020-07-15 18:04:28
您需要关闭ScheduledExecutorService,尝试以下操作
Resource.make(IO(Executors.newSingleThreadScheduledExecutor))(se => IO(se.shutdown())).use {
se =>
for {
x <- delayedTick(5.second)(se)
_ <- IO(println(s"$x"))
} yield ExitCode.Success
}发布于 2020-05-02 18:45:42
我找到了这些问题的答案,尽管有些事情我还是不明白。
为什么程序在1秒后挂起而不是终止?
由于某种原因,Executors.newSingleThreadScheduledExecutor()会导致一些东西挂起。为了解决这个问题,我不得不使用Executors.newSingleThreadScheduledExecutor(new Thread(_))。看起来唯一的区别是,第一个版本等同于Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),尽管文档中没有说明为什么会这样。
我们为什么要设置
mayInterruptIfRunning = false?中断正在运行的任务不是整个取消的目的吗?
我必须承认,我并不完全理解这一点。同样,医生们并没有特别澄清这一点。将标志切换到true似乎根本不改变行为,至少在Ctrl-c中断的情况下是这样。
这是定义ScheduledExecutorService的推荐方法吗?我没有看到文档中的例子。
显然不是。我想出的方式是受猫效应源代码中的这个片段启发的。
此程序等待1秒,然后返回
()(然后意外挂起)。如果我想还别的东西呢?例如,假设我想返回一个字符串,这是一些长期运行的计算的结果。如何从IO.cancelable中提取该值?困难似乎是IO.cancelable返回取消操作,而不是要取消的进程的返回值。
IO.cancellable { ... }块返回IO[A],回调cb函数具有Either[Throwable, A] => Unit类型。从逻辑上讲,这意味着输入到cb函数中的都是IO.cancellable表达式将返回的内容(包装在IO中)。因此,为了返回字符串"hello"而不是(),我们重写了delayedTick
def delayedTick(d: FiniteDuration)
(implicit sc: ScheduledExecutorService): IO[String] = { // Note IO[String] instead of IO[Unit]
implicit val processRunner: JVMProcessRunner[IO] = new JVMProcessRunner
IO.cancelable[String] { cb => // Note IO.cancelable[String] instead of IO[Unit]
val r = new Runnable {
def run() =
cb(Right("hello")) // Note "hello" instead of ()
}
val f: ScheduledFuture[_] = sc.schedule(r, d.length, d.unit)
IO(f.cancel(true))
}
}发布于 2020-09-05 03:48:59
terminate执行器,因为它不是由Scala或Cats运行时管理的,所以它不会自行退出,这就是为什么您的应用程序不是立即退出而是举手。mayInterruptIfRunning = false将优雅地终止它。您可以将其设置为true,以强制杀死它,但它不会被重新命令。ScheduledExecutorService的方法很多,这取决于需要。对于这种情况,这并不重要,但问题1。cb(Right("put your stuff here"))从可取消的IO返回任何内容,唯一要检索返回A的阻塞是在取消生效时。如果你在它到达重点之前阻止它,你就什么也得不到。尝试返回IO(f.cancel(mayInterruptIfRunning)).delayBy(FiniteDuration(2, TimeUnit.SECONDS)).void,您将得到您所期望的。因为2 seconds > 1 second,您的代码在被取消之前有足够的时间运行。https://stackoverflow.com/questions/61545696
复制相似问题