我是一个新手,试图理解fs2队列背后的直觉。我正在尝试做一个从Stream[IO, Int]中提取数据的基本示例。但是对我来说,文档是不够的,因为它直接深入到高级的东西中。
到目前为止,我所做的是:
import cats.effect.{ ExitCode, IO, IOApp}
import fs2._
import fs2.concurrent.Queue
class QueueInt(q: Queue[IO, Int]) {
def startPushingtoQueue: Stream[IO, Unit] = {
Stream(1, 2, 3).covary[IO].through(q.enqueue)
q.dequeue.evalMap(n => IO.delay(println(s"Pulling element $n from Queue")))
}
}
object testingQueues extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val stream = for {
q <- Queue.bounded(10)
b = new QueueInt(q)
_ <- b.startPushingtoQueue.drain
} yield ()
}
}问题1:我让No implicit argument of type Concurrent[F_],知道我没有使用任何并发的效果,我似乎不知道我遗漏了什么?
问题2:我能做什么来打印结果。
问题3:有人能告诉我一些学习fs2的资源吗?
发布于 2020-06-11 11:38:27
我在您的代码中发现了几个问题:
q <- Queue.bounded[IO, Unit](10) // it will fix your error with implicitsIO[Unit],但是为了使其运行,您必须从run方法返回它。您还需要将类型从单元更改为ExitCodestream.as(ExitCode.Success)startPushingToQueue中,您正在创建Steam,但没有在任何地方分配它。它只会创建流的描述,但不会运行。我认为您想要实现的是在方法上创建一个将元素推入队列的方法,另一个方法将从队列中获取元素并打印它们。请检查我的解决方案:
import cats.effect.{ ExitCode, IO, IOApp}
import fs2._
import fs2.concurrent.Queue
import scala.concurrent.duration._
class QueueInt(q: Queue[IO, Int])(implicit timer: Timer[IO]) { //I need implicit timer for metered
def startPushingToQueue: Stream[IO, Unit] = Stream(1, 2, 3)
.covary[IO]
.evalTap(n => IO.delay(println(s"Pushing element $n to Queue"))) //eval tap evaluates effect on an element but doesn't change stream
.metered(500.millis) //it will create 0.5 delay between enqueueing elements of stream,
// I added it to make visible that elements can be pushed and pulled from queue concurrently
.through(q.enqueue)
def pullAndPrintElements: Stream[IO, Unit] = q.dequeue.evalMap(n => IO.delay(println(s"Pulling element $n from Queue")))
}
object testingQueues extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, Int](10)
b = new QueueInt(q)
_ <- b.startPushingToQueue.compile.drain.start //start at the end will start running stream in another Fiber
_ <- b.pullAndPrintElements.compile.drain //compile.draing compiles stream into io byt pulling all elements.
} yield ()
program.as(ExitCode.Success)
}
}在控制台上,您将看到关于交错地推拉队列的行。如果删除start,您将首先看到startPushingToQueue中的流在推送所有元素之后完成,然后pullAndPrintElements才会启动。
如果您正在寻找学习fs2的良好来源,我建议您应该从查看FS2相关讲座开始。与旧的对话相比,他们更喜欢更新的对话,因为他们可以引用旧的API。
您还应该检查指南上的fs2文档。
https://stackoverflow.com/questions/62322638
复制相似问题