我想通过db.stream(yourquery)通过scalaz-stream流式传输从一个巧妙的3.0.0查询返回的数据。
看起来reactive-streams.org使用的是不同库实现的API和数据流模型。
如何在从scalaz-stream流程回流到slick publisher的反压力下做到这一点?
发布于 2016-01-15 03:59:35
看一看https://github.com/krasserm/streamz
Streamz是scalaz-stream的资源组合器库。它允许流程实例从以下位置消费和生产:
发布于 2015-03-21 21:36:23
我终于回答了我自己的问题。如果您愿意使用scalaz-streams队列来对流结果进行排队。
def getData[T](publisher: slick.backend.DatabasePublisher[T],
queue: scalaz.stream.async.mutable.Queue[T], batchRequest: Int = 1): Task[scala.concurrent.Future[Long]] =
Task {
val p = scala.concurrent.Promise[Unit]()
var counter: Long = 0
val s = new org.reactivestreams.Subscriber[T] {
var sub: Subscription = _
def onSubscribe(s: Subscription): Unit = {
sub = s
sub.request(batchRequest)
}
def onComplete(): Unit = {
sub.cancel()
p.success(counter)
}
def onError(t: Throwable): Unit = p.failure(t)
def onNext(e: T): Unit = {
counter += 1
queue.enqueueOne(e).run
sub.request(batchRequest)
}
}
publisher.subscribe(s)
p.future
}当您使用run运行此命令时,您将获得一个完成后的将来,这意味着查询已完成流式处理。如果你想让你的计算等待所有的数据到达,你可以在这个未来进行组合。如果需要在继续之前运行所有数据,还可以在getData的Task中添加、使用Await,然后在返回的Task对象上组合计算。对于我所做的事情,我在将来的完成时编写并关闭队列,这样我的scalaz-stream就可以清楚地终止。
发布于 2016-04-27 22:26:22
下面是一个稍微不同的实现(与user1763729发布的实现不同),它返回一个进程:
def getData[T](publisher: DatabasePublisher[T], batchSize: Long = 1L): Process[Task, T] = {
val q = async.boundedQueue[T](10)
val subscribe = Task.delay {
publisher.subscribe(new Subscriber[T] {
@volatile var subscription: Subscription = _
override def onSubscribe(s: Subscription) {
subscription = s
subscription.request(batchSize)
}
override def onNext(next: T) = {
q.enqueueOne(next).attemptRun
subscription.request(batchSize)
}
override def onError(t: Throwable) = q.fail(t).attemptRun
override def onComplete() = q.close.attemptRun
})
}
Process.eval(subscribe).flatMap(_ => q.dequeue)
}https://stackoverflow.com/questions/28736255
复制相似问题