首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将fs2流输出拆分为两个文件

将fs2流输出拆分为两个文件
EN

Stack Overflow用户
提问于 2020-10-03 19:10:27
回答 3查看 1.4K关注 0票数 5

我刚刚从fs2流开始我的冒险。我想要实现的是读取一个文件(这是我使用fs2的原因),转换它并将结果写入两个不同的文件(基于某个谓词)。一些代码(来自https://github.com/typelevel/fs2),还有我的评论:

代码语言:javascript
复制
  val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0/9.0)

    io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
      .through(text.utf8Decode)
      .through(text.lines)
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble).toString)
      .intersperse("\n")
      .through(text.utf8Encode)
      .through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
      /* instead of the last line I want something like this:
      .through(<write temperatures higher than 10 to one file, the rest to the other one>)
      */
  }

这样做最有效的方法是什么?最明显的解决方案是有两个具有不同过滤器的流,但是效率很低(会有两条通道)。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2020-10-04 22:12:11

不幸的是,据我所知,没有一种简单的方法可以将fs2流分成两部分。

您可以做的是,通过将值推送到两个队列中的一个(值小于10的第一个,大于或等于10的值第二个)来分割流。如果我们使用NoneTerminatedQueue,那么在将None放入队列之前,队列不会终止。然后,我们可以使用dequeue来创建单独的流,直到队列不关闭为止。

下面的示例解决方案。我将写入拆分为文件,并将其分为不同的方法:

代码语言:javascript
复制
import java.nio.file.Paths
import cats.effect.{Blocker, ExitCode, IO, IOApp}
import fs2.concurrent.{NoneTerminatedQueue, Queue}
import fs2.{Stream, io, text}

object FahrenheitToCelsius extends IOApp {

  def fahrenheitToCelsius(f: Double): Double =
    (f - 32.0) * (5.0 / 9.0)

  //I split reading into separate method
  def read(blocker: Blocker, over: NoneTerminatedQueue[IO, Double], under: NoneTerminatedQueue[IO, Double]) = io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
    .through(text.utf8Decode)
    .through(text.lines)
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble))
    .evalMap { value =>
      if (value > 10) { //here we put values to one of queues
        over.enqueue1(Some(value)) //until we put some queues are not close
      } else {
        under.enqueue1(Some(value))
      }
    }
    .onFinalize(
      over.enqueue1(None) *> under.enqueue1(None) //by putting None we terminate queues
    )

  //function write takes as argument source queue and target file
  def write(s: Stream[IO, Double], blocker: Blocker, fileName: String): Stream[IO, Unit] = {
    s.map(_.toString)
      .intersperse("\n")
      .through(text.utf8Encode)
      .through(io.file.writeAll(Paths.get(fileName), blocker))
  }

  val converter: Stream[IO, Unit] = for {
    over <- Stream.eval(Queue.noneTerminated[IO, Double]) //here we create 2 queues
    under <- Stream.eval(Queue.noneTerminated[IO, Double])
    blocker <- Stream.resource(Blocker[IO])
    _ <- write(over.dequeue, blocker, "testdata/celsius-over.txt") //we run reading and writing to both
      .concurrently(write(under.dequeue, blocker, "testdata/celsius-under.txt")) //files concurrently
      .concurrently(read(blocker, over, under)) //stream runs until queue over is not terminated
  } yield ()

  override def run(args: List[String]): IO[ExitCode] =
    converter
      .compile
      .drain
      .as(ExitCode.Success)

}
票数 3
EN

Stack Overflow用户

发布于 2022-03-24 13:27:28

还可以使用broadcastThrough,它允许将流的所有元素广播到多个管道。

您的问题的完整解决方案可能如下所示(使用cats效果为3.3.8和fs2 3.2.5 )。这就是为什么它看起来有点不同的原因,但无论版本如何,其主要思想都是一样的):

代码语言:javascript
复制
import cats.effect.{IO, IOApp}
import fs2.io.file.{Files, Path}
import fs2.{Pipe, Stream, text}

object Converter extends IOApp.Simple {

  val converter: Stream[IO, Unit] = {
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0 / 9.0)

    def saveFiltered(filename: Path, predicate: Double => Boolean): Pipe[IO, Double, Unit] =
      _.filter(predicate)
        .map(_.toString)
        .through(text.utf8.encode)
        .through(Files[IO].writeAll(filename))

    Files[IO].readAll(Path("testdata/fahrenheit.txt"))
      .through(text.utf8.decode)
      .through(text.lines)
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble))
      .broadcastThrough(
        saveFiltered(Path("testdata/celsius_over.txt"), { n => n >= 0 }),
        saveFiltered(Path("testdata/celsius_below.txt"), { n => n < 0 })
      )
  }

  def run: IO[Unit] =
    converter.compile.drain
}

saveFiltered现在是一个函数,返回使用文件名和谓词构建的Pipe。此函数用于为broadcastThrough构建两个参数。我测试了它的一个小的例子和FWIW,它的工作,如预期。

broadcastThrough保证流中的所有元素都被发送到所有管道。Scaladoc中提到了一个小小的警告:最慢的管道会导致整个流慢下来。我不认为这是一个特殊的情况下的问题,因为我想,这两个管道是同样快。

你甚至可以更进一步,把这个想法概括一下:

代码语言:javascript
复制
def partition[F[_] : Concurrent, A, B](predicate: A => Boolean, in: Pipe[F, A, B], out: Pipe[F, A, B]): Pipe[F, A, B] =
  _.broadcastThrough[F, B](
    _.filter(predicate).through(in),
    _.filter(a => !predicate(a)).through(out)
  )

这样,您就不必确保这两个谓词产生相互排斥的结果。

有一个稍微适应的saveFiltered

代码语言:javascript
复制
def saveFiltered2(filename: Path): Pipe[IO, Double, Unit] =
  _.map(_.toString)
    .through(text.utf8.encode)
    .through(Files[IO].writeAll(filename))

流的最后一部分稍微短了一点:

代码语言:javascript
复制
...
.through(
  partition(n => n >= 0,
    saveFiltered2(Path("testdata/celsius_over.txt")),
    saveFiltered2(Path("testdata/celsius_below.txt"))))```
票数 2
EN

Stack Overflow用户

发布于 2020-10-06 17:03:20

我找到了另一个解决办法。下面是:

代码语言:javascript
复制
import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import fs2.{io, text, Stream}
import fs2.io.file.WriteCursor
import java.nio.file.Paths

object Converter extends IOApp {

  val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0/9.0)

    def saveFiltered(in: Stream[IO,Double], blocker: cats.effect.Blocker, filename: String, filter: Double => Boolean) = {
      val processed = in.filter(filter).intersperse("\n").map(_.toString).through(text.utf8Encode)

      Stream.resource(WriteCursor.fromPath[IO](Paths.get(filename), blocker)).flatMap(_.writeAll(processed).void.stream)
    }

    io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
      .through(text.utf8Decode)
      .through(text.lines)
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble))
      .observe( in => saveFiltered(in, blocker, "testdata/celsius_over.txt", {n => n >= 0}) )
      .through( in => saveFiltered(in, blocker, "testdata/celsius_below.txt", {n => n < 0}) )
  }

  def run(args: List[String]): IO[ExitCode] =
    converter.compile.drain.as(ExitCode.Success)
}

我认为它比涉及队列的答案更容易理解(不过,队列似乎是类似情况下常见的解决方案)。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64188023

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档