在这篇博客中,我们将详细介绍 Reactive-Streams 规范的核心概念和它在实际编程中的重要性。 1. 什么是 Reactive-Streams 规范? Reactive-Streams 的核心组件 Reactive-Streams 规范定义了四个核心接口,分别为 Publisher、Subscriber、Subscription 和 Processor Reactive-Streams 与 Reactor Reactor 是 Spring 的响应式编程库,完全基于 Reactive-Streams 规范。 为什么选择 Reactive-Streams? Reactive-Streams 是构建响应式应用的基础,它提供了以下优势: 兼容性:由于 Reactive-Streams 是一个标准,不同的响应式库(如 Reactor 和 RxJava)可以无缝互操作
Slick3.x已经增加了支持Reactive-Streams功能,可以通过Reactive-Streams API来实现有限内存空间内的无限规模数据读取,这正符合了FunDA的设计理念:高效、便捷、安全的后台数据处理工具库 我们在前面几篇讨论里介绍了Iteratee模式,play-iteratees支持Reactive-Streams并且提供与Slick3.x的接口API,我们就在这篇讨论里介绍如何把Slick-Reactive-Streams fetchSize是缓存数据页长度(每批次读取数据字数),然后用db.stream来构成一个Reactive-Streams标准的数据源publisher。 play-iteratee支持Reactive-Streams,所以这个Enumerator应该具备协调后台数据和内存缓冲之间关系(back-pressure)的功能。
换句话讲Reactive-Streams是通过push-pull-model来实现上下游Enumerator和Iteratee之间互动的。 这样就违背了使用Reactive-Streams的意愿。那我们应该怎么办? 现在我们可以把这个Reactive-Streams到fs2-pull-streams转换过程这样来定义: implicit val strat = Strategy.fromFixedDaemonPool
主要变化 主要特点 单一依赖:Reactive-Streams 继续支持Java 6+和Android 2.3+ 修复了API错误和RxJava 2的许多限制 旨在替代RxJava 2,具有相对较少的二进制不兼容更改 System.out::println); 2.5 基类 在 RxJava 3 可以发现有以下几个基类(跟RxJava 2是一致的吧): io.reactivex.Flowable:发送0个N个的数据,支持Reactive-Streams
RxJava 2.0 已经按照Reactive-Streams specification规范完全的重写了。RxJava2.0 已经独立于RxJava 1.x而存在。 现在也可以完成和以前类似的代码: 注意,由于Reactive-Streams的兼容性,方法onCompleted被重命名为onComplete。 Reactive-Streams规范用这个名称指定source和consumer之间的关系: org.reactivestreams.Subscription 允许从上游请求一个正数,并支持取消。 因为Reactive-Streams的基础接口org.reactivestreams.Publisher 定义subscribe()为无返回值,Flowable.subscribe(Subscriber
Reactive-Streams规范中,一个终端事件到达Subscriber,上游的Subscription会取消,因此调用 cancel()是一个空操作。
此外,FunDA的数据库读取方式支持reactive-streams标准,能对大量数据进行后台缓存,然后逐块输出,保证了资源的安全使用。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { } Reference: https://github.com/reactive-streams
reactive-streams是把数据当做数据流来用的,因此spring data reactive并不支持返回Page,但是调用参数可以传Pageable参数 public interface StocDao
; 6.2 更改Single Single的作用类似于 Observable = 发送数据,但区别在于订阅后只能接受到1次 改动如下 <-- 源码分析 --> // 变动1:Single被重新设计为 Reactive-Streams Completable的作用类似于 Observable = 发送数据,但区别在于订阅后只能接受 Complete 和 onError事件 改动如下 // 变动1:Completable被重新设计为 Reactive-Streams
; 6.2 更改Single Single的作用类似于 Observable = 发送数据,但区别在于订阅后只能接受到1次 改动如下 <-- 源码分析 --> // 变动1:Single被重新设计为 Reactive-Streams Completable的作用类似于 Observable = 发送数据,但区别在于订阅后只能接受 Complete 和 onError事件 改动如下 // 变动1:Completable被重新设计为 Reactive-Streams
现在我们对Reactive-Streams有了个大概的印象:这个模式由两方组成,分别是:数据源(在push-model中就是数据发送方)以及数据消耗方,分别对应了Iteratee模式的Enumerator 用Iteratee实现Reactive-Streams时必须实现Enumerator和Iteratee之间的双向通告机制。
)); } @Override public void onComplete() { //由于Reactive-Streams
类似事件模式Reactive-Streams:onSubscribe(onError | onComplete)?
鉴于它还处在incubator,如果不是着急使用HTTP/2,建议还是使用spring5的webclient,它是遵循reactive-streams规范的,使用起来更加方便。
io.projectreactor:reactor-core:jar:3.1.8.RELEASE:compile [INFO] | \- org.reactivestreams:reactive-streams
filepath=org/reactivestreams/reactive-streams/1.0.2/reactive-streams-1.0.2.jar https://search.maven.org
org.mongodb mongodb-driver-reactivestreams 1.11.0 > 引入mongodb-driver-reactivestreams 将会自动添加 reactive-streams
Gradle 本地环境 完成导入 IDEA,等待项目构建初始化完毕,可以看到项目依赖树如下图: 图13-5 项目依赖树 可以看到,在 webflux的 starter 中依赖了 reactor、reactive-streams
akka-stream_2.11/2.4.20/akka-stream_2.11-2.4.20.jar:/Users/huangqingshi/.m2/repository/org/reactivestreams/reactive-streams akka-stream_2.11/2.4.20/akka-stream_2.11-2.4.20.jar:/Users/huangqingshi/.m2/repository/org/reactivestreams/reactive-streams akka-stream_2.11/2.4.20/akka-stream_2.11-2.4.20.jar:/Users/huangqingshi/.m2/repository/org/reactivestreams/reactive-streams akka-stream_2.11/2.4.20/akka-stream_2.11-2.4.20.jar:/Users/huangqingshi/.m2/repository/org/reactivestreams/reactive-streams akka-stream_2.11/2.4.20/akka-stream_2.11-2.4.20.jar:/Users/huangqingshi/.m2/repository/org/reactivestreams/reactive-streams