Reactor
Reactive Programming
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
# 传统模式下编程:a 的值在执行后被确定。
a = b + c
# 响应式编程
a 的值随着 b、c 值的变化而变化。
Project Reactor
Reactor是由 Pivotal 团队开发的,第四代Reactive库,用于根据Reactive Streams规范在JVM上构建非阻塞应用程序。
核心的概念
Operators - Publisher / Subscriber
- Nothing Happens Util You subscribe()
- Flux [ 0..N ] - onNext()、onComplete()、onError()
- Mono [ 0..1 ] - onNext()、onComplete()、onError()
Backpressure
- Subscription
- onRequest()、onCancel()、onDispose()
线程调度 Schedulers
- immediate() / single() / newSingle()
- elastic() / parallel() / newParallel() :线程池相关。
错误处理
- onError / onErrorReturn / onErrorResume
- doOnError / doFinally
示例
引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>geektime.spring.reactor</groupId>
<artifactId>simple-reactor-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>simple-reactor-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
启动类
package geektime.spring.reactor.simple;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@SpringBootApplication
@Slf4j
public class SimpleReactorDemoApplication implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(SimpleReactorDemoApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
//创建1~6的序列
Flux.range(1, 6)
//在每次执行request的时候打印请求多少个数的日志
.publishOn(Schedulers.elastic())
.doOnRequest(n -> log.info("Request {} number", n)) // 注意顺序造成的区别
.doOnComplete(() -> log.info("Publisher COMPLETE 1"))//执行完序列1~6之后打印 Publisher COMPLETE 1
.map(i -> {
log.info("Publish {}, {}", Thread.currentThread(), i);//map执行是在哪个线程上,打印线程名字
return 10 / (i - 3);
// return i;
})
.doOnComplete(() -> log.info("Publisher COMPLETE 2"))//执行完序列1~6之后打印 Publisher COMPLETE 2
.subscribeOn(Schedulers.single())
.onErrorResume(e -> {
log.error("Exception {}", e.toString());
return Mono.just(-1);
})
// .onErrorReturn(-1)
.subscribe(i -> log.info("Subscribe {}: {}", Thread.currentThread(), i),
e -> log.error("error {}", e.toString()),
() -> log.info("Subscriber COMPLETE"),
s -> s.request(4)
);
Thread.sleep(2000);
}
}
控制台输出
2020-01-15 16:26:04.248 INFO 17748 --- [ single-1] g.s.r.s.SimpleReactorDemoApplication : Request 4 number
2020-01-15 16:26:04.250 INFO 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Publish Thread[elastic-2,5,main], 1
2020-01-15 16:26:04.250 INFO 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Subscribe Thread[elastic-2,5,main]: -5
2020-01-15 16:26:04.250 INFO 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Publish Thread[elastic-2,5,main], 2
2020-01-15 16:26:04.250 INFO 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Subscribe Thread[elastic-2,5,main]: -10
2020-01-15 16:26:04.250 INFO 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Publish Thread[elastic-2,5,main], 3
2020-01-15 16:26:04.252 ERROR 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Exception java.lang.ArithmeticException: / by zero
2020-01-15 16:26:04.253 INFO 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Subscribe Thread[elastic-2,5,main]: -1
2020-01-15 16:26:04.253 INFO 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Subscriber COMPLETE