首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >fs2 -与2个流共享引用

fs2 -与2个流共享引用
EN

Stack Overflow用户
提问于 2022-04-18 20:08:53
回答 1查看 281关注 0票数 1

我试图在两个并发流之间共享一个Ref[F, A]。下面是实际场景的简化示例。

代码语言:javascript
复制
  class Container[F[_]](implicit F: Sync[F]) {
    private val counter = Ref[F].of(0)

    def incrementBy2 = counter.flatMap(c => c.update(i => i + 2))

    def printCounter = counter.flatMap(c => c.get.flatMap(i => F.delay(println(i))))
  }

在主要功能上,

代码语言:javascript
复制
object MyApp extends IOApp {

  def run(args: List[String]): IO[ExitCode] = {
    val s = for {
      container <- Ref[IO].of(new Container[IO]())
    } yield {
      val incrementBy2 = Stream.repeatEval(
          container.get
            .flatTap(c => c.incrementBy2)
            .flatMap(c => container.update(_ => c))
        )
        .metered(2.second)
        .interruptScope

      val printStream = Stream.repeatEval(
          container.get
            .flatMap(_.printCounter)
        )
        .metered(1.seconds)

      incrementBy2.concurrently(printStream)
    }
    Stream.eval(s)
      .flatten
      .compile
      .drain
      .as(ExitCode.Success)
  }
}

incrementBy2所做的更新在printStream中不可见。我怎么才能解决这个问题?如果能帮助我理解这个代码中的错误,我将不胜感激。

谢谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-04-18 20:46:20

您的代码由于类定义而中断,甚至没有更新相同的Ref

请记住,IO的重点是对计算的描述,因此Ref[F].of(0)返回一个程序,在计算该程序时将创建一个新的Ref,在它上调用多个flatMaps将导致创建多个Refs

而且,您没有以正确的方式完成无标记的决赛(有些人可能会认为,即使是正确的方式也不值得:https://alexn.org/blog/2022/04/18/scala-oop-design-sample/)

我就是这样写你的代码的:

代码语言:javascript
复制
trait Counter {
  def incrementBy2: IO[Unit]
  def printCounter: IO[Unit]
}
object Counter {
  val inMemory: IO[Counter] =
    IO.ref(0).map { ref =>
      new Counter {
        override final val incrementBy2: IO[Unit] =
          ref.update(c => c + 2)
        
        override final val printCounter: IO[Unit] =
          ref.get.flatMap(IO.println)
      }
    }
}

object Program {
  def run(counter: Counter): Stream[IO, Unit] =
    Stream
      .repeatEval(counter.printCounter)
      .metered(1.second)
      .concurrently(
        Stream.repeatEval(counter.incrementBy2).metered(2.seconds)
      ).interruptAfter(10.seconds)
}

object Main extends IOApp.Simple {
  override final val run: IO[Unit] =
    Stream
      .eval(Counter.inMemory)
      .flatMap(Program.run)
      .compile
      .drain
}

PS:我实际上没有printCounter,而是getCounter,并在Program中打印

您可以看到运行https://scastie.scala-lang.org/BalmungSan/Wd8TioiZSQ6RyRsCugkviQ/1的代码。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71916552

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档