首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将scalaz-stream连接到反应式流(如reactive streams.org中)

如何将scalaz-stream连接到反应式流(如reactive streams.org中)
EN

Stack Overflow用户
提问于 2015-02-26 14:57:57
回答 3查看 365关注 0票数 4

我想通过db.stream(yourquery)通过scalaz-stream流式传输从一个巧妙的3.0.0查询返回的数据。

看起来reactive-streams.org使用的是不同库实现的API和数据流模型。

如何在从scalaz-stream流程回流到slick publisher的反压力下做到这一点?

EN

回答 3

Stack Overflow用户

发布于 2016-01-15 03:59:35

看一看https://github.com/krasserm/streamz

Streamz是scalaz-stream的资源组合器库。它允许流程实例从以下位置消费和生产:

  • Apache Camel端点
  • Akka持久性日志和快照存储以及
  • Akka流(反应流),具有完全背压支持
票数 3
EN

Stack Overflow用户

发布于 2015-03-21 21:36:23

我终于回答了我自己的问题。如果您愿意使用scalaz-streams队列来对流结果进行排队。

代码语言:javascript
复制
def getData[T](publisher: slick.backend.DatabasePublisher[T],
  queue: scalaz.stream.async.mutable.Queue[T], batchRequest: Int = 1): Task[scala.concurrent.Future[Long]] =
  Task {
    val p = scala.concurrent.Promise[Unit]()
    var counter: Long = 0
    val s = new org.reactivestreams.Subscriber[T] {
      var sub: Subscription = _

      def onSubscribe(s: Subscription): Unit = {
        sub = s
        sub.request(batchRequest)
      }

      def onComplete(): Unit = {
        sub.cancel()
        p.success(counter)
      }

      def onError(t: Throwable): Unit = p.failure(t)

      def onNext(e: T): Unit = {
        counter += 1
        queue.enqueueOne(e).run
        sub.request(batchRequest)
      }
    }
    publisher.subscribe(s)
    p.future
  }

当您使用run运行此命令时,您将获得一个完成后的将来,这意味着查询已完成流式处理。如果你想让你的计算等待所有的数据到达,你可以在这个未来进行组合。如果需要在继续之前运行所有数据,还可以在getData的Task中添加、使用Await,然后在返回的Task对象上组合计算。对于我所做的事情,我在将来的完成时编写并关闭队列,这样我的scalaz-stream就可以清楚地终止。

票数 2
EN

Stack Overflow用户

发布于 2016-04-27 22:26:22

下面是一个稍微不同的实现(与user1763729发布的实现不同),它返回一个进程:

代码语言:javascript
复制
def getData[T](publisher: DatabasePublisher[T], batchSize: Long = 1L): Process[Task, T] = {
 val q = async.boundedQueue[T](10)

 val subscribe = Task.delay {
  publisher.subscribe(new Subscriber[T] {

    @volatile var subscription: Subscription = _

    override def onSubscribe(s: Subscription) {
      subscription = s
      subscription.request(batchSize)
    }

    override def onNext(next: T) = {
        q.enqueueOne(next).attemptRun
        subscription.request(batchSize)
    }

    override def onError(t: Throwable) = q.fail(t).attemptRun

    override def onComplete() = q.close.attemptRun
  })
 }

 Process.eval(subscribe).flatMap(_ => q.dequeue)
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28736255

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档