Flink
大数据
大数据(big data),或称巨量资料,指的是所涉及的资料量规模巨大到无法透过主流软件工具,在合理时间内达到撷取、管理、处理、并整理成为帮助企业经营决策更积极目的的资讯。
在维克托·迈尔-舍恩伯格及肯尼斯·库克耶编写的《大数据时代》中大数据指不用随机分析法(抽样调查)这样捷径,而采用所有数据进行分析处理。大数据的5V特点(IBM提出):Volume(大量)、Velocity(高速)、Variety(多样)、Value(低价值密度)、Veracity(真实性)。
人工智能
人工智能(Artificial Intelligence),英文缩写为AI。是新一轮科技革命和产业变革的重要驱动力量,是研究、开发用于模拟、延伸和扩展人的智能的理论、方法、技术及应用系统的一门新的技术科学。
人工智能是智能学科重要的组成部分,它企图了解智能的实质,并生产出一种新的能以与人类智能相似的方式做出反应的智能机器。人工智能是十分广泛的科学,包括机器人、语言识别、图像识别、自然语言处理、专家系统、机器学习,计算机视觉等。
**人工智能大模型带来的治理挑战也不容忽视。**马斯克指出,在人工智能机器学习面具之下的本质仍然是统计。^ [35]^营造良好创新生态,需做好前瞻研究,建立健全保障人工智能健康发展的法律法规、制度体系、伦理道德。着眼未来,在重视防范风险的同时,也应同步建立容错、纠错机制,努力实现规范与发展的动态平衡。
批量计算(batch computing)
对一定规模量的数据进行处理,类似搬砖,10个10个的搬。
- 场景:离线数据统计、报表分析等(过去1年 10000亿条日志,分析日、周、月,接口响应延迟、状态码)
- 特点:批量计算非实时、高延迟,计算完成后才可以得到结果
- 框架:Hadoop的MapReduce
流式计算(stream computing)
对源源不断的数据流进行处理,类似水龙头出水。
- 场景:实时监控、实时风控等
- 特点:流式计算实时、低延迟,实时取最新的结果
- 框架:Spark(宏观上)、Flink
区分( 离线计算和实时计算 、流式计算和批量计算)
- 离线计算和实时计算 :是对数据处理的【延迟】不一样(一个实时和非实时)
- 流式计算和批量计算: 是对数据处理的【方式】不一样(一个流式和一个批量)
大数据技术栈用途
阶段 | 技术栈 | 场景用途 |
---|---|---|
初级阶段 | Javase+Javaweb、Idea+maven+git、Linux+Mysql+Shell、Springboot+Redis、Kakfa+Docker | |
中级阶段 | ELK体系、Hadoop生态:Hive(简化MapReduce编写代码)-Hbase(列式数据库)-MapReduce、Zookeeper、Flink+Flume(采集日志)、Scala+Spark(计算统计) | |
高级阶段 | Zabbix监控+Azkaban任务调度、Canal+ClickHouse(列式数据库)、Python、阿里云ODPS+DataV、SpringCloud微服务、大数据项目实战、企业数据中台+DaaS+PaaS |
Stream(JDK8)
lombok依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<scope>provided</scope>
</dependency>
订单类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
/**
* 订单号
*/
private String tradeNo;
/**
* 订单标题
*/
private String title;
/**
* 订单金额
*/
private int money;
}
JDK8流式处理stream范例
需求:电商订单数据处理,根据下⾯的list1和list2 各10个订单
- 统计两个⼈的分别购买订单的平均价格
- 统计两个人的订单总价
import net.xdclass.model.VideoOrder;
import net.xdclass.model.VideoOrderOld;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class JdkStreamApp {
public static void main(String [] args){
/**
* 数据源
*/
//总价 35
List<VideoOrderOld> videoOrders1 = Arrays.asList(
new VideoOrderOld("20190242812", "springboot", 3),
new VideoOrderOld("20194350812", "微服务SpringCloud", 5),
new VideoOrderOld("20190814232", "Redis", 9),
new VideoOrderOld("20190523812", "⽹⻚开发", 9),
new VideoOrderOld("201932324", "百万并发实战Netty", 9));
//总价 54
List<VideoOrderOld> videoOrders2 = Arrays.asList(
new VideoOrderOld("2019024285312", "springboot", 3),
new VideoOrderOld("2019081453232", "Redis", 9),
new VideoOrderOld("20190522338312", "⽹⻚开发", 9),
new VideoOrderOld("2019435230812", "Jmeter压⼒测试", 5),
new VideoOrderOld("2019323542411", "Git+Jenkins持续集成", 7),
new VideoOrderOld("2019323542424", "Idea", 21));
/**
* 平均价格(一定配置idea的 jdk8编译)
*/
double videoOrder1Avg1 = videoOrders1.stream().collect(
Collectors.averagingInt(VideoOrderOld::getMoney)
).doubleValue();
double videoOrder1Avg2 = videoOrders2.stream().collect(
Collectors.averagingInt(VideoOrderOld::getMoney)
).doubleValue();
System.out.println("videoOrder1Avg1="+videoOrder1Avg1); //videoOrder1Avg1=7.0
System.out.println("videoOrder1Avg2="+videoOrder1Avg2); //videoOrder1Avg2=9.0
/**
* 订单总价
*/
int total1 = videoOrders1.stream().collect(Collectors.summingInt(VideoOrderOld::getMoney)).intValue(); //7.0
int total2 = videoOrders2.stream().collect(Collectors.summingInt(VideoOrderOld::getMoney)).intValue(); //9.0
System.out.println("total1="+total1); // 35
System.out.println("total2="+total2); // 54
}
}
Stream(JDK8) 对比 Flink
数据来源和输出有多样化怎么处理?
- jdk stream:代码。
- flink:自带很多组件。
海量数据需要进行实时处理
- jdk stream:内部jvm单节点处理,单机内部并行处理。
- flink:节点可以分布在不同机器的JVM上,多机器并行处理。
统计时间段内数据,但数据达到是无序的
- jdk stream:写代码
- flink:自带窗口函数和watermark处理迟到数据
为了实现一个天猫双十一实时交易大盘各个品类数据展示功能
- jdk stream:一个功能耗时1个月完成,需求不敢轻易改动
- flink:1周搞定,需求可以灵活变动
Flink
Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。
实时数仓建设、实时数据监控、实时反作弊风控、画像系统等。
数据流
任何类型的数据都可以形成一种事件流,信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。
什么是有界流(批处理)
有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。
什么是无界流(流处理)
有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
Apache Flink 擅长处理无界和有界数据集,有出色的性能
代码使用例子
- source、transformation、sink 都是 operator算子
MAVEN依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.xdclass</groupId>
<artifactId>xdclass-flink</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.13.1</flink.version>
</properties>
<dependencies>
<!--lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<scope>provided</scope>
</dependency>
<!--flink客户端-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--scala版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--java版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!--streaming的scala版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--streaming的java版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--日志输出-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<!--log4j-->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<!--json依赖包-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<!--Flink web ui-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>
日志配置 src/main/resources/log4j.properties
### 配置appender名称
log4j.rootLogger = debugFile, errorFile
### debug日志级别以上的日志到:src/logs/debug.log
log4j.appender.debugFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.debugFile.File = src/logs/flink.log
log4j.appender.debugFile.Append = true
#### Threshold
log4j.appender.debugFile.Threshold = info
log4j.appender.debugFile.layout = org.apache.log4j.PatternLayout
log4j.appender.debugFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %n%m%n
### error日志级别以上的日志到:src/logs/error.log
log4j.appender.errorFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.errorFile.File = src/logs/error.log
log4j.appender.errorFile.Append = true
log4j.appender.errorFile.Threshold = error
log4j.appender.errorFile.layout = org.apache.log4j.PatternLayout
log4j.appender.errorFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %n%m%n
Blink、Flink
-
2019年Flink的母公司被阿里全资收购
-
阿里进行高度定制并取名为Blink (加了很多特性 )
-
阿里巴巴官方说明:Blink不会单独作为一个开源项目运作,而是Flink的一部分
-
都在不断演进中,对比其他流式计算框架(老到新)
- Storm 只支持流处理
- Spark Streaming (流式处理,其实是micro-batch微批处理,本质还是批处理)
- Flink 支持流批一体
-
算子Operator
- 将一个或多个DataStream转换为新的DataStream,可以将多个转换组合成复杂的数据流拓扑
- Source 和 Sink 是数据输入和数据输出的特殊算子,重点是transformation类的算子
编码与部署
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
在IDEA里面运行这样就行?(实际项目也是这样用?)
- flink可以本地idea执行模拟多线程执行,但不能读取配置文件,适合本地调试
- 可以提交到远程搭建的flink集群
- getExecutionEnvironment() 是flink封装好的方式可以自动判断运行模式,更方便开发,
- 如果程序是独立调用的,此方法返回本地执行环境;
- 如果从命令行客户端调用程序以提交到集群,则返回此集群的执行环境,是最常用的一种创建执行环境的方式
最终线上部署会把main函数打成jar包,提交到Flink进群进行运行, 会有UI可视化界面
- 服务端部署示例
- Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同
部署方式 | 描述 |
---|---|
Local | 本地部署,直接启动进程,适合调试使用 |
Standalone Cluster | 集群部署,flink自带集群模式 |
On Yarn | 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率,生产环境 |
Tuple(元组类型)
元组类型, 多个语言都有的特性, flink的java版 tuple最多支持25个。
函数返回(return)多个值,多个不同类型的对象,列表只能存储相同的数据类型,而元组Tuple可以存储不同的数据类型。
/**
* tuple元组使用
*/
private static void tupleTest() {
Tuple3<Integer, String, Double> tuple3 = Tuple3.of(1, "soulboy", 3.1);
System.out.println(tuple3.f0); // 1
System.out.println(tuple3.f1); // soulboy
System.out.println(tuple3.f2); // 3.1
}
java里面的Map操作
一对一 转换对象,比如DO转DTO
/**
* Map 一对一 转换对象
*/
private static void mapTest() {
List<String> list1 = new ArrayList<>();
list1.add("springboot,springcloud");
list1.add("redis6,docker");
list1.add("kafka,rabbitmq");
// 一对一转换
List<String> list2 = list1.stream().map(obj -> {
obj = "soulboy-" + obj;
return obj;
}).collect(Collectors.toList());
System.out.println(list2); // [soulboy-springboot,springcloud, soulboy-redis6,docker, soulboy-kafka,rabbitmq]
}
什么是java里面的FlatMap操作
一对多转换对象
/**
* FlatMap 一对多 转换对象
*/
private static void mapTest() {
List<String> list1 = new ArrayList<>();
list1.add("springboot,springcloud");
list1.add("redis6,docker");
list1.add("kafka,rabbitmq");
//一对多转换
List<String> list3 = list1.stream().flatMap(
obj -> {
Stream<String> stream = Arrays.stream(obj.split(","));
return stream;
}
).collect(Collectors.toList());
System.out.println(list3); // [springboot, springcloud, redis6, docker, kafka, rabbitmq]
}
Flink流批示例
流处理
需求:根据字符串的逗号进行分割
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class Flink01App {
/**
* source
* transformation
* sink
* @param args
*/
public static void main(String[] args) throws Exception {
// 1. 获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行数量
env.setParallelism(1);
// 2. 定义数据源 (相同类型元素的数据集)
DataStream<String> stringDS = env.fromElements("java,springboot","java,springcloud");
stringDS.print("处理前");
// 3. 定义数据转换操作(FlatMapFunction<String, String>, key是输入类型,value是Collector响应的收集的类型,看源码注释,也是 DataStream<String>里面泛型类型)
DataStream<String> flatMapDS = stringDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> collector) throws Exception {
String [] arr = value.split(",");
for(String str : arr){
collector.collect(str);
}
}
});
flatMapDS.print("处理后");
// 4. 执行任务,可以取个名字
env.execute("flat map job");
}
}
控制台输出
# 读一个,处理一个
处理前> java,springboot
处理后> java
处理后> springboot
处理前> java,springcloud
处理后> java
处理后> springcloud
批处理
需求:根据字符串的逗号进行分割
Flink1.12时支持流批一体,DataSetAPI已经不推荐使用了,都会优先使用DataStream流式API。
package net.xdclass.app;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;
public class Flink02App {
/**
* source
* transformation
* sink
* @param args
*/
public static void main(String[] args) throws Exception {
// 1. 获取流的执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//设置并行数量
//env.setParallelism(1);
// 2. 定义数据源 (相同类型元素的数据集)
DataSet<String> stringDS = env.fromElements("java,springboot","java,springcloud");
stringDS.print("处理前");
// 3. 定义数据转换操作(FlatMapFunction<String, String>, key是输入类型,value是Collector响应的收集的类型,看源码注释,也是 DataStream<String>里面泛型类型)
DataSet<String> flatMapDS = stringDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> collector) throws Exception {
String [] arr = value.split(",");
for(String str : arr){
collector.collect(str);
}
}
});
flatMapDS.print("处理后");
// 4. 执行任务
env.execute("flat map job");
}
}
控制台输出
# 一次性全部读入
处理前> java,springboot
处理前> java,springcloud
处理后> java
处理后> springboot
处理后> java
处理后> springcloud
Flink可视化控制台
WebUI可视化界面
- 访问:ip:8081
- 方式一:服务端部署Flink集群(生产环境)
- 方式二:本地依赖添加(测试开发)
依赖坐标
<!--Flink web ui-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
编码
package net.xdclass.app;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WebUIApp {
public static void main(String[] args) throws Exception {
//1.拿到执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//env.setParallelism(1);
//2.从端口读取数据
DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);
//3.对数据进行处理
DataStream<String> flatMapDataStream = stringDataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] arr = value.split(",");
for (String word : arr) {
out.collect(word);
}
}
});
//4.输出结果
flatMapDataStream.print("结果");
//DataStream需要调用execute,可以取个名称
env.execute("data stream job");
}
}
netcat
nc命令安装,https://blog.csdn.net/lck_csdn/article/details/125320540
webUI
地址:http://192.168.10.88:8081/#/overview
部署模式、运行流程
Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同
部署方式 | 描述 |
---|---|
Local | 本地部署,直接启动进程,适合调试使用 |
Standalone Cluster | 集群部署,flink自带集群模式 |
On Yarn | 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率,生产环境 |
运行流程
- 用户提交Flink程序到JobClient
- JobClient的 解析、优化提交到JobManager
- TaskManager运行task, 并上报信息给JobManager
- 通俗解释
- JobManager 包工头
- TaskManager 任务组长
- Task solt 工人 (并行去做事情)
Flink整体架构和组件角色
snapshot用户保存快照,在 TaskManager 出现故障中断时候保存已经完成的状态,待 TaskManager 正常时恢复任务现场。
Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。运行时由两种类型的进程组成:
- 一个 JobManager
- 一个或者多个 TaskManager 。
JobManager
协调 Flink 应用程序的分布式执行的功能。
- 它决定何时调度下一个 task(或一组 task)
- 对完成的 task 或执行失败做出反应
- 协调 checkpoint、并且协调从失败中恢复等等
JobManager进程由三个不同的组件组成。
- ResourceManager
负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots
- Dispatcher
提供了一个 REST 接口,用来提交 Flink 应用程序执行
为每个提交的作业启动一个新的 JobMaster。
运行 Flink WebUI 用来提供作业执行信息
- JobMaster
负责管理单个JobGraph(多个算子构成)的执行,Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster
至少有一个 JobManager,高可用(HA)设置中可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby
TaskManager
任务组长,搬砖的人。
- 负责计算的worker,还有上报内存、任务运行情况给JobManager等
- 至少有一个 TaskManager,也称为 worker执行作业流的 task,并且缓存和交换数据流
- 在 TaskManager 中资源调度的最小单位是 task slot,
task slot独占内存空间
- TaskManager中 task slot 的数量表示并发处理 task 的数量
- 一个 task slot 中可以执行多个算子,里面多个线程。
- 算子 opetator的执行流程
- source
- transformation
- sink
- 算子 opetator的执行流程
- 对于分布式执行,Flink 将算子的 subtasks 链接 成 tasks ,每个 task 由一个线程执行
- 图中source和map算子组成一个算子链,作为一个task运行在一个线程上
- 将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量
文档
- https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/concepts/flink-architecture/#tasks-and-operator-chains
- https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/concepts/glossary/
Task Slots
任务槽。
- Task Slot是Flink中的任务执行器,每个Task Slot可以运行多个subtask ,每个subtask会以单独的线程来运行
- 每个 worker(TaskManager)是一个 JVM 进程,可以在单独的线程中执行一个(1个solt)或多个 subtask
- 为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)
- 每个 task slot 代表 TaskManager 中资源的固定子集
注意
所有Task Slot平均分配TaskManger的内存, TaskSolt 没有 CPU 隔离
当前 TaskSolt 独占内存空间,作业间互不影响
一个TaskManager进程里有多少个taskSolt就意味着多少个task并发
task solt数量建议是cpu的核数,独占内存,共享CPU
- 5 个 subtask 执行,因此有 5 个并行线程
Task 正好封装了一个 Operator 或者 Operator Chain 的 parallel instance。
Sub-Task 强调的是同一个 Operator 或者 Operator Chain 具有多个并行的 Task
图中source和map算子组成一个算子链,作为一个task运行在一个线程上
算子链接成 一个 task 它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量
- Task Slot是Flink中的任务执行器,每个Task Slot可以运行多个subtask ,每个sub task会以单独的线程来运行
- Flink 算子之间可以通过【一对一】模式或【重新分发】模式传输数据
文档
- https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/concepts/flink-architecture/#tasks-and-operator-chains
- https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/concepts/glossary/
并行度
Flink 是分布式流式计算框架,程序在多节点并行执行,所以就有并行度 Parallelism。DataStream 就像是有向无环图(DAG),每一个 数据流(DataStream) 以一个或多个 source 开始,以一个或多个 sink 结束。
- 一个数据流( stream) 包含一个或多个分区,在不同的线程/物理机里并行执行
- 每一个算子( operator) 包含一个或多个子任务( subtask),子任务在不同的线程/物理机里并行执行
- 一个算子的子任务subtask 的个数就是并行度( parallelism)
Flink流程序中不同的算子可能具有不同的并行度,可以在多个地方配置,有不同的优先级。Flink并行度配置级别 (高到低):
优先级(高到低) | 说明 |
---|---|
算子 | map( xxx ).setParallelism(2) |
全局env | env.setParallelism(2) |
客户端cli | ./bin/flink run -p 2 xxx.jar |
Flink配置文件 | `/conf/flink-conf.yaml 的 |
某些算子无法设置并行度,本地IDEA运行 并行度默认为cpu核数。
TaskSolt和parallelism的区别
概念 | 说明 |
---|---|
task slot | 是静态的概念,是指taskmanager具有的并发执行能力 |
parallelism | 是动态的概念,是指 程序运行时实际使用的并发能力 |
ask slot | 是具有的能力比如可以100个,parallelism 是实际使用的并发,比如只要20个并发就行 |
Flink有3中运行模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
模式 | 说明 |
---|---|
STREAMING | 流处理 |
BATCH | 批处理 |
AUTOMATIC | 根据source类型自动选择运行模式,基本就是使用这个 |
Operator:Source
Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象
层级 | 描述 | 备注 |
---|---|---|
第一层 | 最底层的抽象为有状态实时流处理 | 抽象实现是 Process Function,用于底层处理 |
第二层 | Core APIs,许多应用程序不需要使用到上述最底层抽象的 API,而是使用 Core APIs 进行开发 | 例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等,此层 API 中处理的数据类型在每种编程语言中都有其对应的类。 |
第三层 | 抽象是 Table API, 是以表Table为中心的声明式编程API,Table API 使用起来很简洁但是表达能力差 | 类似数据库中关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用 |
第四层 | 最顶层抽象是 SQL | 第四层最顶层抽象是 SQL,这层程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式SQL 抽象与 Table API 抽象之间的关联是非常紧密的 |
注意:Table和SQL层变动多,还在持续发展中,大致知道即可,核心是第一和第二层
Flink编程模型
Source来源
来源类型 | API |
---|---|
元素集合 | env.fromElements、env.fromColletion、env.fromSequence(start,end) |
文件/文件系统 | env.readTextFile(本地文件)、env.readTextFile(HDFS文件) |
基于Socket | env.socketTextStream("ip", 8888) |
自定义Source,实现接口自定义数据源,rich相关的api更丰富 | 并行度为1:SourceFunction、RichSourceFunction ;并行度大于1:ParallelSourceFunction、RichParallelSourceFunction |
Connectors与第三方系统进行对接(用于source或者sink都可以) | Flink本身提供Connector例如kafka、RabbitMQ、ES等; 注意:Flink程序打包一定要将相应的connetor相关类打包进去,不然就会失败 |
Apache Bahir连接器 | 里面也有kafka、RabbitMQ、ES的连接器更多 |
Flink和外部系统进行读取/写入方式
- 第一种:
Flink里面预定义的 source 和 sink。
- 第二种:
Flink 内部也提供部分 Boundled connectors。
- 第三种:
是第三方 Apache Bahir 项目中的连接器。
- 第四种:
是通过异步 IO 方式,异步I/O是Flink提供的非常底层的与外部系统交互
预定义Source
元素集合
- env.fromElements
- env.fromColletion
- env.fromSequence(start,end);
package net.xdclass.app;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class Flink03Source1App {
/**
* source
* transformation
* sink
* @param args
*/
public static void main(String[] args) throws Exception {
// 1. 获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//设置并行数量
//env.setParallelism(1);
// 2. 定义数据源 (相同类型元素的数据集)
//env.fromElements
DataStream<String> ds1 = env.fromElements("java,springboot","java,springcloud");
ds1.print("ds1:");
//env.fromColletion
DataStream<String> ds2 = env.fromCollection(Arrays.asList("java,springboot","java,springcloud"));
ds2.print("ds2:");
//env.fromSequence(start,end);
DataStream<Long> ds3 = env.fromSequence(1,5);
ds3.print("ds3:");
// 4. 执行任务
env.execute("flat map job");
}
}
文件/文件系统
package net.xdclass.app;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class Flink03Source2App {
/**
* source
* transformation
* sink
* @param args
*/
public static void main(String[] args) throws Exception {
// 1. 获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//设置并行数量
env.setParallelism(1);
// 2. 定义数据源 (相同类型元素的数据集)
//本地文件
DataStream<String> ds = env.readTextFile("C:\\Users\\chao1\\Desktop\\text.txt");
ds.print("ds:");
//HDFS
//DataStream<String> ds2 = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");
//ds2.print("ds2:");
// 4. 执行任务
env.execute("flat map job");
}
}
基于Socket
package net.xdclass.app;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class Flink03Source2App {
/**
* source
* transformation
* sink
* @param args
*/
public static void main(String[] args) throws Exception {
// 1. 获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//设置并行数量
//env.setParallelism(1);
// 2. 定义数据源 (相同类型元素的数据集)
// 从socket读取数据
DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);
stringDataStream.print();
// 3. 执行任务
env.execute("flat map job");
}
}
自定义Source
Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等。
并行度 | 需实现接口 |
---|---|
1 | SourceFunction、RichSourceFunction |
大于1 | ParallelSourceFunction、RichParallelSourceFunction |
Model
package net.xdclass.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
/**
* 订单号
*/
private String tradeNo;
/**
* 订单标题
*/
private String title;
/**
* 订单金额
*/
private int money;
/**
* 用户id
*/
private int userId;
/**
* 注册时间
*/
private Date createTime;
}
自定义Source
package net.xdclass.source;
import net.xdclass.model.VideoOrder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.*;
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {
private volatile Boolean flag = true;
private Random random = new Random();
private static List<String> list = new ArrayList<>();
//订单初始化(title信息)
static {
list.add("spring boot2.x");
list.add("微服务SpringCloud");
list.add("RabbitMQ消息队列");
list.add("Kafka");
list.add("第一季");
list.add("Flink流式技术");
list.add("工业级微服务项目");
list.add("Linux");
}
/**
* 自定义 Source(产生数据的逻辑)
* 源源不断产生订单
* @param sourceContext
* @throws Exception
*/
@Override
public void run(SourceContext<VideoOrder> sourceContext) throws Exception {
while (flag) {
Thread.sleep(1000);
String id = UUID.randomUUID().toString();
int userId = random.nextInt(10);
int money = random.nextInt(100);
int videoNum = random.nextInt(list.size());
String title = list.get(videoNum);
sourceContext.collect(new VideoOrder(id, title, money, userId, new Date()));
}
}
/**
* 控制任务取消
*/
@Override
public void cancel() {
}
/**
* run 方法调用前:用于初始化连接
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("-----open-----");
}
/**
* 用于清理之前
* @throws Exception
*/
@Override
public void close() throws Exception {
System.out.println("-----close-----");
}
}
运行
package net.xdclass.app;
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSource;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink04CustomSourceApp {
/**
* source
* transformation
* sink
* @param args
*/
public static void main(String[] args) throws Exception {
// 1. 获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
// 2. 定义数据源 (相同类型元素的数据集)
DataStreamSource<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
videoOrderDS.print();
// 4. 执行任务
env.execute("flat map job");
}
}
控制台输出
-----open-----
VideoOrder(tradeNo=16b7bbbc-b082-431a-9336-96909b29e315, title=工业级微服务项目, money=13, userId=6, createTime=Tue Jan 09 12:38:52 CST 2024)
VideoOrder(tradeNo=0e0745ce-728b-4577-90a1-7ba93a2eb4d4, title=spring boot2.x, money=95, userId=8, createTime=Tue Jan 09 12:38:53 CST 2024)
VideoOrder(tradeNo=d1ead911-b66e-4eb5-994d-2eff4db1fa30, title=RabbitMQ消息队列, money=9, userId=3, createTime=Tue Jan 09 12:38:54 CST 2024)
VideoOrder(tradeNo=32aec72d-f96b-4209-bb3b-b3e0177d7956, title=spring boot2.x, money=62, userId=1, createTime=Tue Jan 09 12:38:55 CST 2024)
VideoOrder(tradeNo=cac431ae-18cf-4237-a0de-75c84d9c0164, title=RabbitMQ消息队列, money=78, userId=6, createTime=Tue Jan 09 12:38:56 CST 2024)
对比并行度
PC处理器12核
package net.xdclass.app;
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSource;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink04CustomSourceApp {
/**
* source
* transformation
* sink
* @param args
*/
public static void main(String[] args) throws Exception {
// 1. 获取流的执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(2);
// 2. 定义数据源 (相同类型元素的数据集)
DataStream<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
// 3. 过滤打印
DataStream<VideoOrder> filterDS = videoOrderDS.filter(new FilterFunction<VideoOrder>() {
@Override
public boolean filter(VideoOrder videoOrder) throws Exception {
return videoOrder.getMoney() > 5;
}
}).setParallelism(3);
filterDS.print().setParallelism(4);
// 4. 执行任务
env.execute("flat map job");
}
}
http://192.168.10.88:8081/#/overview
Operator:Sink
Sink输出源
类型 | 描述 |
---|---|
预定义 | print、writeAsText |
自定义 | SinkFunction、RichSinkFunction:Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等 |
flink官方提供 Bundle Connector | kafka、ES 等 |
Apache Bahir | kafka、ES、Redis等 |
预定义:print
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSource;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink05Sink1App {
/**
* source
* transformation
* sink
* @param args
*/
public static void main(String[] args) throws Exception {
// 1. 获取流的执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(2);
// 2. 定义数据源 (相同类型元素的数据集)
DataStream<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
// 3. 过滤打印
DataStream<VideoOrder> filterDS = videoOrderDS.filter(new FilterFunction<VideoOrder>() {
@Override
public boolean filter(VideoOrder videoOrder) throws Exception {
return videoOrder.getMoney() > 5;
}
});
//Sink之print
filterDS.print();
//红色打印
//filterDS.printToErr();
// 4. 执行任务
env.execute("flat map job");
}
}
自定义Sink:连接Mysql存储商品订单
Sink输出源
类型 | 描述 |
---|---|
预定义 | print、writeAsText |
自定义 | SinkFunction:如果选择继承SinkFunction,会在每次写入一条数据时都会创建一个JDBC连接 、RichSinkFunction:Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等 |
flink官方提供 Bundle Connector | kafka、ES 等 |
Apache Bahir | kafka、ES、Redis等 |
Flink连接mysql的2种方式(都需要加jdbc驱动)
- 方式一:
自带flink-connector-jdbc 需要加依赖包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.12</artifactId> <version>1.12.0</version> </dependency>
- 方式二:
自定义sink
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.25</version> </dependency>
需求:保存视频订单到Mysql
- 建表
CREATE TABLE `video_order` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`user_id` int(11) DEFAULT NULL,
`money` int(11) DEFAULT NULL,
`title` varchar(32) DEFAULT NULL,
`trade_no` varchar(64) DEFAULT NULL,
`create_time` date DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- 添加jdbc依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
MysqlSink
import net.xdclass.model.VideoOrder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class MysqlSink extends RichSinkFunction<VideoOrder> {
private Connection conn = null;
private PreparedStatement ps = null;
/**
* 建立Mysql连接
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://192.168.10.62:3306/shop?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai", "root", "123456");
//sql预编译
String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";
ps = conn.prepareStatement(sql);
}
/**
* 关闭Mysql连接
* @throws Exception
*/
@Override
public void close() throws Exception {
if (conn != null) {
conn.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 输出逻辑
* @param videoOrder
* @param context
* @throws Exception
*/
@Override
public void invoke(VideoOrder videoOrder, Context context) throws Exception {
//给ps中的?设置具体值
ps.setInt(1,videoOrder.getUserId());
ps.setInt(2,videoOrder.getMoney());
ps.setString(3,videoOrder.getTitle());
ps.setString(4,videoOrder.getTradeNo());
ps.setDate(5,new Date(videoOrder.getCreateTime().getTime()));
ps.executeUpdate();
}
}
Flink06CustomSinkApp
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSource;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink06CustomSinkApp {
/**
* source
* transformation
* sink
* @param args
*/
public static void main(String[] args) throws Exception {
// 1. 获取流的执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//Total Task Slots1
env.setParallelism(1);
// 2. 定义数据源 (相同类型元素的数据集)
DataStream<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
// 3. 过滤打印
DataStream<VideoOrder> filterDS = videoOrderDS.filter(new FilterFunction<VideoOrder>() {
@Override
public boolean filter(VideoOrder videoOrder) throws Exception {
return videoOrder.getMoney() > 5;
}
});
//预定义Sink:print输出控制台
filterDS.print();
//自定义Sink:连接Mysql存储商品订单
filterDS.addSink(new MysqlSink());
// 4. 执行任务
env.execute("custom mysql flat map job");
}
}
自定义Sink:Bahir Connetor存储数据到Redis6
Flink操作redis的方式
- 方式一:
自定义sink
- 方式二:
使用connector
Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法
方法名 | 功能 |
---|---|
getCommandDescription | 选择对应的数据结构和key名称配置 |
getKeyFromData | 获取key |
getValueFromData | 获取value |
引入依赖
<!--Bahir Connetor-->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
Docker部署redis
docker run -itd --name redis -p 6379:6379 -v /mydata/redis/data:/data redis:6.2.13 --requirepass 123456
需求:统计各商品的销售数量
VideoOrderCounterSink
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
* 定义存储到Redis的数据格式:Tuple2 = (java,3)、(flink,1)、(redis,1)、(springboot,2)、(springcloud,1)
* (课程名字,销量)
*/
public class VideoOrderCounterSink implements RedisMapper<Tuple2<String,Integer>> {
/**
* 指定redis数据结构,指定key的名称
* @return
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"VIDEO_ORDER_COUNTER");
}
/**
* 获取对应的key(filed值)
* @param data
* @return
*/
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0;
}
/**
* 获取对应的value值
* @param data
* @return
*/
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
return data.f1.toString();
}
}
Flink07RedisSinkApp
package net.xdclass.app;
import net.xdclass.model.VideoOrder;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import java.util.Date;
public class Flink07RedisSinkApp {
/**
* source
* transformation
* sink
* @param args
*/
public static void main(String[] args) throws Exception {
// 1. 获取流的执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//Total Task Slots1
env.setParallelism(1);
// 2. 定义数据源 (相同类型元素的数据集)
DataStream<VideoOrder> ds = env.fromElements(
new VideoOrder("21312","java",32,5,new Date()),
new VideoOrder("314","java",32,5,new Date()),
new VideoOrder("542","springboot",32,5,new Date()),
new VideoOrder("42","springcloud",32,5,new Date()),
new VideoOrder("52","flink",32,5,new Date()),
new VideoOrder("523","redis",32,5,new Date()),
new VideoOrder("3143","java",32,5,new Date()),
new VideoOrder("5422","springboot",32,5,new Date())
);
// 3. 定义数据转换操作
DataStream<Tuple2<String, Integer>> mapDS = ds.map(new MapFunction<VideoOrder, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
return new Tuple2<>(value.getTitle(), 1);
}
});
//分组(根据指定的key分组:title --> value.f0)
KeyedStream<Tuple2<String, Integer>,String> keyByDS = mapDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//统计每个title多少个
DataStream<Tuple2<String, Integer>> sumResult = keyByDS.sum(1);
//打印输出 (java,3)、(flink,1)、(redis,1)、(springboot,2)、(springcloud,1)
//sumResult.print();
// 4. 写入redis
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.10.62").setPort(6379).setPassword("123456").build();
//VideoOrderCounterSink定义了存入redis的数据结构
sumResult.addSink(new RedisSink<>(conf, new VideoOrderCounterSink()));
// 5. 执行任务
env.execute("custom redis sink job");
}
}
Source Sink对接 Kaf Connetor
环境准备
部署JDK8
[root@localhost tmp]# mkdir -pv /usr/local/software
[root@localhost tmp]# tar -zxvf jdk-8u181-linux-x64.tar.gz
[root@localhost tmp]# mv jdk1.8.0_181 /usr/local/software/jdk1.8
[root@localhost tmp]# vim /etc/profile
JAVA_HOME=/usr/local/software/jdk1.8
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
[root@localhost tmp]# source /etc/profile
[root@localhost tmp]# java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
部署zookeepr
### 解压Zookeeper
[root@localhost tmp]# tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
[root@localhost tmp]# mv apache-zookeeper-3.7.0-bin /usr/local/software/zookeeper
[root@localhost tmp]# cp /usr/local/software/zookeeper/conf/zoo_sample.cfg /usr/local/software/zookeeper/conf/zoo.cfg
[root@localhost tmp]# vim /usr/local/software/zookeeper/conf/zoo.cfg
### 启动Zookeeper (默认2181端口)
[root@localhost tmp]# bash /usr/local/software/zookeeper/bin/zkServer.sh start
[root@localhost ~]# tail -f /usr/local/software/zookeeper/logs/zookeeper_audit.log
[root@localhost tmp]# yum install -y lsof
[root@localhost tmp]# lsof -i:2181
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 25786 root 57u IPv6 88166 0t0 TCP *:eforward (LISTEN)
[root@localhost ~]# tail -f
部署kafka
### 解压
[root@localhost tmp]# tar -zxvf kafka_2.13-2.8.0.tgz
[root@localhost tmp]# mv kafka_2.13-2.8.0 /usr/local/software/kafka
### 修改下面两个配置 ( listeners 配置的ip和advertised.listeners相同时启动kafka会报错)、listeners(内网Ip)、advertised.listeners(公网ip)
[root@localhost tmp]# vim /usr/local/software/kafka/config/server.properties
listeners=PLAINTEXT://192.168.10.61:9092
zookeeper.connect=192.168.10.61:2181
### 启动kafka
[root@localhost tmp]# bash /usr/local/software/kafka/bin/kafka-server-start.sh -daemon /usr/local/software/kafka/config/server.properties &
[root@localhost ~]# lsof -i:9092
### 停止kafka
[root@localhost tmp]# bash /usr/local/software/kafka/bin/kafka-server-stop.sh
### 创建topic
[root@localhost tmp]# cd /usr/local/software/kafka/bin/
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.10.62:2181 --replication-factor 1 --partitions 1 --topic soulboy-topic
### topic存放目录
[root@localhost ~]# ls /tmp/kafka-logs/soulboy-topic-0/
00000000000000000000.index 00000000000000000000.timeindex
00000000000000000000.log leader-epoch-checkpoint
### 切换目录
[root@localhost bin]# cd /usr/local/software/kafka/bin/
### 查看topic
[root@localhost bin]# ./kafka-topics.sh --list --zookeeper 192.168.10.62:2181
soulboy-topic
### 生产者发送消息
[root@localhost bin]# ./kafka-console-producer.sh --broker-list 192.168.10.62:9092 --topic soulboy-topic
>111
>222
### 消费者消费消息 ( --from-beginning:会把主题中以往所有的数据都读取出来, 重启后会有这个重复消费,忽略偏移量)
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.62:9092 --from-beginning -topic soulboy-topic
111
222
### 删除topic
[root@localhost bin]# ./kafka-topics.sh --zookeeper 192.168.10.62:2181 --delete --topic soulboy-topic
Topic soulboy-topic is marked for deletion.
### 查看broker节点的指定topic状态信息
[root@localhost bin]# ./kafka-topics.sh --describe --zookeeper 192.168.10.62:2181 --topic soulboy-topic
Topic: xdclass-topic TopicId: qZse3pJeRL6oYikgJ--V7w PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: xdc
Source:从消息队列Kafka中读取数据
之前自定义SourceFunction,Flink官方也有提供对接外部系统的,比如读取Kafka
flink官方提供的连接器
-
添加依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> </dependency>
-
编写代码:
FlinkKafkaConsumer继承了FlinkKafkaConsumerBase和RichParallelSourceFunction(富函数-并行读取kafka多分区)
Flink08KafkaSourceApp
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSource;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import java.util.Properties;
public class Flink08KafkaSourceApp {
/**
* source
* transformation
* sink
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//Total Task Slots1
env.setParallelism(1);
//创建属性对象
Properties props = new Properties();
//kafka地址
props.setProperty("bootstrap.servers", "192.168.10.62:9092");
//组名
props.setProperty("group.id", "video-order-group");
//字符串序列化和反序列化规则
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//offset重置规则
props.setProperty("auto.offset.reset", "latest");
//自动提交(消费消息时向kafka进行反馈),2秒提交一次,避免频繁提交,提升性能
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "2000");
//有后台线程每隔10s检测一下Kafka的分区变化情况(Flink可以从多个kafka分区进行消费--读取数据,如果分区数量动态增加,需要让flink能感知到新的kakfa分区)
props.setProperty("flink.partition-discovery.interval-millis","10000");
//配置从kafka中读取数据(topic、序列化规则、kafka配置属性对象)
FlinkKafkaConsumer<String> consumer =new FlinkKafkaConsumer<>("soulboy-topic", new SimpleStringSchema(), props);
//设置从记录的消费者组内的offset开始消费(消费者下面有多个group,一个消息只能被group内消费一次)
consumer.setStartFromGroupOffsets();
//从kafka中读取数据
DataStream<String> ds = env.addSource(consumer);
ds.print();
//执行任务
env.execute("kafka source job");
}
}
测试
# 消费者发送消息
[root@Flink bin]# ./kafka-console-producer.sh --broker-list 192.168.10.62:9092 --topic soulboy-topic
>123
>321
>123
# 查看idea控制台
kafka> 123
kafka> 321
kafka> 123
Sink:将数据发送到消息队列Kafka中
Flink08KafkaSourceApp
package net.xdclass.app;
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSource;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import java.util.Properties;
public class Flink08KafkaSourceApp {
/**
* source
* transformation
* sink
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//Total Task Slots1
env.setParallelism(1);
//创建属性对象
Properties props = new Properties();
//kafka地址
props.setProperty("bootstrap.servers", "192.168.10.62:9092");
//组名
props.setProperty("group.id", "video-order-group");
//字符串序列化和反序列化规则
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//offset重置规则
props.setProperty("auto.offset.reset", "latest");
//自动提交(消费消息时向kafka进行反馈),2秒提交一次,避免频繁提交,提升性能
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "2000");
//有后台线程每隔10s检测一下Kafka的分区变化情况(Flink可以从多个kafka分区进行消费--读取数据,如果分区数量动态增加,需要让flink能感知到新的kakfa分区)
props.setProperty("flink.partition-discovery.interval-millis","10000");
//配置FlinkKafkaConsumer(消费者)
FlinkKafkaConsumer<String> kafkaConsumer =new FlinkKafkaConsumer<>("soulboy-topic", new SimpleStringSchema(), props);
//设置从记录的消费者组内的offset开始消费(消费者下面有多个group,一个消息只能被group内消费一次)
kafkaConsumer.setStartFromGroupOffsets();
/**
* Source:从消息队列Kafka中读取数据
*/
DataStream<String> kafkaDS = env.addSource(kafkaConsumer);
kafkaDS.print("kafka");
/**
* Sink:将数据发送到消息队列Kafka中
*/
//二次处理:在每个字符串前面添加soulboy
DataStream<String> mapDS = kafkaDS.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return "soulboy:" + value;
}
});
//配置FlinkKafkaProducer(生产者)
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>("leon-topic", new SimpleStringSchema(), props);
//Sink:将数据发送到消息队列Kafka中
mapDS.addSink(kafkaProducer);
//执行任务
env.execute("kafka source job");
}
}
测试
### soulboy-topic 生产者(linux命令行)
[root@Flink bin]# ./kafka-console-producer.sh --broker-list 192.168.10.62:9092 --topic soulboy-topic
>123
### soulboy-topic 消费者(idea控制台 代码)
kafka> 123
### leon-topic 生产者(idea控制台 代码)
详见代码……
### leon-topic 消费者(linux命令行)
# 切换目录
[root@localhost bin]# cd /usr/local/software/kafka/bin/
# 消费者消费消息 ( --from-beginning:会把主题中以往所有的数据都读取出来, 重启后会有这个重复消费,忽略偏移量) [root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.62:9092 --from-beginning -topic leon-topic
soulboy:123
Core API 常用Transformation算子
示例:Map、FlatMap算子
需求:多数算子,我们会用订单得转换-过滤-分组-统计 来实现
Map:一对一转换对象
//源源不断产生新的订单
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
// 定义数据转换操作: map转换,来一个记录一个,方便后续统计
DataStream<Tuple2<String, Integer>> mapDS = ds.map(new MapFunction<VideoOrder, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
return new Tuple2<>(value.getTitle(), 1);
}
});
FlatMap:一对多转换对象
一对多转换包含一对一转换
一对一(订单转换)
//源源不断产生新的订单
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
//只是一对一记录而已,没必要使用flatMap(如果使用flatMap参考如下使用)
//FlatMapFunction<String, String>, key是输入类型,value是Collector响应的收集的类型,看源码注释,也是 DataStream<String>里面泛型类型
DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {
@Override
public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<>(value.getTitle(),1));
}
});
一对多(切割)
DataStreamSource<String> stringDS = env.fromElements("spring,java,soulboy",
"haha,xixi,lala",
"hiahia,tutu,xiuxiu");
SingleOutputStreamOperator<String> flatMapDS = stringDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String sourceData, Collector<String> outCollector) throws Exception {
String[] arr = sourceData.split(",");
for (String s : arr) {
outCollector.collect(s);
}
}
});
flatMapDS.print();
RichMap、RichFlatMap算子实战
Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
RichXXX相关Open、Close、setRuntimeContext等 API方法会根据并行度进行
常规的数据流转
注意: KeyBy后的聚合函数,只处理当前分组后组内的数据,不同组内数据互不影响
DataStream
-> keyBy操作
-> KeyStream
-> window操作
-> windowStream
-> 聚合操作
-> DataStream
比如并行度是4,那就有4次触发对应的open/close方法等,是4个不同subtask
- 比如
RichMapFunction、RichFlatMapFunction、RichSourceFunction
等
RichMap
//并行度如果高了,控制台就会在同一时间显示多个连续的open close
DataStream<Tuple2<String, Integer>> mapDS = ds.map(new RichMapFunction<VideoOrder, Tuple2<String, Integer>>() {
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("==========open");
}
@Override
public void close() throws Exception {
System.out.println("==========close");
}
@Override
public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
return new Tuple2<>(value.getTitle(), 1);
}
});
RichFlatMap
//并行度如果高了,控制台就会在同一时间显示多个连续的open
SingleOutputStreamOperator<String> flatMapDS = stringDS.flatMap(new RichFlatMapFunction<String, String>() {
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("==========open");
}
@Override
public void close() throws Exception {
System.out.println("==========close");
}
@Override
public void flatMap(String sourceData, Collector<String> outCollector) throws Exception {
String[] arr = sourceData.split(",");
for (String s : arr) {
outCollector.collect(s);
}
}
});
KeyBy分组
keyBy是把数据流按照某个字段分区
keyBy后是相同的数据放到同个组里面,再进行组内统计
//源源不断产生新的订单
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
//分组
KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder videoOrder) throws Exception {
return videoOrder.getTitle();
}
});
//聚合(各商品总销售金额)
SingleOutputStreamOperator<VideoOrder> money = videoOrderStringKeyedStream.sum("money");
//控制台打印输出
money.print();
/*
VideoOrder(tradeNo=f7838e95-a102-4565-9907-d2c6fa43d3ef, title=葵花宝典, money=53, userId=0, createTime=Thu Sep 05 11:43:42 CST 2024)
VideoOrder(tradeNo=f7838e95-a102-4565-9907-d2c6fa43d3ef, title=葵花宝典, money=95, userId=0, createTime=Thu Sep 05 11:43:42 CST 2024)
VideoOrder(tradeNo=82f6dd02-30ea-4e14-8bd7-2f4d6e0e4ed0, title=九阴真经, money=10, userId=3, createTime=Thu Sep 05 11:43:44 CST 2024)
VideoOrder(tradeNo=90f14227-0765-4997-92a3-069be5f8712a, title=辟邪剑谱, money=67, userId=7, createTime=Thu Sep 05 11:43:45 CST 2024)
VideoOrder(tradeNo=4691b5e0-8d90-4b32-8b40-3609dfa58ccf, title=一阳指, money=99, userId=7, createTime=Thu Sep 05 11:43:46 CST 2024)
VideoOrder(tradeNo=4691b5e0-8d90-4b32-8b40-3609dfa58ccf, title=一阳指, money=150, userId=7, createTime=Thu Sep 05 11:43:46 CST 2024)
VideoOrder(tradeNo=90f14227-0765-4997-92a3-069be5f8712a, title=辟邪剑谱, money=148, userId=7, createTime=Thu Sep 05 11:43:45 CST 2024)
VideoOrder(tradeNo=82f6dd02-30ea-4e14-8bd7-2f4d6e0e4ed0, title=九阴真经, money=27, userId=3, createTime=Thu Sep 05 11:43:44 CST 2024)
*/
Filter过滤
需求:对商品价格大于20元的商品进行分组,统计出商品的销售总金额
链式调用
//源源不断产生新的订单
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
//过滤(分组前过滤性能高)、分组、聚合
DataStream<VideoOrder> sumDS = ds.filter(new FilterFunction<VideoOrder>() {
@Override
public boolean filter(VideoOrder videoOrder) throws Exception {
return videoOrder.getMoney() > 20;
}
}).keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder videoOrder) throws Exception {
return videoOrder.getTitle();
}
}).sum("money");
sumDS.print();
Reduce
API | 功能描述 |
---|---|
sum | sum是简单聚合,sum("xxx")使用的时候,如果是tuple元组则用序号,POJO则用属性名称 |
reduce | reduce是可以自定义聚合,aggregate支持复杂的自定义聚合 |
reduce示例
//源源不断产生新的订单
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
//分组
KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder videoOrder) throws Exception {
return videoOrder.getTitle();
}
});
//聚合
SingleOutputStreamOperator<VideoOrder> reduce = videoOrderStringKeyedStream.reduce(new ReduceFunction<VideoOrder>() {
@Override
public VideoOrder reduce(VideoOrder value1, VideoOrder value2) throws Exception {
//value1是历史对象,value2是加入统计的对象
//所以value1.f1是历史值,value2.f1是新值,不断累加
//设置title是因为分组需要
VideoOrder videoOrder = new VideoOrder();
videoOrder.setTitle(value1.getTitle());
videoOrder.setMoney(value1.getMoney() + value2.getMoney());
return videoOrder;
}
});
reduce.print();
maxBy、minBy
如果是用了keyby,在后续算子要用maxby,minby类型,才可以再分组里面找对应的数据
KeyBy后可以用KeyedProcessFunction,更底层的API,有processElement和onTimer函数
maxBy 对比 max
并发度不为1才能看得出来效果
类型 | 描述 |
---|---|
maxBy、minBy | 如果是keyBy的是对象的某个属性,则分组用max/min聚合统计,只有聚合的字段会更新,其他字段还是旧的,导致对象不准确 |
max、min | 需要用maxby/minBy才对,让整个对象的属性都是最新的 |
//并行度不为1才看得出效果
//env.setParallelism(1);
DataStream<VideoOrder> ds = env.fromElements(
new VideoOrder("1", "java", 31, 15, new Date()),
new VideoOrder("2", "java", 32, 45, new Date()),
new VideoOrder("3", "java", 33, 52, new Date()),
new VideoOrder("4", "springboot", 21, 5, new Date()),
new VideoOrder("5", "redis", 41, 52, new Date()),
new VideoOrder("6", "redis", 40, 15, new Date()),
new VideoOrder("7", "kafka", 1, 55, new Date())
);
//maxBy
SingleOutputStreamOperator<VideoOrder> maxByDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
}).maxBy("money");
//max
SingleOutputStreamOperator<VideoOrder> maxDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
}).max("money");
//maxByDS.print("maxByDS:");
maxDS.print("maxDS:");
Slide Window
Windows are at the heart of processing infinite streams(Window是处理无限数据流的核心
)
数据流是一直源源不断产生,业务需要聚合统计使用,比如每10秒统计过去5分钟的点击量、成交额等
Windows 就可以将无限的数据流拆分为有限大小的“桶 buckets”,然后程序可以对其窗口内的数据进行计算
窗口认为是一个Bucket桶,一个窗口段就是一个桶,比如8到9点是一个桶,9到10点是一个桶
应用场景
业务类型 | 说明 |
---|---|
监控平台 | 过去一小时:站点的访问量、接口调用次数;每5秒统计过去一分钟接口的响应耗时和成功率(及时发现业务问题,及时处理) |
窗口分类
窗口类型 | 说明 |
---|---|
time Window | 时间窗口,即按照一定的时间规则作为窗口统计 |
time-tumbling-window 时间滚动窗口 (用的多) | |
time-sliding-window 时间滑动窗口 (用的多) | |
session WIndow 会话窗口,即一个会话内的数据进行统计,相对少用 | |
count Window | 数量窗口,即按照一定的数据量作为窗口统计,相对少用 |
size(窗口大小) 和 slide(滑动间隔)
窗口类型 | size、slide说明 |
---|---|
tumbling-window(滚动窗口) | size = slide,如:每隔10s统计最近10s的数据 |
sliding-window(滑动窗口) | size > slide,如:每隔5s统计最近10s的数据 |
开发中不推荐 | size < slide的时候,如每隔15s统计最近10s的数据,那么中间5s的数据会丢失 |
- tumbling-window:滚动窗口: size=slide,如:每隔10s统计最近10s的数据
- sliding-window:滑动窗口: size>slide,如:每隔5s统计最近10s的数据
- size<slide的时候,如每隔15s统计最近10s的数据,那么中间5s的数据会丢失,所以开发中不用
滑动窗口 Sliding Windows
窗口具有固定大小
窗口数据有重叠
例子:每10s统计一次最近1min内的订单数量
滚动窗口 Tumbling Windows
窗口具有固定大小
窗口数据不重叠
例子:每10s统计一次最近10s内的订单数量
Window 窗口API
什么情况下才可以使用WindowAPI
方括号 ([…]) 中的命令是可选的,允许用多种不同的方式自定义窗口逻辑
- 一个窗口内 的是左闭右开:
[5 10)、[10 15)
- countWindow没过期,但
timeWindow在1.12过期,统一使用window
;
分组情况 | 说明 |
---|---|
有keyBy | 用 window() api |
没keyBy | 用 windowAll() api ,并行度低 |
Keyed Windows
Non-Keyed Windows
API名称 | 功能描述 |
---|---|
Window Assigners(窗口分配器) | 定义了如何将元素分配给窗口,负责将每条数据分发到正确的 window窗口上,window() 的参数是一个 WindowAssigner,flink本身提供了Tumbling、Sliding 等Assigner |
trigger(窗口触发器) | 用来控制一个窗口是否需要被触发,每个窗口分配器WindowAssigner都有一个默认触发器,也支持自定义触发器 |
window function | 定义了要对窗口中收集的数据做的计算操作 |
增量聚合函数:aggregate(agg函数,WindowFunction(){ }) ,数据来一条就计算,窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中,常见的增量聚合函数有 reduceFunction、aggregateFunction,min、max、sum 都是简单的聚合操作,不需要自定义规则。AggregateFunction IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据 | |
全窗口函数:apply(new processWindowFunction(){ }) ,窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算。常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文更多信息)。 WindowFunction IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 | |
如果想处理每个元素更底层的API的时候用: process(new KeyedProcessFunction(){processElement、onTimer}) ,对数据进行解析 ,process对每个元素进行处理,相当于 map+flatMap+filter |
Tumbling-Window(滚动时间)示例
窗口具有固定大小
窗口数据有重叠
例子:每10s统计一次最近1min内的订单数量
需求:指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口
TimeUtil
public class TimeUtil {
public static String formatTime(Date time) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
//指定时区:获取当前操作系统的时区
ZoneId zoneId = ZoneId.systemDefault();
//返回字符串
String timeStr = formatter.format(time.toInstant().atZone(zoneId));
return timeStr;
}
}
VideoOrder
重写toString(),更好的展示时间格式
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import net.xdclass.util.TimeUtil;
import java.util.Date;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
/**
* 订单号
*/
private String tradeNo;
/**
* 订单标题
*/
private String title;
/**
* 订单金额
*/
private int money;
/**
* 用户id
*/
private int userId;
/**
* 注册时间
*/
private Date createTime;
@Override
public String toString() {
return "VideoOrder{" +
"tradeNo='" + tradeNo + '\'' +
", title='" + title + '\'' +
", money=" + money +
", userId=" + userId +
", createTime=" + TimeUtil.formatTime(createTime) +
'}';
}
}
VideoOrderSourceV2
import net.xdclass.model.VideoOrder;
import net.xdclass.util.TimeUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.*;
public class VideoOrderSourceV2 extends RichParallelSourceFunction<VideoOrder> {
private volatile Boolean flag = true;
private Random random = new Random();
private static List<VideoOrder> list = new ArrayList<>();
//订单初始化(title信息)
static {
list.add(new VideoOrder("", "springboot", 20, 0, null));
list.add(new VideoOrder("", "redis", 10, 0, null));
}
/**
* 自定义 Source(产生数据的逻辑)
* 源源不断产生订单
* @param sourceContext
* @throws Exception
*/
@Override
public void run(SourceContext<VideoOrder> sourceContext) throws Exception {
while (flag) {
Thread.sleep(1000);
String tradeNo = UUID.randomUUID().toString().substring(30);
int userId = random.nextInt(10);
int money = random.nextInt(100);
int videoNum = random.nextInt(list.size());
VideoOrder videoOrder = list.get(videoNum);
videoOrder.setTradeNo(tradeNo);
videoOrder.setUserId(userId);
videoOrder.setCreateTime(new Date());
//VideoOrderSource
System.out.println("产生:" + videoOrder.getTitle() + ",价格:" + videoOrder.getMoney() + ", 时间:"+TimeUtil.formatTime(new Date()));
sourceContext.collect(videoOrder);
}
}
/**
* 控制任务取消
*/
@Override
public void cancel() {
}
/**
* run 方法调用前:用于初始化连接
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("-----open-----");
}
/**
* 用于清理之前
* @throws Exception
*/
@Override
public void close() throws Exception {
System.out.println("-----close-----");
}
}
Flink14TumblingWindowApp
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Flink14TumblingWindowApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//并行度为1更容易观察
env.setParallelism(1);
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
//maxBy
KeyedStream<VideoOrder, String> keyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
//窗口滚动(TumblingWindow):5秒统计一次前5秒内各商品销售的总金额
SingleOutputStreamOperator<VideoOrder> sumDS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("money");
sumDS.print();
//执行任务
env.execute("kafka KeyBy job");
}
}
控制台输出
-----open-----
产生:springboot,价格:20, 时间:2024-09-06 15:00:59
产生:redis,价格:10, 时间:2024-09-06 15:01:00
产生:redis,价格:10, 时间:2024-09-06 15:01:01
产生:springboot,价格:20, 时间:2024-09-06 15:01:02
产生:springboot,价格:20, 时间:2024-09-06 15:01:03
产生:redis,价格:10, 时间:2024-09-06 15:01:04
VideoOrder{tradeNo='828be2', title='springboot', money=60, userId=0, createTime=2024-09-06 15:00:59}
VideoOrder{tradeNo='12a988', title='redis', money=20, userId=1, createTime=2024-09-06 15:01:00}
产生:redis,价格:10, 时间:2024-09-06 15:01:05
产生:redis,价格:10, 时间:2024-09-06 15:01:06
Sliding-Window(滑动时间窗)示例
窗口具有固定大小
窗口数据有重叠
例子:每10s统计一次最近1min内的订单数量
Flink15SlidingWindowApp
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Flink15SlidingWindowApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//并行度为1更容易观察
env.setParallelism(1);
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
//maxBy
KeyedStream<VideoOrder, String> keyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
//滑动窗口(SlidingWindows):窗口大小20秒,滑动大小5秒,每5秒统计一下20秒内各商品销售总金额
SingleOutputStreamOperator<VideoOrder> sumDS = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5))).sum("money");
sumDS.print();
//执行任务
env.execute("kafka SlidingWindows job");
}
}
控制台输出
产生:redis,价格:10, 时间:2024-09-06 15:41:12
产生:springboot,价格:20, 时间:2024-09-06 15:41:13
产生:springboot,价格:20, 时间:2024-09-06 15:41:14
VideoOrder{tradeNo='eafb4c', title='redis', money=10, userId=1, createTime=2024-09-06 15:41:12}
VideoOrder{tradeNo='e6ea1d', title='springboot', money=20, userId=1, createTime=2024-09-06 15:41:13}
产生:redis,价格:10, 时间:2024-09-06 15:41:16
产生:springboot,价格:20, 时间:2024-09-06 15:41:17
产生:springboot,价格:20, 时间:2024-09-06 15:41:18
产生:redis,价格:10, 时间:2024-09-06 15:41:19
VideoOrder{tradeNo='eafb4c', title='redis', money=30, userId=1, createTime=2024-09-06 15:41:12}
VideoOrder{tradeNo='e6ea1d', title='springboot', money=80, userId=1, createTime=2024-09-06 15:41:13}
产生:redis,价格:10, 时间:2024-09-06 15:41:20
产生:redis,价格:10, 时间:2024-09-06 15:41:21
产生:redis,价格:10, 时间:2024-09-06 15:41:22
产生:springboot,价格:20, 时间:2024-09-06 15:41:23
产生:redis,价格:10, 时间:2024-09-06 15:41:24
VideoOrder{tradeNo='eafb4c', title='redis', money=70, userId=1, createTime=2024-09-06 15:41:12}
VideoOrder{tradeNo='e6ea1d', title='springboot', money=100, userId=1, createTime=2024-09-06 15:41:13}
产生:redis,价格:10, 时间:2024-09-06 15:41:25
产生:redis,价格:10, 时间:2024-09-06 15:41:26
产生:redis,价格:10, 时间:2024-09-06 15:41:27
产生:springboot,价格:20, 时间:2024-09-06 15:41:28
产生:redis,价格:10, 时间:2024-09-06 15:41:29
VideoOrder{tradeNo='eafb4c', title='redis', money=110, userId=1, createTime=2024-09-06 15:41:12}
VideoOrder{tradeNo='e6ea1d', title='springboot', money=120, userId=1, createTime=2024-09-06 15:41:13}
产生:redis,价格:10, 时间:2024-09-06 15:41:30
产生:redis,价格:10, 时间:2024-09-06 15:41:31 --------- 这个不算,超过20秒了,所以是12个redis
产生:springboot,价格:20, 时间:2024-09-06 15:41:32
产生:springboot,价格:20, 时间:2024-09-06 15:41:33
产生:springboot,价格:20, 时间:2024-09-06 15:41:34 --------- 这个不算,超过20秒了 所以是8个springboot
VideoOrder{tradeNo='d742fa', title='springboot', money=160, userId=0, createTime=2024-09-06 15:41:14}
VideoOrder{tradeNo='27c5ac', title='redis', money=120, userId=0, createTime=2024-09-06 15:41:16}
Count-Window(数量窗口)示例
滚动窗口
统计分组后同个key内的数据超过5次(个)则进行统计 countWindow(5)
Flink16CountWindowApp
package net.xdclass.app;
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Flink16CountWindowApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//并行度为1更容易观察
env.setParallelism(1);
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
//maxBy
KeyedStream<VideoOrder, String> keyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
//滑动窗口(CountWindow):统计分组后同个key内的数据超过5次(个)则进行统计 countWindow(5)
SingleOutputStreamOperator<VideoOrder> sumDS = keyedStream.countWindow(5).sum("money");
sumDS.print();
//执行任务
env.execute("kafka SlidingWindows job");
}
}
控制台输出
-----open-----
产生:redis,价格:10, 时间:2024-09-06 16:15:21
产生:springboot,价格:20, 时间:2024-09-06 16:15:22
产生:redis,价格:10, 时间:2024-09-06 16:15:23
产生:redis,价格:10, 时间:2024-09-06 16:15:24
产生:redis,价格:10, 时间:2024-09-06 16:15:25
产生:springboot,价格:20, 时间:2024-09-06 16:15:26
产生:redis,价格:10, 时间:2024-09-06 16:15:27
VideoOrder{tradeNo='2df358', title='redis', money=50, userId=2, createTime=2024-09-06 16:15:21}
产生:redis,价格:10, 时间:2024-09-06 16:15:28
产生:redis,价格:10, 时间:2024-09-06 16:15:29
产生:redis,价格:10, 时间:2024-09-06 16:15:30
产生:springboot,价格:20, 时间:2024-09-06 16:15:31
产生:redis,价格:10, 时间:2024-09-06 16:15:32
产生:springboot,价格:20, 时间:2024-09-06 16:15:33
产生:redis,价格:10, 时间:2024-09-06 16:15:34
VideoOrder{tradeNo='dd74bb', title='redis', money=50, userId=0, createTime=2024-09-06 16:15:28}
产生:springboot,价格:20, 时间:2024-09-06 16:15:35
VideoOrder{tradeNo='a09451', title='springboot', money=100, userId=4, createTime=2024-09-06 16:15:22}
产生:redis,价格:10, 时间:2024-09-06 16:15:36
产生:springboot,价格:20, 时间:2024-09-06 16:15:37
产生:redis,价格:10, 时间:2024-09-06 16:15:38
产生:redis,价格:10, 时间:2024-09-06 16:15:39
滑动窗口
只要有2个数据到达后就可以往后统计5个数据的值, countWindow(5, 2)
Flink16CountWindowApp
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Flink16CountWindowApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//并行度为1更容易观察
env.setParallelism(1);
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
//maxBy
KeyedStream<VideoOrder, String> keyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
//滑动窗口(CountWindow):统计分组后同个key内的数据超过5次(个)则进行统计 countWindow(5)
//SingleOutputStreamOperator<VideoOrder> sumDS = keyedStream.countWindow(5).sum("money");
//滑动窗口(CountWindow):统计分组后同个key内的数据超过5次(个)则进行统计 countWindow(5)
SingleOutputStreamOperator<VideoOrder> sumDS = keyedStream.countWindow(5,2).sum("money");
sumDS.print();
//执行任务
env.execute("kafka SlidingWindows job");
}
}
控制台输出
-----open-----
产生:springboot,价格:20, 时间:2024-09-06 16:19:45
产生:redis,价格:10, 时间:2024-09-06 16:19:46
产生:springboot,价格:20, 时间:2024-09-06 16:19:47 -------- 满2次,进行统计springboot
VideoOrder{tradeNo='b374ef', title='springboot', money=40, userId=2, createTime=2024-09-06 16:19:45}
产生:springboot,价格:20, 时间:2024-09-06 16:19:48
产生:springboot,价格:20, 时间:2024-09-06 16:19:49 -------- 满2次,进行统计springboot
VideoOrder{tradeNo='b374ef', title='springboot', money=80, userId=2, createTime=2024-09-06 16:19:45}
产生:springboot,价格:20, 时间:2024-09-06 16:19:50
产生:redis,价格:10, 时间:2024-09-06 16:19:51 -------- 满2次,进行统计redis
VideoOrder{tradeNo='c84060', title='redis', money=20, userId=1, createTime=2024-09-06 16:19:46}
增量聚合
增量聚合函数(AggregateFunction)
aggregate(agg函数,WindowFunction(){ })
窗口保存临时数据(中间数据),每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中
- 常见的增量聚合函数有
reduceFunction、aggregateFunction
min、max、sum
都是简单的聚合操作,不需要自定义规则AggregateFunction<IN, ACC, OUT> IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
Flink17AggWindowApp
和sum、reduce效果一样
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Flink17AggWindowApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//并行度为1更容易观察
env.setParallelism(1);
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
//分组
KeyedStream<VideoOrder, String> keyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
//滚动时间窗
SingleOutputStreamOperator<VideoOrder> aggregateDS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
//AggregateFunction<IN, ACC, OUT> ;IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
.aggregate(new AggregateFunction<VideoOrder, VideoOrder, VideoOrder>() {
//累加器(初始化)
@Override
public VideoOrder createAccumulator() {
VideoOrder videoOrder = new VideoOrder();
return videoOrder;
}
//聚合方式
@Override
public VideoOrder add(VideoOrder videoOrder, VideoOrder accumulator) {
accumulator.setMoney(videoOrder.getMoney() + accumulator.getMoney());
accumulator.setTitle(videoOrder.getTitle());
if (accumulator.getCreateTime() == null) {
accumulator.setCreateTime(videoOrder.getCreateTime());
}
return accumulator;
}
//获取结果
@Override
public VideoOrder getResult(VideoOrder accumulator) {
return accumulator;
}
//合并内容,一般不用
@Override
public VideoOrder merge(VideoOrder a, VideoOrder b) {
VideoOrder videoOrder = new VideoOrder();
videoOrder.setMoney(a.getMoney() + b.getMoney());
return videoOrder;
}
});
aggregateDS.print();
//执行任务
env.execute("flink aggregate job");
}
}
控制台输出
-----open-----
产生:springboot,价格:20, 时间:2024-09-10 09:22:21
产生:springboot,价格:20, 时间:2024-09-10 09:22:22
产生:redis,价格:10, 时间:2024-09-10 09:22:23
产生:springboot,价格:20, 时间:2024-09-10 09:22:24
VideoOrder{tradeNo='null', title='springboot', money=60, userId=0, createTime=2024-09-10 09:22:21}
VideoOrder{tradeNo='null', title='redis', money=10, userId=0, createTime=2024-09-10 09:22:23}
产生:redis,价格:10, 时间:2024-09-10 09:22:25
产生:redis,价格:10, 时间:2024-09-10 09:22:26
产生:springboot,价格:20, 时间:2024-09-10 09:22:27
产生:redis,价格:10, 时间:2024-09-10 09:22:28
产生:springboot,价格:20, 时间:2024-09-10 09:22:29
VideoOrder{tradeNo='null', title='redis', money=30, userId=0, createTime=2024-09-10 09:22:25}
VideoOrder{tradeNo='null', title='springboot', money=40, userId=0, createTime=2024-09-10 09:22:27}
产生:springboot,价格:20, 时间:2024-09-10 09:22:30
产生:redis,价格:10, 时间:2024-09-10 09:22:31
产生:springboot,价格:20, 时间:2024-09-10 09:22:32
产生:springboot,价格:20, 时间:2024-09-10 09:22:33
产生:redis,价格:10, 时间:2024-09-10 09:22:34
VideoOrder{tradeNo='null', title='springboot', money=60, userId=0, createTime=2024-09-10 09:22:30}
VideoOrder{tradeNo='null', title='redis', money=20, userId=0, createTime=2024-09-10 09:22:31}
产生:springboot,价格:20, 时间:2024-09-10 09:22:35
产生:springboot,价格:20, 时间:2024-09-10 09:22:36
产生:redis,价格:10, 时间:2024-09-10 09:22:37
产生:redis,价格:10, 时间:2024-09-10 09:22:38
产生:springboot,价格:20, 时间:2024-09-10 09:22:39
VideoOrder{tradeNo='null', title='springboot', money=60, userId=0, createTime=2024-09-10 09:22:35}
VideoOrder{tradeNo='null', title='redis', money=20, userId=0, createTime=2024-09-10 09:22:37}
全窗口函数(全量聚合)
窗口函数 | 描述 |
---|---|
增量聚合 | aggregate(new AggregateFunction(){}); |
全窗口函数 | 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算 |
apply(new WindowFunction(){}) | |
process(new ProcessWindowFunction(){}) //比上面这个强 |
窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
- 常见的全窗口聚合函数:
windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 WindowFunction<IN, OUT, KEY, W extends Window>
WindowFunction
windowFunction(未来可能弃用)
Flink17ApplyWindowApp
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.stream.Collectors;
public class Flink17ApplyWindowApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//并行度为1更容易观察
env.setParallelism(1);
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
//分组
KeyedStream<VideoOrder, String> keyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
//滚动时间窗
SingleOutputStreamOperator<VideoOrder> apply = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).apply(new WindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow timeWindow, Iterable<VideoOrder> input, Collector<VideoOrder> output) throws Exception {
List<VideoOrder> list = IteratorUtils.toList(input.iterator());
//list ==> stream
int total = list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue();
VideoOrder videoOrder = new VideoOrder();
videoOrder.setMoney(total);
videoOrder.setTitle(list.get(0).getTitle());
videoOrder.setCreateTime(list.get(0).getCreateTime());
output.collect(videoOrder);
}
});
apply.print();
//执行任务
env.execute("flink sliding job");
}
}
控制台输出
-----open-----
产生:redis,价格:10, 时间:2024-09-10 11:20:06
产生:springboot,价格:20, 时间:2024-09-10 11:20:07
产生:redis,价格:10, 时间:2024-09-10 11:20:08
产生:redis,价格:10, 时间:2024-09-10 11:20:09
VideoOrder{tradeNo='null', title='redis', money=30, userId=0, createTime=2024-09-10 11:20:06}
VideoOrder{tradeNo='null', title='springboot', money=20, userId=0, createTime=2024-09-10 11:20:07}
产生:springboot,价格:20, 时间:2024-09-10 11:20:10
产生:redis,价格:10, 时间:2024-09-10 11:20:11
产生:springboot,价格:20, 时间:2024-09-10 11:20:12
产生:redis,价格:10, 时间:2024-09-10 11:20:13
产生:springboot,价格:20, 时间:2024-09-10 11:20:14
VideoOrder{tradeNo='null', title='springboot', money=60, userId=0, createTime=2024-09-10 11:20:10}
VideoOrder{tradeNo='null', title='redis', money=20, userId=0, createTime=2024-09-10 11:20:11}
产生:springboot,价格:20, 时间:2024-09-10 11:20:15
产生:springboot,价格:20, 时间:2024-09-10 11:20:16
产生:springboot,价格:20, 时间:2024-09-10 11:20:17
processWindowFunction
processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗
WindowFunction<IN, OUT, KEY, W extends Window>
Flink17ProcessWindowApp
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.stream.Collectors;
public class Flink17ProcessWindowApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//并行度为1更容易观察
env.setParallelism(1);
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
//分组
KeyedStream<VideoOrder, String> keyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
//滚动时间窗
SingleOutputStreamOperator<VideoOrder> processDS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<VideoOrder, VideoOrder, String, TimeWindow>.Context context, Iterable<VideoOrder> input, Collector<VideoOrder> output) throws Exception {
List<VideoOrder> list = IteratorUtils.toList(input.iterator());
//list ==> stream
int total = list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue();
VideoOrder videoOrder = new VideoOrder();
videoOrder.setMoney(total);
videoOrder.setTitle(list.get(0).getTitle());
videoOrder.setCreateTime(list.get(0).getCreateTime());
output.collect(videoOrder);
}
});
processDS.print();
//执行任务
env.execute("flink sliding job");
}
}
控制台输出
-----open-----
产生:redis,价格:10, 时间:2024-09-10 11:37:11
产生:springboot,价格:20, 时间:2024-09-10 11:37:12
产生:redis,价格:10, 时间:2024-09-10 11:37:13
产生:springboot,价格:20, 时间:2024-09-10 11:37:14
VideoOrder{tradeNo='null', title='redis', money=20, userId=0, createTime=2024-09-10 11:37:11}
VideoOrder{tradeNo='null', title='springboot', money=40, userId=0, createTime=2024-09-10 11:37:12}
产生:springboot,价格:20, 时间:2024-09-10 11:37:15
产生:springboot,价格:20, 时间:2024-09-10 11:37:16
产生:springboot,价格:20, 时间:2024-09-10 11:37:17
产生:redis,价格:10, 时间:2024-09-10 11:37:18
产生:springboot,价格:20, 时间:2024-09-10 11:37:19
VideoOrder{tradeNo='null', title='springboot', money=80, userId=0, createTime=2024-09-10 11:37:15}
VideoOrder{tradeNo='null', title='redis', money=10, userId=0, createTime=2024-09-10 11:37:18}
产生:redis,价格:10, 时间:2024-09-10 11:37:20
产生:redis,价格:10, 时间:2024-09-10 11:37:21
产生:springboot,价格:20, 时间:2024-09-10 11:37:22
迟到无序数据处理
在使用Window窗口函数时候,flink怎么知道哪个是字段是对应的时间呢?
由于网络问题,数据先产生,但是乱序延迟了,那属于哪个时间窗呢?
Flink里面定义窗口,可以引用不同的时间概念
Flink时间分类 | 说明 |
---|---|
事件时间EventTime(重点关注) | 事件发生的时间,事件时间是每个单独事件在其产生进程上发生的时间,这个时间通常在记录进入 Flink 之前记录在对象中,在事件时间中,时间值取决于数据产生记录的时间,而不是任何Flink机器上的 |
进入时间(IngestionTime) | 事件到进入Flink |
处理时间(ProcessingTime) | 事件被flink处理的时间,指正在执行相应操作的机器的系统时间,是最简单的时间概念,不需要流和机器之间的协调,它提供最佳性能和最低延迟,但是在分布式和异步环境中,处理时间有不确定性,存在延迟或乱序问题 |
事件时间已经能够解决所有的问题了,那为何还要用处理时间呢????
- 处理时间由于不用考虑事件的延迟与乱序,所以处理数据的速度高效,
如果一些应用比较重视处理速度而非准确性,那么就可以使用处理时间,但结果具有不确定性
- 事件时间有延迟,
但是能够保证处理的结果具有准确性,并且可以处理延迟甚至无序的数据
做了一个电商平台卖"男装衣服",如果要统计10分钟内成交额,你认为是哪个时间比较好?
应该使用事件时间(EventTime)才合理
- (EventTime) 下单支付时间是 2022-11-11 01-01-01
- (IngestionTime ) 进入Flink时间 2022-11-11 01-03-01(网络拥堵、延迟)
- (ProcessingTime)进入窗口时间 2022-11-11 01-31-01(网络拥堵、延迟)
乱序延迟时间处理-多层保证措施
如何保证在需要的窗口内获得指定的数据?(数据有乱序延迟)
flink采用watermark 、allowedLateness() 、sideOutputLateData()
三个机制来保证获取数据
机制 | 功能描述 | 补充 | 应用场景 |
---|---|---|---|
watermark | 防止数据出现延迟乱序,允许等待一会再触发窗口计算,"提前"输出 | 可以用来及时输出数据 | |
allowedLateness() | 将窗口关闭时间再延迟一段时间.设置后就像window变大了 | 那么为什么不直接把window设置大一点呢?或者把watermark加大点? watermark先输出数据,allowLateness会局部修复数据并主动更新窗口的数据输出,这期间的迟到数据不会被丢弃,而是会触发窗口重新计算 | 做短期的更新迟到数据 |
sideOutPut | 后兜底操作,超过allowLateness后,窗口已经彻底关闭了,就会把数据放到侧输出流 | 测输出流 OutputTag tag = new OutputTag(){}, 由于泛型查除问题,需要重写方法,加花括号 | 做兜底更新保证数据准确性 |
二次的趋势线是经过sideOutPut修复后的,所以会稍微多一点点
Flink机制归纳总结
Flink 默认的处理方式直接丢弃迟到的数据
机制 | 功能描述 | 补充 |
---|---|---|
第一层(窗口window) | 从DataStream数据流里指定范围获取数据 | DataStream没有getSideOutput方法,SingleOutputStreamOperator才有 |
第二层 (watermark) | 是防止数据出现乱序延迟,允许窗口等待延迟数据达到,再触发计算,提前输出 | |
第三层(allowLateness) | 会让窗口关闭时间再延迟一段时间, 如果还有数据达到,会局部修复数据并主动更新窗口的数据输出 | |
第四层 (sideOutPut) | 侧输出流是最后兜底操作,在窗口已经彻底关闭后,所有过期延迟数据放到侧输出流,可以单独获取,存储到某个地方再批量更新之前的聚合的数据 | sideOutPut还可以进行分流功能 |
版本弃用API
新接口,`WatermarkStrategy`,`TimestampAssigner` 和 `WatermarkGenerator` 因为其对时间戳和 watermark 等重点的抽象和分离很清晰,并且还统一了周期性和标记形式的 watermark 生成方式
新接口之前是用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks ,现在可以弃用了
乱序延迟时间处理-Watermark
背景:一般我们都是用EventTime事件时间进行处理统计数据,但数据由于网络问题延迟、乱序到达会导致窗口计算数据不准确
需求:比如时间窗是 [12:01:01,12:01:10 ) ,但是有数据延迟到达,当 12:01:10 秒数据到达的时候,不立刻触发窗口计算,而是等一定的时间,等迟到的数据来后再关闭窗口进行计算
每天10点后就是迟到,需要扣工资
扣款规定 | 对应到技术 |
---|---|
HR就规定迟到5分钟后就罚款100元 | 5分钟就是 watermark |
迟到30分钟就是上午事假处理 | 5~30分就是 allowLateness |
不请假都是要来的,超过30分钟就是侧输出流 | sideOutPut 兜底 |
Watermark 水位线
Watermark由flink的某个operator操作生成后,就在整个程序中随event数据流转
With Periodic Watermarks(周期生成,可以定义一个最大允许乱序的时间,用的很多)
With Punctuated Watermarks(标点水位线,根据数据流中某些特殊标记事件来生成,相对少)
衡量数据是否乱序的时间,什么时候不用等早之前的数据
Watermark 是一个全局时间戳,不是某一个key下的值
Watermark 是一个特殊字段,单调递增的方式,主要是和数据本身的时间戳做比较
用来确定什么时候不再等待更早的数据了,可以触发窗口进行计算,忍耐是有限度的,给迟到的数据一些机
注意:Watermark 设置太小会影响数据准确性
,设置太大会影响数据的实时性
,更加会加重Flink作业的负担
;需要经过测试
,和业务相关联,得出一个较合适的值
即可
窗口触发计算的时机
- watermark之前是按照窗口的关闭时间点计算的 [12:01:01,12:01:10 )
- watermark之后,触发计算的时机
- 窗口内有数据
- Watermaker >= Window EndTime窗口结束时间
- 触发计算后,其他窗口内数据再到达也被丢弃
Watermaker = 当前计算窗口最大的事件时间 - 允许乱序延迟的时间
1:05 = 1:10 - 5
- watermark之后,触发计算的时机
数据流中的事件是有序
数据流中的事件是无序
Watermaker 计算
Watermaker = 当前计算窗口最大的事件时间 - 允许乱序延迟的时间
触发W1窗口计算的时机
Watermaker >= Window EndTime窗口结束时间
;当前计算窗口最大的事件时间 - 允许乱序延迟的时间 >= Window EndTime窗口结束时间
案例
window大小为10s,窗口是W1 [23:12:00~23:12:10)
、 W2[23:12:10~23:12:20)
数据的event time |
---|
数据A 23:12:07 |
数据B 23:12:11 |
数据C 23:12:08 |
数据D 23:12:17 |
数据E 23:12:09 |
没加入watermark,由上到下进入flink
上来就丢失了数据,准确度较低
- 数据B到达:
W1就进行了窗口计算,数据只有A
- 数据C到达:
迟到了3秒,到了之后,由于W1已经计算了,所以就丢失了数据C
加入watermark, 允许5秒延迟乱序,由上到下进入flink
丢失的数据变少,准确度提高
- 数据A到达:
watermark = 12:07 - 5 = 12:02 < 12:10 ,所以不触发W1计算, A属于W1
- 数据B到达:
watermark = max{ 12:11, 12:07} - 5 = 12:06 < 12:10 ,所以不触发W1计算, B属于W2
- 数据C到达:
watermark = max{12:08, 12:11, 12:07} - 5 = 12:06 < 12:10 ,所以不触发W1计算, C属于W1
- 数据D到达:
watermark = max{12:17, 12:08, 12:11, 12:07} - 5 = 12:12 > 23:12:10 , 触发W1计算, D属于W2
- 数据E到达:
watermark = max{12:09, 12:17, 12:08, 12:11, 12:07} - 5 = 12:12 > 23:12:10 , 之前已触发W1计算, 所以丢失了E数据
Watermark+Window编码实战
需求
分组统计不同商品的成交总价
数据有乱序延迟,允许3秒的时间
10钟统计一次
时间工具类
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Date;
public class TimeUtil {
public static String formatTime(Date time) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
//指定时区:获取当前操作系统的时区
ZoneId zoneId = ZoneId.systemDefault();
//返回字符串
String timeStr = formatter.format(time.toInstant().atZone(zoneId));
return timeStr;
}
/**
* date 转 字符串
*
* @param timestamp
* @return
*/
public static String format(long timestamp) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
ZoneId zoneId = ZoneId.systemDefault();
String timeStr = formatter.format(new Date(timestamp).toInstant().atZone(zoneId));
return timeStr;
}
/**
* 字符串 转 date
*
* @param time
* @return
*/
public static Date strToDate(String time) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime localDateTime = LocalDateTime.parse(time, formatter);
return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
}
}
Flink18WatermarkWindowApp
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
import net.xdclass.util.TimeUtil;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class Flink19WatermarkWindowApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为1更容易观察
env.setParallelism(1);
//数据来源
//java,2022-11-11 23:12:07,10
//java,2022-11-11 23:12:11,10
DataStreamSource<String> ds = env.socketTextStream("127.0.0.1", 8888);
//一对多(转成Tuple3)
SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
String[] arr = value.split(",");
out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
}
});
//EventTime指定为arr[1] == 指定POJO的事件时间列
SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermakerDS = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy
//指定最大允许的(延迟/乱序)时间
.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
(event, timestamp) -> {
//指定POJO的事件时间列;EventTime指定为arr[1]
return TimeUtil.strToDate(event.f1).getTime();
}
));
//分组(keyBy)
SingleOutputStreamOperator<String> sumDS = watermakerDS.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> value) throws Exception {
//指定分组的列(title列)
return value.f0;
}
}) //开窗(window)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
//全量聚合:方便调试拿到窗口全部数据,全窗口函数
.apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> input, Collector<String> output) throws Exception {
//准备list,存放窗口的事件时间
ArrayList<String> timeList = new ArrayList<>();
//金额
int total = 0;
for (Tuple3<String, String, Integer> order : input) {
timeList.add(order.f1);
total = total + order.f2;
}
String resultStr = String.format("分组key:%s,聚合值:%s,窗口开始结束:[%s~%s),窗口所有事件时间:%s", key,total, TimeUtil.format(timeWindow.getStart()),TimeUtil.format(timeWindow.getEnd()), timeList);
output.collect(resultStr);
}
});
sumDS.print();
//执行任务
env.execute("flink watermark job");
}
}
测试数据
- 窗口 [23:12:00 ~ 23:12:10) | [23:12:10 ~ 23:12:20)
- 触发窗口计算条件
- 窗口内有数据
- watermark >= 窗口endtime
- 即 当前计算窗口最大的事件时间 - 允许乱序延迟的时间 >= Window EndTime窗口结束时间
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
mysql,2022-11-11 23:12:13,10
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:17,10
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:20,10
java,2022-11-11 23:12:22,10
java,2022-11-11 23:12:23,10
netcat
C:\Users\chao1>cd C:\Tools\netcat-win32-1.12
C:\Tools\netcat-win32-1.12>nc -lp 8888
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
mysql,2022-11-11 23:12:13,10
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:17,10
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:20,10
java,2022-11-11 23:12:22,10
java,2022-11-11 23:12:23,10
控制台输出
丢失数据4条数据
分组key:java,聚合值:20,窗口开始结束:[2022-11-11 23:12:00~2022-11-11 23:12:10),窗口所有事件时间:[2022-11-11 23:12:07, 2022-11-11 23:12:08]
分组key:mysql,聚合值:10,窗口开始结束:[2022-11-11 23:12:10~2022-11-11 23:12:20),窗口所有事件时间:[2022-11-11 23:12:13]
分组key:java,聚合值:30,窗口开始结束:[2022-11-11 23:12:10~2022-11-11 23:12:20),窗口所有事件时间:[2022-11-11 23:12:11, 2022-11-11 23:12:13, 2022-11-11 23:12:17]
二次兜底延迟数据处理 - Allowed Lateness
超过了watermark的等待后,还有延迟数据到达怎么办?
watermark先输出,然后配置allowedLateness 再延长时间,然后延迟数据到达后更新之前的窗口数据
Flink19AllowLatenessWindowApp
import net.xdclass.util.TimeUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
public class Flink19AllowLatenessWindowApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为1更容易观察
env.setParallelism(1);
//数据来源
//java,2022-11-11 23:12:07,10
//java,2022-11-11 23:12:11,10
DataStreamSource<String> ds = env.socketTextStream("127.0.0.1", 8888);
//一对多(转成Tuple3)
SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
String[] arr = value.split(",");
out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
}
});
//EventTime指定为arr[1] == 指定POJO的事件时间列
SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermakerDS = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy
//指定最大允许的(延迟/乱序)时间
.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
(event, timestamp) -> {
//指定POJO的事件时间列;EventTime指定为arr[1]
return TimeUtil.strToDate(event.f1).getTime();
}
));
//分组(keyBy)
SingleOutputStreamOperator<String> sumDS = watermakerDS.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> value) throws Exception {
//指定分组的列(title列)
return value.f0;
}
}) //开窗(window)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 允许延迟一分钟
.allowedLateness(Time.minutes(1))
//全量聚合:方便调试拿到窗口全部数据,全窗口函数
.apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> input, Collector<String> output) throws Exception {
//准备list,存放窗口的事件时间
ArrayList<String> timeList = new ArrayList<>();
//金额
int total = 0;
for (Tuple3<String, String, Integer> order : input) {
timeList.add(order.f1);
total = total + order.f2;
}
String resultStr = String.format("分组key:%s,聚合值:%s,窗口开始结束:[%s~%s),窗口所有事件时间:%s", key, total, TimeUtil.format(timeWindow.getStart()), TimeUtil.format(timeWindow.getEnd()), timeList);
output.collect(resultStr);
}
});
sumDS.print();
//执行任务
env.execute("flink watermark job");
}
}
测试数据
- 窗口 [23:12:00 ~ 23:12:10) | [23:12:10 ~ 23:12:20)
- 触发窗口计算条件
- 窗口内有数据
- watermark >= 窗口endtime
- 即 当前计算窗口最大的事件时间 - 允许乱序延迟的时间 >= Window EndTime窗口结束时间
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:23,10
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:02,10
java,2022-11-11 23:14:30,10
java,2022-11-11 23:12:03,10
netcat
C:\Users\chao1>cd C:\Tools\netcat-win32-1.12
C:\Tools\netcat-win32-1.12>nc -lp 8888
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
mysql,2022-11-11 23:12:13,10
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:17,10
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:20,10
java,2022-11-11 23:12:22,10
java,2022-11-11 23:12:23,10
控制台输出
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:23,10
#延迟1分钟内,所以会输出
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:02,10
java,2022-11-11 23:14:30,10
#延迟超过1分钟,不会输出
java,2022-11-11 23:12:03,10
兜底延迟数据处理-SideOutput侧输出流编码
容忍度更好,还能保证实时输出
背景
- 超过了watermark的等待后,还有延迟数据到达怎么办?
- watermark先输出,然后配置allowedLateness 再延长时间,然后到了后更新之前的窗口数据
- 数据超过了allowedLateness 后,就丢失了吗?
用侧输出流 SideOutput
Flink20SideOutPutLateDataWindowApp
import net.xdclass.util.TimeUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.ArrayList;
public class Flink20SideOutPutLateDataWindowApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为1更容易观察
env.setParallelism(1);
//数据来源
//java,2022-11-11 23:12:07,10
//java,2022-11-11 23:12:11,10
DataStreamSource<String> ds = env.socketTextStream("127.0.0.1", 8888);
//一对多(转成Tuple3)
SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
String[] arr = value.split(",");
out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
}
});
//EventTime指定为arr[1] == 指定POJO的事件时间列
SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermakerDS = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy
//指定最大允许的(延迟/乱序)时间
.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
(event, timestamp) -> {
//指定POJO的事件时间列;EventTime指定为arr[1]
return TimeUtil.strToDate(event.f1).getTime();
}
));
//兜底数据
OutputTag<Tuple3<String, String,Integer>> lateData = new OutputTag<Tuple3<String, String,Integer>>("lateData"){};
//分组(keyBy)
SingleOutputStreamOperator<String> sumDS = watermakerDS.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> value) throws Exception {
//指定分组的列(title列)
return value.f0;
}
}) //开窗(window)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 允许延迟一分钟
.allowedLateness(Time.minutes(1))
//兜底数据
.sideOutputLateData(lateData)
//全量聚合:方便调试拿到窗口全部数据,全窗口函数
.apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> input, Collector<String> output) throws Exception {
//准备list,存放窗口的事件时间
ArrayList<String> timeList = new ArrayList<>();
//金额
int total = 0;
for (Tuple3<String, String, Integer> order : input) {
timeList.add(order.f1);
total = total + order.f2;
}
String resultStr = String.format("分组key:%s,聚合值:%s,窗口开始结束:[%s~%s),窗口所有事件时间:%s", key, total, TimeUtil.format(timeWindow.getStart()), TimeUtil.format(timeWindow.getEnd()), timeList);
output.collect(resultStr);
}
});
sumDS.print();
//不会更新之前的窗口数据,需要代码单独写逻辑处理更新之前的数据,也可以积累后批处理
//TODO... 最后兜底处理,更新之前的数据(redis kafka……)
sumDS.getSideOutput(lateData).print("late data");
//执行任务
env.execute("flink watermark job");
}
}
测试数据
- 窗口 [23:12:00 ~ 23:12:10) | [23:12:10 ~ 23:12:20)
- 触发窗口计算条件
- 窗口内有数据
- watermark >= 窗口endtime
- 即 当前计算窗口最大的事件时间 - 允许乱序延迟的时间 >= Window EndTime窗口结束时间
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:23,10
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:02,10
java,2022-11-11 23:14:30,10
java,2022-11-11 23:12:03,10
java,2022-11-11 23:12:04,10
netcat
C:\Users\chao1>cd C:\Tools\netcat-win32-1.12
C:\Tools\netcat-win32-1.12>nc -lp 8888
控制台输出
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:23,10
#延迟1分钟内,所以会输出
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:02,10
java,2022-11-11 23:14:30,10
#延迟超过1分钟,不会输出,配置了sideOutPut,会在兜底输出
java,2022-11-11 23:12:03,10
java,2022-11-11 23:12:04,10
分组key:java,聚合值:20,窗口开始结束:[2022-11-11 23:12:00~2022-11-11 23:12:10),窗口所有事件时间:[2022-11-11 23:12:07, 2022-11-11 23:12:08]
分组key:java,聚合值:20,窗口开始结束:[2022-11-11 23:12:10~2022-11-11 23:12:20),窗口所有事件时间:[2022-11-11 23:12:11, 2022-11-11 23:12:13]
### 延迟1分钟内,所以会输出
分组key:java,聚合值:30,窗口开始结束:[2022-11-11 23:12:00~2022-11-11 23:12:10),窗口所有事件时间:[2022-11-11 23:12:07, 2022-11-11 23:12:08, 2022-11-11 23:12:09]
分组key:java,聚合值:40,窗口开始结束:[2022-11-11 23:12:00~2022-11-11 23:12:10),窗口所有事件时间:[2022-11-11 23:12:07, 2022-11-11 23:12:08, 2022-11-11 23:12:09, 2022-11-11 23:12:02]
分组key:java,聚合值:10,窗口开始结束:[2022-11-11 23:12:20~2022-11-11 23:12:30),窗口所有事件时间:[2022-11-11 23:12:23]
### 延迟超过1分钟,不会输出,配置了sideOutPut,会在兜底输出
late data> (java,2022-11-11 23:12:03,10)
late data> (java,2022-11-11 23:12:04,10)
Flink状态管理-State
什么是State状态
数据流处理离不开状态管理,比如窗口聚合统计、去重、排序等。是一个Operator的运行的状态/历史值,是维护在内存中的。
流程
一个算子的子任务接收输入流,获取对应的状态,计算新的结果,然后把结果更新到状态里面
有状态、无状态
算子类型 | 描述 |
---|---|
无状态计算 | 同个数据进到算子里面多少次,都是一样的输出,比如 filter (过/不过) |
有状态计算 | 需要考虑历史状态,同个输入会有不同的输出,比如sum、reduce聚合操作 |
状态管理分类
类型 | 细分 | 描述 |
---|---|---|
Keyed State 键控状态(用的多) | Flink管理,自动存储恢复 | |
Keyed State 键控状态(用的多) | 有KeyBy才用这个,仅限用在KeyStream中,每个key都有state ,是基于KeyedStream上的状态,一般是用richFlatFunction,或者其他richfunction里面,在open()声明周期里面进行初始化,ValueState、ListState、MapState等数据结构 | |
Operator State 算子状态(用的少,部分source会用) | ListState、UnionListState、BroadcastState等数据结构 | |
RawState(用的少) | 用户自己管理和维护 ,存储结构:二进制数组 |
State数据结构
状态值可能存在内存、磁盘、DB或者其他分布式存储中
数据结构 | 描述 | API |
---|---|---|
ValueState | 简单的存储一个值(ThreadLocal / String) | ValueState.value()、ValueState.update(T value) |
ListState | 列表 | ListState.add(T value)、ListState.get() //得到一个Iterator |
MapState | 映射类型 | MapState.get(key)、MapState.put(key, value) |
Flink的状态State后端存储
从Flink 1.13开始,社区重新设计了其公共状态后端类,以帮助用户更好地理解本地状态存储
和检查点存储
的分离
用户可以迁移现有应用程序以使用新 API,而不会丢失任何状态或一致性
State状态后端(存储在哪里)
Flink 内置了以下这些开箱即用的 state backends
如果没有其他配置,系统将使用 HashMapStateBackend。
state backend | 版本 |
---|---|
HashMapStateBackend | 新版 |
EmbeddedRocksDBStateBackend | 新版 |
MemoryStateBackend | 旧版 |
FsStateBackend | 旧版 |
RocksDBStateBackend | 旧版 |
MemoryStateBackend | 旧版 |
不同state backend的特点和适用场景
state backend | 描述 | 特点 | 适用场景 |
---|---|---|---|
HashMapStateBackend | 保存数据在内部,作为Java堆的对象 | 键/值状态和窗口操作符持有哈希表,用于存储值、触发器等。非常快,因为每个状态访问和更新都对Java 堆上的对象进行操作,但是状态大小受集群内可用内存的限制 。 | 具有大状态、长窗口、大键/值状态的作业。所有高可用性设置。 |
EmbeddedRocksDBStateBackend | 在RocksDB数据库中保存状态数据 | 该数据库(默认)存储在 TaskManager 本地数据目录中,与HashMapStateBackend在java存储 对象不同,数据存储为序列化的字节数组。RocksDB可以根据可用磁盘空间进行扩展,并且是唯一支持增量快照的状态后端。但是每个状态访问和更新都需要(反)序列化并可能从磁盘读取,这导致平均性能比内存状态后端慢一个数量级 | 具有非常大状态、长窗口、大键/值状态的作业。所有高可用性设置 |
MemoryStateBackend(旧版) | 内存,不推荐在生产场景使用 | ||
FsStateBackend(旧版) | 文件系统上,本地文件系统、HDFS, 性能更好,常用 | ||
RocksDBStateBackend(旧版) | 无需担心OOM风险,是大部分时候的选择 |
配置state backend
- 引入依赖
<!--flinkflink-statebackend-rocksdb-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
<version>1.13.1</version>
</dependency>
- 配置方式一(全局配置)
flink-conf.yaml使用配置键在 中配置默认状态后端state.backend。`
# 配置条目的可能值是hashmap (HashMapStateBackend)、rocksdb (EmbeddedRocksDBStateBackend)
或实现状态后端工厂StateBackendFactory的类的完全限定类名
# 全局配置例子一
# The backend that will be used to store operator state checkpoints
state.backend: hashmap
# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager
# 全局配置例子二
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
- 方式二代码(单独job配置例子)
//代码配置一(基于内存不推荐):HashMapStateBackend
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
//代码配置二(经常使用):EmbeddedRocksDBStateBackend
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
//或者
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
Flink的状态State管理编码-订单数据统计实现MaxBy操作
sum()、maxBy() 等函数底层源码也是有ValueState进行状态存储
需求
根据订单进行分组,统计找出每个商品最大的订单成交额
不用maxBy实现,用ValueState实现
根据订单进行分组,统计找出每个商品最大的订单成交额
Flink22StateMaxByApp
import net.xdclass.util.TimeUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.ArrayList;
public class Flink22StateMaxByApp {
/**
* 使用valueState实现maxBy功能,统计分组内订单金额最高的订单
*
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为1更容易观察
env.setParallelism(1);
//数据来源 java,2022-11-11 23:12:07,10
DataStreamSource<String> ds = env.socketTextStream("127.0.0.1", 8888);
SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
String[] arr = s.split(",");
collector.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
}
});
//分组、聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> maxVideoOrderDS = flatMapDS.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> value) throws Exception {
return value.f0;
}
}).map(new RichMapFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>>() {
//局部变量(商品最大成交金额)
private ValueState<Integer> maxVideoOrderState = null;
/**
* 初始化状态
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
//获取上下文信息
maxVideoOrderState = getRuntimeContext().getState(new ValueStateDescriptor<>("maxValue", Integer.class));
}
@Override
public Tuple2<String, Integer> map(Tuple3<String, String, Integer> value) throws Exception {
//获取历史值
Integer maxValue = maxVideoOrderState.value();
//获取当前值
Integer currentValue = value.f2;
//判断(更新状态,把当前最大的值存储到state)
if (maxValue == null || currentValue > maxValue) {
maxVideoOrderState.update(currentValue);
return Tuple2.of(value.f0, currentValue);
} else {
return Tuple2.of(value.f0, maxValue);
}
}
@Override
public void close() throws Exception {
super.close();
}
});
//打印
maxVideoOrderDS.print("商品最大交额的订单");
//执行任务
env.execute("flink watermark job");
}
}
netcat
C:\Tools\netcat-win32-1.12>nc -lp 8888
java,2021,19
java,2022,21
java,2022,12
控制台输出
商品最大交额的订单> (java,19)
商品最大交额的订单> (java,21)
商品最大交额的订单> (java,21)
Flink的Checkpoint-SavePoint和端到端(end-to-end)状态一致性
Checkpoint 检查点
- Flink中所有的Operator的
当前State的全局快照
- 默认情况下
checkpoint 是禁用的,需要开启
- Checkpoint是把State数据定时
持久化存储,防止丢失
Savepoint 保存点
- 手工调用checkpoint,叫 savepoint,
主要是用于flink集群维护升级等
- 底层使用了Chandy-Lamport 分布式快照算法,
保证数据在分布式环境下的一致性
开箱即用,Flink 捆绑了这些检查点存储类型
检查点存储类型 | 功能描述 |
---|---|
JobManagerCheckpointStorage | 作业管理器检查点存储 |
FileSystemCheckpointStorage | 文件系统检查点存储 |
配置
### 全局配置checkpoints
# 全局配置
state.checkpoints.dir: hdfs:///checkpoints/
# 作业单独配置checkpoints(代码配置)
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");
### 全局配置savepoint
state.savepoints.dir: hdfs:///flink/savepoints
Savepoint 与 Checkpoint 的不同之处
- 类似于传统数据库中的备份与恢复日志之间的差异
- Checkpoint 的主要目的是为意外失败的作业提供【重启恢复机制】,
- Checkpoint 的生命周期
由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交互
- Savepoint
由用户创建,拥有和删除, 主要是【升级 Flink 版本】,调整用户逻辑
- 除去概念上的差异,
Checkpoint 和 Savepoint 的当前实现基本上使用相同的代码并生成相同的格式
端到端(end-to-end)状态一致性
数据一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的。
在真实应用中,了流处理器以外还包含了数据源(例如Kafka、Mysql)和输出到持久化系统(Kafka、Mysql、Hbase、CK)
端到端的一致性保证,是意味着结果的正确性贯穿了整个流处理应用的各个环节,每一个组件都要保证自己的一致性。
处理环节 | 如何保证数据一致性 |
---|---|
Source | 需要外部数据源可以重置读取位置,当发生故障的时候重置偏移量到故障之前的位置 |
内部 | 依赖Checkpoints机制,在发生故障的时可以恢复各个环节的数据 |
Sink | 当故障恢复时,数据不会重复写入外部系统,常见的就是 幂等和事务写入(和checkpoint配合) |
Flink的Checkpoint代码配置编码
Flink23CheckpointApp
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class Flink23CheckpointApp {
/**
* 使用valueState实现maxBy功能,统计分组内订单金额最高的订单
*
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为1(更容易观察)
env.setParallelism(1);
//两个检查点之间的间隔时间 (默认是0,单位毫秒):检查点快照比较耗费资源,因此不能过于频分
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//容忍度:Checkpoint过程中出现错误,是否让整体任务都失败(默认值为0,表示不容忍任何Checkpoint失败):可以容忍5次失败
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
//当一个Flink应用程序失败终止、人为取消等时,它的Checkpoint就会被清除。可以配置不同策略进行操作
//DELETE_ON_CANCELLATION: 当作业取消时,Checkpoint 状态信息会被删除,因此取消任务后,不能从 Checkpoint 位置进行恢复任务
//RETAIN_ON_CANCELLATION(多的多): 当作业手动取消时,将会保留作业的 Checkpoint 状态信息,要手动清除该作业的 Checkpoint 状态信息
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//数据一致性保障策略:Flink默认提供Extractly-Once保证State的一致性,还提供了Extractly-Once,At-Least-Once 两种模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//快照耗时最大时间:设置checkpoint的超时时间, 如果规定时间没完成则放弃,默认是10分钟
env.getCheckpointConfig().setCheckpointTimeout(60000);
//设置同一时刻有多少个checkpoint可以同时执行,默认为1就行,以避免占用太多正常数据处理资源
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//设置了重启策略, 作业在失败后能自动恢复,失败后最多重启3次,每次重启间隔10s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
//执行任务
env.execute("flink Checkpoint job");
}
}
复杂事件处理 CEP
FlinkCEP
- CEP全称
Complex event processing 复杂事件处理
,和正则表达式类似 - FlinkCEP 是在 Flink 之上实现的复杂事件处理(CEP)库
- 擅长高吞吐、低延迟的处理,市场上有多种CEP的解决方案,例如Spark,但是Flink专门类库更方便使用
- 地址:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/cep/
用途
- 检测和发现无边界事件流中多个记录的关联规则,
得到满足规则的复杂事件
- 允许业务定义要
从输入流中提取的复杂模式序列
模式(Pattern)
定义处理事件的规则
模式类型 | 描述 |
---|---|
个体模式(Individual Patterns) | 组成复杂规则的每一个单独的模式定义,就是个体模式 |
组合模式(Combining Patterns) | 很多个体模式组合起来,形成组合模式 |
模式组(Groups of Patterns) | 将一个组合模式作为条件嵌套在个体模式里,就是模式组 |
近邻模式
模式类型 | 描述 |
---|---|
严格近邻 | 期望所有匹配事件严格地一个接一个出现,中间没有任何不匹配的事件, API是.next() |
宽松近邻 | 允许中间出现不匹配的事件,API是.followedBy() |
非确定性宽松近邻 | 可以忽略已经匹配的条件,API是followedByAny() |
指定时间约束 | 指定模式在多长时间内匹配有效,API是within |
模式分类
模式分类 | 描述 |
---|---|
单次模式 | 接收一次一个事件 |
循环模式 | 接收一个或多个事件 |
其他参数
参数 | 功能描述 |
---|---|
times | 指定固定的循环执行次数 |
greedy | 贪婪模式,尽可能多触发 |
oneOrMore | 指定触发一次或多次 |
timesOrMore | 指定触发固定以上的次数 |
optional | 要么不触发要么触发指定的次数 |
引入依赖
<!--flink-cep-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
使用流程
- 定义pattern
- pattern应用到数据流,得到模式流
- 从模式流 获取结果
//定义pattern
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
).next("middle").subtype(SubEvent.class).where(
new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
).followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
}
);
//pattern应用到数据流,得到模式流
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
//从模式流 获取结果
DataStream<Alert> result = patternStream.process(
new PatternProcessFunction<Event, Alert>() {
@Override
public void processMatch(
Map<String, List<Event>> pattern,
Context ctx,
Collector<Alert> out) throws Exception {
out.collect(createAlertFrom(pattern));
}
});
CEP-账号登录风控检测
需求
同个账号,在5秒内连续登录失败2次,则认为存在而已登录问题
数据格式:soulboy,2022-11-11 12:01:01,-1
,状态码-1
代表登录失败。
引入依赖
<!--flink-cep-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
Flink24CEPLoginApp
import net.xdclass.util.TimeUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
public class Flink24CEPLoginApp {
/**
* 使用valueState实现maxBy功能,统计分组内订单金额最高的订单
*
* @param args
*/
public static void main(String[] args) throws Exception {
//获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为1更容易观察
env.setParallelism(1);
//数据来源 soulboy,2022-11-11 12:01:01,-1 状态码`-1`代表登录失败。
DataStreamSource<String> ds = env.socketTextStream("127.0.0.1", 8888);
//转成Tuple3
SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMap = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
String[] arr = s.split(",");
collector.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
}
});
//指定event time列
SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarkDS = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy
//生成一个延迟3s的watermark
//.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
//分配watermark策略=>时间是单调递增,event中的时间戳充当了水印
.<Tuple3<String, String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> TimeUtil.strToDate(event.f1).getTime()));
//分组
KeyedStream<Tuple3<String, String, Integer>, String> keyedStream = watermarkDS.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> value) throws Exception {
//获取到账户:(soulboy,2022-11-11 12:01:01,-1 )
return value.f0;
}
});
//cep:定义模式(Pattern) 匹配5秒内登录2次失败
Pattern<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>> pattern = Pattern.<Tuple3<String, String, Integer>>
begin("firstLoginTime")
.where(new SimpleCondition<Tuple3<String, String, Integer>>() {
@Override
public boolean filter(Tuple3<String, String, Integer> value) throws Exception {
//-1是登录失败的错误码
return value.f2 == -1;
}
})
.next("secondLoginTime")
.where(new SimpleCondition<Tuple3<String, String, Integer>>() {
@Override
public boolean filter(Tuple3<String, String, Integer> value) throws Exception {
//-1是登录失败的错误码
return value.f2 == -1;
}
}).within(Time.seconds(5));
//cep:pattern应用到数据流,得到模式流
PatternStream<Tuple3<String, String, Integer>> patternStream = CEP.pattern(keyedStream, pattern);
//cep:从模式流获取结果 输出到redis中Tuple3 类型<String, String, String> 账户、第一次登录失败时间、第二次登录失败时间
SingleOutputStreamOperator<Tuple3<String, String, String>> selectedResult = patternStream.select(new PatternSelectFunction<Tuple3<String, String, Integer>, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> select(Map<String, List<Tuple3<String, String, Integer>>> map) throws Exception {
Tuple3<String, String, Integer> firstLoginTimeEvent = map.get("firstLoginTime").get(0);
Tuple3<String, String, Integer> secondLoginTimeEvent = map.get("secondLoginTime").get(0);
//Tuple3.of(账户,第一次登录失败时间,第二次登录失败时间) 输出到redis 或其他sink
return Tuple3.of(firstLoginTimeEvent.f0, firstLoginTimeEvent.f1, secondLoginTimeEvent.f1);
}
});
//输出风险账户: TODO ... sink 可以输出到 redis mysql kafka
//调用微服务之前,获取登录信息时在redis中判断该账户是不是风险账户(redis中判断),如果是风险账户则触发输入验证码
selectedResult.print("风险帐号");
//执行任务
env.execute("cep job");
}
}
测试数据
soulboy,2022-11-11 12:01:01,-1
dero,2022-11-11 12:01:10,-1
dero,2022-11-11 12:01:11,-1
soulboy,2022-11-11 12:01:13,-1
dero,2022-11-11 12:01:14,-1
dero,2022-11-11 12:01:15,1
soulboy,2022-11-11 12:01:16,-1
dero,2022-11-11 12:01:17,-1
soulboy,2022-11-11 12:01:20,1
cmd命令行
C:\Users\chao1>c:
C:\Users\chao1>cd C:\Tools\netcat-win32-1.12
C:\Tools\netcat-win32-1.12>nc -lp 8888
soulboy,2022-11-11 12:01:01,-1
dero,2022-11-11 12:01:10,-1
dero,2022-11-11 12:01:11,-1
soulboy,2022-11-11 12:01:13,-1
dero,2022-11-11 12:01:14,-1
dero,2022-11-11 12:01:15,1
soulboy,2022-11-11 12:01:16,-1
dero,2022-11-11 12:01:17,-1
soulboy,2022-11-11 12:01:20,1
idea控制台输出
风险帐号> (dero,2022-11-11 12:01:10,2022-11-11 12:01:11)
风险帐号> (dero,2022-11-11 12:01:11,2022-11-11 12:01:14)
风险帐号> (soulboy,2022-11-11 12:01:13,2022-11-11 12:01:16)
Flink项目打包(插件)阿里云部署
Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同
,详细可参考文档
部署模式 | 描述 |
---|---|
Local | 本地部署,直接启动进程,适合调试使用(直接部署启动服务) |
Standalone Cluster | 集群部署,flink自带集群模式 |
Hadoop YARN | 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率 |
Kubernetes | 部署 |
Docker | 部署 |
Flink1.13.1下载和Local本地模式部署
目录介绍
conf目录
flink-conf.yaml
# web ui 端口
rest.port=8081
# 调整内存大小
jobmanager.memory.process.size: 1000m
taskmanager.memory.process.size: 1000m
taskmanager.numberOfTaskSlots: 4
bin目录
脚本名 |
---|
start-cluster.sh |
stop-cluster.sh |
yarn-session.sh |
解压
[root@Flink software]# tar -zxvf flink-1.13.1-bin-scala_2.12.tgz
[root@Flink software]# mv flink-1.13.1 flink
常用命令
# 启动flink
[root@Flink flink]# cd /usr/local/software/flink/bin/
[root@Flink bin]# ./start-cluster.sh
# 停止flink
bin/stop-cluster.sh
# 查看进程 jps
[root@Flink bin]# jps
TaskManagerRunner
StandaloneSessionClusterEntrypoint
local模式命令行运行jar包
创建文件
[root@Flink bin]# mkdir /usr/local/software/flink/examples/source
[root@Flink bin]# cat /usr/local/software/flink/examples/source/soulboy.txt
java soulboy
springboot springcloud
html flink
springboot redis
java flink
kafka flink
java springboot
bin目录运行官方案例
统计各单词出现的次数
[root@Flink bin]# pwd
/usr/local/software/flink/bin
[root@Flink bin]# ./flink run /usr/local/software/flink/examples/batch/WordCount.jar --input /usr/local/software/flink/examples/source/soulboy.txt --output /usr/local/software/flink/examples/source/soulboy_result.txt
Job has been submitted with JobID 8079e6a6818d0f8dc20174028ff04de7
Program execution finished
Job with JobID 8079e6a6818d0f8dc20174028ff04de7 has finished.
Job Runtime: 387 ms
查看输出结果
[root@Flink bin]# cat /usr/local/software/flink/examples/source/soulboy_result.txt
flink 3
html 1
java 3
kafka 1
redis 1
soulboy 1
springboot 3
springcloud 1
Flink Web Dashboard
http://192.168.10.62:8081/#/overview
Flink项目打包
Maven打包插件
插件名 | 功能描述 |
---|---|
maven-jar-plugin | |
默认的打包插件,用来打普通的jar包,需建立lib目录里来存放需要的依赖包 | |
maven-shade-plugin (推荐) | 将依赖的jar包打包到当前jar包 ,成为fat JAR包,也可以防止类冲突(隔离) |
maven-assembly-plugin | 大数据项目用的比较多 ,支持自定义的打包结构,比如sql/shell等 |
测试插件
插件名 | 功能描述 |
---|---|
maven-surefire-plugin | 用于mvn 生命周期的测试阶段的插件,通过参数设置在junit下控制测试 |
引入maven常用插件
调整为JDK8,java -version
<build>
<finalName>soulboy-flink</finalName>
<plugins>
<!--默认编译版本比较低,所以用compiler插件,指定项目源码的jdk版本,编译后的jdk版本和编码,-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${file.encoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
本地Flink项目打包
调整为JDK8,java -version
mvn clean
mvn install
Flink的WebUI运行jar包
Centos7安装Netcat
# 下载 Netcat RPM 包
[root@Flink tmp]# wget http://vault.centos.org/7.9.2009/os/x86_64/Packages/nmap-ncat-6.40-19.el7.x86_64.rpm
# 安装依赖项 libpcap
[root@Flink tmp]# wget http://vault.centos.org/7.9.2009/os/x86_64/Packages/libpcap-1.5.3-12.el7.x86_64.rpm
[root@Flink tmp]# sudo rpm -ivh libpcap-1.5.3-12.el7.x86_64.rpm
# 安装 Netcat
[root@Flink tmp]# sudo rpm -ivh nmap-ncat-6.40-19.el7.x86_64.rpm
# 验证安装
[root@Flink tmp]# nc -h
Flink Web Dashboard
-
上传jar包
-
选择main入口类APP
-
提交任务查看情况
# Centos7监听 [root@Flink tmp]# nc -lk 8888
Task Solt
Task Solt指taskmanager的并发执行能力
parallelism是指taskmanager实际使用的并发能力
Task Slots 是具备的并发能力,大于 Parallelism并行度(实际用的)
数据流里面算子的最大并行度就是Parallelism, 2-2-2-3-1 这样的并行度,最大的Parallelism就是3(同个任务job里面)
# 假如每一个taskmanager中的分配4个TaskSlot,
# 那有3个taskmanager一共有12个TaskSlot
taskmanager.numberOfTaskSlots:4
Docker-Compose容器化部署Flink集群实战
安装并运行Docker
yum install docker-io -y
systemctl start docker
systemctl start docker #运行Docker守护进程
systemctl stop docker #停止Docker守护进程
systemctl restart docker #重启Docker守护进程
# 修改镜像仓库vim /etc/docker/daemon.json 改为下面内容,然后重启docker
{
"debug":true,"experimental":true,
"registry-mirrors":["https://pb5bklzr.mirror.aliyuncs.com","https://hub-mirror.c.163.com","https://docker.mirrors.ustc.edu.cn"]
}
#查看信息
docker info
Docker Compose容器编排安装
官方安装地址:https://docs.docker.com/compose/install/
curl -SL https://github.com/docker/compose/releases/download/v2.24.7/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
docker-compose version
创建docker-compose.yml 文件
配置Session Cluster模式
[root@Flink tmp]# cat docker-compose.yml
version: "3.7"
services:
jobmanager:
image: flink:scala_2.12-java8
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:scala_2.12-java8
depends_on:
- jobmanager
command: taskmanager
scale: 3
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
部署Flink
[root@Flink tmp]# chmod +x /usr/local/bin/docker-compose
[root@Flink tmp]# docker-compose up -d
有3个TaskManager,每个TaskManager有2个TaskSlots,所以一共有6个Task Slots
端口说明
The Web Client is on port 8081
JobManager RPC port 6123
TaskManagers RPC port 6122
TaskManagers Data port 6121
注意:docker-compose.yml 文件中 expose 和 ports 的区别
expose暴露容器给link到当前容器的容器
ports是暴露容器端口到宿主机端口进行映射
问题
- 内存不足 (其他程序不运行,最少也需要1核2g,建议是4或者8g)
- 网络安全组没开放端口 8081
实时监控告警
需求:公司开发一个监控告警平台(类似nginx访问日志)
需求 |
---|
每5秒统计过去1分钟可以统计各个接口的访问量(滑动时间窗 + 聚合) |
每5秒统计过去1分钟各个接口的各个状态码次数 (多级分组) |
日志来源:nginx访问日志
日志特点 |
---|
访问日志日志来源存在乱序、无效访问记录(爬虫) |
要实时显示数据(Watermark),可以间隔时间二次更新(Allowlateness) |
最大允许1分钟延迟,超过后兜底保存(SideOutput) |
定义POJO类
AccessLogDO
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AccessLogDO {
private String title;
private String url;
private String method;
private Integer httpCode;
private String body;
private Date createTime;
private String userId;
private String city;
}
ResultCount
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ResultCount {
private String url;
private Integer code;
private Long count;
private String startTime;
private String endTime;
private String type;
}
自定义source
AccessLogSource
import net.xdclass.util.TimeUtil;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
/**
* 模拟source,未来可以从kafka中读取数据来源
*/
public class AccessLogSource extends RichParallelSourceFunction<AccessLogDO> {
private volatile Boolean flag = true;
private Random random = new Random();
//模拟不同接口调用
private static List<AccessLogDO> urlList = new ArrayList<>();
static {
urlList.add(new AccessLogDO("首页","/pub/api/v1/web/index_card","GET",200,"",new Date(),"",""));
urlList.add(new AccessLogDO("个人信息","/pub/api/v1/web/user_info","GET",200,"",new Date(),"",""));
urlList.add(new AccessLogDO("分类列表","/pub/api/v1/web/all_category","GET",200,"",new Date(),"",""));
urlList.add(new AccessLogDO("分页视频","/pub/api/v1/web/page_video","GET",200,"",new Date(),"",""));
urlList.add(new AccessLogDO("收藏","/user/api/v1/favorite/save","POST",200,"",new Date(),"",""));
urlList.add(new AccessLogDO("下单","/user/api/v1/product/order/save","POST",200,"",new Date(),"",""));
urlList.add(new AccessLogDO("异常url","","POST",200,"",new Date(),"",""));
}
//状态码
private static List<Integer> codeList = new ArrayList<>();
static {
codeList.add(200);
codeList.add(200);
codeList.add(200);
codeList.add(502);
codeList.add(403);
}
@Override
public void run(SourceContext<AccessLogDO> ctx) throws Exception {
while (flag){
Thread.sleep(1000);
int userId = random.nextInt(50);
int httpCodeNum = random.nextInt(codeList.size());
int accessLogNum = random.nextInt(urlList.size());
AccessLogDO accessLogDO = urlList.get(accessLogNum);
accessLogDO.setHttpCode(codeList.get(httpCodeNum));
accessLogDO.setUserId(userId+"");
//模拟迟到数据,100秒波动
//long timestamp = System.currentTimeMillis() - random.nextInt(100000);
//当前时间 - 0~5秒之间得随机时间
long timestamp = System.currentTimeMillis() - random.nextInt(5000);
accessLogDO.setCreateTime(new Date(timestamp));
System.out.println("产生:" + accessLogDO.getTitle() + ",状态码:" + accessLogDO.getHttpCode() + ", 时间:" + TimeUtil.formatTime(accessLogDO.getCreateTime()));
//收集accesslog
ctx.collect(accessLogDO);
}
}
@Override
public void cancel() {
flag = false;
}
}
过滤-分组-开窗-分配Watermark-聚合
ProcessWindowFunction 方法说明
一次性迭代整个窗口里的所有元素
,通过Context,可以获取到事件、窗口和状态信息可以和ReduceFunction, AggregateFunction 来做增量计算
- 可以在Agg方法做第2个参数 ,windowFunction 会把每个key的窗口聚合后的结果(
AggregateFunction<T, ACC, V> aggFunction
)带上 上下文信息进行输出,aggFunction 会传给 windowFunction
- 之前在agg方法里面的
ProcessWindowFunction
是获取整个窗口的全部元素,现在agg方法里面的
ProcessWindowFunction是**获取聚合后的结果(一个元素)**
aggregate( AggregateFunction<T, ACC, V> aggFunction,
ProcessWindowFunction<V, R, K, W> windowFunction )
SoulboyMonitorApp
import net.xdclass.util.TimeUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
public class SoulboyMonitorApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStreamSource<AccessLogDO> ds = environment.addSource(new AccessLogSource());
//过滤url: ""为不合法
SingleOutputStreamOperator<AccessLogDO> filterDS = ds.filter(new FilterFunction<AccessLogDO>() {
@Override
public boolean filter(AccessLogDO value) throws Exception {
return StringUtils.isNotBlank(value.getUrl());
}
});
//指定watermark(插线),指定eventtime的列
SingleOutputStreamOperator<AccessLogDO> watermarkDS = filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<AccessLogDO>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event, timestamp) -> {
//指定eventtime的列
return event.getCreateTime().getTime();
}));
//最后兜底数据
OutputTag<AccessLogDO> lateDate = new OutputTag<AccessLogDO>("lateDataLog") {
};
//分组
KeyedStream<AccessLogDO, String> keyedStream = watermarkDS.keyBy(new KeySelector<AccessLogDO, String>() {
@Override
public String getKey(AccessLogDO value) throws Exception {
return value.getUrl();
}
});
//开窗(滑动):每5秒统计过去1分钟可以统计各个接口的访问量
WindowedStream<AccessLogDO, String, TimeWindow> windowedStream = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(5)))
//允许有1分钟延迟
.allowedLateness(Time.minutes(1))
//兜底侧输出流(不会更新到时间窗口,需要人工兜底处理)
.sideOutputLateData(lateDate);
//聚合 <输入类型,中间聚合类型(接口访问量),输出类型>
SingleOutputStreamOperator<ResultCount> aggregate = windowedStream.aggregate(
new AggregateFunction<AccessLogDO, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(AccessLogDO accessLogDO, Long accumulator) {
return accumulator+1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a+b;
}
},
//<aggFunction的集合结果类型,输出类型,key类型,window>
new ProcessWindowFunction<Long, ResultCount, String, TimeWindow>() {
@Override
//process(value是分组的key(url), context(上下文信息), iterable(AggregateFunction()聚合后的结果,只有一个元素), collector(收集输出))
public void process(String value, ProcessWindowFunction<Long, ResultCount, String, TimeWindow>.Context context, Iterable<Long> iterable, Collector<ResultCount> collector) throws Exception {
ResultCount resultCount = new ResultCount();
resultCount.setUrl(value);
resultCount.setStartTime(TimeUtil.format(context.window().getStart()));
resultCount.setEndTime(TimeUtil.format(context.window().getEnd()));
//上一步AggregateFunction()聚合结果只有一个,因此这里不需要迭代,直接next()即可获取数据(接口访问量)
long total = iterable.iterator().next();
resultCount.setCount(total);
//收集数据输出
collector.collect(resultCount);
}
});
aggregate.print("实时1分钟接口访问量");
environment.execute("SoulboyMonitor");
}
}
多接口多状态码统计
多个分组字段
SoulboyApiCodeMonitorApp
import net.xdclass.util.TimeUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
public class SoulboyApiCodeMonitorApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStreamSource<AccessLogDO> ds = environment.addSource(new AccessLogSource());
//过滤url: ""为不合法
SingleOutputStreamOperator<AccessLogDO> filterDS = ds.filter(new FilterFunction<AccessLogDO>() {
@Override
public boolean filter(AccessLogDO value) throws Exception {
return StringUtils.isNotBlank(value.getUrl());
}
});
//指定watermark(插线),指定eventtime的列
SingleOutputStreamOperator<AccessLogDO> watermarkDS = filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<AccessLogDO>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event, timestamp) -> {
//指定eventtime的列
return event.getCreateTime().getTime();
}));
//最后兜底数据
OutputTag<AccessLogDO> lateDate = new OutputTag<AccessLogDO>("lateDataLog") {
};
//多字段分组
//KeySelector<输入类型,分组key类型(Tuple2<url, state_code>)>
KeyedStream<AccessLogDO, Tuple2<String, Integer>> keyedStream = watermarkDS.keyBy(new KeySelector<AccessLogDO, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> getKey(AccessLogDO value) throws Exception {
return Tuple2.of(value.getUrl(), value.getHttpCode());
}
});
//开窗
WindowedStream<AccessLogDO, Tuple2<String, Integer>, TimeWindow> windowedStream = keyedStream.window(SlidingEventTimeWindows.of(
Time.seconds(60), Time.seconds(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateDate);
//聚合 <输入类型,中间类型,输出类型>
SingleOutputStreamOperator<ResultCount> aggregateDS = windowedStream.aggregate(
new AggregateFunction<AccessLogDO, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(AccessLogDO value, Long accumulator) {
return accumulator+1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a+b;
}
},
//<aggFunction的集合结果类型,输出类型,key类型,window>
new ProcessWindowFunction<Long, ResultCount, Tuple2<String, Integer>, TimeWindow>() {
@Override
public void process(Tuple2<String, Integer> value, ProcessWindowFunction<Long, ResultCount, Tuple2<String, Integer>, TimeWindow>.Context context, Iterable<Long> iterable, Collector<ResultCount> collector) throws Exception {
ResultCount resultCount = new ResultCount();
resultCount.setUrl(value.f0);
resultCount.setCode(value.f1);
//上一步AggregateFunction()聚合结果只有一个,因此这里不需要迭代,直接next()即可获取数据(接口访问量)
long total = iterable.iterator().next();
resultCount.setCount(total);
resultCount.setStartTime(TimeUtil.format(context.window().getStart()));
resultCount.setEndTime(TimeUtil.format(context.window().getEnd()));
collector.collect(resultCount);
}
});
aggregateDS.print("接口状态码");
aggregateDS.getSideOutput(lateDate).print("late data");
environment.execute("SoulboyMonitor");
}
}
控制台输出
产生:分页视频,状态码:200, 时间:2024-09-16 10:48:12
产生:首页,状态码:403, 时间:2024-09-16 10:48:15
产生:收藏,状态码:403, 时间:2024-09-16 10:48:14
产生:首页,状态码:502, 时间:2024-09-16 10:48:15
产生:分页视频,状态码:502, 时间:2024-09-16 10:48:18
接口状态码> ResultCount(url=/pub/api/v1/web/page_video, code=200, count=1, startTime=2024-09-16 10:47:15, endTime=2024-09-16 10:48:15, type=null)
接口状态码> ResultCount(url=/user/api/v1/favorite/save, code=403, count=1, startTime=2024-09-16 10:47:15, endTime=2024-09-16 10:48:15, type=null)
产生:首页,状态码:200, 时间:2024-09-16 10:48:19
产生:收藏,状态码:403, 时间:2024-09-16 10:48:19
产生:个人信息,状态码:403, 时间:2024-09-16 10:48:20
产生:收藏,状态码:200, 时间:2024-09-16 10:48:23
接口状态码> ResultCount(url=/pub/api/v1/web/page_video, code=200, count=1, startTime=2024-09-16 10:47:20, endTime=2024-09-16 10:48:20, type=null)
接口状态码> ResultCount(url=/user/api/v1/favorite/save, code=403, count=2, startTime=2024-09-16 10:47:20, endTime=2024-09-16 10:48:20, type=null)
接口状态码> ResultCount(url=/pub/api/v1/web/index_card, code=200, count=1, startTime=2024-09-16 10:47:20, endTime=2024-09-16 10:48:20, type=null)
接口状态码> ResultCount(url=/pub/api/v1/web/index_card, code=403, count=1, startTime=2024-09-16 10:47:20, endTime=2024-09-16 10:48:20, type=null)
接口状态码> ResultCount(url=/pub/api/v1/web/index_card, code=502, count=1, startTime=2024-09-16 10:47:20, endTime=2024-09-16 10:48:20, type=null)
接口状态码> ResultCount(url=/pub/api/v1/web/page_video, code=502, count=1, startTime=2024-09-16 10:47:20, endTime=2024-09-16 10:48:20, type=null)
产生:首页,状态码:200, 时间:2024-09-16 10:48:23
产生:收藏,状态码:502, 时间:2024-09-16 10:48:23
产生:个人信息,状态码:200, 时间:2024-09-16 10:48:26
产生:下单,状态码:502, 时间:2024-09-16 10:48:25
产生:分类列表,状态码:200, 时间:2024-09-16 10:48:25
产生:首页,状态码:403, 时间:2024-09-16 10:48:26
产生:异常url,状态码:200, 时间:2024-09-16 10:48:31
产生:下单,状态码:200, 时间:2024-09-16 10:48:31
接口状态码> ResultCount(url=/user/api/v1/favorite/save, code=403, count=2, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
接口状态码> ResultCount(url=/pub/api/v1/web/index_card, code=200, count=2, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
接口状态码> ResultCount(url=/pub/api/v1/web/user_info, code=403, count=1, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
接口状态码> ResultCount(url=/pub/api/v1/web/page_video, code=200, count=1, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
接口状态码> ResultCount(url=/pub/api/v1/web/index_card, code=502, count=1, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
接口状态码> ResultCount(url=/pub/api/v1/web/index_card, code=403, count=1, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
接口状态码> ResultCount(url=/user/api/v1/favorite/save, code=200, count=1, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
接口状态码> ResultCount(url=/pub/api/v1/web/page_video, code=502, count=1, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
接口状态码> ResultCount(url=/user/api/v1/favorite/save, code=502, count=1, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
产生:异常url,状态码:502, 时间:2024-09-16 10:48:33
产生:分类列表,状态码:200, 时间:2024-09-16 10:48:31
产生:下单,状态码:200, 时间:2024-09-16 10:48:34
CEP接口监控告警
需求:1分钟内超过3次则告警
CEP 乱序延迟问题
Flink CEP会将事件进行缓存,在相应的watermark到底之后,Flink CEP将缓存中的事件按照事件时间进行升序排序,然后再进行的模式匹配。
再迟到的数据可以用SideOutput(侧输出流)进行处理
当watermark到达时,处理该缓冲区中事件时间小于watermark时间的所有数据。
时间小于上次的watermark的时间就是迟到的数据,迟到的数据需要用侧输出流处理
SoulboyApiCodeCEPMonitorApp
import net.xdclass.util.TimeUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.List;
import java.util.Map;
public class SoulboyApiCodeCEPMonitorApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStreamSource<AccessLogDO> ds = environment.addSource(new AccessLogSource());
//过滤url: ""为不合法
SingleOutputStreamOperator<AccessLogDO> filterDS = ds.filter(new FilterFunction<AccessLogDO>() {
@Override
public boolean filter(AccessLogDO value) throws Exception {
return StringUtils.isNotBlank(value.getUrl());
}
});
//指定watermark(插线),指定eventtime的列
SingleOutputStreamOperator<AccessLogDO> watermarkDS = filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<AccessLogDO>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) -> {
//指定eventtime的列
return event.getCreateTime().getTime();
}));
//最后兜底数据
OutputTag<AccessLogDO> lateDate = new OutputTag<AccessLogDO>("lateDataLog") {
};
//多字段分组
//KeySelector<输入类型,分组key类型(Tuple2<url, state_code>)>
KeyedStream<AccessLogDO, Tuple2<String, Integer>> keyedStream = watermarkDS.keyBy(new KeySelector<AccessLogDO, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> getKey(AccessLogDO value) throws Exception {
return Tuple2.of(value.getUrl(), value.getHttpCode());
}
});
//定义pattern: 60秒内3次错误则报警
Pattern<AccessLogDO, AccessLogDO> pattern = Pattern.<AccessLogDO>begin("errorCode").where(new SimpleCondition<AccessLogDO>() {
@Override
public boolean filter(AccessLogDO value) throws Exception {
return value.getHttpCode() != 200;
}
}).timesOrMore(3).within(Time.seconds(60));
//pattern匹配数据流(模式流)
PatternStream<AccessLogDO> patternStream = CEP.pattern(keyedStream, pattern);
//从模式流中获取结果
SingleOutputStreamOperator<ResultCount> CEPResult = patternStream.select(new PatternSelectFunction<AccessLogDO, ResultCount>() {
@Override
public ResultCount select(Map<String, List<AccessLogDO>> map) throws Exception {
//获取匹配数据
List<AccessLogDO> listErrorCode = map.get("errorCode");
AccessLogDO accessLogDO = listErrorCode.get(0);
ResultCount resultCount = new ResultCount();
resultCount.setUrl(accessLogDO.getUrl());
resultCount.setCount(Long.valueOf(listErrorCode.size()));
resultCount.setCode(accessLogDO.getHttpCode());
return resultCount;
}
});
//60秒内3次错误则报警
CEPResult.print("接口告警");
environment.execute("SoulboyMonitor");
}
}
控制台输出
产生:首页,状态码:200, 时间:2024-09-16 11:18:05
产生:个人信息,状态码:403, 时间:2024-09-16 11:18:08
产生:个人信息,状态码:403, 时间:2024-09-16 11:18:11
产生:首页,状态码:200, 时间:2024-09-16 11:18:11
产生:首页,状态码:200, 时间:2024-09-16 11:18:12
产生:个人信息,状态码:403, 时间:2024-09-16 11:18:13
产生:个人信息,状态码:200, 时间:2024-09-16 11:18:11
产生:个人信息,状态码:403, 时间:2024-09-16 11:18:15
接口告警> ResultCount(url=/pub/api/v1/web/user_info, code=403, count=3, startTime=null, endTime=null, type=null)
产生:首页,状态码:200, 时间:2024-09-16 11:18:13
产生:首页,状态码:502, 时间:2024-09-16 11:18:16
接口告警> ResultCount(url=/pub/api/v1/web/user_info, code=403, count=4, startTime=null, endTime=null, type=null)
接口告警> ResultCount(url=/pub/api/v1/web/user_info, code=403, count=3, startTime=null, endTime=null, type=null)
产生:首页,状态码:403, 时间:2024-09-16 11:18:14
产生:首页,状态码:200, 时间:2024-09-16 11:18:16
产生:首页,状态码:200, 时间:2024-09-16 11:18:17
产生:首页,状态码:200, 时间:2024-09-16 11:18:21
产生:个人信息,状态码:403, 时间:2024-09-16 11:18:20
产生:个人信息,状态码:200, 时间:2024-09-16 11:18:21
产生:个人信息,状态码:502, 时间:2024-09-16 11:18:21
产生:个人信息,状态码:200, 时间:2024-09-16 11:18:23