目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

Reactor

Reactive Programming

 响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

# 传统模式下编程:a 的值在执行后被确定。
a = b + c 

# 响应式编程
a 的值随着 b、c 值的变化而变化。

Project Reactor

 Reactor是由 Pivotal 团队开发的,第四代Reactive库,用于根据Reactive Streams规范在JVM上构建非阻塞应用程序。

核心的概念

Operators - Publisher / Subscriber

  • Nothing Happens Util You subscribe()
  • Flux [ 0..N ] - onNext()、onComplete()、onError()
  • Mono [ 0..1 ] - onNext()、onComplete()、onError()

Backpressure

  • Subscription
  • onRequest()、onCancel()、onDispose()

线程调度 Schedulers

  • immediate() / single() / newSingle()
  • elastic() / parallel() / newParallel() :线程池相关。

错误处理

  • onError / onErrorReturn / onErrorResume
  • doOnError / doFinally

示例

引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.2.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>geektime.spring.reactor</groupId>
	<artifactId>simple-reactor-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>simple-reactor-demo</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>
		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

启动类

package geektime.spring.reactor.simple;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@SpringBootApplication
@Slf4j
public class SimpleReactorDemoApplication implements ApplicationRunner {

	public static void main(String[] args) {
		SpringApplication.run(SimpleReactorDemoApplication.class, args);
	}

	@Override
	public void run(ApplicationArguments args) throws Exception {
		//创建1~6的序列
		Flux.range(1, 6)
				//在每次执行request的时候打印请求多少个数的日志
				.publishOn(Schedulers.elastic())
				.doOnRequest(n -> log.info("Request {} number", n)) // 注意顺序造成的区别
				.doOnComplete(() -> log.info("Publisher COMPLETE 1"))//执行完序列1~6之后打印 Publisher COMPLETE 1
				.map(i -> {
					log.info("Publish {}, {}", Thread.currentThread(), i);//map执行是在哪个线程上,打印线程名字
					return 10 / (i - 3);
//					return i;
				})
				.doOnComplete(() -> log.info("Publisher COMPLETE 2"))//执行完序列1~6之后打印 Publisher COMPLETE 2
				.subscribeOn(Schedulers.single())
				.onErrorResume(e -> {
					log.error("Exception {}", e.toString());
					return Mono.just(-1);
				})
//				.onErrorReturn(-1)
				.subscribe(i -> log.info("Subscribe {}: {}", Thread.currentThread(), i),
						e -> log.error("error {}", e.toString()),
						() -> log.info("Subscriber COMPLETE"),
						s -> s.request(4)
				);
		Thread.sleep(2000);
	}
}


控制台输出

2020-01-15 16:26:04.248  INFO 17748 --- [       single-1] g.s.r.s.SimpleReactorDemoApplication     : Request 4 number
2020-01-15 16:26:04.250  INFO 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Publish Thread[elastic-2,5,main], 1
2020-01-15 16:26:04.250  INFO 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Subscribe Thread[elastic-2,5,main]: -5
2020-01-15 16:26:04.250  INFO 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Publish Thread[elastic-2,5,main], 2
2020-01-15 16:26:04.250  INFO 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Subscribe Thread[elastic-2,5,main]: -10
2020-01-15 16:26:04.250  INFO 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Publish Thread[elastic-2,5,main], 3
2020-01-15 16:26:04.252 ERROR 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Exception java.lang.ArithmeticException: / by zero
2020-01-15 16:26:04.253  INFO 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Subscribe Thread[elastic-2,5,main]: -1
2020-01-15 16:26:04.253  INFO 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Subscriber COMPLETE

作者:Soulboy