我正在使用Alpakka和Slick模块创建一个akka-stream,但我遇到了类型不匹配的问题。
其中一个分支是获取其表中的发票总数:
def getTotal(implicit session: SlickSession) = {
import session.profile.api._
val query = TableQuery[Tables.Invoice].length.result
Slick.source(query)
}但是结束行不能编译,因为Alpakka需要一个StreamingDBIO,但我提供了一个FixedSqlAction[Int,slick.dbio.NoStream,slick.dbio.Effect.Read]。
如何从非流式结果转移到流式结果?
发布于 2021-10-22 21:22:42
获取一个表的长度会得到一个值,而不是一个流。因此,让Source提供流的最简单方法是
def getTotal(implicit session: SlickSession): Source[Int, NotUsed] =
Source.lazyFuture { () =>
// Don't actually run the query until the stream has materialized and
// demand has reached the source
val query = TableQuery[Tables.Invoice].length.result
session.db.run(query)
}Alpakka的灵活连接器更多地面向流媒体(包括管理分页等)。具有大量结果的查询的结果。对于单个结果,将vanilla Slick提供的结果的Future转换为流就足够了。
如果您希望在调用getTotal后立即开始执行查询(请注意,无论下游是否运行或从源请求数据),您都可以
def getTotal(implicit session: SlickSession): Source[Int, NotUsed] = {
val query = TableQuery[Tables.Invoice].length.result
Source.future(session.db.run(query))
}发布于 2021-10-22 19:27:25
像这样的东西对你有用吗?
def getTotal() = {
// Doc Expressions (Scalar values)
// https://scala-slick.org/doc/3.2.0/queries.html
val query = TableQuery[Tables.Invoice].length.result
val res = Await.result(session.db.run(query), 60.seconds)
println(s"Result: $res")
res
}https://stackoverflow.com/questions/69674054
复制相似问题