首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >并发流不会打印任何控制台

并发流不会打印任何控制台
EN

Stack Overflow用户
提问于 2022-01-23 20:49:29
回答 1查看 147关注 0票数 0

我正在尝试构建一个关于在Stream.concurrently中使用fs2方法的示例。我正在开发生产者/消费者模式,使用Queue作为共享状态:

代码语言:javascript
复制
import cats.effect.std.{Queue, Random}

object Fs2Tutorial extends IOApp {
  val random: IO[Random[IO]] = Random.scalaUtilRandom[IO]
  val queue: IO[Queue[IO, Int]] = Queue.bounded[IO, Int](10)

  val producer: IO[Nothing] = for {
    r <- random
    q <- queue
    p <-
      r.betweenInt(1, 11)
      .flatMap(q.offer)
      .flatTap(_ => IO.sleep(1.second))
      .foreverM
  } yield p

  val consumer: IO[Nothing] = for {
    q <- queue
    c <- q.take.flatMap { n =>
      IO.println(s"Consumed $n")
    }.foreverM
  } yield c

  val concurrently: Stream[IO, Nothing] = Stream.eval(producer).concurrently(Stream.eval(consumer))

  override def run(args: List[String]): IO[ExitCode] = {
    concurrently.compile.drain.as(ExitCode.Success)
  }
}

我希望这个程序能打印一些"Consumed n",一些n。但是,程序不向控制台输出任何内容。

上面的代码有什么问题?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-01-23 21:14:36

上面的代码有什么问题?

您不是在消费者和生产者中使用相同的Queue,而是每个Queue都在创建自己的新的独立Queue( Random BTW也是如此)。

这是新手犯的一个常见错误,他们还没有掌握像IO这样的数据类型背后的主要原则。

当您执行val queue: IO[Queue[IO, Int]] = Queue.bounded[IO, Int](10)时,您是说queue是一个程序,在进行计算时,它将生成一个Queue[IO, Unit]类型的值,这就是所有这些的要点。

程序成为一个值,而作为任何值,您可以以任何方式操作它以产生新的值,例如使用flatMap,因此当consumerproducer都将flatMapping queue的新程序装箱时,它们都会创建新的独立程序/值。

您可以这样修正代码:

代码语言:javascript
复制
import cats.effect.{IO, IOApp}
import cats.effect.std.{Queue, Random}
import cats.syntax.all._
import fs2.Stream

import scala.concurrent.duration._

object Fs2Tutorial extends IOApp.Simple {  
  override final val run: IO[Unit] = {
    val resources =
      (
        Random.scalaUtilRandom[IO],
        Queue.bounded[IO, Int](10)
      ).tupled
    
    val concurrently =
      Stream.eval(resources).flatMap {
        case (random, queue) =>
          val producer = 
            Stream
              .fixedDelay[IO](1.second)
              .evalMap(_ => random.betweenInt(1, 11))
              .evalMap(queue.offer)

        val consumer =
          Stream.fromQueueUnterminated(queue).evalMap(n => IO.println(s"Consumed $n"))
        
        producer.concurrently(consumer)
      }
    
    concurrently.interruptAfter(10.seconds).compile.drain >> IO.println("Finished!")
  }
}

(您可以看到它运行https://scastie.scala-lang.org/BalmungSan/6yv5W21mRcWS9KOL3d4T2g/9)。

PS:我建议你看看Fabio:https://systemfw.org/archive.html的“程序作为价值观”系列。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70826450

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档