首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Scala + Monix异步获取Cassandra查询结果

使用Scala + Monix异步获取Cassandra查询结果
EN

Stack Overflow用户
提问于 2017-08-26 10:03:48
回答 1查看 537关注 0票数 0

我正在使用AKKA Http莫尼克斯的Datastax驱动程序构建REST,在尝试从cassandra获取一些项时遇到了一些问题,等待查询完成并返回结果。

我能够轻松地打印所有结果,但无法等待查询完成并返回所有项。我的rest点只是返回一个空的条目数组,因为它不等待查询的完成。

我有一个executeQuery方法,它需要:

  • 表示cassandra查询的queryString: String
  • 用于分页的page: Int
  • 表示参数的parameters: Any* (如果查询需要的话)

并返回一个Observable[Row]

然后,为了执行这样的查询,检索其结果,解析它们并将它们发送回来,我使用Monix可观测和订阅

让我们假设我想通过一个名为pid的公共字段检索一些项

代码语言:javascript
复制
import monix.execution.Ack
import monix.execution.Scheduler.Implicits.global
import com.datastax.driver.core.Row
import monix.reactive.Observable

import cassandra.src.CassandraHelper
import item.src.entity.{Item, Items}

. . .
val keyspace = "my_keyspace"
val table = "items"
. . .

def getItems() : Items = {
  var itemList: Items = List()
  val observable: Observable[Row] = CassandraHelper.executeQuery(
    "SELECT * FROM " + keyspace + "." + table,
    1
  )
  observable.subscribe { row =>
    itemList ::= ItemMapper.rowToItem()(row)
    Ack.Continue
  }
  Items(itemList)
}

其中,rowToItem简单地将一行解析为ItemItems: List[Item]。我在看任务,但我不太确定我要找的是什么。

编辑

使用@Alexandru解决方案,一旦将所有的items插入到itemList中,我就能够将它们打印出来,但仍然可以得到调用的空响应:{ "items" : [] }

下面是编辑的代码:

代码语言:javascript
复制
def getItems() : Items = {
  var itemList: List[Item] = List()
  val observable: Observable[Row] = CassandraHelper.executeQuery(
    "SELECT * FROM " + keyspace + "." + table,
    1
  )
  observable.subscribe { row =>
    println(itemList)
    itemList ::= ItemMapper.rowToItem()(row)
    Ack.Continue
  }
  Items(itemList)
}

,我如何等待结果全部被解析并插入到项目中,然后将它们发送回来?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-08-26 11:19:05

据我所知,您有一个Observable[Row],您希望用它构建一个Items,它从源流聚合每个Row元素,对吗?

如果是这样的话,foldLeftL就是您想要的,它将把每个元素聚合成一个状态,并在源流完成后返回最终结果:

代码语言:javascript
复制
// We need to suspend the Task, because your Items is probably a
// mutable object and it's best to suspend side effects ;-)
val items: Task[Items] = Task.suspend {
  val initial: Items = _
  val observable: Observable[Row] = ???

  // This returns a Task[Items] when the source completes
  observable.foldLeftL(initial) { (items, elem) =>
    items ::= ItemMapper.rowToItem()(row)
    // I don't understand if your `Items` is mutable or not
    // but returning the same reference is fine
    items
  }
}

Task是懒惰的Future。你可以用runAsync把它转换成一个runAsync。这里有更多详细信息:https://monix.io/docs/2x/eval/task.html

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45894205

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档