monix.Task是一套新出现的解决方案,借鉴了许多scalaz.Task的概念和方法同时又加入了很多优化、附加的功能,并且github更新也很近期。使用monix.Task应该是一个正确的选择。 首先我们必须解决scala.Future与monix.Task之间的转换: import monix.eval.Task import monix.execution.Scheduler.Implicits.global import monix.execution._ object MonixTask extends App { import monix.execution.Scheduler.Implicits.global } //=> Effect // task: monix.eval.Task[String] = Delay(Now(Hello!)) import monix.execution._ object MonixTask extends App { import monix.execution.Scheduler.Implicits.global
grpc-netty" % grpcJavaVersion, "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, "io.monix " %% "monix" % "2.3.0" ) PB.targets in Compile := Seq( scalapb.gen() -> (sourceManaged in Compile) learn.grpc.sum.one2many.server import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ import monix.execution.atomic io.grpc.stub.StreamObserver import learn.grpc.services.sum._ import learn.grpc.server.gRPCServer import monix.execution.atomic io.grpc.stub.StreamObserver import learn.grpc.services.sum._ import learn.grpc.server.gRPCServer import monix.execution.atomic
在对上一次3月份的scala-meetup里我曾分享了关于Future在函数组合中的问题及如何用Monix.Task来替代。具体分析可以查阅这篇博文。 meals <- cookPasta(10)(takeFood(_,_),addFood(_,_)) } yield meals import scala.util._ import monix.execution.Scheduler.Implicits.global cooker.cookPasta(10) } yield pasta import scala.concurrent.duration._ import scala.util._ import monix.execution.Scheduler.Implicits.global
现在我们可以用隐式转换implicit conversion方式进行代码简化重用: import monix.execution.Scheduler.Implicits.global implicit .setCommand(Replace(filter,row)) mgoUpdate[UpdateResult](ctxUpdate) } } import monix.execution.Scheduler.Implicits.global _ import org.mongodb.scala.model.Filters._ import com.datatech.sdp.mongo.engine.MGOClasses._ import monix.execution.CancelableFuture MongoClient, m: Manifest[M], mat: ActorMaterializer) extends Directives with JsonConverter { import monix.execution.Scheduler.Implicits.global
Monix这种backend的返回值则是 Task[Resonse[T]]。
数据库操作是标准的MongoEngine方式: import monix.execution.Scheduler.Implicits.global implicit val mongoClient ch.qos.logback" % "logback-classic" % "1.2.3", "org.typelevel" %% "cats-core" % "0.9.0", "io.monix " %% "monix-execution" % "3.0.0-RC1", "io.monix" %% "monix-eval" % "3.0.0-RC1" ) } PB.targets Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = { Await.result(fut, timeOut) } import monix.eval.Task import monix.execution.Scheduler.Implicits.global final class FutureToTask[A](x: => Future[A])
grpc-netty" % grpcJavaVersion, "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, "io.monix " %% "monix" % "2.3.0", //for mongodb 4.0 "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.0", ch.qos.logback" % "logback-classic" % "1.2.3", "org.typelevel" %% "cats-core" % "0.9.0", "io.monix " %% "monix-execution" % "3.0.0-RC1", "io.monix" %% "monix-eval" % "3.0.0-RC1" ) PB.targets in Compile import monix.execution.Scheduler.Implicits.global final class FutureToTask[A](x: => Future[A])
akka-stream-alpakka-mongodb" % "1.1.0", "ch.qos.logback" % "logback-classic" % "1.2.3", "io.monix " %% "monix" % "3.0.0-RC3", "org.typelevel" %% "cats-core" % "2.0.0-M4" ) converters/DBOResultType.scala package com.datatech.sdp.result import cats._ import cats.data.EitherT import cats.data.OptionT import monix.eval.Task Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = { Await.result(fut, timeOut) } import monix.eval.Task import monix.execution.Scheduler.Implicits.global final class FutureToTask[A](x: => Future[A])
._ class MongoAdder extends Actor with ActorLogging { import monix.execution.Scheduler.Implicits.global " %% "monix" % "2.3.0", //for mongodb 4.0 "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.0", ch.qos.logback" % "logback-classic" % "1.2.3", "org.typelevel" %% "cats-core" % "0.9.0", "io.monix " %% "monix-execution" % "3.0.0-RC1", "io.monix" %% "monix-eval" % "3.0.0-RC1" ) PB.targets in Compile import monix.execution.Scheduler.Implicits.global final class FutureToTask[A](x: => Future[A])
% "cassandra-driver-extras" % "3.6.0", "ch.qos.logback" % "logback-classic" % "1.2.3", "io.monix " %% "monix" % "3.0.0-RC2", "org.typelevel" %% "cats-core" % "2.0.0-M1", "io.grpc" % "grpc-netty"
taskToDBOResult(add(b,c)) } yield d val sum: Task[Either[String,Option[Int]]] = calc.value.value import monix.execution.Scheduler.Implicits.global
grpc-netty" % grpcJavaVersion, "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, "io.monix " %% "monix" % "2.3.0", // for GRPC Akkastream "beyondthelines" %% "grpcakkastreamruntime
akka-stream-alpakka-mongodb" % "1.1.0", "ch.qos.logback" % "logback-classic" % "1.2.3", "io.monix " %% "monix" % "3.0.0-RC3", "org.typelevel" %% "cats-core" % "2.0.0-M4", "com.github.tasubo" % "jurl-tools _ import org.mongodb.scala.model.Filters._ import com.datatech.sdp.mongo.engine.MGOClasses._ import monix.execution.CancelableFuture MongoClient, m: Manifest[M], mat: ActorMaterializer) extends Directives with JsonConverter { import monix.execution.Scheduler.Implicits.global
% "cassandra-driver-extras" % "3.6.0", "ch.qos.logback" % "logback-classic" % "1.2.3", "io.monix " %% "monix" % "3.0.0-RC2", "org.typelevel" %% "cats-core" % "2.0.0-M1", "io.grpc" % "grpc-netty"
% "cassandra-driver-extras" % "3.6.0", "ch.qos.logback" % "logback-classic" % "1.2.3", "io.monix " %% "monix" % "3.0.0-RC2", "org.typelevel" %% "cats-core" % "2.0.0-M1", "io.grpc" % "grpc-netty"
._ import monix.execution.Scheduler.Implicits.global import scala.util._ object CQLCreatTables extends % "cassandra-driver-extras" % "3.6.0", "ch.qos.logback" % "logback-classic" % "1.2.3", "io.monix " %% "monix" % "3.0.0-RC2", "org.typelevel" %% "cats-core" % "2.0.0-M1", "com.thesamet.scalapb" % import monix.execution.Scheduler.Implicits.global session.execute(boundStmt) Task.now import monix.execution.Scheduler.Implicits.global final class FutureToTask[A](x: => Future[A])
._ import monix.execution.CancelableFuture import akka.util._ import akka.http.scaladsl.model._ import MongoClient, m: Manifest[M], mat: ActorMaterializer) extends Directives with JsonConverter { import monix.execution.Scheduler.Implicits.global
grpc-netty" % grpcJavaVersion, "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, "io.monix " %% "monix" % "2.3.0", // for GRPC Akkastream "beyondthelines" %% "grpcakkastreamruntime
parameters = getSeqParams(jsonParams,sqlText) ) jdbcBatchUpdate[Seq](ctx) } } import monix.execution.Scheduler.Implicits.global
Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = { Await.result(fut, timeOut) } import monix.eval.Task import monix.execution.Scheduler.Implicits.global final class FutureToTask[A](x: => Future[A])