我正在使用AKKA Http、莫尼克斯和的Datastax驱动程序构建REST,在尝试从cassandra获取一些项时遇到了一些问题,等待查询完成并返回结果。
我能够轻松地打印所有结果,但无法等待查询完成并返回所有项。我的rest点只是返回一个空的条目数组,因为它不等待查询的完成。
我有一个executeQuery方法,它需要:
queryString: Stringpage: Intparameters: Any* (如果查询需要的话)并返回一个Observable[Row]。
然后,为了执行这样的查询,检索其结果,解析它们并将它们发送回来,我使用Monix可观测和订阅。
让我们假设我想通过一个名为pid的公共字段检索一些项
import monix.execution.Ack
import monix.execution.Scheduler.Implicits.global
import com.datastax.driver.core.Row
import monix.reactive.Observable
import cassandra.src.CassandraHelper
import item.src.entity.{Item, Items}
. . .
val keyspace = "my_keyspace"
val table = "items"
. . .
def getItems() : Items = {
var itemList: Items = List()
val observable: Observable[Row] = CassandraHelper.executeQuery(
"SELECT * FROM " + keyspace + "." + table,
1
)
observable.subscribe { row =>
itemList ::= ItemMapper.rowToItem()(row)
Ack.Continue
}
Items(itemList)
}其中,rowToItem简单地将一行解析为Item和Items: List[Item]。我在看任务,但我不太确定我要找的是什么。
编辑
使用@Alexandru解决方案,一旦将所有的items插入到itemList中,我就能够将它们打印出来,但仍然可以得到调用的空响应:{ "items" : [] }。
下面是编辑的代码:
def getItems() : Items = {
var itemList: List[Item] = List()
val observable: Observable[Row] = CassandraHelper.executeQuery(
"SELECT * FROM " + keyspace + "." + table,
1
)
observable.subscribe { row =>
println(itemList)
itemList ::= ItemMapper.rowToItem()(row)
Ack.Continue
}
Items(itemList)
},我如何等待结果全部被解析并插入到项目中,然后将它们发送回来?
发布于 2017-08-26 11:19:05
据我所知,您有一个Observable[Row],您希望用它构建一个Items,它从源流聚合每个Row元素,对吗?
如果是这样的话,foldLeftL就是您想要的,它将把每个元素聚合成一个状态,并在源流完成后返回最终结果:
// We need to suspend the Task, because your Items is probably a
// mutable object and it's best to suspend side effects ;-)
val items: Task[Items] = Task.suspend {
val initial: Items = _
val observable: Observable[Row] = ???
// This returns a Task[Items] when the source completes
observable.foldLeftL(initial) { (items, elem) =>
items ::= ItemMapper.rowToItem()(row)
// I don't understand if your `Items` is mutable or not
// but returning the same reference is fine
items
}
}Task是懒惰的Future。你可以用runAsync把它转换成一个runAsync。这里有更多详细信息:https://monix.io/docs/2x/eval/task.html
https://stackoverflow.com/questions/45894205
复制相似问题