这里有一个简单的场景。我使用akka streams从kafka中读取并写入外部源,在我的例子中是: cassandra。
Akka streams(reactive-kafka)库为我配备了背压和其他一些很好的东西来实现这一点。
kafka是Source,Cassandra是Sink,当我通过Kafka得到一堆事件时,比如这里的cassandra查询,这些查询应该是按顺序执行的(例如:它可以是插入、更新和删除,并且必须是顺序的)。
我不能同时使用mayAsync和执行这两个语句,Future is before,并且有可能DELETE或UPDATE可能会在INSERT之前先执行。
我不得不使用Cassandra的execute,而不是非阻塞的executeAsync。
没有办法对这个问题做出一个完整的异步解决方案,但是有一种非常优雅的方法可以做到这一点吗?
对于ex:使Future变得懒惰和顺序化,并将其卸载到不同的执行上下文中。mapAsync还提供了并行性选项。
Monix Task在这里能帮上忙吗?
这是一个一般性的设计问题,人们可以采取哪些方法。
更新:
Flow[In].mapAsync(3)(input => {
input match {
case INSERT => //do insert - returns future
case UPDATE => //do update - returns future
case DELETE => //delete - returns future
}这个场景稍微复杂一点。可能有数以千计的插入,更新和删除来为特定的键(在kafka中),我理想情况下想要按顺序执行单个键的3个未来。我相信Monix的Task能帮上忙?
发布于 2018-01-25 02:48:53
如果你处理的东西的并行度为1,它们将严格按顺序执行,这将解决你的问题。
但这并不有趣。如果你愿意,你可以对不同的键并行运行操作-如果不同键的处理是独立的,我假设从你的描述中,这是可能的。为此,您必须缓冲传入的值,然后对其进行重组。让我们来看一些代码:
import monix.reactive.Observable
import scala.concurrent.duration._
import monix.eval.Task
// Your domain logic - I'll use these stubs
trait Event
trait Acknowledgement // whatever your DB functions return, if you need it
def toKey(e: Event): String = ???
def processOne(event: Event): Task[Acknowledgement] = Task.deferFuture {
event match {
case _ => ??? // insert/update/delete
}
}
// Monix Task.traverse is strictly sequential, which is what you need
def processMany(evs: Seq[Event]): Task[Seq[Acknowledgement]] =
Task.traverse(evs)(processOne)
def processEventStreamInParallel(source: Observable[Event]): Observable[Acknowledgement] =
source
// Process a bunch of events, but don't wait too long for whole 100. Fine-tune for your data source
.bufferTimedAndCounted(2.seconds, 100)
.concatMap { batch =>
Observable
.fromIterable(batch.groupBy(toKey).values) // Standard collection methods FTW
.mapAsync(3)(processMany) // processing up to 3 different keys in parallel - tho 3 is not necessary, probably depends on your DB throughput
.flatMap(Observable.fromIterable) // flattening it back
}这里的concatMap操作符将确保块也按顺序处理。因此,即使一个缓冲区具有key1 -> insert, key1 -> update,而另一个缓冲区具有key1 -> delete,也不会造成问题。在Monix中,这与flatMap相同,但在其他Rx库中,flatMap可能是mergeMap的别名,没有排序保证。
这也可以用Future来完成,尽管没有标准的“顺序遍历”,所以你必须使用你自己的,比如:
def processMany(evs: Seq[Event]): Future[Seq[Acknowledgement]] =
evs.foldLeft(Future.successful(Vector.empty[Acknowledgement])){ (acksF, ev) =>
for {
acks <- acksF
next <- processOne(ev)
} yield acks :+ next
}发布于 2018-01-25 03:38:14
您可以使用akka-streams子流,按键分组,然后合并子流,如果您想要对从数据库操作中获得的内容执行某些操作:
def databaseOp(input: In): Future[Out] = input match {
case INSERT => ...
case UPDATE => ...
case DELETE => ...
}
val databaseFlow: Flow[In, Out, NotUsed] =
Flow[In].groupBy(Int.maxValues, _.key).mapAsync(1)(databaseOp).mergeSubstreams请注意,输入源的顺序不会像在mapAsync中那样保存在输出中,但同一个键上的所有操作仍将按顺序进行。
发布于 2018-01-24 20:33:40
您正在寻找Future.flatMap
def doSomething: Future[Unit]
def doSomethingElse: Future[Unit]
val result = doSomething.flatMap { _ => doSomethingElse }这将执行第一个函数,然后在满足其Future时启动第二个函数。result是一个新的Future,它在满足第二次执行的结果时完成。
第一个未来的结果被传递给您给.flatMap的函数,因此第二个函数可以依赖于第一个函数的结果。例如:
def getUserID: Future[Int]
def getUser(id: Int): Future[User]
val userName: Future[String] = getUserID.flatMap(getUser).map(_.name)您也可以将以下代码编写为for-comprehension:
for {
id <- getUserID
user <- getUser(id)
} yield user.name https://stackoverflow.com/questions/48422131
复制相似问题