响应式编程是一个关注数据流和变化传播的异步编程范式。它使得通过所使用的编程语言来轻松的描述静态(e.g. array
)和动态(e.g. event emitters
)数据流成为可能。
Reactive Streams Specification定义了Java
响应式编程的接口规范。这个规范已被整合进Java 9
(Flow
类)。
在面向对象的编程语言中,响应式编程经常被做为观察者模式的扩展。
也有人拿响应式流与迭代器模式进行对比,因为在它们的类库中都有一个Iterable-Iterator
对。它们最大的不同在于,迭代器采用的是pull
模式,而响应式流采用的是push
模式。
迭代器是命令式编程模式,由开发者决定何时调用next()方法来获取下一个元素;而响应式编程中国,与Iterable-Iterator
对等价的是Publisher-Subscriber
对,由Publisher
通知Subscriber
有新的元素要处理。
这个规范定义了如下接口:
-
org.reactivestreams.Publisher
: 无界元素序列的提供者,根据Subscriber
发布这些元素。一个Publisher
可以服务于多个Subscriber
-
org.reactivestreams.Subscriber
: 无界元素序列的消费者 -
org.reactivestreams.Subscription
: 一个订阅到Publisher
的Subscriber
的一对一的生命周期 -
org.reactivestreams.Processor
:Publisher
和Subscriber
都要尊从的契约
Publisher/Subscriber
: 1/N
Subscriber/Publisher
: 1/1
Subscription/Subscriber
: 1/1
Reactor是一个在JVM平台上构建非阻塞应用程序(non-blocking application
)的Reactive Streams Specification的实现。
它是Spring WebFlux
的Reactor
默认实现,其对外提供的主要类有Flux
和Mono
:
Flux<T>
:public abstract class Flux<T> implements Publisher<T>
,0
到N
个元素的响应式流Mono<T>
:public abstract class Mono<T> implements Publisher<T>
,0
到1
个元素的响应式流
引入依赖:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.6.RELEASE</version>
</dependency>
最简单的构建方式:
//从数组构建
Flux.just("0", "1", "2", "3")
.subscribe(s -> log.info("Flux.just: {}", s));
Flux.fromArray(new String[]{"0", "1", "2", "3"})
.subscribe(s -> log.info("Flux.fromArray: {}", s));
List<String> strings = Arrays.asList("0", "1", "2", "3");
//从可迭代对象构建,如ArrayList
Flux.fromIterable(strings)
.subscribe(s -> log.info("Flux.fromIterable: {}", s));
//从Java 8 的Stream构建
Flux.fromStream(strings.stream())
.subscribe(s -> log.info("Flux.fromStream: {}", s));
//构建至多一个元素的流
Mono.just("0")
.subscribe(s -> log.info("Mono.just: {}", s));
String value = "0";
Mono.justOrEmpty(Optional.ofNullable(value))
.subscribe(s -> log.info("Mono.justOrEmpty: {}", s));
构建一个数字序列:
//构建一个排列
Flux.range(0, 4)
.subscribe(i -> log.info("Flux.range: {}", i));
//这个for循环实现了上面代码的功能
for (int i = 0; i < 4; i++) {
log.info("for: {}", i);
}
为序列分配线程执行,而不是在调用者线程中执行,即异步执行。
Reactor
提供了如下Scheduler
:
Schedulers.immediate()
: 在当前线程执行Schedulers.single()
: 可重用的单线程Schedulers.newSingle()
: 每次调用都创建一个新线程Schedulers.elastic()
: 在需要时创建线程池,重用空闲的线程,适用于I/O
等阻塞式工作,类似Executors.newCachedThreadPool()
Schedulers.parallel()
: 固定大小(CPU
核心数)的线程池
为每个Flux分配一个线程
Flux.range(0, 10)
.publishOn(Schedulers.parallel())
.subscribe(i -> log.info("Flux.range: {}", i));
Flux.range(10, 10)
.subscribeOn(Schedulers.parallel())
.subscribe(i -> log.info("Flux.range: {}", i));
publishOn/subscribeOn
: 都会使你的操作在单独的线程中执行
parallel(int)
和runOn(Scheduler)
方法帮助你真正的进行异步处理:
parallel(int)
方法返回一个ParallelFlux
实例,runOn(Scheduler)
方法告诉ParallelFlux
实例使用哪个Scheduler
来执行任务。
Flux.range(0, 100)
.parallel(4)
.runOn(Schedulers.parallel())
.subscribe(i -> log.info("Flux.range: {}", i));
通过ConnectableFlux
,可以向多个Subscriber
发送广播
方案一:
publish()-->connect()
:
ConnectableFlux<Integer> flux = Flux.range(0, 4)
.doOnSubscribe(subscription -> log.info("doOnSubscribe: {}", subscription))
.publish();
flux.subscribe(i -> log.info("subscribe 1: {}", i));
flux.subscribe(i -> log.info("subscribe 2: {}", i));
flux.connect();
直到调用connect()
方法,才会开始处理数据,即调用了connect()
方法才会执行subscribe()
方法中的代码
方案二:
publish()-->autoConnect()
:
Flux<Integer> flux = Flux.range(0, 4)
.doOnSubscribe(subscription -> log.info("doOnSubscribe: {}", subscription))
.publish().autoConnect(2);
flux.subscribe(i -> log.info("subscribe 1: {}", i));
flux.subscribe(i -> log.info("subscribe 2: {}", i));
使用autoConnect(int)
方法时,直到subscribe()
被调用N
(autoConnect(int)
的参数)次才会开始执行任务,且最多有N
个订阅者
RxJava是另一个Reactive Streams Specification的实现。
引入依赖:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.12</version>
</dependency>
Flowable<T>
:public abstract class Flowable<T> implements Publisher<T>
,0
到N
个元素的响应式流,支持Reactive-Streams
和backpressure
构建与Reactor
类似,也是rang
、just
、fromXXX
几个方法
RxJava
提供了如下Scheduler
:
Schedulers.computation()
: 使用固定数量的专门的线程异步执行计算密集型任务Schedulers.io()
: 执行I/O等阻塞式任务Schedulers.single()
: 使用单个线程以FIFO的方式执行任务Schedulers.trampoline()
: 用于测试
示例:
Flowable.range(0,10)
.subscribeOn(Schedulers.computation())
.subscribe(s -> log.info("Schedulers.computation: {}",s));
Flowable.range(10,10)
.observeOn(Schedulers.computation())
.subscribe(s -> log.info("Schedulers.computation: {}",s));
subscribeOn/observeOn
: 与Reactor
的publishOn/subscribeOn
类似
方案一:
Flowable.range(0, 10)
.flatMap(v -> Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> {
Integer r = w * w;
log.info("parallel processing: {} * {} = {}", w, w, r);
return r;
}))
.subscribe(r -> log.info("parallel processing: {}", r));
方案二:
Flowable.range(0, 10)
.parallel(4)
.runOn(Schedulers.computation())
.map(w -> {
Integer r = w * w;
log.info("parallel processing: {} * {} = {}", w, w, r);
return r;
})
.sequential()
.subscribe(r -> log.info("parallel processing: {}", r));
parallel(int)
方法返回一个ParallelFlowable<T>
。需要注意的是,parallel(int)
和ParallelFlowable<T>
都被标记为beta
(@Beta
),在未来的RxJava
版本中有可能会变动。