首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在非阻塞数据库中执行未来顺序插入的顺序

在非阻塞数据库中执行未来顺序插入的顺序
EN

Stack Overflow用户
提问于 2018-01-24 20:07:55
回答 3查看 313关注 0票数 0

这里有一个简单的场景。我使用akka streams从kafka中读取并写入外部源,在我的例子中是: cassandra。

Akka streams(reactive-kafka)库为我配备了背压和其他一些很好的东西来实现这一点。

kafka是Source,Cassandra是Sink,当我通过Kafka得到一堆事件时,比如这里的cassandra查询,这些查询应该是按顺序执行的(例如:它可以是插入、更新和删除,并且必须是顺序的)。

我不能同时使用mayAsync和执行这两个语句,Future is before,并且有可能DELETE或UPDATE可能会在INSERT之前先执行。

我不得不使用Cassandra的execute,而不是非阻塞的executeAsync

没有办法对这个问题做出一个完整的异步解决方案,但是有一种非常优雅的方法可以做到这一点吗?

对于ex:使Future变得懒惰和顺序化,并将其卸载到不同的执行上下文中。mapAsync还提供了并行性选项。

Monix Task在这里能帮上忙吗?

这是一个一般性的设计问题,人们可以采取哪些方法。

更新:

代码语言:javascript
复制
Flow[In].mapAsync(3)(input => {

 input match {
    case INSERT => //do insert - returns future
    case UPDATE => //do update - returns future
    case DELETE => //delete - returns future
}

这个场景稍微复杂一点。可能有数以千计的插入,更新和删除来为特定的键(在kafka中),我理想情况下想要按顺序执行单个键的3个未来。我相信Monix的Task能帮上忙?

EN

回答 3

Stack Overflow用户

发布于 2018-01-25 02:48:53

如果你处理的东西的并行度为1,它们将严格按顺序执行,这将解决你的问题。

但这并不有趣。如果你愿意,你可以对不同的并行运行操作-如果不同键的处理是独立的,我假设从你的描述中,这是可能的。为此,您必须缓冲传入的值,然后对其进行重组。让我们来看一些代码:

代码语言:javascript
复制
import monix.reactive.Observable
import scala.concurrent.duration._

import monix.eval.Task

// Your domain logic - I'll use these stubs
trait Event
trait Acknowledgement // whatever your DB functions return, if you need it
def toKey(e: Event): String = ???
def processOne(event: Event): Task[Acknowledgement] = Task.deferFuture {
  event match {
    case _ => ??? // insert/update/delete
  }
}

// Monix Task.traverse is strictly sequential, which is what you need
def processMany(evs: Seq[Event]): Task[Seq[Acknowledgement]] =
  Task.traverse(evs)(processOne)

def processEventStreamInParallel(source: Observable[Event]): Observable[Acknowledgement] =
  source
    // Process a bunch of events, but don't wait too long for whole 100. Fine-tune for your data source
    .bufferTimedAndCounted(2.seconds, 100)
    .concatMap { batch =>
      Observable
        .fromIterable(batch.groupBy(toKey).values) // Standard collection methods FTW
        .mapAsync(3)(processMany) // processing up to 3 different keys in parallel - tho 3 is not necessary, probably depends on your DB throughput
        .flatMap(Observable.fromIterable) // flattening it back
    }

这里的concatMap操作符将确保块也按顺序处理。因此,即使一个缓冲区具有key1 -> insert, key1 -> update,而另一个缓冲区具有key1 -> delete,也不会造成问题。在Monix中,这与flatMap相同,但在其他Rx库中,flatMap可能是mergeMap的别名,没有排序保证。

这也可以用Future来完成,尽管没有标准的“顺序遍历”,所以你必须使用你自己的,比如:

代码语言:javascript
复制
def processMany(evs: Seq[Event]): Future[Seq[Acknowledgement]] =
  evs.foldLeft(Future.successful(Vector.empty[Acknowledgement])){ (acksF, ev) =>
    for {
      acks <- acksF
      next <- processOne(ev)
    } yield acks :+ next
  }
票数 1
EN

Stack Overflow用户

发布于 2018-01-25 03:38:14

您可以使用akka-streams子流,按键分组,然后合并子流,如果您想要对从数据库操作中获得的内容执行某些操作:

代码语言:javascript
复制
def databaseOp(input: In): Future[Out] = input match {
  case INSERT => ...
  case UPDATE => ...
  case DELETE => ...
}

val databaseFlow: Flow[In, Out, NotUsed] =
  Flow[In].groupBy(Int.maxValues, _.key).mapAsync(1)(databaseOp).mergeSubstreams

请注意,输入源的顺序不会像在mapAsync中那样保存在输出中,但同一个键上的所有操作仍将按顺序进行。

票数 1
EN

Stack Overflow用户

发布于 2018-01-24 20:33:40

您正在寻找Future.flatMap

代码语言:javascript
复制
def doSomething: Future[Unit]
def doSomethingElse: Future[Unit]

val result = doSomething.flatMap { _ => doSomethingElse }

这将执行第一个函数,然后在满足其Future时启动第二个函数。result是一个新的Future,它在满足第二次执行的结果时完成。

第一个未来的结果被传递给您给.flatMap的函数,因此第二个函数可以依赖于第一个函数的结果。例如:

代码语言:javascript
复制
def getUserID: Future[Int]
def getUser(id: Int): Future[User]

val userName: Future[String] = getUserID.flatMap(getUser).map(_.name)

您也可以将以下代码编写为for-comprehension

代码语言:javascript
复制
for {
  id <- getUserID
  user <- getUser(id)
} yield user.name 
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48422131

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档