那么如果能把gRPC中ListenableFuture和StreamObserver这两种类型转成akka-stream的基本类型应该就能够实现所谓的reactive-gRPC了。 如果我们能用akka-stream编程方式实现gRPC服务调用的话,可能会遭遇下面的场景:在服务端我们只需要实现一种akka-stream的Flow把进来的request转化成出去的response,如下 )(computeResponse) // Bidirectional streaming Flow[Request].flatMapConcat(computeResponses) 当然,这是个akka-stream Flow,我们可以在这个Flow里调用任何akka-stream提供的功能,如: Flow[Request] .throttle(1, 10.millis, 1, ThrottleMode.Shaping learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] } 我们看到服务函数sumPair是一个akka-stream
akka-stream是一套功能更加完整和强大的streaming工具库,那么如果以akka-stream为基础,设计一套能在集群环境里进行分布式多线程并行数据处理的开源编程工具应该可以是2018的首要任务
akka-stream原则上是一种推式(push-model)的数据流。 对于akka-stream这种push模式的数据流,因为超速推送数据会造成数据丢失,所以必须想办法控制publisher产生数据的速度。 因为akka-stream已经在上下游环节全部实现了Reactive-Streams-Specification,所以上下游之间可以进行互动,这样就可以在akka-stream里由下游通知上游自身可接收数据的状态来控制上游数据流速 akka-stream的backpressure使用了缓冲区buffer来成批预存及补充数据,这样可以提高数据传输效率。 对此akka-stream提供了具体的解决方法:如果外界系统是在上游过快产生数据可以用conflate函数用Seq这样的集合把数据传到下游。
刚好,在这篇讨论里我们希望能介绍一些Akka-Stream和外部系统集成对接的实际用例,把Slick数据库数据载入连接到Akka-Stream形成streaming-dataset应该是一个挺好的想法。 Slick和Akka-Stream可以说是自然匹配的一对,它们都是同一个公司产品,都支持Reactive-Specification。 现在我们有了Reactive stream source,它是个akka-stream,该如何对接处于下游的scalaz-stream-fs2呢? 我们知道:akka-stream是Reactive stream,而scalaz-stream-fs2是纯“拖式”pull-model stream,也就是说上面这个Reactive stream source enqueue代表akka-stream向scalaz-stream-fs2发送数据,可以用akka-stream的Sink构件来实现: class FS2Gate[T](q: fs2.async.mutable.Queue
我们在前面介绍过scalaz-stream,它与akka-stream的主要区别在于: 1、scalaz-stream是pull模式的,而akka-stream是push模式的。 akka-stream的运算器是materializer。 在akka-stream里数据流组件一般被称为数据流图(graph)。我们可以用许多数据流图组成更大的stream-graph。 akka-stream最简单的完整(或者闭合)线性数据流(linear-stream)就是直接把一个Source和一个Sink相接。 我们上面提过akka-stream是在actor系统里处理数据流元素的。在这个过程中同时可以用actor内部状态来产生运算结果。
Akka-http是基于Akka-stream开发的:不但它的工作流程可以用Akka-stream来表达,它还支持stream化的数据传输。 我们知道:Akka-stream提供了功能强大的FileIO和Data-Streaming,可以用Stream-Source代表文件或数据库数据源。 首先,Akka-stream通过FileIO对象提供了足够多的file-io操作函数,其中有个fromPath函数可以用某个文件内容数据构建一个Source类型: /** * Creates a
akka-stream的Graph是一种运算方案,它可能代表某种简单的线性数据流图如:Source/Flow/Sink,也可能是由更基础的流图组合而成相对复杂点的某种复合流图,而这个复合流图本身又可以被当作组件来组合更大的 下面是akka-stream预设的一些基础数据流图: ? 上面Source,Sink,Flow代表具备线性步骤linear-stage的流图,属于最基础的组件,可以用来构建数据处理链条。 注意这个~>符合的使用:akka-stream只提供了对预设定Shape作为连接对象的支持如: def ~>[Out](junction: UniformFanInShape[T, Out] } // Putting all together val closed = source.via(flow.filter(_ > 1)).to(sink) 和scalaz-stream不同的还有akka-stream 的运算是在actor上进行的,除了大家都能对数据流元素进行处理之外,akka-stream还可以通过actor的内部状态来维护和返回运算结果。
从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据流终点Sink三个框架性的流构件(stream components)组成的。 所以:akka-stream必须有一个Graph描述功能和流程。每个Graph又可以由一些代表更细小功能的子Graph组成。 这容易理解,因为akka-stream是Reactive-Stream,是push,pull结合模式上下游相互沟通的。但如此则很不方便某些应用场景,比如数据流动控制。 akka-stream还提供了一套更简单的API使用户可以更灵活的对端口进行操作。 对于一对多扩散型和多对一合并型形状的数据流构件akka-stream提供了UniformFanIn和UniformFanOut两种GraphStage。
akka-stream是多线程non-blocking模式的,一般来说,运算任务提交到另外线程后这个线程就会在当前程序控制之外自由运行了。 akka-stream提供了KillSwitch trait来支持这项功能: /** * A [[KillSwitch]] allows completion of [[Graph]]s from the UniqueBidiKillSwitchStage.asInstanceOf[Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch]] ...} akka-stream
akka-stream的数据流可以由一些组件组合而成。这些组件统称数据流图Graph,它描述了数据流向和处理环节。Source,Flow,Sink是最基础的Graph。 我们可以用akka-stream提供的GraphDSL来构建Graph。 我们知道:akka-stream的Graph可以用更简单的Partial-Graph来组合,而所有Graph最终都是用基础流图Core-Graph如Source,Flow,Sink组合而成的。 = { GraphInterpreter .currentInterpreter .activeStage .completeStage() } } akka-stream 所以我们最好使用akka-stream提供现成的pull,push来重写抽象函数onPull,onPush。
akka-stream是基于Actor模式的,所以也继承了Actor模式的“坚韧性(resilient)”特点,在任何异常情况下都有某种整体统一的异常处理策略和具体实施方式。 在akka-stream的官方文件中都有详细的说明和示范例子。我们在这篇讨论里也没有什么更好的想法和范例,也只能略做一些字面翻译和分析理解的事了。 下面列出了akka-stream处理异常的一些实用方法: 1、recover:这是一个函数,发出数据流最后一个元素然后根据上游发生的异常终止当前数据流 2、recoverWithRetries:也是个函数 stream,Supervisor-Strategy提供了三种处理方法: Stop:终结stream,返回异常 Resume:越过当前元素,继续运行 Restart:重新启动、越过当前元素、清除任何内部状态 akka-stream
在现实应用中akka-stream往往需要集成其它的外部系统形成完整的应用。这些外部系统可能是akka系列系统或者其它类型的系统。 所以,akka-stream必须提供一些函数和方法来实现与各种不同类型系统的信息交换。在这篇讨论里我们就介绍几种通用的信息交换方法和函数。 akka-stream提供了mapAsync+ask模式可以从一个运算中的数据流向外连接某个Actor来进行数据交换。这是一种akka-stream与Actor集成的应用。 akka-stream提供了个Source.queque函数来构建一种Source,外部系统可以利用这个Source来向Stream发送数据。 Add(1,1) scala.io.StdIn.readLine sys.terminate() } 在本次讨论里我们了解了akka-stream与外界系统对接集成的一些情况。
Akka-http是基于Akka-stream编写的,所以我们需要从Akka-stream运算模式来理解Akka-http的类型表现形式。
我们知道HttpResponse里的Entity.dataBytes就是一个Source[ByteString,_],我们可以把它Unmarshall成Source[County,_],然后用Akka-stream localhost:8011/rows")) scala.io.StdIn.readLine() sys.terminate() } 以上我们已经实现了客户端从服务端下载一段数据库表行,然后以Akka-stream def asJson: JsValue = parseJson def parseJson: JsValue = JsonParser(string) } } 假设服务端收到数据后以Akka-stream
akka-http是以akka-stream为核心的,使用了大量的akka-stream功能。 akka-stream中有一个FileIO组件库,里面提供了一系列有关文件读写功能,以数据流Source,Sink方式呈现: ... libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http" % "10.1.8", "com.typesafe.akka" %% "akka-stream
我们知道:Akka-http是搭建在Akka-stream之上的。所以,Akka-http在客户端构建与服务器的连接通道也可以用Akka-stream的Flow来表示。
我们在前面几个章节里已经实现了gRPC与akka-stream的集成:这样我们就可以通过akka-stream实现一种互动的数据交换。所以:streaming就是标题上面提到的编程模式了。 value.asInstanceOf[A] } } 在gRPC-JDBC服务端按照客户端传过来的JDBCQuery重构JDBCQueryContext,然后用这个Context来产生一个akka-stream , parameters = marshal(Seq("Arizona", 5)) ) 然后via Flow传给服务端获取一个akka-stream Source[JBCDataRow,NotUsed logback-classic" % "1.2.3", "com.typesafe.akka" %% "akka-actor" % "2.5.4", "com.typesafe.akka" %% "akka-stream
{idxCreate.error.reason}") } system.terminate() client.close() } 以上代码有几个地方值得提一下: 1、这上面使用了一个基于akka-stream
akka-stream是多线程异步模式的程序,所以这个函数只能是一个异步运行的回调callback。 akka-stream提供了一个函数getAsyncCallback函数,能够把一个函数放到另一个线程里并返回它的callback: /** * Obtain a callback object
AddressRepo.deleteById(id)) { addr => complete(s"address deleted: $addr") } } } 这样做可以灵活的使用akka-stream libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http" % "10.1.8", "com.typesafe.akka" %% "akka-stream