我刚刚从fs2流开始我的冒险。我想要实现的是读取一个文件(这是我使用fs2的原因),转换它并将结果写入两个不同的文件(基于某个谓词)。一些代码(来自https://github.com/typelevel/fs2),还有我的评论:
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
/* instead of the last line I want something like this:
.through(<write temperatures higher than 10 to one file, the rest to the other one>)
*/
}这样做最有效的方法是什么?最明显的解决方案是有两个具有不同过滤器的流,但是效率很低(会有两条通道)。
发布于 2020-10-04 22:12:11
不幸的是,据我所知,没有一种简单的方法可以将fs2流分成两部分。
您可以做的是,通过将值推送到两个队列中的一个(值小于10的第一个,大于或等于10的值第二个)来分割流。如果我们使用NoneTerminatedQueue,那么在将None放入队列之前,队列不会终止。然后,我们可以使用dequeue来创建单独的流,直到队列不关闭为止。
下面的示例解决方案。我将写入拆分为文件,并将其分为不同的方法:
import java.nio.file.Paths
import cats.effect.{Blocker, ExitCode, IO, IOApp}
import fs2.concurrent.{NoneTerminatedQueue, Queue}
import fs2.{Stream, io, text}
object FahrenheitToCelsius extends IOApp {
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0 / 9.0)
//I split reading into separate method
def read(blocker: Blocker, over: NoneTerminatedQueue[IO, Double], under: NoneTerminatedQueue[IO, Double]) = io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble))
.evalMap { value =>
if (value > 10) { //here we put values to one of queues
over.enqueue1(Some(value)) //until we put some queues are not close
} else {
under.enqueue1(Some(value))
}
}
.onFinalize(
over.enqueue1(None) *> under.enqueue1(None) //by putting None we terminate queues
)
//function write takes as argument source queue and target file
def write(s: Stream[IO, Double], blocker: Blocker, fileName: String): Stream[IO, Unit] = {
s.map(_.toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get(fileName), blocker))
}
val converter: Stream[IO, Unit] = for {
over <- Stream.eval(Queue.noneTerminated[IO, Double]) //here we create 2 queues
under <- Stream.eval(Queue.noneTerminated[IO, Double])
blocker <- Stream.resource(Blocker[IO])
_ <- write(over.dequeue, blocker, "testdata/celsius-over.txt") //we run reading and writing to both
.concurrently(write(under.dequeue, blocker, "testdata/celsius-under.txt")) //files concurrently
.concurrently(read(blocker, over, under)) //stream runs until queue over is not terminated
} yield ()
override def run(args: List[String]): IO[ExitCode] =
converter
.compile
.drain
.as(ExitCode.Success)
}发布于 2022-03-24 13:27:28
还可以使用broadcastThrough,它允许将流的所有元素广播到多个管道。
您的问题的完整解决方案可能如下所示(使用cats效果为3.3.8和fs2 3.2.5 )。这就是为什么它看起来有点不同的原因,但无论版本如何,其主要思想都是一样的):
import cats.effect.{IO, IOApp}
import fs2.io.file.{Files, Path}
import fs2.{Pipe, Stream, text}
object Converter extends IOApp.Simple {
val converter: Stream[IO, Unit] = {
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0 / 9.0)
def saveFiltered(filename: Path, predicate: Double => Boolean): Pipe[IO, Double, Unit] =
_.filter(predicate)
.map(_.toString)
.through(text.utf8.encode)
.through(Files[IO].writeAll(filename))
Files[IO].readAll(Path("testdata/fahrenheit.txt"))
.through(text.utf8.decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble))
.broadcastThrough(
saveFiltered(Path("testdata/celsius_over.txt"), { n => n >= 0 }),
saveFiltered(Path("testdata/celsius_below.txt"), { n => n < 0 })
)
}
def run: IO[Unit] =
converter.compile.drain
}saveFiltered现在是一个函数,返回使用文件名和谓词构建的Pipe。此函数用于为broadcastThrough构建两个参数。我测试了它的一个小的例子和FWIW,它的工作,如预期。
broadcastThrough保证流中的所有元素都被发送到所有管道。Scaladoc中提到了一个小小的警告:最慢的管道会导致整个流慢下来。我不认为这是一个特殊的情况下的问题,因为我想,这两个管道是同样快。
你甚至可以更进一步,把这个想法概括一下:
def partition[F[_] : Concurrent, A, B](predicate: A => Boolean, in: Pipe[F, A, B], out: Pipe[F, A, B]): Pipe[F, A, B] =
_.broadcastThrough[F, B](
_.filter(predicate).through(in),
_.filter(a => !predicate(a)).through(out)
)这样,您就不必确保这两个谓词产生相互排斥的结果。
有一个稍微适应的saveFiltered
def saveFiltered2(filename: Path): Pipe[IO, Double, Unit] =
_.map(_.toString)
.through(text.utf8.encode)
.through(Files[IO].writeAll(filename))流的最后一部分稍微短了一点:
...
.through(
partition(n => n >= 0,
saveFiltered2(Path("testdata/celsius_over.txt")),
saveFiltered2(Path("testdata/celsius_below.txt"))))```发布于 2020-10-06 17:03:20
我找到了另一个解决办法。下面是:
import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import fs2.{io, text, Stream}
import fs2.io.file.WriteCursor
import java.nio.file.Paths
object Converter extends IOApp {
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
def saveFiltered(in: Stream[IO,Double], blocker: cats.effect.Blocker, filename: String, filter: Double => Boolean) = {
val processed = in.filter(filter).intersperse("\n").map(_.toString).through(text.utf8Encode)
Stream.resource(WriteCursor.fromPath[IO](Paths.get(filename), blocker)).flatMap(_.writeAll(processed).void.stream)
}
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble))
.observe( in => saveFiltered(in, blocker, "testdata/celsius_over.txt", {n => n >= 0}) )
.through( in => saveFiltered(in, blocker, "testdata/celsius_below.txt", {n => n < 0}) )
}
def run(args: List[String]): IO[ExitCode] =
converter.compile.drain.as(ExitCode.Success)
}我认为它比涉及队列的答案更容易理解(不过,队列似乎是类似情况下常见的解决方案)。
https://stackoverflow.com/questions/64188023
复制相似问题