目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

Flink

大数据

大数据(big data),或称巨量资料,指的是所涉及的资料量规模巨大到无法透过主流软件工具,在合理时间内达到撷取、管理、处理、并整理成为帮助企业经营决策更积极目的的资讯。

维克托·迈尔-舍恩伯格及肯尼斯·库克耶编写的《大数据时代》中大数据指不用随机分析法(抽样调查)这样捷径,而采用所有数据进行分析处理。大数据的 5V 特点(IBM 提出):Volume(大量)、Velocity(高速)、Variety(多样)、Value(低价值密度)、Veracity(真实性)。

image.png
image.png
image.png
image.png
image.png

人工智能

人工智能(Artificial Intelligence),英文缩写为 AI。是新一轮科技革命和产业变革的重要驱动力量,是研究、开发用于模拟、延伸和扩展人的智能的理论、方法、技术及应用系统的一门新的技术科学。

人工智能是智能学科重要的组成部分,它企图了解智能的实质,并生产出一种新的能以与人类智能相似的方式做出反应的智能机器。人工智能是十分广泛的科学,包括机器人、语言识别、图像识别自然语言处理专家系统、机器学习,计算机视觉等。

**人工智能大模型带来的治理挑战也不容忽视。**马斯克指出,在人工智能机器学习面具之下的本质仍然是统计。^ [35]^ 营造良好创新生态,需做好前瞻研究,建立健全保障人工智能健康发展的法律法规、制度体系、伦理道德。着眼未来,在重视防范风险的同时,也应同步建立容错、纠错机制,努力实现规范与发展的动态平衡。

批量计算(batch computing)

对一定规模量的数据进行处理,类似搬砖,10个10个的搬。

  • 场景:离线数据统计、报表分析等(过去 1 年 10000 亿条日志,分析日、周、月,接口响应延迟、状态码)
  • 特点:批量计算非实时、高延迟,计算完成后才可以得到结果
  • 框架:Hadoop 的 MapReduce
    image.png

流式计算(stream computing)

对源源不断的数据流进行处理,类似水龙头出水。

  • 场景:实时监控、实时风控等
  • 特点:流式计算实时、低延迟,实时取最新的结果
  • 框架:Spark(宏观上)、Flink
    image.png

区分( 离线计算和实时计算 、流式计算和批量计算)

  • 离线计算和实时计算 :是对数据处理的【延迟】不一样(一个实时和非实时)
  • 流式计算和批量计算: 是对数据处理的【方式】不一样(一个流式和一个批量)

大数据技术栈用途

阶段 技术栈 场景用途
初级阶段 Javase+Javaweb、Idea+maven+git、Linux+Mysql+Shell、Springboot+Redis、Kakfa+Docker
中级阶段 ELK 体系、Hadoop 生态:Hive(简化 MapReduce 编写代码)-Hbase(列式数据库)-MapReduce、Zookeeper、Flink+Flume(采集日志)、Scala+Spark(计算统计)
高级阶段 Zabbix 监控 +Azkaban 任务调度、Canal+ClickHouse(列式数据库)、Python、阿里云 ODPS+DataV、SpringCloud 微服务、大数据项目实战、企业数据中台 +DaaS+PaaS

image.png

Stream(JDK8)

lombok 依赖

1<dependency>
2            <groupId>org.projectlombok</groupId>
3            <artifactId>lombok</artifactId>
4            <version>1.18.16</version>
5            <scope>provided</scope>
6        </dependency>

订单类

 1import lombok.AllArgsConstructor;
 2import lombok.Data;
 3import lombok.NoArgsConstructor;
 4
 5@Data
 6@AllArgsConstructor
 7@NoArgsConstructor
 8public class VideoOrder {
 9    /**
10     * 订单号
11     */
12    private String tradeNo;
13
14    /**
15     * 订单标题
16     */
17    private String title;
18
19    /**
20     * 订单金额
21     */
22    private int money;
23}

JDK8 流式处理 stream 范例

需求:电商订单数据处理,根据下⾯的list1和list2 各10个订单

  • 统计两个⼈的分别购买订单的平均价格
  • 统计两个人的订单总价
 1import net.xdclass.model.VideoOrder;
 2import net.xdclass.model.VideoOrderOld;
 3
 4import java.util.Arrays;
 5import java.util.List;
 6import java.util.stream.Collectors;
 7
 8public class JdkStreamApp {
 9
10    public static void main(String [] args){
11        /**
12         * 数据源
13         */
14        //总价 35
15        List<VideoOrderOld> videoOrders1 = Arrays.asList(
16                new VideoOrderOld("20190242812", "springboot", 3),
17                new VideoOrderOld("20194350812", "微服务SpringCloud", 5),
18                new VideoOrderOld("20190814232", "Redis", 9),
19                new VideoOrderOld("20190523812", "⽹⻚开发", 9),
20                new VideoOrderOld("201932324", "百万并发实战Netty", 9));
21
22        //总价 54
23        List<VideoOrderOld> videoOrders2 = Arrays.asList(
24                new VideoOrderOld("2019024285312", "springboot", 3),
25                new VideoOrderOld("2019081453232", "Redis", 9),
26                new VideoOrderOld("20190522338312", "⽹⻚开发", 9),
27                new VideoOrderOld("2019435230812", "Jmeter压⼒测试", 5),
28                new VideoOrderOld("2019323542411", "Git+Jenkins持续集成", 7),
29                new VideoOrderOld("2019323542424", "Idea", 21));
30        
31        /**
32         * 平均价格(一定配置idea的 jdk8编译)
33         */
34        double videoOrder1Avg1 = videoOrders1.stream().collect(
35                Collectors.averagingInt(VideoOrderOld::getMoney)
36        ).doubleValue();
37
38        double videoOrder1Avg2 = videoOrders2.stream().collect(
39                Collectors.averagingInt(VideoOrderOld::getMoney)
40                ).doubleValue();
41        System.out.println("videoOrder1Avg1="+videoOrder1Avg1); //videoOrder1Avg1=7.0
42        System.out.println("videoOrder1Avg2="+videoOrder1Avg2); //videoOrder1Avg2=9.0
43
44        /**
45         * 订单总价
46         */
47        int total1 = videoOrders1.stream().collect(Collectors.summingInt(VideoOrderOld::getMoney)).intValue(); //7.0
48        int total2 = videoOrders2.stream().collect(Collectors.summingInt(VideoOrderOld::getMoney)).intValue(); //9.0
49        System.out.println("total1="+total1); // 35
50        System.out.println("total2="+total2); // 54
51    }
52}

Stream(JDK8) 对比 Flink

image.png

数据来源和输出有多样化怎么处理?

  • JDK stream:代码。
  • flink:自带很多组件。

海量数据需要进行实时处理

  • JDK stream:内部 jvm 单节点处理,单机内部并行处理。
  • flink:节点可以分布在不同机器的 JVM 上,多机器并行处理。

统计时间段内数据,但数据达到是无序的

  • JDK stream:写代码
  • flink:自带窗口函数和 watermark 处理迟到数据

Flink

为了实现一个天猫双十一实时交易大盘各个品类数据展示功能

  • JDK stream:一个功能耗时 1 个月完成,需求不敢轻易改动
  • flink:1 周搞定,需求可以灵活变动

image.png

image.png

Flink

   Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。
实时数仓建设、实时数据监控、实时反作弊风控、画像系统等。

数据流
    任何类型的数据都可以形成一种事件流,信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。

什么是有界流(批处理)
    有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。

什么是无界流(流处理)
    有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。

有界与无界

Apache Flink 擅长处理无界和有界数据集,有出色的性能

性能出色

代码使用例子

  • source、transformation、sink 都是 operator 算子

MAVEN 依赖

pom.xml

 1<?xml version="1.0" encoding="UTF-8"?>
 2<project xmlns="http://maven.apache.org/POM/4.0.0"
 3         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5    <modelVersion>4.0.0</modelVersion>
 6
 7    <groupId>net.xdclass</groupId>
 8    <artifactId>xdclass-flink</artifactId>
 9    <version>1.0-SNAPSHOT</version>
10
11    <properties>
12        <encoding>UTF-8</encoding>
13        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
14        <maven.compiler.source>1.8</maven.compiler.source>
15        <maven.compiler.target>1.8</maven.compiler.target>
16        <java.version>1.8</java.version>
17        <scala.version>2.12</scala.version>
18        <flink.version>1.13.1</flink.version>
19    </properties>
20
21
22    <dependencies>
23        <!--lombok依赖-->
24        <dependency>
25            <groupId>org.projectlombok</groupId>
26            <artifactId>lombok</artifactId>
27            <version>1.18.16</version>
28            <scope>provided</scope>
29        </dependency>
30
31        <!--flink客户端-->
32        <dependency>
33            <groupId>org.apache.flink</groupId>
34            <artifactId>flink-clients_${scala.version}</artifactId>
35            <version>${flink.version}</version>
36        </dependency>
37
38        <!--scala版本-->
39        <dependency>
40            <groupId>org.apache.flink</groupId>
41            <artifactId>flink-scala_${scala.version}</artifactId>
42            <version>${flink.version}</version>
43        </dependency>
44
45        <!--java版本-->
46        <dependency>
47            <groupId>org.apache.flink</groupId>
48            <artifactId>flink-java</artifactId>
49            <version>${flink.version}</version>
50        </dependency>
51
52        <!--streaming的scala版本-->
53        <dependency>
54            <groupId>org.apache.flink</groupId>
55            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
56            <version>${flink.version}</version>
57        </dependency>
58
59        <!--streaming的java版本-->
60        <dependency>
61            <groupId>org.apache.flink</groupId>
62            <artifactId>flink-streaming-java_${scala.version}</artifactId>
63            <version>${flink.version}</version>
64        </dependency>
65
66        <!--日志输出-->
67        <dependency>
68            <groupId>org.slf4j</groupId>
69            <artifactId>slf4j-log4j12</artifactId>
70            <version>1.7.7</version>
71            <scope>runtime</scope>
72        </dependency>
73
74        <!--log4j-->
75        <dependency>
76            <groupId>log4j</groupId>
77            <artifactId>log4j</artifactId>
78            <version>1.2.17</version>
79            <scope>runtime</scope>
80        </dependency>
81
82        <!--json依赖包-->
83        <dependency>
84            <groupId>com.alibaba</groupId>
85            <artifactId>fastjson</artifactId>
86            <version>1.2.44</version>
87        </dependency>
88
89        <!--Flink web ui-->
90        <dependency>
91            <groupId>org.apache.flink</groupId>
92            <artifactId>flink-runtime-web_${scala.version}</artifactId>
93            <version>${flink.version}</version>
94        </dependency>
95
96    </dependencies>
97</project>

日志配置 src/main/resources/log4j.properties

 1### 配置appender名称
 2log4j.rootLogger = debugFile, errorFile
 3
 4### debug日志级别以上的日志到:src/logs/debug.log
 5log4j.appender.debugFile = org.apache.log4j.DailyRollingFileAppender
 6log4j.appender.debugFile.File = src/logs/flink.log
 7log4j.appender.debugFile.Append = true
 8
 9#### Threshold
10log4j.appender.debugFile.Threshold = info
11log4j.appender.debugFile.layout = org.apache.log4j.PatternLayout
12log4j.appender.debugFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH🇲🇲ss}  [ %t:%r ] - [ %p ]  %n%m%n
13
14### error日志级别以上的日志到:src/logs/error.log
15log4j.appender.errorFile = org.apache.log4j.DailyRollingFileAppender
16log4j.appender.errorFile.File = src/logs/error.log
17log4j.appender.errorFile.Append = true
18log4j.appender.errorFile.Threshold = error
19log4j.appender.errorFile.layout = org.apache.log4j.PatternLayout
20log4j.appender.errorFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH🇲🇲ss}  [ %t:%r ] - [ %p ]  %n%m%n

Blink、Flink

  • 2019 年 Flink 的母公司被阿里全资收购

  • 阿里进行高度定制并取名为 Blink (加了很多特性 )

  • 阿里巴巴官方说明:Blink 不会单独作为一个开源项目运作,而是 Flink 的一部分

  • 都在不断演进中,对比其他流式计算框架(老到新)

    • Storm 只支持流处理
    • Spark Streaming (流式处理,其实是 micro-batch 微批处理,本质还是批处理)
    • Flink 支持流批一体
  • 算子 Operator

    • 将一个或多个 DataStream 转换为新的 DataStream,可以将多个转换组合成复杂的数据流拓扑
    • Source 和 Sink 是数据输入和数据输出的特殊算子,重点是 transformation 类的算子

编码与部署

1StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

在 IDEA 里面运行这样就行?(实际项目也是这样用?)

  • flink 可以本地 idea 执行模拟多线程执行,但不能读取配置文件,适合本地调试
  • 可以提交到远程搭建的 flink 集群
  • getExecutionEnvironment() 是 flink 封装好的方式可以自动判断运行模式,更方便开发,
  • 如果程序是独立调用的,此方法返回本地执行环境;
  • 如果从命令行客户端调用程序以提交到集群,则返回此集群的执行环境,是最常用的一种创建执行环境的方式

最终线上部署会把 main 函数打成 jar 包,提交到 Flink 进群进行运行, 会有 UI 可视化界面

部署方式 描述
Local 本地部署,直接启动进程,适合调试使用
Standalone Cluster 集群部署,flink 自带集群模式
On Yarn 计算资源统一由 Hadoop YARN 管理资源进行调度,按需使用提高集群的资源利用率,生产环境

Tuple(元组类型)

元组类型, 多个语言都有的特性, flink的java版 tuple最多支持25个。
函数返回(return)多个值,多个不同类型的对象,列表只能存储相同的数据类型,而元组Tuple可以存储不同的数据类型。

1/**
2     * tuple元组使用
3     */
4    private static void tupleTest() {
5        Tuple3<Integer, String, Double> tuple3 = Tuple3.of(1, "soulboy", 3.1);
6        System.out.println(tuple3.f0);  // 1
7        System.out.println(tuple3.f1);  // soulboy
8        System.out.println(tuple3.f2);  // 3.1
9    }

Java 里面的 Map 操作
一对一 转换对象,比如DO转DTO

 1/**
 2     * Map  一对一 转换对象
 3     */
 4    private static void mapTest() {
 5        List<String> list1 = new ArrayList<>();
 6        list1.add("springboot,springcloud");
 7        list1.add("redis6,docker");
 8        list1.add("kafka,rabbitmq");
 9
10        // 一对一转换
11        List<String> list2 = list1.stream().map(obj -> {
12            obj = "soulboy-" + obj;
13            return obj;
14        }).collect(Collectors.toList());
15        System.out.println(list2); // [soulboy-springboot,springcloud, soulboy-redis6,docker, soulboy-kafka,rabbitmq]
16    }

什么是 Java 里面的 FlatMap 操作
一对多转换对象

 1/**
 2     * FlatMap  一对多 转换对象
 3     */
 4    private static void mapTest() {
 5        List<String> list1 = new ArrayList<>();
 6        list1.add("springboot,springcloud");
 7        list1.add("redis6,docker");
 8        list1.add("kafka,rabbitmq");
 9
10        //一对多转换
11        List<String> list3 = list1.stream().flatMap(
12                obj -> {
13                    Stream<String> stream = Arrays.stream(obj.split(","));
14                    return stream;
15                }
16        ).collect(Collectors.toList());
17        System.out.println(list3); // [springboot, springcloud, redis6, docker, kafka, rabbitmq]
18    }

Flink 流批示例

流处理

需求:根据字符串的逗号进行分割

 1import org.apache.flink.api.common.functions.FlatMapFunction;
 2import org.apache.flink.streaming.api.datastream.DataStream;
 3import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 4import org.apache.flink.util.Collector;
 5
 6public class Flink01App {
 7
 8    /**
 9     * source
10     * transformation
11     * sink
12     * @param args
13     */
14    public static void main(String[] args) throws Exception {
15        // 1. 获取流的执行环境
16        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
17
18        //设置并行数量
19        env.setParallelism(1);
20        
21        // 2. 定义数据源 (相同类型元素的数据集)
22        DataStream<String> stringDS = env.fromElements("java,springboot","java,springcloud");
23
24        stringDS.print("处理前");
25
26        // 3. 定义数据转换操作(FlatMapFunction<String, String>, key是输入类型,value是Collector响应的收集的类型,看源码注释,也是 DataStream<String>里面泛型类型)
27        DataStream<String> flatMapDS = stringDS.flatMap(new FlatMapFunction<String, String>() {
28            @Override
29            public void flatMap(String value, Collector<String> collector) throws Exception {
30                String [] arr =  value.split(",");
31                for(String str : arr){
32                    collector.collect(str);
33                }
34            }
35        });
36
37        flatMapDS.print("处理后");
38
39        // 4. 执行任务,可以取个名字
40        env.execute("flat map job");
41    }
42}

控制台输出

1# 读一个,处理一个
2处理前> java,springboot
3处理后> java
4处理后> springboot
5处理前> java,springcloud
6处理后> java
7处理后> springcloud

批处理

需求:根据字符串的逗号进行分割
Flink1.12时支持流批一体,DataSetAPI已经不推荐使用了,都会优先使用DataStream流式API。

 1package net.xdclass.app;
 2
 3import org.apache.flink.api.common.functions.FlatMapFunction;
 4import org.apache.flink.api.java.DataSet;
 5import org.apache.flink.api.java.ExecutionEnvironment;
 6import org.apache.flink.streaming.api.datastream.DataStream;
 7import org.apache.flink.util.Collector;
 8
 9public class Flink02App {
10
11    /**
12     * source
13     * transformation
14     * sink
15     * @param args
16     */
17    public static void main(String[] args) throws Exception {
18        // 1. 获取流的执行环境
19        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
20
21        //设置并行数量
22        //env.setParallelism(1);
23
24        // 2. 定义数据源 (相同类型元素的数据集)
25        DataSet<String> stringDS = env.fromElements("java,springboot","java,springcloud");
26
27        stringDS.print("处理前");
28
29        // 3. 定义数据转换操作(FlatMapFunction<String, String>, key是输入类型,value是Collector响应的收集的类型,看源码注释,也是 DataStream<String>里面泛型类型)
30        DataSet<String> flatMapDS = stringDS.flatMap(new FlatMapFunction<String, String>() {
31            @Override
32            public void flatMap(String value, Collector<String> collector) throws Exception {
33                String [] arr =  value.split(",");
34                for(String str : arr){
35                    collector.collect(str);
36                }
37            }
38        });
39
40        flatMapDS.print("处理后");
41
42        // 4. 执行任务
43        env.execute("flat map job");
44    }
45}

控制台输出

1# 一次性全部读入
2处理前> java,springboot
3处理前> java,springcloud
4处理后> java
5处理后> springboot
6处理后> java
7处理后> springcloud

Flink 可视化控制台

WebUI 可视化界面

  • 访问:ip:8081
  • 方式一:服务端部署 Flink 集群(生产环境)
  • 方式二:本地依赖添加(测试开发)

image.png

依赖坐标

1<!--Flink web ui-->
2        <dependency>
3            <groupId>org.apache.flink</groupId>
4            <artifactId>flink-runtime-web_${scala.version}</artifactId>
5            <version>${flink.version}</version>
6        </dependency>

编码

 1package net.xdclass.app;
 2
 3import org.apache.flink.api.common.functions.FlatMapFunction;
 4import org.apache.flink.configuration.Configuration;
 5import org.apache.flink.streaming.api.datastream.DataStream;
 6import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 7import org.apache.flink.util.Collector;
 8
 9public class WebUIApp {
10    public static void main(String[] args) throws Exception {
11        //1.拿到执行环境
12        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
13        //env.setParallelism(1);
14
15        //2.从端口读取数据
16        DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);
17
18        //3.对数据进行处理
19        DataStream<String> flatMapDataStream = stringDataStream.flatMap(new FlatMapFunction<String, String>() {
20            @Override
21            public void flatMap(String value, Collector<String> out) throws Exception {
22
23                String[] arr = value.split(",");
24                for (String word : arr) {
25                    out.collect(word);
26                }
27            }
28        });
29
30        //4.输出结果
31        flatMapDataStream.print("结果");
32
33        //DataStream需要调用execute,可以取个名称
34        env.execute("data stream job");
35
36    }
37}

netcat
nc命令安装,https://blog.csdn.net/lck_csdn/article/details/125320540

webUI
地址:http://192.168.10.88:8081/#/overview

部署模式、运行流程

Flink 部署方式是灵活,主要是对 Flink 计算时所需资源的管理方式不同

部署方式 描述
Local 本地部署,直接启动进程,适合调试使用
Standalone Cluster 集群部署,flink 自带集群模式
On Yarn 计算资源统一由 Hadoop YARN 管理资源进行调度,按需使用提高集群的资源利用率,生产环境

运行流程

  • 用户提交 Flink 程序到 JobClient
  • JobClient 的 解析、优化提交到 JobManager
  • TaskManager 运行 task, 并上报信息给 JobManager
  • 通俗解释
    • JobManager 包工头
    • TaskManager 任务组长
    • Task solt 工人 (并行去做事情)

Flink 整体架构和组件角色

snapshot用户保存快照,在 TaskManager 出现故障中断时候保存已经完成的状态,待 TaskManager 正常时恢复任务现场。

Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。运行时由两种类型的进程组成:

  • 一个 JobManager
  • 一个或者多个 TaskManager

JobManager

协调 Flink 应用程序的分布式执行的功能。

  • 它决定何时调度下一个 task(或一组 task)
  • 对完成的 task 或执行失败做出反应
  • 协调 checkpoint、并且协调从失败中恢复等等

JobManager进程由三个不同的组件组成。

  • ResourceManager
    负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots
  • Dispatcher
    提供了一个 REST 接口,用来提交 Flink 应用程序执行
    为每个提交的作业启动一个新的 JobMaster。
    运行 Flink WebUI 用来提供作业执行信息
  • JobMaster
    负责管理单个JobGraph(多个算子构成)的执行,Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster
    至少有一个 JobManager,高可用(HA)设置中可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby

TaskManager

任务组长,搬砖的人。

  • 负责计算的 worker,还有上报内存、任务运行情况给 JobManager 等
  • 至少有一个 TaskManager,也称为 worker 执行作业流的 task,并且缓存和交换数据流
  • 在 TaskManager 中资源调度的最小单位是 task slot,task slot独占内存空间
  • TaskManager 中 task slot 的数量表示并发处理 task 的数量
  • 一个 task slot 中可以执行多个算子,里面多个线程
    • 算子 opetator 的执行流程
      • source
      • transformation
      • sink
  • 对于分布式执行,Flink 将算子的 subtasks 链接tasks ,每个 task 由一个线程执行
    • 图中 source 和 map 算子组成一个算子链,作为一个 task 运行在一个线程上
  • 将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量

文档

Task Slots

任务槽。

  • Task Slot 是 Flink 中的任务执行器,每个 Task Slot 可以运行多个 subtask ,每个 subtask 会以单独的线程来运行
  • 每个 worker(TaskManager)是一个 JVM 进程,可以在单独的线程中执行一个(1 个 solt)或多个 subtask
  • 为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)
  • 每个 task slot 代表 TaskManager 中资源的固定子集

注意

  • 所有Task Slot平均分配TaskManger的内存, TaskSolt 没有 CPU 隔离
  • 当前 TaskSolt 独占内存空间,作业间互不影响
  • 一个TaskManager进程里有多少个taskSolt就意味着多少个task并发
  • task solt数量建议是cpu的核数,独占内存,共享CPU

  • 5 个 subtask 执行,因此有 5 个并行线程
    • Task 正好封装了一个 Operator 或者 Operator Chain 的 parallel instance。
    • Sub-Task 强调的是同一个 Operator 或者 Operator Chain 具有多个并行的 Task
    • 图中source和map算子组成一个算子链,作为一个task运行在一个线程上
      • 算子链接成 一个 task 它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量

  • Task Slot 是 Flink 中的任务执行器,每个 Task Slot 可以运行多个 subtask ,每个 sub task 会以单独的线程来运行

  • Flink 算子之间可以通过【一对一】模式或【重新分发】模式传输数据

文档

并行度

Flink 是分布式流式计算框架,程序在多节点并行执行,所以就有并行度 Parallelism。DataStream 就像是有向无环图(DAG),每一个 数据流(DataStream) 以一个或多个 source 开始,以一个或多个 sink 结束。

  • 一个数据流( stream) 包含一个或多个分区,在不同的线程/物理机里并行执行
  • 每一个算子( operator) 包含一个或多个子任务( subtask),子任务在不同的线程/物理机里并行执行
  • 一个算子的子任务 subtask 的个数就是并行度( parallelism)

Flink流程序中不同的算子可能具有不同的并行度,可以在多个地方配置,有不同的优先级。Flink并行度配置级别 (高到低):

优先级(高到低) 说明
算子 map( xxx ).setParallelism(2)
全局 env env.setParallelism(2)
客户端 cli ./bin/flink run -p 2 xxx.jar
Flink 配置文件 `/conf/flink-conf.yaml 的

某些算子无法设置并行度,本地IDEA运行 并行度默认为cpu核数。

TaskSolt 和 parallelism 的区别

概念 说明
task slot 是静态的概念,是指 taskmanager 具有的并发执行能力
parallelism 是动态的概念,是指 程序运行时实际使用的并发能力
ask slot 是具有的能力比如可以 100 个,parallelism 是实际使用的并发,比如只要 20 个并发就行

Flink 有 3 中运行模式

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

模式 说明
STREAMING 流处理
BATCH 批处理
AUTOMATIC 根据 source 类型自动选择运行模式,基本就是使用这个

Operator:Source

Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象

层级 描述 备注
第一层 最底层的抽象为有状态实时流处理 抽象实现是 Process Function,用于底层处理
第二层 Core APIs,许多应用程序不需要使用到上述最底层抽象的 API,而是使用 Core APIs 进行开发 例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等,此层 API 中处理的数据类型在每种编程语言中都有其对应的类。
第三层 抽象是 Table API, 是以表 Table 为中心的声明式编程 API,Table API 使用起来很简洁但是表达能力差 类似数据库中关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用
第四层 最顶层抽象是 SQL 第四层最顶层抽象是 SQL,这层程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式 SQL 抽象与 Table API 抽象之间的关联是非常紧密的

注意:Table和SQL层变动多,还在持续发展中,大致知道即可,核心是第一和第二层

Flink 编程模型

Source 来源

来源类型 API
元素集合 env.fromElements、env.fromColletion、env.fromSequence(start,end)
文件/文件系统 env.readTextFile(本地文件)、env.readTextFile(HDFS 文件)
基于 Socket env.socketTextStream("ip", 8888)
自定义 Source,实现接口自定义数据源,rich 相关的 API 更丰富 并行度为 1SourceFunction、RichSourceFunction并行度大于 1ParallelSourceFunction、RichParallelSourceFunction
Connectors 与第三方系统进行对接(用于 source 或者 sink 都可以) Flink 本身提供 Connector 例如 kafka、RabbitMQ、ES 等; 注意:Flink程序打包一定要将相应的connetor相关类打包进去,不然就会失败
Apache Bahir 连接器 里面也有 kafka、RabbitMQ、ES 的连接器更多

Flink 和外部系统进行读取/写入方式

  • 第一种:Flink里面预定义的 source 和 sink。
  • 第二种:Flink 内部也提供部分 Boundled connectors。
  • 第三种:是第三方 Apache Bahir 项目中的连接器。
  • 第四种:是通过异步 IO 方式,异步I/O是Flink提供的非常底层的与外部系统交互

预定义 Source

元素集合

  • env.fromElements
  • env.fromColletion
  • env.fromSequence(start,end);
 1package net.xdclass.app;
 2
 3import org.apache.flink.api.common.RuntimeExecutionMode;
 4import org.apache.flink.streaming.api.datastream.DataStream;
 5import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 6import java.util.Arrays;
 7
 8public class Flink03Source1App {
 9
10    /**
11     * source
12     * transformation
13     * sink
14     * @param args
15     */
16    public static void main(String[] args) throws Exception {
17        // 1. 获取流的执行环境
18        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
19        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
20
21        //设置并行数量
22        //env.setParallelism(1);
23
24        // 2. 定义数据源 (相同类型元素的数据集)
25        //env.fromElements
26        DataStream<String> ds1 = env.fromElements("java,springboot","java,springcloud");
27        ds1.print("ds1:");
28
29        //env.fromColletion
30        DataStream<String> ds2 = env.fromCollection(Arrays.asList("java,springboot","java,springcloud"));
31        ds2.print("ds2:");
32
33        //env.fromSequence(start,end);
34        DataStream<Long> ds3 = env.fromSequence(1,5);
35        ds3.print("ds3:");
36
37        // 4. 执行任务
38        env.execute("flat map job");
39    }
40}

文件/文件系统

 1package net.xdclass.app;
 2
 3import org.apache.flink.api.common.RuntimeExecutionMode;
 4import org.apache.flink.streaming.api.datastream.DataStream;
 5import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 6
 7import java.util.Arrays;
 8
 9public class Flink03Source2App {
10
11    /**
12     * source
13     * transformation
14     * sink
15     * @param args
16     */
17    public static void main(String[] args) throws Exception {
18        // 1. 获取流的执行环境
19        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
20        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
21
22        //设置并行数量
23        env.setParallelism(1);
24
25        // 2. 定义数据源 (相同类型元素的数据集)
26        
27        //本地文件
28        DataStream<String> ds = env.readTextFile("C:\\Users\\chao1\\Desktop\\text.txt");
29        ds.print("ds:");
30       
31        //HDFS
32        //DataStream<String> ds2 = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");
33        //ds2.print("ds2:");
34        
35        // 4. 执行任务
36        env.execute("flat map job");
37    }
38}

基于 Socket

 1package net.xdclass.app;
 2
 3import org.apache.flink.api.common.RuntimeExecutionMode;
 4import org.apache.flink.streaming.api.datastream.DataStream;
 5import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 6
 7import java.util.Arrays;
 8
 9public class Flink03Source2App {
10
11    /**
12     * source
13     * transformation
14     * sink
15     * @param args
16     */
17    public static void main(String[] args) throws Exception {
18        // 1. 获取流的执行环境
19        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
20        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
21
22        //设置并行数量
23        //env.setParallelism(1);
24
25        // 2. 定义数据源 (相同类型元素的数据集)
26        // 从socket读取数据
27        DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);
28        stringDataStream.print();
29
30        // 3. 执行任务
31        env.execute("flat map job");
32    }
33}

自定义 Source

Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等。

并行度 需实现接口
1 SourceFunction、RichSourceFunction
大于 1 ParallelSourceFunction、RichParallelSourceFunction

Model

 1package net.xdclass.model;
 2
 3import lombok.AllArgsConstructor;
 4import lombok.Data;
 5import lombok.NoArgsConstructor;
 6
 7import java.util.Date;
 8
 9@Data
10@AllArgsConstructor
11@NoArgsConstructor
12public class VideoOrder {
13    /**
14     * 订单号
15     */
16    private String tradeNo;
17
18    /**
19     * 订单标题
20     */
21    private String title;
22
23    /**
24     * 订单金额
25     */
26    private int money;
27
28    /**
29     * 用户id
30     */  
31    private int userId;
32
33    /**
34     * 注册时间
35     */
36    private Date createTime;
37}

自定义 Source

 1package net.xdclass.source;
 2
 3import net.xdclass.model.VideoOrder;
 4import org.apache.flink.configuration.Configuration;
 5import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 6
 7import java.util.*;
 8
 9public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {
10
11    private volatile Boolean flag = true;
12    private  Random random = new Random();
13    private static List<String> list = new ArrayList<>();
14
15    //订单初始化(title信息)
16    static {
17        list.add("spring boot2.x");
18        list.add("微服务SpringCloud");
19        list.add("RabbitMQ消息队列");
20        list.add("Kafka");
21        list.add("第一季");
22        list.add("Flink流式技术");
23        list.add("工业级微服务项目");
24        list.add("Linux");
25    }
26
27    /**
28     * 自定义 Source(产生数据的逻辑)
29     * 源源不断产生订单
30     * @param sourceContext
31     * @throws Exception
32     */
33    @Override
34    public void run(SourceContext<VideoOrder> sourceContext) throws Exception {
35        while (flag) {
36            Thread.sleep(1000);
37            String id = UUID.randomUUID().toString();
38            int userId = random.nextInt(10);
39            int money = random.nextInt(100);
40            int videoNum = random.nextInt(list.size());
41            String title = list.get(videoNum);
42            sourceContext.collect(new VideoOrder(id, title, money, userId, new Date()));
43        }
44    }
45
46    /**
47     * 控制任务取消
48     */
49    @Override
50    public void cancel() {
51
52    }
53
54    /**
55     * run 方法调用前:用于初始化连接
56     * @param parameters
57     * @throws Exception
58     */
59    @Override
60    public void open(Configuration parameters) throws Exception {
61        System.out.println("-----open-----");
62    }
63
64    /**
65     * 用于清理之前
66     * @throws Exception
67     */
68    @Override
69    public void close() throws Exception {
70        System.out.println("-----close-----");
71    }
72}

运行

 1package net.xdclass.app;
 2
 3import net.xdclass.model.VideoOrder;
 4import net.xdclass.source.VideoOrderSource;
 5import org.apache.flink.api.common.RuntimeExecutionMode;
 6import org.apache.flink.streaming.api.datastream.DataStream;
 7import org.apache.flink.streaming.api.datastream.DataStreamSource;
 8import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 9
10public class Flink04CustomSourceApp {
11
12    /**
13     * source
14     * transformation
15     * sink
16     * @param args
17     */
18    public static void main(String[] args) throws Exception {
19        // 1. 获取流的执行环境
20        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
21        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
22        env.setParallelism(1);
23
24        // 2. 定义数据源 (相同类型元素的数据集)
25        DataStreamSource<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
26        videoOrderDS.print();
27
28        // 4. 执行任务
29        env.execute("flat map job");
30    }
31}

控制台输出

1-----open-----
2VideoOrder(tradeNo=16b7bbbc-b082-431a-9336-96909b29e315, title=工业级微服务项目, money=13, userId=6, createTime=Tue Jan 09 12:38:52 CST 2024)
3VideoOrder(tradeNo=0e0745ce-728b-4577-90a1-7ba93a2eb4d4, title=spring boot2.x, money=95, userId=8, createTime=Tue Jan 09 12:38:53 CST 2024)
4VideoOrder(tradeNo=d1ead911-b66e-4eb5-994d-2eff4db1fa30, title=RabbitMQ消息队列, money=9, userId=3, createTime=Tue Jan 09 12:38:54 CST 2024)
5VideoOrder(tradeNo=32aec72d-f96b-4209-bb3b-b3e0177d7956, title=spring boot2.x, money=62, userId=1, createTime=Tue Jan 09 12:38:55 CST 2024)
6VideoOrder(tradeNo=cac431ae-18cf-4237-a0de-75c84d9c0164, title=RabbitMQ消息队列, money=78, userId=6, createTime=Tue Jan 09 12:38:56 CST 2024)

对比并行度
PC处理器12核

 1package net.xdclass.app;
 2
 3import net.xdclass.model.VideoOrder;
 4import net.xdclass.source.VideoOrderSource;
 5import org.apache.flink.api.common.RuntimeExecutionMode;
 6import org.apache.flink.api.common.functions.FilterFunction;
 7import org.apache.flink.configuration.Configuration;
 8import org.apache.flink.streaming.api.datastream.DataStream;
 9import org.apache.flink.streaming.api.datastream.DataStreamSource;
10import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
11
12public class Flink04CustomSourceApp {
13
14    /**
15     * source
16     * transformation
17     * sink
18     * @param args
19     */
20    public static void main(String[] args) throws Exception {
21        // 1. 获取流的执行环境
22        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
23        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
24        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
25        env.setParallelism(2);
26
27        // 2. 定义数据源 (相同类型元素的数据集)
28        DataStream<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
29
30        // 3. 过滤打印
31        DataStream<VideoOrder> filterDS = videoOrderDS.filter(new FilterFunction<VideoOrder>() {
32            @Override
33            public boolean filter(VideoOrder videoOrder) throws Exception {
34                return videoOrder.getMoney() > 5;
35            }
36        }).setParallelism(3);
37
38        filterDS.print().setParallelism(4);
39
40        // 4. 执行任务
41        env.execute("flat map job");
42    }
43}

http://192.168.10.88:8081/#/overview

image.png

Operator:Sink

1725349165351.jpg

Sink 输出源

类型 描述
预定义 print、writeAsText
自定义 SinkFunction、RichSinkFunction:Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
flink 官方提供 Bundle Connector kafka、ES 等
Apache Bahir kafka、ES、Redis 等

预定义:print

 1import net.xdclass.model.VideoOrder;
 2import net.xdclass.source.VideoOrderSource;
 3import org.apache.flink.api.common.RuntimeExecutionMode;
 4import org.apache.flink.api.common.functions.FilterFunction;
 5import org.apache.flink.configuration.Configuration;
 6import org.apache.flink.streaming.api.datastream.DataStream;
 7import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 8
 9public class Flink05Sink1App {
10
11    /**
12     * source
13     * transformation
14     * sink
15     * @param args
16     */
17    public static void main(String[] args) throws Exception {
18        // 1. 获取流的执行环境
19        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
20        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
21        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
22        env.setParallelism(2);
23
24        // 2. 定义数据源 (相同类型元素的数据集)
25        DataStream<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
26
27        // 3. 过滤打印
28        DataStream<VideoOrder> filterDS = videoOrderDS.filter(new FilterFunction<VideoOrder>() {
29            @Override
30            public boolean filter(VideoOrder videoOrder) throws Exception {
31                return videoOrder.getMoney() > 5;
32            }
33        });
34
35        //Sink之print
36        filterDS.print();
37
38        //红色打印
39        //filterDS.printToErr();
40
41        // 4. 执行任务
42        env.execute("flat map job");
43    }
44}

自定义 Sink:连接 MySQL 存储商品订单

Sink 输出源

类型 描述
预定义 print、writeAsText
自定义 SinkFunction:如果选择继承SinkFunction,会在每次写入一条数据时都会创建一个JDBC连接、RichSinkFunction:Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
flink 官方提供 Bundle Connector kafka、ES 等
Apache Bahir kafka、ES、Redis 等

Flink 连接 MySQL 的 2 种方式(都需要加 JDBC 驱动)

  • 方式一自带flink-connector-jdbc 需要加依赖包
    1<dependency>
    2    <groupId>org.apache.flink</groupId>
    3    <artifactId>flink-connector-jdbc_2.12</artifactId>
    4    <version>1.12.0</version>
    5</dependency>
    
  • 方式二自定义sink
    1<dependency>
    2            <groupId>mysql</groupId>
    3            <artifactId>mysql-connector-java</artifactId>
    4            <version>8.0.25</version>
    5</dependency>
    

需求保存视频订单到Mysql

  • 建表
1CREATE TABLE `video_order` (
2  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
3  `user_id` int(11) DEFAULT NULL,
4  `money` int(11) DEFAULT NULL,
5  `title` varchar(32) DEFAULT NULL,
6  `trade_no` varchar(64) DEFAULT NULL,
7  `create_time` date DEFAULT NULL,
8  PRIMARY KEY (`id`)
9) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  • 添加 JDBC 依赖
1<dependency>
2            <groupId>mysql</groupId>
3            <artifactId>mysql-connector-java</artifactId>
4            <version>8.0.25</version>
5</dependency>

MysqlSink

 1import net.xdclass.model.VideoOrder;
 2import org.apache.flink.configuration.Configuration;
 3import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 4
 5import java.sql.Connection;
 6import java.sql.Date;
 7import java.sql.DriverManager;
 8import java.sql.PreparedStatement;
 9
10public class MysqlSink extends RichSinkFunction<VideoOrder> {
11
12    private Connection conn = null;
13    private PreparedStatement ps = null;
14
15    /**
16     * 建立Mysql连接
17     * @param parameters
18     * @throws Exception
19     */
20    @Override
21    public void open(Configuration parameters) throws Exception {
22        conn = DriverManager.getConnection("jdbc:mysql://192.168.10.62:3306/shop?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai", "root", "123456");
23        //sql预编译
24        String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";
25        ps = conn.prepareStatement(sql);
26    }
27
28    /**
29     * 关闭Mysql连接
30     * @throws Exception
31     */
32    @Override
33    public void close() throws Exception {
34        if (conn != null) {
35            conn.close();
36        }
37        if (ps != null) {
38            ps.close();
39        }
40    }
41
42    /**
43     * 输出逻辑
44     * @param videoOrder
45     * @param context
46     * @throws Exception
47     */
48    @Override
49    public void invoke(VideoOrder videoOrder, Context context) throws Exception {
50        //给ps中的?设置具体值
51        ps.setInt(1,videoOrder.getUserId());
52        ps.setInt(2,videoOrder.getMoney());
53        ps.setString(3,videoOrder.getTitle());
54        ps.setString(4,videoOrder.getTradeNo());
55        ps.setDate(5,new Date(videoOrder.getCreateTime().getTime()));
56
57        ps.executeUpdate();
58    }
59}

Flink06CustomSinkApp

 1import net.xdclass.model.VideoOrder;
 2import net.xdclass.source.VideoOrderSource;
 3import org.apache.flink.api.common.RuntimeExecutionMode;
 4import org.apache.flink.api.common.functions.FilterFunction;
 5import org.apache.flink.configuration.Configuration;
 6import org.apache.flink.streaming.api.datastream.DataStream;
 7import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 8
 9public class Flink06CustomSinkApp {
10
11    /**
12     * source
13     * transformation
14     * sink
15     * @param args
16     */
17    public static void main(String[] args) throws Exception {
18        // 1. 获取流的执行环境
19        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
20        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
21        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
22        //Total Task Slots1
23        env.setParallelism(1);
24
25        // 2. 定义数据源 (相同类型元素的数据集)
26        DataStream<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
27
28        // 3. 过滤打印
29        DataStream<VideoOrder> filterDS = videoOrderDS.filter(new FilterFunction<VideoOrder>() {
30            @Override
31            public boolean filter(VideoOrder videoOrder) throws Exception {
32                return videoOrder.getMoney() > 5;
33            }
34        });
35
36        //预定义Sink:print输出控制台
37        filterDS.print();
38
39        //自定义Sink:连接Mysql存储商品订单
40        filterDS.addSink(new MysqlSink());
41
42        // 4. 执行任务
43        env.execute("custom mysql flat map job");
44    }
45}

自定义 Sink:Bahir Connetor 存储数据到 Redis6

Flink 操作 Redis 的方式

  • 方式一自定义sink
  • 方式二使用connector
    Redis Sink 核心是 RedisMapper 是一个接口,使用时要编写自己的 Redis 操作类实现这个接口中的三个方法
方法名 功能
getCommandDescription 选择对应的数据结构和 key 名称配置
getKeyFromData 获取 key
getValueFromData 获取 value

引入依赖

1<!--Bahir Connetor-->
2        <dependency>
3            <groupId>org.apache.bahir</groupId>
4            <artifactId>flink-connector-redis_2.11</artifactId>
5            <version>1.0</version>
6        </dependency>

Docker 部署 Redis

1docker run -itd --name redis -p 6379:6379 -v /mydata/redis/data:/data redis:6.2.13 --requirepass 123456

image.png

需求统计各商品的销售数量

VideoOrderCounterSink

 1import org.apache.flink.api.java.tuple.Tuple2;
 2import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
 3import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
 4import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
 5
 6/**
 7 * 定义存储到Redis的数据格式:Tuple2 =  (java,3)、(flink,1)、(redis,1)、(springboot,2)、(springcloud,1)
 8 * (课程名字,销量)
 9 */
10public class VideoOrderCounterSink implements RedisMapper<Tuple2<String,Integer>> {
11
12    /**
13     * 指定redis数据结构,指定key的名称
14     * @return
15     */
16    @Override
17    public RedisCommandDescription getCommandDescription() {
18        return new RedisCommandDescription(RedisCommand.HSET,"VIDEO_ORDER_COUNTER");
19    }
20
21    /**
22     * 获取对应的key(filed值)
23     * @param data
24     * @return
25     */
26    @Override
27    public String getKeyFromData(Tuple2<String, Integer> data) {
28        return data.f0;
29    }
30
31    /**
32     * 获取对应的value值
33     * @param data
34     * @return
35     */
36    @Override
37    public String getValueFromData(Tuple2<String, Integer> data) {
38        return data.f1.toString();
39    }
40}

Flink07RedisSinkApp

 1package net.xdclass.app;
 2
 3import net.xdclass.model.VideoOrder;
 4import org.apache.flink.api.common.RuntimeExecutionMode;
 5import org.apache.flink.api.common.functions.MapFunction;
 6import org.apache.flink.api.java.functions.KeySelector;
 7import org.apache.flink.api.java.tuple.Tuple2;
 8import org.apache.flink.configuration.Configuration;
 9import org.apache.flink.streaming.api.datastream.DataStream;
10import org.apache.flink.streaming.api.datastream.KeyedStream;
11import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
12import org.apache.flink.streaming.connectors.redis.RedisSink;
13import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
14
15import java.util.Date;
16
17public class Flink07RedisSinkApp {
18
19    /**
20     * source
21     * transformation
22     * sink
23     * @param args
24     */
25    public static void main(String[] args) throws Exception {
26        // 1. 获取流的执行环境
27        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
28        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
29        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
30        //Total Task Slots1
31        env.setParallelism(1);
32
33        // 2. 定义数据源 (相同类型元素的数据集)
34        DataStream<VideoOrder> ds = env.fromElements(
35                new VideoOrder("21312","java",32,5,new Date()),
36                new VideoOrder("314","java",32,5,new Date()),
37                new VideoOrder("542","springboot",32,5,new Date()),
38                new VideoOrder("42","springcloud",32,5,new Date()),
39                new VideoOrder("52","flink",32,5,new Date()),
40                new VideoOrder("523","redis",32,5,new Date()),
41                new VideoOrder("3143","java",32,5,new Date()),
42                new VideoOrder("5422","springboot",32,5,new Date())
43        );
44
45        // 3. 定义数据转换操作
46        DataStream<Tuple2<String, Integer>> mapDS = ds.map(new MapFunction<VideoOrder, Tuple2<String, Integer>>() {
47            @Override
48            public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
49                return new Tuple2<>(value.getTitle(), 1);
50            }
51        });
52
53        //分组(根据指定的key分组:title --> value.f0)
54        KeyedStream<Tuple2<String, Integer>,String> keyByDS = mapDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
55            @Override
56            public String getKey(Tuple2<String, Integer> value) throws Exception {
57                return value.f0;
58            }
59        });
60
61        //统计每个title多少个
62        DataStream<Tuple2<String, Integer>> sumResult = keyByDS.sum(1);
63        //打印输出  (java,3)、(flink,1)、(redis,1)、(springboot,2)、(springcloud,1)
64        //sumResult.print();
65
66        // 4. 写入redis
67        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.10.62").setPort(6379).setPassword("123456").build();
68        //VideoOrderCounterSink定义了存入redis的数据结构
69        sumResult.addSink(new RedisSink<>(conf, new VideoOrderCounterSink()));
70
71        // 5. 执行任务
72        env.execute("custom redis sink job");
73    }
74}

image.png

Source Sink 对接 Kaf Connetor

环境准备

部署 JDK8
 1[root@localhost tmp]# mkdir -pv /usr/local/software
 2[root@localhost tmp]# tar -zxvf jdk-8u181-linux-x64.tar.gz
 3[root@localhost tmp]# mv jdk1.8.0_181 /usr/local/software/jdk1.8
 4[root@localhost tmp]# vim /etc/profile
 5JAVA_HOME=/usr/local/software/jdk1.8
 6CLASSPATH=$JAVA_HOME/lib/
 7PATH=$PATH:$JAVA_HOME/bin
 8export PATH JAVA_HOME CLASSPATH
 9[root@localhost tmp]# source /etc/profile
10[root@localhost tmp]# java -version
11java version "1.8.0_181"
12Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
13Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
部署 zookeepr
 1### 解压Zookeeper
 2[root@localhost tmp]# tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
 3[root@localhost tmp]# mv apache-zookeeper-3.7.0-bin /usr/local/software/zookeeper
 4[root@localhost tmp]# cp /usr/local/software/zookeeper/conf/zoo_sample.cfg /usr/local/software/zookeeper/conf/zoo.cfg
 5[root@localhost tmp]# vim /usr/local/software/zookeeper/conf/zoo.cfg
 6
 7### 启动Zookeeper (默认2181端口)
 8[root@localhost tmp]# bash /usr/local/software/zookeeper/bin/zkServer.sh start
 9[root@localhost ~]# tail -f /usr/local/software/zookeeper/logs/zookeeper_audit.log
10[root@localhost tmp]# yum install -y lsof
11[root@localhost tmp]# lsof -i:2181
12COMMAND   PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
13java    25786 root   57u  IPv6  88166      0t0  TCP *:eforward (LISTEN)
14[root@localhost ~]# tail -f
部署 kafka
 1### 解压
 2[root@localhost tmp]# tar -zxvf kafka_2.13-2.8.0.tgz
 3[root@localhost tmp]# mv kafka_2.13-2.8.0 /usr/local/software/kafka
 4
 5### 修改下面两个配置 ( listeners 配置的ip和advertised.listeners相同时启动kafka会报错)、listeners(内网Ip)、advertised.listeners(公网ip)
 6[root@localhost tmp]# vim /usr/local/software/kafka/config/server.properties
 7listeners=PLAINTEXT://192.168.10.61:9092
 8zookeeper.connect=192.168.10.61:2181
 9
10### 启动kafka
11[root@localhost tmp]# bash /usr/local/software/kafka/bin/kafka-server-start.sh -daemon /usr/local/software/kafka/config/server.properties &
12[root@localhost ~]# lsof -i:9092
13
14### 停止kafka
15[root@localhost tmp]# bash /usr/local/software/kafka/bin/kafka-server-stop.sh
16
17### 创建topic
18[root@localhost tmp]# cd /usr/local/software/kafka/bin/
19[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.10.62:2181 --replication-factor 1 --partitions 1 --topic soulboy-topic
20
21### topic存放目录
22[root@localhost ~]# ls /tmp/kafka-logs/soulboy-topic-0/
2300000000000000000000.index  00000000000000000000.timeindex
2400000000000000000000.log    leader-epoch-checkpoint
25
26### 切换目录
27 [root@localhost bin]# cd /usr/local/software/kafka/bin/
28
29### 查看topic
30[root@localhost bin]# ./kafka-topics.sh --list --zookeeper 192.168.10.62:2181
31soulboy-topic
32
33### 生产者发送消息
34[root@localhost bin]#  ./kafka-console-producer.sh --broker-list 192.168.10.62:9092 --topic soulboy-topic
35>111
36>222
37
38### 消费者消费消息 ( --from-beginning:会把主题中以往所有的数据都读取出来, 重启后会有这个重复消费,忽略偏移量)
39[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.62:9092 --from-beginning -topic soulboy-topic
40111
41222
42
43### 删除topic
44[root@localhost bin]# ./kafka-topics.sh --zookeeper 192.168.10.62:2181 --delete --topic soulboy-topic
45Topic soulboy-topic is marked for deletion.
46
47### 查看broker节点的指定topic状态信息
48[root@localhost bin]# ./kafka-topics.sh --describe --zookeeper 192.168.10.62:2181 --topic soulboy-topic
49Topic: xdclass-topic    TopicId: qZse3pJeRL6oYikgJ--V7w PartitionCount: 1   ReplicationFactor: 1    Configs:
50        Topic: xdc

Source:从消息队列 Kafka 中读取数据

之前自定义 SourceFunction,Flink 官方也有提供对接外部系统的,比如读取 Kafka

flink 官方提供的连接器

  • 添加依赖

    1<dependency>
    2    <groupId>org.apache.flink</groupId>
    3    <artifactId>flink-connector-kafka_${scala.version}</artifactId>
    4    <version>${flink.version}</version>
    5</dependency>
    
  • 编写代码FlinkKafkaConsumer继承了FlinkKafkaConsumerBase和RichParallelSourceFunction(富函数-并行读取kafka多分区)

Flink08KafkaSourceApp

 1import net.xdclass.model.VideoOrder;
 2import net.xdclass.source.VideoOrderSource;
 3import org.apache.flink.api.common.RuntimeExecutionMode;
 4import org.apache.flink.api.common.functions.MapFunction;
 5import org.apache.flink.api.common.serialization.SimpleStringSchema;
 6import org.apache.flink.api.java.functions.KeySelector;
 7import org.apache.flink.api.java.tuple.Tuple2;
 8import org.apache.flink.configuration.Configuration;
 9import org.apache.flink.streaming.api.datastream.DataStream;
10import org.apache.flink.streaming.api.datastream.KeyedStream;
11import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
12import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
13import org.apache.flink.streaming.connectors.redis.RedisSink;
14import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
15
16import java.util.Properties;
17
18public class Flink08KafkaSourceApp {
19
20    /**
21     * source
22     * transformation
23     * sink
24     * @param args
25     */
26    public static void main(String[] args) throws Exception {
27        //获取流的执行环境
28        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
29        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
30        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
31        //Total Task Slots1
32        env.setParallelism(1);
33
34        //创建属性对象
35        Properties props = new Properties();
36        //kafka地址
37        props.setProperty("bootstrap.servers", "192.168.10.62:9092");
38        //组名
39        props.setProperty("group.id", "video-order-group");
40
41        //字符串序列化和反序列化规则
42        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
43        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
44
45        //offset重置规则
46        props.setProperty("auto.offset.reset", "latest");
47
48        //自动提交(消费消息时向kafka进行反馈),2秒提交一次,避免频繁提交,提升性能
49        props.setProperty("enable.auto.commit", "true");
50        props.setProperty("auto.commit.interval.ms", "2000");
51
52        //有后台线程每隔10s检测一下Kafka的分区变化情况(Flink可以从多个kafka分区进行消费--读取数据,如果分区数量动态增加,需要让flink能感知到新的kakfa分区)
53        props.setProperty("flink.partition-discovery.interval-millis","10000");
54
55        //配置从kafka中读取数据(topic、序列化规则、kafka配置属性对象)
56        FlinkKafkaConsumer<String> consumer =new FlinkKafkaConsumer<>("soulboy-topic", new SimpleStringSchema(), props);
57
58        //设置从记录的消费者组内的offset开始消费(消费者下面有多个group,一个消息只能被group内消费一次)
59        consumer.setStartFromGroupOffsets();
60
61        //从kafka中读取数据
62        DataStream<String> ds = env.addSource(consumer);
63        ds.print();
64
65        //执行任务
66        env.execute("kafka source job");
67    }
68}

测试

 1# 消费者发送消息
 2[root@Flink bin]# ./kafka-console-producer.sh --broker-list 192.168.10.62:9092 --topic soulboy-topic
 3>123
 4>321
 5>123
 6
 7# 查看idea控制台
 8kafka> 123
 9kafka> 321
10kafka> 123

Sink:将数据发送到消息队列 Kafka 中

Flink08KafkaSourceApp

 1package net.xdclass.app;
 2
 3import net.xdclass.model.VideoOrder;
 4import net.xdclass.source.VideoOrderSource;
 5import org.apache.flink.api.common.RuntimeExecutionMode;
 6import org.apache.flink.api.common.functions.MapFunction;
 7import org.apache.flink.api.common.serialization.SimpleStringSchema;
 8import org.apache.flink.api.java.functions.KeySelector;
 9import org.apache.flink.api.java.tuple.Tuple2;
10import org.apache.flink.configuration.Configuration;
11import org.apache.flink.streaming.api.datastream.DataStream;
12import org.apache.flink.streaming.api.datastream.KeyedStream;
13import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
14import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
15import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
16import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
17import org.apache.flink.streaming.connectors.redis.RedisSink;
18import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
19
20import java.util.Properties;
21
22public class Flink08KafkaSourceApp {
23
24    /**
25     * source
26     * transformation
27     * sink
28     * @param args
29     */
30    public static void main(String[] args) throws Exception {
31        //获取流的执行环境
32        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
33        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
34        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
35        //Total Task Slots1
36        env.setParallelism(1);
37
38        //创建属性对象
39        Properties props = new Properties();
40        //kafka地址
41        props.setProperty("bootstrap.servers", "192.168.10.62:9092");
42        //组名
43        props.setProperty("group.id", "video-order-group");
44
45        //字符串序列化和反序列化规则
46        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
47        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
48
49        //offset重置规则
50        props.setProperty("auto.offset.reset", "latest");
51
52        //自动提交(消费消息时向kafka进行反馈),2秒提交一次,避免频繁提交,提升性能
53        props.setProperty("enable.auto.commit", "true");
54        props.setProperty("auto.commit.interval.ms", "2000");
55
56        //有后台线程每隔10s检测一下Kafka的分区变化情况(Flink可以从多个kafka分区进行消费--读取数据,如果分区数量动态增加,需要让flink能感知到新的kakfa分区)
57        props.setProperty("flink.partition-discovery.interval-millis","10000");
58
59        //配置FlinkKafkaConsumer(消费者)
60        FlinkKafkaConsumer<String> kafkaConsumer =new FlinkKafkaConsumer<>("soulboy-topic", new SimpleStringSchema(), props);
61
62        //设置从记录的消费者组内的offset开始消费(消费者下面有多个group,一个消息只能被group内消费一次)
63        kafkaConsumer.setStartFromGroupOffsets();
64
65        /**
66         * Source:从消息队列Kafka中读取数据
67         */
68        DataStream<String> kafkaDS = env.addSource(kafkaConsumer);
69        kafkaDS.print("kafka");
70
71        /**
72         * Sink:将数据发送到消息队列Kafka中
73         */
74        //二次处理:在每个字符串前面添加soulboy
75        DataStream<String> mapDS = kafkaDS.map(new MapFunction<String, String>() {
76            @Override
77            public String map(String value) throws Exception {
78                return "soulboy:" + value;
79            }
80        });
81        //配置FlinkKafkaProducer(生产者)
82        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>("leon-topic", new SimpleStringSchema(), props);
83        //Sink:将数据发送到消息队列Kafka中
84        mapDS.addSink(kafkaProducer);
85
86        //执行任务
87        env.execute("kafka source job");
88    }
89}

测试

 1### soulboy-topic 生产者(linux命令行)
 2[root@Flink bin]# ./kafka-console-producer.sh --broker-list 192.168.10.62:9092 --topic soulboy-topic
 3>123
 4
 5### soulboy-topic 消费者(idea控制台 代码)
 6kafka> 123
 7
 8### leon-topic 生产者(idea控制台  代码)
 9详见代码……
10
11### leon-topic 消费者(linux命令行)
12# 切换目录 
13[root@localhost bin]# cd /usr/local/software/kafka/bin/
14# 消费者消费消息 ( --from-beginning:会把主题中以往所有的数据都读取出来, 重启后会有这个重复消费,忽略偏移量) [root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.62:9092 --from-beginning -topic leon-topic
15soulboy:123

Core API 常用 Transformation 算子

示例:Map、FlatMap 算子

需求:多数算子,我们会用订单得转换-过滤-分组-统计 来实现

Map一对一转换对象

 1//源源不断产生新的订单
 2        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
 3
 4        // 定义数据转换操作: map转换,来一个记录一个,方便后续统计
 5        DataStream<Tuple2<String, Integer>> mapDS = ds.map(new MapFunction<VideoOrder, Tuple2<String, Integer>>() {
 6            @Override
 7            public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
 8                return new Tuple2<>(value.getTitle(), 1);
 9            }
10        });

FlatMap一对多转换对象
一对多转换包含一对一转换

一对一(订单转换)

 1//源源不断产生新的订单
 2        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
 3
 4        //只是一对一记录而已,没必要使用flatMap(如果使用flatMap参考如下使用)
 5        //FlatMapFunction<String, String>, key是输入类型,value是Collector响应的收集的类型,看源码注释,也是 DataStream<String>里面泛型类型
 6        DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {
 7            @Override
 8            public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {
 9                out.collect(new Tuple2<>(value.getTitle(),1));
10            }
11        });

一对多(切割)

 1DataStreamSource<String> stringDS = env.fromElements("spring,java,soulboy",
 2                "haha,xixi,lala",
 3                "hiahia,tutu,xiuxiu");
 4        SingleOutputStreamOperator<String> flatMapDS = stringDS.flatMap(new FlatMapFunction<String, String>() {
 5            @Override
 6            public void flatMap(String sourceData, Collector<String> outCollector) throws Exception {
 7                String[] arr = sourceData.split(",");
 8                for (String s : arr) {
 9                    outCollector.collect(s);
10
11                }
12            }
13        });
14        flatMapDS.print();

RichMap、RichFlatMap 算子实战

    Rich 相关的 API 更丰富,多了 Open、Close 方法,用于初始化连接等

    RichXXX 相关 Open、Close、setRuntimeContext 等 API 方法会根据并行度进行

常规的数据流转
    注意: KeyBy后的聚合函数,只处理当前分组后组内的数据,不同组内数据互不影响
    DataStream -> keyBy操作 -> KeyStream -> window操作 -> windowStream -> 聚合操作 -> DataStream

  • 比如并行度是4,那就有4次触发对应的open/close方法等,是4个不同subtask
  • 比如 RichMapFunction、RichFlatMapFunction、RichSourceFunction

RichMap

 1//并行度如果高了,控制台就会在同一时间显示多个连续的open close
 2        DataStream<Tuple2<String, Integer>> mapDS = ds.map(new RichMapFunction<VideoOrder, Tuple2<String, Integer>>() {
 3            @Override
 4            public void open(Configuration parameters) throws Exception {
 5                System.out.println("==========open");
 6            }
 7
 8            @Override
 9            public void close() throws Exception {
10                System.out.println("==========close");
11            }
12
13            @Override
14            public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
15                return new Tuple2<>(value.getTitle(), 1);
16            }
17        });

RichFlatMap

 1//并行度如果高了,控制台就会在同一时间显示多个连续的open
 2        SingleOutputStreamOperator<String> flatMapDS = stringDS.flatMap(new RichFlatMapFunction<String, String>() {
 3            @Override
 4            public void open(Configuration parameters) throws Exception {
 5                System.out.println("==========open");
 6            }
 7
 8            @Override
 9            public void close() throws Exception {
10                System.out.println("==========close");
11            }
12
13            @Override
14            public void flatMap(String sourceData, Collector<String> outCollector) throws Exception {
15                String[] arr = sourceData.split(",");
16                for (String s : arr) {
17                    outCollector.collect(s);
18
19                }
20            }
21        });

KeyBy 分组

   keyBy 是把数据流按照某个字段分区

   keyBy 后是相同的数据放到同个组里面,再进行组内统计

image.png

 1//源源不断产生新的订单
 2        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
 3
 4        //分组
 5        KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
 6            @Override
 7            public String getKey(VideoOrder videoOrder) throws Exception {
 8                return videoOrder.getTitle();
 9            }
10        });
11        //聚合(各商品总销售金额)
12        SingleOutputStreamOperator<VideoOrder> money = videoOrderStringKeyedStream.sum("money");
13
14        //控制台打印输出
15        money.print();
16/*
17        VideoOrder(tradeNo=f7838e95-a102-4565-9907-d2c6fa43d3ef, title=葵花宝典, money=53, userId=0, createTime=Thu Sep 05 11:43:42 CST 2024)
18        VideoOrder(tradeNo=f7838e95-a102-4565-9907-d2c6fa43d3ef, title=葵花宝典, money=95, userId=0, createTime=Thu Sep 05 11:43:42 CST 2024)
19        VideoOrder(tradeNo=82f6dd02-30ea-4e14-8bd7-2f4d6e0e4ed0, title=九阴真经, money=10, userId=3, createTime=Thu Sep 05 11:43:44 CST 2024)
20        VideoOrder(tradeNo=90f14227-0765-4997-92a3-069be5f8712a, title=辟邪剑谱, money=67, userId=7, createTime=Thu Sep 05 11:43:45 CST 2024)
21        VideoOrder(tradeNo=4691b5e0-8d90-4b32-8b40-3609dfa58ccf, title=一阳指, money=99, userId=7, createTime=Thu Sep 05 11:43:46 CST 2024)
22        VideoOrder(tradeNo=4691b5e0-8d90-4b32-8b40-3609dfa58ccf, title=一阳指, money=150, userId=7, createTime=Thu Sep 05 11:43:46 CST 2024)
23        VideoOrder(tradeNo=90f14227-0765-4997-92a3-069be5f8712a, title=辟邪剑谱, money=148, userId=7, createTime=Thu Sep 05 11:43:45 CST 2024)
24        VideoOrder(tradeNo=82f6dd02-30ea-4e14-8bd7-2f4d6e0e4ed0, title=九阴真经, money=27, userId=3, createTime=Thu Sep 05 11:43:44 CST 2024)
25*/

Filter 过滤

需求对商品价格大于20元的商品进行分组,统计出商品的销售总金额

链式调用

 1//源源不断产生新的订单
 2        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
 3
 4        //过滤(分组前过滤性能高)、分组、聚合
 5        DataStream<VideoOrder> sumDS = ds.filter(new FilterFunction<VideoOrder>() {
 6            @Override
 7            public boolean filter(VideoOrder videoOrder) throws Exception {
 8                return videoOrder.getMoney() > 20;
 9            }
10        }).keyBy(new KeySelector<VideoOrder, String>() {
11
12            @Override
13            public String getKey(VideoOrder videoOrder) throws Exception {
14                return videoOrder.getTitle();
15            }
16        }).sum("money");
17        sumDS.print();

Reduce

API 功能描述
sum sum是简单聚合,sum("xxx")使用的时候,如果是tuple元组则用序号,POJO则用属性名称
reduce reduce是可以自定义聚合,aggregate支持复杂的自定义聚合

reduce 示例

 1//源源不断产生新的订单
 2        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
 3
 4        //分组
 5        KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
 6            @Override
 7            public String getKey(VideoOrder videoOrder) throws Exception {
 8                return videoOrder.getTitle();
 9            }
10        });
11
12        //聚合
13        SingleOutputStreamOperator<VideoOrder> reduce = videoOrderStringKeyedStream.reduce(new ReduceFunction<VideoOrder>() {
14            @Override
15            public VideoOrder reduce(VideoOrder value1, VideoOrder value2) throws Exception {
16                //value1是历史对象,value2是加入统计的对象
17                //所以value1.f1是历史值,value2.f1是新值,不断累加
18                //设置title是因为分组需要
19                VideoOrder videoOrder = new VideoOrder();
20                videoOrder.setTitle(value1.getTitle());
21                videoOrder.setMoney(value1.getMoney() + value2.getMoney());
22                return videoOrder;
23            }
24        });
25
26        reduce.print();

maxBy、minBy

   如果是用了 keyby,在后续算子要用 maxby,minby 类型,才可以再分组里面找对应的数据

   KeyBy后可以用KeyedProcessFunction,更底层的API,有processElement和onTimer函数

maxBy 对比 max

   并发度不为1才能看得出来效果

类型 描述
maxBy、minBy 如果是 keyBy 的是对象的某个属性,则分组用 max/min 聚合统计,只有聚合的字段会更新,其他字段还是旧的,导致对象不准确
max、min 需要用 maxby/minBy 才对,让整个对象的属性都是最新的
 1//并行度不为1才看得出效果
 2        //env.setParallelism(1);
 3        DataStream<VideoOrder> ds = env.fromElements(
 4                new VideoOrder("1", "java", 31, 15, new Date()),
 5                new VideoOrder("2", "java", 32, 45, new Date()),
 6                new VideoOrder("3", "java", 33, 52, new Date()),
 7                new VideoOrder("4", "springboot", 21, 5, new Date()),
 8                new VideoOrder("5", "redis", 41, 52, new Date()),
 9                new VideoOrder("6", "redis", 40, 15, new Date()),
10                new VideoOrder("7", "kafka", 1, 55, new Date())
11        );
12
13        //maxBy
14        SingleOutputStreamOperator<VideoOrder> maxByDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
15            @Override
16            public String getKey(VideoOrder value) throws Exception {
17                return value.getTitle();
18            }
19        }).maxBy("money");
20
21        //max
22        SingleOutputStreamOperator<VideoOrder> maxDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
23            @Override
24            public String getKey(VideoOrder value) throws Exception {
25                return value.getTitle();
26            }
27        }).max("money");
28
29
30        //maxByDS.print("maxByDS:");
31        maxDS.print("maxDS:");

Slide Window

   Windows are at the heart of processing infinite streams(Window是处理无限数据流的核心

   数据流是一直源源不断产生,业务需要聚合统计使用,比如 每10秒统计过去5分钟的点击量、成交额等

   Windows 就可以将 无限的数据流拆分为有限大小的“桶 buckets”,然后程序可以对其窗口内的数据进行计算

   窗口认为是一个 Bucket 桶,一个窗口段就是一个桶,比如 8到9点是一个桶,9到10点是一个桶

应用场景

业务类型 说明
监控平台 过去一小时:站点的访问量、接口调用次数;每 5 秒统计过去一分钟接口的响应耗时和成功率(及时发现业务问题,及时处理)

窗口分类

窗口类型 说明
time Window 时间窗口,即按照一定的时间规则作为窗口统计
time-tumbling-window 时间滚动窗口 (用的多)
time-sliding-window 时间滑动窗口 (用的多)
session WIndow 会话窗口,即一个会话内的数据进行统计,相对少用
count Window 数量窗口,即按照一定的数据量作为窗口统计,相对少用

size(窗口大小) 和 slide(滑动间隔)

窗口类型 size、slide 说明
tumbling-window(滚动窗口) size = slide,如:每隔10s统计最近10s的数据
sliding-window(滑动窗口) size > slide,如:每隔5s统计最近10s的数据
开发中不推荐 size < slide 的时候,如每隔 15s 统计最近 10s 的数据,那么中间 5s 的数据会丢失
  • tumbling-window:滚动窗口: size=slide,如:每隔 10s 统计最近 10s 的数据
  • sliding-window:滑动窗口: size>slide,如:每隔 5s 统计最近 10s 的数据
  • size<slide 的时候,如每隔 15s 统计最近 10s 的数据,那么中间 5s 的数据会丢失,所以开发中不用

滑动窗口 Sliding Windows

   窗口具有固定大小

   窗口数据有重叠

   例子:每10s统计一次最近1min内的订单数量

1725583597703.jpg

滚动窗口 Tumbling Windows

   窗口具有固定大小

   窗口数据不重叠

   例子:每10s统计一次最近10s内的订单数量

1725583796075.jpg

Window 窗口 API

什么情况下才可以使用 WindowAPI
   方括号 ([…]) 中的命令是可选的,允许用多种不同的方式自定义窗口逻辑

  • 一个窗口内 的是左闭右开:[5 10)、[10 15)
  • countWindow 没过期,但 timeWindow在1.12过期,统一使用window
分组情况 说明
有 keyBy 用 window() API
没 keyBy 用 windowAll() API ,并行度低

Keyed Windows
image.png

Non-Keyed Windows
image.png

API 名称 功能描述
Window Assigners(窗口分配器) 定义了如何将元素分配给窗口,负责将每条数据分发到正确的 window 窗口上,window() 的参数是一个 WindowAssigner,flink本身提供了Tumbling、Sliding 等Assigner
trigger(窗口触发器) 用来控制一个窗口是否需要被触发,每个窗口分配器WindowAssigner都有一个默认触发器,也支持自定义触发器
window function 定义了要对窗口中收集的数据做的计算操作
增量聚合函数:aggregate(agg函数,WindowFunction(){ }),数据来一条就计算,窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中,常见的增量聚合函数有 reduceFunction、aggregateFunction,min、max、sum 都是简单的聚合操作,不需要自定义规则。AggregateFunction IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
全窗口函数:apply(new processWindowFunction(){ }),窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算。常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文更多信息)。 WindowFunction IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗
如果想处理每个元素更底层的 API 的时候用: process(new KeyedProcessFunction(){processElement、onTimer}) ,对数据进行解析 ,process 对每个元素进行处理,相当于 map+flatMap+filter

Tumbling-Window(滚动时间)示例

   窗口具有固定大小

   窗口数据有重叠

   例子:每10s统计一次最近1min内的订单数量

需求:指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口

TimeUtil

 1public class TimeUtil {
 2    public static String formatTime(Date time) {
 3        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH🇲🇲ss");
 4        //指定时区:获取当前操作系统的时区
 5        ZoneId zoneId = ZoneId.systemDefault();
 6        //返回字符串
 7        String timeStr = formatter.format(time.toInstant().atZone(zoneId));
 8        return timeStr;
 9    }
10}

VideoOrder
   重写toString(),更好的展示时间格式

 1import lombok.AllArgsConstructor;
 2import lombok.Data;
 3import lombok.NoArgsConstructor;
 4import net.xdclass.util.TimeUtil;
 5
 6import java.util.Date;
 7
 8@Data
 9@AllArgsConstructor
10@NoArgsConstructor
11public class VideoOrder {
12    /**
13     * 订单号
14     */
15    private String tradeNo;
16
17    /**
18     * 订单标题
19     */
20    private String title;
21
22    /**
23     * 订单金额
24     */
25    private int money;
26
27    /**
28     * 用户id
29     */
30    private int userId;
31
32    /**
33     * 注册时间
34     */
35    private Date createTime;
36
37    @Override
38    public String toString() {
39        return "VideoOrder{" +
40                "tradeNo='" + tradeNo + '\'' +
41                ", title='" + title + '\'' +
42                ", money=" + money +
43                ", userId=" + userId +
44                ", createTime=" + TimeUtil.formatTime(createTime) +
45                '}';
46    }
47}

VideoOrderSourceV2

 1import net.xdclass.model.VideoOrder;
 2import net.xdclass.util.TimeUtil;
 3import org.apache.flink.configuration.Configuration;
 4import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 5
 6import java.util.*;
 7
 8public class VideoOrderSourceV2 extends RichParallelSourceFunction<VideoOrder> {
 9
10    private volatile Boolean flag = true;
11    private  Random random = new Random();
12    private static List<VideoOrder> list = new ArrayList<>();
13
14    //订单初始化(title信息)
15    static {
16        list.add(new VideoOrder("", "springboot", 20, 0, null));
17        list.add(new VideoOrder("", "redis", 10, 0, null));
18    }
19
20    /**
21     * 自定义 Source(产生数据的逻辑)
22     * 源源不断产生订单
23     * @param sourceContext
24     * @throws Exception
25     */
26    @Override
27    public void run(SourceContext<VideoOrder> sourceContext) throws Exception {
28        while (flag) {
29            Thread.sleep(1000);
30            String tradeNo = UUID.randomUUID().toString().substring(30);
31            int userId = random.nextInt(10);
32            int money = random.nextInt(100);
33            int videoNum = random.nextInt(list.size());
34            VideoOrder videoOrder = list.get(videoNum);
35            videoOrder.setTradeNo(tradeNo);
36            videoOrder.setUserId(userId);
37            videoOrder.setCreateTime(new Date());
38
39
40            //VideoOrderSource
41            System.out.println("产生:" + videoOrder.getTitle() + ",价格:" + videoOrder.getMoney() + ", 时间:"+TimeUtil.formatTime(new Date()));
42            sourceContext.collect(videoOrder);
43        }
44    }
45
46    /**
47     * 控制任务取消
48     */
49    @Override
50    public void cancel() {
51
52    }
53
54    /**
55     * run 方法调用前:用于初始化连接
56     * @param parameters
57     * @throws Exception
58     */
59    @Override
60    public void open(Configuration parameters) throws Exception {
61        System.out.println("-----open-----");
62    }
63
64    /**
65     * 用于清理之前
66     * @throws Exception
67     */
68    @Override
69    public void close() throws Exception {
70        System.out.println("-----close-----");
71    }
72}

Flink14TumblingWindowApp

 1import net.xdclass.model.VideoOrder;
 2import net.xdclass.source.VideoOrderSourceV2;
 3import org.apache.flink.api.common.RuntimeExecutionMode;
 4import org.apache.flink.api.java.functions.KeySelector;
 5import org.apache.flink.configuration.Configuration;
 6import org.apache.flink.streaming.api.datastream.DataStream;
 7import org.apache.flink.streaming.api.datastream.KeyedStream;
 8import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 9import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
10import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
11import org.apache.flink.streaming.api.windowing.time.Time;
12
13
14public class Flink14TumblingWindowApp {
15
16    /**
17     * source
18     * transformation
19     * sink
20     *
21     * @param args
22     */
23    public static void main(String[] args) throws Exception {
24        //获取流的执行环境
25        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
26        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
27        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
28
29        //并行度为1更容易观察
30        env.setParallelism(1);
31        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
32
33        //maxBy
34        KeyedStream<VideoOrder, String> keyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
35            @Override
36            public String getKey(VideoOrder value) throws Exception {
37                return value.getTitle();
38            }
39        });
40
41        //窗口滚动(TumblingWindow):5秒统计一次前5秒内各商品销售的总金额
42        SingleOutputStreamOperator<VideoOrder> sumDS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("money");
43        sumDS.print();
44
45        //执行任务
46        env.execute("kafka KeyBy job");
47    }
48}

控制台输出
image.png

 1-----open-----
 2产生:springboot,价格:20, 时间:2024-09-06 15:00:59
 3产生:redis,价格:10, 时间:2024-09-06 15:01:00
 4产生:redis,价格:10, 时间:2024-09-06 15:01:01
 5产生:springboot,价格:20, 时间:2024-09-06 15:01:02
 6产生:springboot,价格:20, 时间:2024-09-06 15:01:03
 7产生:redis,价格:10, 时间:2024-09-06 15:01:04
 8VideoOrder{tradeNo='828be2', title='springboot', money=60, userId=0, createTime=2024-09-06 15:00:59}
 9VideoOrder{tradeNo='12a988', title='redis', money=20, userId=1, createTime=2024-09-06 15:01:00}
10产生:redis,价格:10, 时间:2024-09-06 15:01:05
11产生:redis,价格:10, 时间:2024-09-06 15:01:06

Sliding-Window(滑动时间窗)示例

   窗口具有固定大小

   窗口数据有重叠

   例子:每10s统计一次最近1min内的订单数量

1725583597703.jpg

Flink15SlidingWindowApp

 1import net.xdclass.model.VideoOrder;
 2import net.xdclass.source.VideoOrderSourceV2;
 3import org.apache.flink.api.common.RuntimeExecutionMode;
 4import org.apache.flink.api.java.functions.KeySelector;
 5import org.apache.flink.configuration.Configuration;
 6import org.apache.flink.streaming.api.datastream.DataStream;
 7import org.apache.flink.streaming.api.datastream.KeyedStream;
 8import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 9import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
10import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
11import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
12import org.apache.flink.streaming.api.windowing.time.Time;
13
14
15public class Flink15SlidingWindowApp {
16
17    /**
18     * source
19     * transformation
20     * sink
21     *
22     * @param args
23     */
24    public static void main(String[] args) throws Exception {
25        //获取流的执行环境
26        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
27        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
28        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
29
30        //并行度为1更容易观察
31        env.setParallelism(1);
32        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
33
34        //maxBy
35        KeyedStream<VideoOrder, String> keyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
36            @Override
37            public String getKey(VideoOrder value) throws Exception {
38                return value.getTitle();
39            }
40        });
41
42        //滑动窗口(SlidingWindows):窗口大小20秒,滑动大小5秒,每5秒统计一下20秒内各商品销售总金额
43        SingleOutputStreamOperator<VideoOrder> sumDS = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5))).sum("money");
44        sumDS.print();
45
46        //执行任务
47        env.execute("kafka SlidingWindows job");
48    }
49}

控制台输出

 1产生:redis,价格:10, 时间:2024-09-06 15:41:12
 2产生:springboot,价格:20, 时间:2024-09-06 15:41:13
 3产生:springboot,价格:20, 时间:2024-09-06 15:41:14
 4VideoOrder{tradeNo='eafb4c', title='redis', money=10, userId=1, createTime=2024-09-06 15:41:12}
 5VideoOrder{tradeNo='e6ea1d', title='springboot', money=20, userId=1, createTime=2024-09-06 15:41:13}
 6产生:redis,价格:10, 时间:2024-09-06 15:41:16
 7产生:springboot,价格:20, 时间:2024-09-06 15:41:17
 8产生:springboot,价格:20, 时间:2024-09-06 15:41:18
 9产生:redis,价格:10, 时间:2024-09-06 15:41:19
10VideoOrder{tradeNo='eafb4c', title='redis', money=30, userId=1, createTime=2024-09-06 15:41:12}
11VideoOrder{tradeNo='e6ea1d', title='springboot', money=80, userId=1, createTime=2024-09-06 15:41:13}
12产生:redis,价格:10, 时间:2024-09-06 15:41:20
13产生:redis,价格:10, 时间:2024-09-06 15:41:21
14产生:redis,价格:10, 时间:2024-09-06 15:41:22
15产生:springboot,价格:20, 时间:2024-09-06 15:41:23
16产生:redis,价格:10, 时间:2024-09-06 15:41:24
17VideoOrder{tradeNo='eafb4c', title='redis', money=70, userId=1, createTime=2024-09-06 15:41:12}
18VideoOrder{tradeNo='e6ea1d', title='springboot', money=100, userId=1, createTime=2024-09-06 15:41:13}
19产生:redis,价格:10, 时间:2024-09-06 15:41:25
20产生:redis,价格:10, 时间:2024-09-06 15:41:26
21产生:redis,价格:10, 时间:2024-09-06 15:41:27
22产生:springboot,价格:20, 时间:2024-09-06 15:41:28
23产生:redis,价格:10, 时间:2024-09-06 15:41:29
24VideoOrder{tradeNo='eafb4c', title='redis', money=110, userId=1, createTime=2024-09-06 15:41:12}
25VideoOrder{tradeNo='e6ea1d', title='springboot', money=120, userId=1, createTime=2024-09-06 15:41:13}
26产生:redis,价格:10, 时间:2024-09-06 15:41:30
27产生:redis,价格:10, 时间:2024-09-06 15:41:31  --------- 这个不算,超过20秒了,所以是12个redis
28产生:springboot,价格:20, 时间:2024-09-06 15:41:32
29产生:springboot,价格:20, 时间:2024-09-06 15:41:33
30产生:springboot,价格:20, 时间:2024-09-06 15:41:34  --------- 这个不算,超过20秒了 所以是8个springboot
31VideoOrder{tradeNo='d742fa', title='springboot', money=160, userId=0, createTime=2024-09-06 15:41:14}
32VideoOrder{tradeNo='27c5ac', title='redis', money=120, userId=0, createTime=2024-09-06 15:41:16}

Count-Window(数量窗口)示例

滚动窗口

   统计分组后同个key内的数据超过5次(个)则进行统计 countWindow(5)

Flink16CountWindowApp

 1package net.xdclass.app;
 2
 3import net.xdclass.model.VideoOrder;
 4import net.xdclass.source.VideoOrderSourceV2;
 5import org.apache.flink.api.common.RuntimeExecutionMode;
 6import org.apache.flink.api.java.functions.KeySelector;
 7import org.apache.flink.configuration.Configuration;
 8import org.apache.flink.streaming.api.datastream.DataStream;
 9import org.apache.flink.streaming.api.datastream.KeyedStream;
10import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
11import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
12import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
13import org.apache.flink.streaming.api.windowing.time.Time;
14
15
16public class Flink16CountWindowApp {
17
18    /**
19     * source
20     * transformation
21     * sink
22     *
23     * @param args
24     */
25    public static void main(String[] args) throws Exception {
26        //获取流的执行环境
27        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
28        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
29        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
30
31        //并行度为1更容易观察
32        env.setParallelism(1);
33        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
34
35        //maxBy
36        KeyedStream<VideoOrder, String> keyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
37            @Override
38            public String getKey(VideoOrder value) throws Exception {
39                return value.getTitle();
40            }
41        });
42
43        //滑动窗口(CountWindow):统计分组后同个key内的数据超过5次(个)则进行统计 countWindow(5)
44        SingleOutputStreamOperator<VideoOrder> sumDS = keyedStream.countWindow(5).sum("money");
45        
46        sumDS.print();
47
48        //执行任务
49        env.execute("kafka SlidingWindows job");
50    }
51}

控制台输出

 1-----open-----
 2产生:redis,价格:10, 时间:2024-09-06 16:15:21
 3产生:springboot,价格:20, 时间:2024-09-06 16:15:22
 4产生:redis,价格:10, 时间:2024-09-06 16:15:23
 5产生:redis,价格:10, 时间:2024-09-06 16:15:24
 6产生:redis,价格:10, 时间:2024-09-06 16:15:25
 7产生:springboot,价格:20, 时间:2024-09-06 16:15:26
 8产生:redis,价格:10, 时间:2024-09-06 16:15:27
 9VideoOrder{tradeNo='2df358', title='redis', money=50, userId=2, createTime=2024-09-06 16:15:21}
10产生:redis,价格:10, 时间:2024-09-06 16:15:28
11产生:redis,价格:10, 时间:2024-09-06 16:15:29
12产生:redis,价格:10, 时间:2024-09-06 16:15:30
13产生:springboot,价格:20, 时间:2024-09-06 16:15:31
14产生:redis,价格:10, 时间:2024-09-06 16:15:32
15产生:springboot,价格:20, 时间:2024-09-06 16:15:33
16产生:redis,价格:10, 时间:2024-09-06 16:15:34
17VideoOrder{tradeNo='dd74bb', title='redis', money=50, userId=0, createTime=2024-09-06 16:15:28}
18产生:springboot,价格:20, 时间:2024-09-06 16:15:35
19VideoOrder{tradeNo='a09451', title='springboot', money=100, userId=4, createTime=2024-09-06 16:15:22}
20产生:redis,价格:10, 时间:2024-09-06 16:15:36
21产生:springboot,价格:20, 时间:2024-09-06 16:15:37
22产生:redis,价格:10, 时间:2024-09-06 16:15:38
23产生:redis,价格:10, 时间:2024-09-06 16:15:39

滑动窗口
   只要有2个数据到达后就可以往后统计5个数据的值, countWindow(5, 2)

Flink16CountWindowApp

 1import net.xdclass.model.VideoOrder;
 2import net.xdclass.source.VideoOrderSourceV2;
 3import org.apache.flink.api.common.RuntimeExecutionMode;
 4import org.apache.flink.api.java.functions.KeySelector;
 5import org.apache.flink.configuration.Configuration;
 6import org.apache.flink.streaming.api.datastream.DataStream;
 7import org.apache.flink.streaming.api.datastream.KeyedStream;
 8import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 9import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
10import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
11import org.apache.flink.streaming.api.windowing.time.Time;
12
13
14public class Flink16CountWindowApp {
15
16    /**
17     * source
18     * transformation
19     * sink
20     *
21     * @param args
22     */
23    public static void main(String[] args) throws Exception {
24        //获取流的执行环境
25        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
26        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
27        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
28
29        //并行度为1更容易观察
30        env.setParallelism(1);
31        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
32
33        //maxBy
34        KeyedStream<VideoOrder, String> keyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
35            @Override
36            public String getKey(VideoOrder value) throws Exception {
37                return value.getTitle();
38            }
39        });
40
41        //滑动窗口(CountWindow):统计分组后同个key内的数据超过5次(个)则进行统计 countWindow(5)
42        //SingleOutputStreamOperator<VideoOrder> sumDS = keyedStream.countWindow(5).sum("money");
43        //滑动窗口(CountWindow):统计分组后同个key内的数据超过5次(个)则进行统计 countWindow(5)
44        SingleOutputStreamOperator<VideoOrder> sumDS = keyedStream.countWindow(5,2).sum("money");
45
46        sumDS.print();
47
48        //执行任务
49        env.execute("kafka SlidingWindows job");
50    }
51}

控制台输出

 1-----open-----
 2产生:springboot,价格:20, 时间:2024-09-06 16:19:45
 3产生:redis,价格:10, 时间:2024-09-06 16:19:46
 4产生:springboot,价格:20, 时间:2024-09-06 16:19:47 -------- 满2次,进行统计springboot
 5VideoOrder{tradeNo='b374ef', title='springboot', money=40, userId=2, createTime=2024-09-06 16:19:45}
 6产生:springboot,价格:20, 时间:2024-09-06 16:19:48
 7产生:springboot,价格:20, 时间:2024-09-06 16:19:49 -------- 满2次,进行统计springboot
 8VideoOrder{tradeNo='b374ef', title='springboot', money=80, userId=2, createTime=2024-09-06 16:19:45}
 9产生:springboot,价格:20, 时间:2024-09-06 16:19:50
10产生:redis,价格:10, 时间:2024-09-06 16:19:51  -------- 满2次,进行统计redis
11VideoOrder{tradeNo='c84060', title='redis', money=20, userId=1, createTime=2024-09-06 16:19:46}

增量聚合

增量聚合函数(AggregateFunction)

1aggregate(agg函数,WindowFunction(){  })
  • 窗口保存临时数据(中间数据),每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中
  • 常见的增量聚合函数有 reduceFunction、aggregateFunction
  • min、max、sum 都是简单的聚合操作,不需要自定义规则
    1AggregateFunction<IN, ACC, OUT>
    2IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
    

Flink17AggWindowApp
   和sum、reduce效果一样

 1import net.xdclass.model.VideoOrder;
 2import net.xdclass.source.VideoOrderSourceV2;
 3import org.apache.flink.api.common.RuntimeExecutionMode;
 4import org.apache.flink.api.common.functions.AggregateFunction;
 5import org.apache.flink.api.java.functions.KeySelector;
 6import org.apache.flink.configuration.Configuration;
 7import org.apache.flink.streaming.api.datastream.DataStream;
 8import org.apache.flink.streaming.api.datastream.KeyedStream;
 9import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
10import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
11import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
12import org.apache.flink.streaming.api.windowing.time.Time;
13
14
15public class Flink17AggWindowApp {
16
17    /**
18     * source
19     * transformation
20     * sink
21     *
22     * @param args
23     */
24    public static void main(String[] args) throws Exception {
25        //获取流的执行环境
26        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
27        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
28        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
29
30        //并行度为1更容易观察
31        env.setParallelism(1);
32        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
33
34        //分组
35        KeyedStream<VideoOrder, String> keyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
36            @Override
37            public String getKey(VideoOrder value) throws Exception {
38                return value.getTitle();
39            }
40        });
41        //滚动时间窗
42        SingleOutputStreamOperator<VideoOrder> aggregateDS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
43                //AggregateFunction<IN, ACC, OUT> ;IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
44                .aggregate(new AggregateFunction<VideoOrder, VideoOrder, VideoOrder>() {
45                    //累加器(初始化)
46                    @Override
47                    public VideoOrder createAccumulator() {
48                        VideoOrder videoOrder = new VideoOrder();
49                        return videoOrder;
50                    }
51
52                    //聚合方式
53                    @Override
54                    public VideoOrder add(VideoOrder videoOrder, VideoOrder accumulator) {
55                        accumulator.setMoney(videoOrder.getMoney() + accumulator.getMoney());
56                        accumulator.setTitle(videoOrder.getTitle());
57                        if (accumulator.getCreateTime() == null) {
58                            accumulator.setCreateTime(videoOrder.getCreateTime());
59                        }
60                        return accumulator;
61                    }
62
63                    //获取结果
64                    @Override
65                    public VideoOrder getResult(VideoOrder accumulator) {
66                        return accumulator;
67                    }
68
69                    //合并内容,一般不用
70                    @Override
71                    public VideoOrder merge(VideoOrder a, VideoOrder b) {
72                        VideoOrder videoOrder = new VideoOrder();
73                        videoOrder.setMoney(a.getMoney() + b.getMoney());
74                        return videoOrder;
75                    }
76                });
77
78        aggregateDS.print();
79
80        //执行任务
81        env.execute("flink aggregate job");
82    }
83}

控制台输出

 1-----open-----
 2产生:springboot,价格:20, 时间:2024-09-10 09:22:21
 3产生:springboot,价格:20, 时间:2024-09-10 09:22:22
 4产生:redis,价格:10, 时间:2024-09-10 09:22:23
 5产生:springboot,价格:20, 时间:2024-09-10 09:22:24
 6VideoOrder{tradeNo='null', title='springboot', money=60, userId=0, createTime=2024-09-10 09:22:21}
 7VideoOrder{tradeNo='null', title='redis', money=10, userId=0, createTime=2024-09-10 09:22:23}
 8产生:redis,价格:10, 时间:2024-09-10 09:22:25
 9产生:redis,价格:10, 时间:2024-09-10 09:22:26
10产生:springboot,价格:20, 时间:2024-09-10 09:22:27
11产生:redis,价格:10, 时间:2024-09-10 09:22:28
12产生:springboot,价格:20, 时间:2024-09-10 09:22:29
13VideoOrder{tradeNo='null', title='redis', money=30, userId=0, createTime=2024-09-10 09:22:25}
14VideoOrder{tradeNo='null', title='springboot', money=40, userId=0, createTime=2024-09-10 09:22:27}
15产生:springboot,价格:20, 时间:2024-09-10 09:22:30
16产生:redis,价格:10, 时间:2024-09-10 09:22:31
17产生:springboot,价格:20, 时间:2024-09-10 09:22:32
18产生:springboot,价格:20, 时间:2024-09-10 09:22:33
19产生:redis,价格:10, 时间:2024-09-10 09:22:34
20VideoOrder{tradeNo='null', title='springboot', money=60, userId=0, createTime=2024-09-10 09:22:30}
21VideoOrder{tradeNo='null', title='redis', money=20, userId=0, createTime=2024-09-10 09:22:31}
22产生:springboot,价格:20, 时间:2024-09-10 09:22:35
23产生:springboot,价格:20, 时间:2024-09-10 09:22:36
24产生:redis,价格:10, 时间:2024-09-10 09:22:37
25产生:redis,价格:10, 时间:2024-09-10 09:22:38
26产生:springboot,价格:20, 时间:2024-09-10 09:22:39
27VideoOrder{tradeNo='null', title='springboot', money=60, userId=0, createTime=2024-09-10 09:22:35}
28VideoOrder{tradeNo='null', title='redis', money=20, userId=0, createTime=2024-09-10 09:22:37}

全窗口函数(全量聚合)

窗口函数 描述
增量聚合 aggregate(new AggregateFunction(){});
全窗口函数 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
apply(new WindowFunction(){})
process(new ProcessWindowFunction(){}) //比上面这个强
  • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
  • 常见的全窗口聚合函数windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
    1IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗
    2WindowFunction<IN, OUT, KEY, W extends Window>
    

WindowFunction

   windowFunction(未来可能弃用)

Flink17ApplyWindowApp

 1import net.xdclass.model.VideoOrder;
 2import net.xdclass.source.VideoOrderSourceV2;
 3import org.apache.commons.collections.IteratorUtils;
 4import org.apache.flink.api.common.RuntimeExecutionMode;
 5import org.apache.flink.api.common.functions.AggregateFunction;
 6import org.apache.flink.api.java.functions.KeySelector;
 7import org.apache.flink.configuration.Configuration;
 8import org.apache.flink.streaming.api.datastream.DataStream;
 9import org.apache.flink.streaming.api.datastream.KeyedStream;
10import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
11import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
12import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
13import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
14import org.apache.flink.streaming.api.windowing.time.Time;
15import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
16import org.apache.flink.util.Collector;
17
18import java.util.List;
19import java.util.stream.Collectors;
20
21
22public class Flink17ApplyWindowApp {
23
24    /**
25     * source
26     * transformation
27     * sink
28     *
29     * @param args
30     */
31    public static void main(String[] args) throws Exception {
32        //获取流的执行环境
33        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
34        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
35        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
36
37        //并行度为1更容易观察
38        env.setParallelism(1);
39        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
40
41        //分组
42        KeyedStream<VideoOrder, String> keyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
43            @Override
44            public String getKey(VideoOrder value) throws Exception {
45                return value.getTitle();
46            }
47        });
48        //滚动时间窗
49        SingleOutputStreamOperator<VideoOrder> apply = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).apply(new WindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() {
50            @Override
51            public void apply(String s, TimeWindow timeWindow, Iterable<VideoOrder> input, Collector<VideoOrder> output) throws Exception {
52                List<VideoOrder> list = IteratorUtils.toList(input.iterator());
53                //list ==> stream
54                int total = list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue();
55                VideoOrder videoOrder = new VideoOrder();
56                videoOrder.setMoney(total);
57                videoOrder.setTitle(list.get(0).getTitle());
58                videoOrder.setCreateTime(list.get(0).getCreateTime());
59                output.collect(videoOrder);
60            }
61        });
62
63        apply.print();
64
65
66        //执行任务
67        env.execute("flink sliding job");
68    }
69}

控制台输出

 1-----open-----
 2产生:redis,价格:10, 时间:2024-09-10 11:20:06
 3产生:springboot,价格:20, 时间:2024-09-10 11:20:07
 4产生:redis,价格:10, 时间:2024-09-10 11:20:08
 5产生:redis,价格:10, 时间:2024-09-10 11:20:09
 6VideoOrder{tradeNo='null', title='redis', money=30, userId=0, createTime=2024-09-10 11:20:06}
 7VideoOrder{tradeNo='null', title='springboot', money=20, userId=0, createTime=2024-09-10 11:20:07}
 8产生:springboot,价格:20, 时间:2024-09-10 11:20:10
 9产生:redis,价格:10, 时间:2024-09-10 11:20:11
10产生:springboot,价格:20, 时间:2024-09-10 11:20:12
11产生:redis,价格:10, 时间:2024-09-10 11:20:13
12产生:springboot,价格:20, 时间:2024-09-10 11:20:14
13VideoOrder{tradeNo='null', title='springboot', money=60, userId=0, createTime=2024-09-10 11:20:10}
14VideoOrder{tradeNo='null', title='redis', money=20, userId=0, createTime=2024-09-10 11:20:11}
15产生:springboot,价格:20, 时间:2024-09-10 11:20:15
16产生:springboot,价格:20, 时间:2024-09-10 11:20:16
17产生:springboot,价格:20, 时间:2024-09-10 11:20:17

processWindowFunction

   processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)

1IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗
2WindowFunction<IN, OUT, KEY, W extends Window>

Flink17ProcessWindowApp

 1import net.xdclass.model.VideoOrder;
 2import net.xdclass.source.VideoOrderSourceV2;
 3import org.apache.commons.collections.IteratorUtils;
 4import org.apache.flink.api.common.RuntimeExecutionMode;
 5import org.apache.flink.api.java.functions.KeySelector;
 6import org.apache.flink.configuration.Configuration;
 7import org.apache.flink.streaming.api.datastream.DataStream;
 8import org.apache.flink.streaming.api.datastream.KeyedStream;
 9import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
10import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
11import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
12import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
13import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
14import org.apache.flink.streaming.api.windowing.time.Time;
15import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
16import org.apache.flink.util.Collector;
17
18import java.util.List;
19import java.util.stream.Collectors;
20
21
22public class Flink17ProcessWindowApp {
23
24    /**
25     * source
26     * transformation
27     * sink
28     *
29     * @param args
30     */
31    public static void main(String[] args) throws Exception {
32        //获取流的执行环境
33        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
34        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
35        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
36
37        //并行度为1更容易观察
38        env.setParallelism(1);
39        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
40
41        //分组
42        KeyedStream<VideoOrder, String> keyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
43            @Override
44            public String getKey(VideoOrder value) throws Exception {
45                return value.getTitle();
46            }
47        });
48        //滚动时间窗
49        SingleOutputStreamOperator<VideoOrder> processDS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
50                .process(new ProcessWindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() {
51                    @Override
52                    public void process(String s, ProcessWindowFunction<VideoOrder, VideoOrder, String, TimeWindow>.Context context, Iterable<VideoOrder> input, Collector<VideoOrder> output) throws Exception {
53                        List<VideoOrder> list = IteratorUtils.toList(input.iterator());
54                        //list ==> stream
55                        int total = list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue();
56                        VideoOrder videoOrder = new VideoOrder();
57                        videoOrder.setMoney(total);
58                        videoOrder.setTitle(list.get(0).getTitle());
59                        videoOrder.setCreateTime(list.get(0).getCreateTime());
60                        output.collect(videoOrder);
61                    }
62                });
63
64        processDS.print();
65
66
67        //执行任务
68        env.execute("flink sliding job");
69    }
70}

控制台输出

 1-----open-----
 2产生:redis,价格:10, 时间:2024-09-10 11:37:11
 3产生:springboot,价格:20, 时间:2024-09-10 11:37:12
 4产生:redis,价格:10, 时间:2024-09-10 11:37:13
 5产生:springboot,价格:20, 时间:2024-09-10 11:37:14
 6VideoOrder{tradeNo='null', title='redis', money=20, userId=0, createTime=2024-09-10 11:37:11}
 7VideoOrder{tradeNo='null', title='springboot', money=40, userId=0, createTime=2024-09-10 11:37:12}
 8产生:springboot,价格:20, 时间:2024-09-10 11:37:15
 9产生:springboot,价格:20, 时间:2024-09-10 11:37:16
10产生:springboot,价格:20, 时间:2024-09-10 11:37:17
11产生:redis,价格:10, 时间:2024-09-10 11:37:18
12产生:springboot,价格:20, 时间:2024-09-10 11:37:19
13VideoOrder{tradeNo='null', title='springboot', money=80, userId=0, createTime=2024-09-10 11:37:15}
14VideoOrder{tradeNo='null', title='redis', money=10, userId=0, createTime=2024-09-10 11:37:18}
15产生:redis,价格:10, 时间:2024-09-10 11:37:20
16产生:redis,价格:10, 时间:2024-09-10 11:37:21
17产生:springboot,价格:20, 时间:2024-09-10 11:37:22

迟到无序数据处理

   在使用 Window 窗口函数时候,flink 怎么知道哪个是字段是对应的时间呢?

   由于网络问题,数据先产生,但是乱序延迟了,那属于哪个时间窗呢?

Flink 里面定义窗口,可以引用不同的时间概念

Flink 时间分类 说明
事件时间 EventTime(重点关注) 事件发生的时间,事件时间是每个单独事件在其产生进程上发生的时间,这个时间通常在记录进入 Flink 之前记录在对象中,在事件时间中,时间值取决于数据产生记录的时间,而不是任何Flink机器上的
进入时间(IngestionTime) 事件到进入 Flink
处理时间(ProcessingTime) 事件被 flink 处理的时间,指正在执行相应操作的机器的系统时间,是最简单的时间概念,不需要流和机器之间的协调,它提供最佳性能和最低延迟,但是在分布式和异步环境中,处理时间有不确定性,存在延迟或乱序问题

image.png

事件时间已经能够解决所有的问题了,那为何还要用处理时间呢????

  • 处理时间由于不用考虑事件的延迟与乱序,所以处理数据的速度高效,如果一些应用比较重视处理速度而非准确性,那么就可以使用处理时间,但结果具有不确定性
  • 事件时间有延迟,但是能够保证处理的结果具有准确性,并且可以处理延迟甚至无序的数据

   做了一个电商平台卖"男装衣服",如果要统计 10 分钟内成交额,你认为是哪个时间比较好?
   应该使用事件时间(EventTime)才合理

  • (EventTime) 下单支付时间是 2022-11-11 01-01-01
  • (IngestionTime ) 进入 Flink 时间 2022-11-11 01-03-01(网络拥堵、延迟)
  • (ProcessingTime)进入窗口时间 2022-11-11 01-31-01(网络拥堵、延迟)

乱序延迟时间处理-多层保证措施

如何保证在需要的窗口内获得指定的数据?(数据有乱序延迟)

  flink 采用 watermark 、allowedLateness() 、sideOutputLateData() 三个机制来保证获取数据

机制 功能描述 补充 应用场景
watermark 防止数据出现延迟乱序,允许等待一会再触发窗口计算,"提前"输出 可以用来及时输出数据
allowedLateness() 将窗口关闭时间再延迟一段时间.设置后就像 window 变大了 那么为什么不直接把 window 设置大一点呢?或者把 watermark 加大点? watermark先输出数据,allowLateness会局部修复数据并主动更新窗口的数据输出,这期间的迟到数据不会被丢弃,而是会触发窗口重新计算 做短期的更新迟到数据
sideOutPut 后兜底操作,超过 allowLateness 后,窗口已经彻底关闭了,就会把数据放到侧输出流 测输出流 OutputTag tag = new OutputTag(){}, 由于泛型查除问题,需要重写方法,加花括号 做兜底更新保证数据准确性

image.png
   二次的趋势线是经过sideOutPut修复后的,所以会稍微多一点点

Flink 机制归纳总结
   Flink 默认的处理方式直接丢弃迟到的数据

机制 功能描述 补充
第一层(窗口 window) 从 DataStream 数据流里指定范围获取数据 DataStream没有getSideOutput方法,SingleOutputStreamOperator才有
第二层 (watermark) 是防止数据出现乱序延迟,允许窗口等待延迟数据达到,再触发计算,提前输出
第三层(allowLateness) 会让窗口关闭时间再延迟一段时间, 如果还有数据达到,会局部修复数据并主动更新窗口的数据输出
第四层 (sideOutPut) 侧输出流是最后兜底操作,在窗口已经彻底关闭后,所有过期延迟数据放到侧输出流,可以单独获取,存储到某个地方再批量更新之前的聚合的数据 sideOutPut还可以进行分流功能

版本弃用 API

1新接口,`WatermarkStrategy``TimestampAssigner`  `WatermarkGenerator` 因为其对时间戳和 watermark 等重点的抽象和分离很清晰,并且还统一了周期性和标记形式的 watermark 生成方式
2
3新接口之前是用AssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarks ,现在可以弃用了

乱序延迟时间处理-Watermark

   背景:一般我们都是用EventTime事件时间进行处理统计数据,但数据由于网络问题延迟、乱序到达会导致窗口计算数据不准确

   需求:比如时间窗是 [12:01:01,12:01:10 ) ,但是有数据延迟到达,当 12:01:10 秒数据到达的时候,不立刻触发窗口计算,而是等一定的时间,等迟到的数据来后再关闭窗口进行计算

   每天 10 点后就是迟到,需要扣工资

扣款规定 对应到技术
HR 就规定迟到 5 分钟后就 罚款100元 5 分钟就是 watermark
迟到 30 分钟就是上午 事假处理 5~30 分就是 allowLateness
不请假都是要来的,超过 30 分钟就是侧输出流 sideOutPut 兜底

Watermark 水位线
   Watermark 由 flink 的某个 operator 操作生成后,就在整个程序中随 event 数据流转
   With Periodic Watermarks(周期生成,可以定义一个最大允许乱序的时间,用的很多)
   With Punctuated Watermarks(标点水位线,根据数据流中某些特殊标记事件来生成,相对少)
衡量数据是否乱序的时间,什么时候不用等早之前的数据
   Watermark 是一个全局时间戳,不是某一个 key 下的值
   Watermark 是一个特殊字段,单调递增的方式,主要是和数据本身的时间戳做比较
   用来确定什么时候不再等待更早的数据了可以触发窗口进行计算,忍耐是有限度的,给迟到的数据一些机

   注意:Watermark 设置太小会影响数据 准确性设置太大会影响数据的 实时性,更加会 加重Flink作业的负担;需要 经过测试和业务相关联,得出一个较合适的值 即可

窗口触发计算的时机

  • watermark 之前是按照窗口的关闭时间点计算的 [12:01:01,12:01:10 )
    • watermark 之后,触发计算的时机
      • 窗口内有数据
      • Watermaker >= Window EndTime 窗口结束时间
    • 触发计算后,其他窗口内数据再到达也被丢弃
    • Watermaker = 当前计算窗口最大的事件时间 - 允许乱序延迟的时间
      11:05 = 1:10 - 5
      

数据流中的事件是有序

image-20210727143401846

数据流中的事件是无序
image-20210727143440095

Watermaker 计算
   Watermaker = 当前计算窗口最大的事件时间 - 允许乱序延迟的时间

触发 W1 窗口计算的时机

   Watermaker >= Window EndTime窗口结束时间;当前计算窗口最大的事件时间 - 允许乱序延迟的时间 >= Window EndTime 窗口结束时间

案例
   window 大小为 10s,窗口是 W1 [23:12:00~23:12:10) W2[23:12:10~23:12:20)

数据的 event time
数据 A 23:12:07
数据 B 23:12:11
数据 C 23:12:08
数据 D 23:12:17
数据 E 23:12:09

没加入 watermark,由上到下进入 flink
   上来就丢失了数据,准确度较低

  • 数据 B 到达W1就进行了窗口计算,数据只有A
  • 数据 C 到达迟到了3秒,到了之后,由于W1已经计算了,所以就丢失了数据C

加入 watermark, 允许 5 秒延迟乱序,由上到下进入 flink
   丢失的数据变少,准确度提高

  • 数据 A 到达watermark = 12:07 - 5 = 12:02 < 12:10 ,所以不触发W1计算, A属于W1
  • 数据 B 到达watermark = max{ 12:11, 12:07} - 5 = 12:06 < 12:10 ,所以不触发W1计算, B属于W2
  • 数据 C 到达watermark = max{12:08, 12:11, 12:07} - 5 = 12:06 < 12:10 ,所以不触发W1计算, C属于W1
  • 数据 D 到达watermark = max{12:17, 12:08, 12:11, 12:07} - 5 = 12:12 > 23:12:10 , 触发W1计算, D属于W2
  • 数据 E 到达watermark = max{12:09, 12:17, 12:08, 12:11, 12:07} - 5 = 12:12 > 23:12:10 , 之前已触发W1计算, 所以丢失了E数据

Watermark+Window 编码实战

需求

  • 分组统计不同商品的成交总价
  • 数据有乱序延迟,允许3秒的时间
  • 10钟统计一次

时间工具类

 1import java.time.LocalDateTime;
 2import java.time.ZoneId;
 3import java.time.format.DateTimeFormatter;
 4import java.util.Date;
 5
 6public class TimeUtil {
 7    public static String formatTime(Date time) {
 8        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH🇲🇲ss");
 9        //指定时区:获取当前操作系统的时区
10        ZoneId zoneId = ZoneId.systemDefault();
11        //返回字符串
12        String timeStr = formatter.format(time.toInstant().atZone(zoneId));
13        return timeStr;
14    }
15
16    /**
17     * date 转 字符串
18     *
19     * @param timestamp
20     * @return
21     */
22    public static String format(long timestamp) {
23        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH🇲🇲ss");
24        ZoneId zoneId = ZoneId.systemDefault();
25        String timeStr = formatter.format(new Date(timestamp).toInstant().atZone(zoneId));
26        return timeStr;
27    }
28
29    /**
30     * 字符串 转 date
31     *
32     * @param time
33     * @return
34     */
35    public static Date strToDate(String time) {
36        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH🇲🇲ss");
37        LocalDateTime localDateTime = LocalDateTime.parse(time, formatter);
38        return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
39    }
40}

Flink18WatermarkWindowApp

  1import net.xdclass.model.VideoOrder;
  2import net.xdclass.source.VideoOrderSourceV2;
  3import net.xdclass.util.TimeUtil;
  4import org.apache.commons.collections.IteratorUtils;
  5import org.apache.flink.api.common.RuntimeExecutionMode;
  6import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  7import org.apache.flink.api.common.functions.FlatMapFunction;
  8import org.apache.flink.api.java.functions.KeySelector;
  9import org.apache.flink.api.java.tuple.Tuple3;
 10import org.apache.flink.configuration.Configuration;
 11import org.apache.flink.streaming.api.datastream.DataStream;
 12import org.apache.flink.streaming.api.datastream.DataStreamSource;
 13import org.apache.flink.streaming.api.datastream.KeyedStream;
 14import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 15import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 16import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 17import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 18import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 19import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 20import org.apache.flink.streaming.api.windowing.time.Time;
 21import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 22import org.apache.flink.util.Collector;
 23
 24import java.time.Duration;
 25import java.util.ArrayList;
 26import java.util.List;
 27import java.util.stream.Collectors;
 28
 29
 30public class Flink19WatermarkWindowApp {
 31
 32    /**
 33     * source
 34     * transformation
 35     * sink
 36     *
 37     * @param args
 38     */
 39    public static void main(String[] args) throws Exception {
 40        //获取流的执行环境
 41        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 42        //并行度为1更容易观察
 43        env.setParallelism(1);
 44
 45        //数据来源
 46        //java,2022-11-11 23:12:07,10
 47        //java,2022-11-11 23:12:11,10
 48        DataStreamSource<String> ds = env.socketTextStream("127.0.0.1", 8888);
 49
 50        //一对多(转成Tuple3)
 51        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
 52            @Override
 53            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
 54                String[] arr = value.split(",");
 55                out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
 56            }
 57        });
 58
 59        //EventTime指定为arr[1] == 指定POJO的事件时间列
 60        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermakerDS = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy
 61                //指定最大允许的(延迟/乱序)时间
 62                .<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
 63                .withTimestampAssigner(
 64                        (event, timestamp) -> {
 65                            //指定POJO的事件时间列;EventTime指定为arr[1]
 66                            return TimeUtil.strToDate(event.f1).getTime();
 67                        }
 68                ));
 69        //分组(keyBy)
 70        SingleOutputStreamOperator<String> sumDS = watermakerDS.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
 71                    @Override
 72                    public String getKey(Tuple3<String, String, Integer> value) throws Exception {
 73                        //指定分组的列(title列)
 74                        return value.f0;
 75                    }
 76                })       //开窗(window)
 77                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
 78                //全量聚合:方便调试拿到窗口全部数据,全窗口函数
 79                .apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
 80                    @Override
 81                    public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> input, Collector<String> output) throws Exception {
 82                        //准备list,存放窗口的事件时间
 83                        ArrayList<String> timeList = new ArrayList<>();
 84                        //金额
 85                        int total = 0;
 86                        for (Tuple3<String, String, Integer> order : input) {
 87                            timeList.add(order.f1);
 88                            total = total + order.f2;
 89                        }
 90                        String resultStr = String.format("分组key:%s,聚合值:%s,窗口开始结束:[%s~%s),窗口所有事件时间:%s", key,total, TimeUtil.format(timeWindow.getStart()),TimeUtil.format(timeWindow.getEnd()), timeList);
 91                        output.collect(resultStr);
 92                    }
 93
 94                });
 95
 96        sumDS.print();
 97
 98        //执行任务
 99        env.execute("flink watermark job");
100    }
101}

测试数据

  • 窗口 [23:12:00 ~ 23:12:10) | [23:12:10 ~ 23:12:20)
  • 触发窗口计算条件
    • 窗口内有数据
    • watermark >= 窗口 endtime
    • 即 当前计算窗口最大的事件时间 - 允许乱序延迟的时间 >= Window EndTime 窗口结束时间
 1java,2022-11-11 23:12:07,10
 2java,2022-11-11 23:12:11,10
 3java,2022-11-11 23:12:08,10
 4mysql,2022-11-11 23:12:13,10
 5java,2022-11-11 23:12:13,10
 6java,2022-11-11 23:12:17,10
 7java,2022-11-11 23:12:09,10
 8java,2022-11-11 23:12:20,10
 9java,2022-11-11 23:12:22,10
10java,2022-11-11 23:12:23,10

netcat

 1C:\Users\chao1>cd C:\Tools\netcat-win32-1.12
 2
 3C:\Tools\netcat-win32-1.12>nc -lp 8888
 4java,2022-11-11 23:12:07,10
 5java,2022-11-11 23:12:11,10
 6java,2022-11-11 23:12:08,10
 7mysql,2022-11-11 23:12:13,10
 8java,2022-11-11 23:12:13,10
 9java,2022-11-11 23:12:17,10
10java,2022-11-11 23:12:09,10
11java,2022-11-11 23:12:20,10
12java,2022-11-11 23:12:22,10
13java,2022-11-11 23:12:23,10

控制台输出
   丢失数据4条数据

1分组key:java,聚合值:20,窗口开始结束:[2022-11-11 23:12:00~2022-11-11 23:12:10),窗口所有事件时间:[2022-11-11 23:12:07, 2022-11-11 23:12:08]
2分组key:mysql,聚合值:10,窗口开始结束:[2022-11-11 23:12:10~2022-11-11 23:12:20),窗口所有事件时间:[2022-11-11 23:12:13]
3分组key:java,聚合值:30,窗口开始结束:[2022-11-11 23:12:10~2022-11-11 23:12:20),窗口所有事件时间:[2022-11-11 23:12:11, 2022-11-11 23:12:13, 2022-11-11 23:12:17]

二次兜底延迟数据处理 - Allowed Lateness

   超过了 watermark 的等待后,还有延迟数据到达怎么办?

   watermark 先输出,然后配置 allowedLateness 再延长时间,然后 延迟数据到达后更新之前的窗口数据

Flink19AllowLatenessWindowApp

 1import net.xdclass.util.TimeUtil;
 2import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 3import org.apache.flink.api.common.functions.FlatMapFunction;
 4import org.apache.flink.api.java.functions.KeySelector;
 5import org.apache.flink.api.java.tuple.Tuple3;
 6import org.apache.flink.streaming.api.datastream.DataStreamSource;
 7import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 8import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 9import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
10import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
11import org.apache.flink.streaming.api.windowing.time.Time;
12import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
13import org.apache.flink.util.Collector;
14
15import java.time.Duration;
16import java.util.ArrayList;
17
18
19public class Flink19AllowLatenessWindowApp {
20
21    /**
22     * source
23     * transformation
24     * sink
25     *
26     * @param args
27     */
28    public static void main(String[] args) throws Exception {
29        //获取流的执行环境
30        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
31        //并行度为1更容易观察
32        env.setParallelism(1);
33
34        //数据来源
35        //java,2022-11-11 23:12:07,10
36        //java,2022-11-11 23:12:11,10
37        DataStreamSource<String> ds = env.socketTextStream("127.0.0.1", 8888);
38
39        //一对多(转成Tuple3)
40        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
41            @Override
42            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
43                String[] arr = value.split(",");
44                out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
45            }
46        });
47
48        //EventTime指定为arr[1] == 指定POJO的事件时间列
49        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermakerDS = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy
50                //指定最大允许的(延迟/乱序)时间
51                .<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
52                .withTimestampAssigner(
53                        (event, timestamp) -> {
54                            //指定POJO的事件时间列;EventTime指定为arr[1]
55                            return TimeUtil.strToDate(event.f1).getTime();
56                        }
57                ));
58        //分组(keyBy)
59        SingleOutputStreamOperator<String> sumDS = watermakerDS.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
60                    @Override
61                    public String getKey(Tuple3<String, String, Integer> value) throws Exception {
62                        //指定分组的列(title列)
63                        return value.f0;
64                    }
65                })       //开窗(window)
66                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
67                // 允许延迟一分钟
68                .allowedLateness(Time.minutes(1))
69                //全量聚合:方便调试拿到窗口全部数据,全窗口函数
70                .apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
71                    @Override
72                    public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> input, Collector<String> output) throws Exception {
73                        //准备list,存放窗口的事件时间
74                        ArrayList<String> timeList = new ArrayList<>();
75                        //金额
76                        int total = 0;
77                        for (Tuple3<String, String, Integer> order : input) {
78                            timeList.add(order.f1);
79                            total = total + order.f2;
80                        }
81                        String resultStr = String.format("分组key:%s,聚合值:%s,窗口开始结束:[%s~%s),窗口所有事件时间:%s", key, total, TimeUtil.format(timeWindow.getStart()), TimeUtil.format(timeWindow.getEnd()), timeList);
82                        output.collect(resultStr);
83                    }
84
85                });
86
87        sumDS.print();
88
89        //执行任务
90        env.execute("flink watermark job");
91    }
92}

测试数据

  • 窗口 [23:12:00 ~ 23:12:10) | [23:12:10 ~ 23:12:20)
  • 触发窗口计算条件
    • 窗口内有数据
    • watermark >= 窗口 endtime
    • 即 当前计算窗口最大的事件时间 - 允许乱序延迟的时间 >= Window EndTime 窗口结束时间
1java,2022-11-11 23:12:07,10
2java,2022-11-11 23:12:11,10
3java,2022-11-11 23:12:08,10
4java,2022-11-11 23:12:13,10
5java,2022-11-11 23:12:23,10
6java,2022-11-11 23:12:09,10
7java,2022-11-11 23:12:02,10
8java,2022-11-11 23:14:30,10
9java,2022-11-11 23:12:03,10

netcat

 1C:\Users\chao1>cd C:\Tools\netcat-win32-1.12
 2
 3C:\Tools\netcat-win32-1.12>nc -lp 8888
 4java,2022-11-11 23:12:07,10
 5java,2022-11-11 23:12:11,10
 6java,2022-11-11 23:12:08,10
 7mysql,2022-11-11 23:12:13,10
 8java,2022-11-11 23:12:13,10
 9java,2022-11-11 23:12:17,10
10java,2022-11-11 23:12:09,10
11java,2022-11-11 23:12:20,10
12java,2022-11-11 23:12:22,10
13java,2022-11-11 23:12:23,10

控制台输出

 1java,2022-11-11 23:12:07,10
 2java,2022-11-11 23:12:11,10
 3java,2022-11-11 23:12:08,10
 4java,2022-11-11 23:12:13,10
 5java,2022-11-11 23:12:23,10
 6#延迟1分钟内,所以会输出
 7java,2022-11-11 23:12:09,10
 8java,2022-11-11 23:12:02,10
 9java,2022-11-11 23:14:30,10
10#延迟超过1分钟,不会输出
11java,2022-11-11 23:12:03,10

兜底延迟数据处理-SideOutput 侧输出流编码

   容忍度更好,还能保证实时输出

背景

  • 超过了 watermark 的等待后,还有延迟数据到达怎么办?
  • watermark 先输出,然后配置 allowedLateness 再延长时间,然后到了后更新之前的窗口数据
  • 数据超过了 allowedLateness 后,就丢失了吗?用侧输出流 SideOutput

Flink20SideOutPutLateDataWindowApp

  1import net.xdclass.util.TimeUtil;
  2import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3import org.apache.flink.api.common.functions.FlatMapFunction;
  4import org.apache.flink.api.java.functions.KeySelector;
  5import org.apache.flink.api.java.tuple.Tuple3;
  6import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 10import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 11import org.apache.flink.streaming.api.windowing.time.Time;
 12import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 13import org.apache.flink.util.Collector;
 14import org.apache.flink.util.OutputTag;
 15
 16import java.time.Duration;
 17import java.util.ArrayList;
 18
 19
 20public class Flink20SideOutPutLateDataWindowApp {
 21
 22    /**
 23     * source
 24     * transformation
 25     * sink
 26     *
 27     * @param args
 28     */
 29    public static void main(String[] args) throws Exception {
 30        //获取流的执行环境
 31        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 32        //并行度为1更容易观察
 33        env.setParallelism(1);
 34
 35        //数据来源
 36        //java,2022-11-11 23:12:07,10
 37        //java,2022-11-11 23:12:11,10
 38        DataStreamSource<String> ds = env.socketTextStream("127.0.0.1", 8888);
 39
 40        //一对多(转成Tuple3)
 41        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
 42            @Override
 43            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
 44                String[] arr = value.split(",");
 45                out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
 46            }
 47        });
 48
 49        //EventTime指定为arr[1] == 指定POJO的事件时间列
 50        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermakerDS = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy
 51                //指定最大允许的(延迟/乱序)时间
 52                .<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
 53                .withTimestampAssigner(
 54                        (event, timestamp) -> {
 55                            //指定POJO的事件时间列;EventTime指定为arr[1]
 56                            return TimeUtil.strToDate(event.f1).getTime();
 57                        }
 58                ));
 59
 60        //兜底数据
 61        OutputTag<Tuple3<String, String,Integer>> lateData = new OutputTag<Tuple3<String, String,Integer>>("lateData"){};
 62        //分组(keyBy)
 63        SingleOutputStreamOperator<String> sumDS = watermakerDS.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
 64                    @Override
 65                    public String getKey(Tuple3<String, String, Integer> value) throws Exception {
 66                        //指定分组的列(title列)
 67                        return value.f0;
 68                    }
 69                })       //开窗(window)
 70                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
 71                // 允许延迟一分钟
 72                .allowedLateness(Time.minutes(1))
 73                //兜底数据
 74                .sideOutputLateData(lateData)
 75                //全量聚合:方便调试拿到窗口全部数据,全窗口函数
 76                .apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
 77                    @Override
 78                    public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> input, Collector<String> output) throws Exception {
 79                        //准备list,存放窗口的事件时间
 80                        ArrayList<String> timeList = new ArrayList<>();
 81                        //金额
 82                        int total = 0;
 83                        for (Tuple3<String, String, Integer> order : input) {
 84                            timeList.add(order.f1);
 85                            total = total + order.f2;
 86                        }
 87                        String resultStr = String.format("分组key:%s,聚合值:%s,窗口开始结束:[%s~%s),窗口所有事件时间:%s", key, total, TimeUtil.format(timeWindow.getStart()), TimeUtil.format(timeWindow.getEnd()), timeList);
 88                        output.collect(resultStr);
 89                    }
 90
 91                });
 92
 93        sumDS.print();
 94        //不会更新之前的窗口数据,需要代码单独写逻辑处理更新之前的数据,也可以积累后批处理
 95        //TODO... 最后兜底处理,更新之前的数据(redis kafka……)
 96        sumDS.getSideOutput(lateData).print("late data")
 97        //执行任务
 98        env.execute("flink watermark job");
 99    }
100}

测试数据

  • 窗口 [23:12:00 ~ 23:12:10) | [23:12:10 ~ 23:12:20)
  • 触发窗口计算条件
    • 窗口内有数据
    • watermark >= 窗口 endtime
    • 即 当前计算窗口最大的事件时间 - 允许乱序延迟的时间 >= Window EndTime 窗口结束时间
 1java,2022-11-11 23:12:07,10
 2java,2022-11-11 23:12:11,10
 3java,2022-11-11 23:12:08,10
 4java,2022-11-11 23:12:13,10
 5java,2022-11-11 23:12:23,10
 6java,2022-11-11 23:12:09,10
 7java,2022-11-11 23:12:02,10
 8java,2022-11-11 23:14:30,10
 9java,2022-11-11 23:12:03,10
10java,2022-11-11 23:12:04,10

netcat

1C:\Users\chao1>cd C:\Tools\netcat-win32-1.12
2C:\Tools\netcat-win32-1.12>nc -lp 8888

控制台输出

 1java,2022-11-11 23:12:07,10
 2java,2022-11-11 23:12:11,10
 3java,2022-11-11 23:12:08,10
 4java,2022-11-11 23:12:13,10
 5java,2022-11-11 23:12:23,10
 6#延迟1分钟内,所以会输出
 7java,2022-11-11 23:12:09,10
 8java,2022-11-11 23:12:02,10
 9java,2022-11-11 23:14:30,10
10#延迟超过1分钟,不会输出,配置了sideOutPut,会在兜底输出
11java,2022-11-11 23:12:03,10
12java,2022-11-11 23:12:04,10
13
14
15
16分组key:java,聚合值:20,窗口开始结束:[2022-11-11 23:12:00~2022-11-11 23:12:10),窗口所有事件时间:[2022-11-11 23:12:07, 2022-11-11 23:12:08]
17分组key:java,聚合值:20,窗口开始结束:[2022-11-11 23:12:10~2022-11-11 23:12:20),窗口所有事件时间:[2022-11-11 23:12:11, 2022-11-11 23:12:13]
18### 延迟1分钟内,所以会输出
19分组key:java,聚合值:30,窗口开始结束:[2022-11-11 23:12:00~2022-11-11 23:12:10),窗口所有事件时间:[2022-11-11 23:12:07, 2022-11-11 23:12:08, 2022-11-11 23:12:09]
20分组key:java,聚合值:40,窗口开始结束:[2022-11-11 23:12:00~2022-11-11 23:12:10),窗口所有事件时间:[2022-11-11 23:12:07, 2022-11-11 23:12:08, 2022-11-11 23:12:09, 2022-11-11 23:12:02]
21分组key:java,聚合值:10,窗口开始结束:[2022-11-11 23:12:20~2022-11-11 23:12:30),窗口所有事件时间:[2022-11-11 23:12:23]
22### 延迟超过1分钟,不会输出,配置了sideOutPut,会在兜底输出
23late data> (java,2022-11-11 23:12:03,10)
24late data> (java,2022-11-11 23:12:04,10)

Flink 状态管理-State

什么是 State 状态

   数据流处理离不开状态管理,比如窗口聚合统计去重排序等。是一个Operator的运行的状态/历史值,是维护在内存中的。

流程
   一个算子的子任务接收输入流,获取对应的状态,计算新的结果,然后把结果更新到状态里面

image.png

有状态、无状态

算子类型 描述
无状态计算 同个数据进到算子里面多少次,都是一样的输出,比如 filter (过/不过)
有状态计算 需要考虑历史状态,同个输入会有不同的输出,比如sum、reduce聚合操作

状态管理分类

类型 细分 描述
Keyed State 键控状态(用的多) Flink 管理,自动存储恢复
Keyed State 键控状态(用的多) 有 KeyBy 才用这个,仅限用在 KeyStream 中,每个 key 都有 state ,是基于 KeyedStream 上的状态,一般是用 richFlatFunction,或者其他 richfunction 里面,在 open()声明周期里面进行初始化,ValueState、ListState、MapState 等数据结构
Operator State 算子状态(用的少,部分 source 会用) ListState、UnionListState、BroadcastState 等数据结构
RawState(用的少) 用户自己管理和维护 ,存储结构:二进制数组

State 数据结构
   状态值可能存在内存、磁盘、DB或者其他分布式存储中

数据结构 描述 API
ValueState 简单的存储一个值(ThreadLocal / String) ValueState.value()、ValueState.update(T value)
ListState 列表 ListState.add(T value)、ListState.get() //得到一个Iterator
MapState 映射类型 MapState.get(key)、MapState.put(key, value)

Flink 的状态 State 后端存储

   从 Flink 1.13 开始,社区重新设计了其公共状态后端类,以帮助用户更好地理解 本地状态存储检查点存储 的分离

   用户可以迁移现有应用程序以使用新 API,而不会丢失任何状态或一致性

State 状态后端(存储在哪里)
   Flink 内置了以下这些开箱即用的 state backends
   如果没有其他配置,系统将使用 HashMapStateBackend。

state backend 版本
HashMapStateBackend 新版
EmbeddedRocksDBStateBackend 新版
MemoryStateBackend 旧版
FsStateBackend 旧版
RocksDBStateBackend 旧版
MemoryStateBackend 旧版

不同 state backend 的特点和适用场景

state backend 描述 特点 适用场景
HashMapStateBackend 保存数据在内部,作为 Java 堆的对象 键/值状态和窗口操作符持有哈希表,用于存储值、触发器等。非常快,因为每个状态访问和更新都对 Java 堆上的对象进行操作,但是 状态大小受集群内可用内存的限制 具有大状态、长窗口、大键/值状态的作业。所有高可用性设置。
EmbeddedRocksDBStateBackend 在 RocksDB 数据库中保存状态数据 该数据库(默认)存储在 TaskManager 本地数据目录中,与 HashMapStateBackend 在 Java 存储 对象不同,数据存储为序列化的字节数组。RocksDB 可以根据可用磁盘空间进行扩展,并且是唯一支持增量快照的状态后端但是每个状态访问和更新都需要(反)序列化并可能从磁盘读取,这导致平均性能比内存状态后端慢一个数量级 具有非常大状态、长窗口、大键/值状态的作业。所有高可用性设置
MemoryStateBackend(旧版) 内存,不推荐在生产场景使用
FsStateBackend(旧版) 文件系统上,本地文件系统、HDFS, 性能更好,常用
RocksDBStateBackend(旧版) 无需担心 OOM 风险,是大部分时候的选择

配置 state backend

  • 引入依赖
1<!--flinkflink-statebackend-rocksdb-->
2        <dependency>
3            <groupId>org.apache.flink</groupId>
4            <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
5            <version>1.13.1</version>
6        </dependency>
  • 配置方式一(全局配置)
       flink-conf.yaml 使用配置键在 中配置默认状态后端 state.backend。`
 1# 配置条目的可能值是hashmap (HashMapStateBackend)、rocksdb (EmbeddedRocksDBStateBackend) 
 2或实现状态后端工厂StateBackendFactory的类的完全限定类名
 3
 4# 全局配置例子一
 5# The backend that will be used to store operator state checkpoints
 6state.backend: hashmap
 7
 8# Optional, Flink will automatically default to JobManagerCheckpointStorage
 9# when no checkpoint directory is specified.
10state.checkpoint-storage: jobmanager
11
12# 全局配置例子二
13state.backend: rocksdb
14state.checkpoints.dir: file:///checkpoint-dir/
15
16# Optional, Flink will automatically default to FileSystemCheckpointStorage
17# when a checkpoint directory is specified.
18state.checkpoint-storage: filesystem
  • 方式二代码(单独 job 配置例子)
 1//代码配置一(基于内存不推荐):HashMapStateBackend
 2StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 3env.setStateBackend(new HashMapStateBackend());
 4env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
 5
 6//代码配置二(经常使用):EmbeddedRocksDBStateBackend
 7StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 8env.setStateBackend(new EmbeddedRocksDBStateBackend());
 9env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
10//或者
11env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));

Flink 的状态 State 管理编码-订单数据统计实现 MaxBy 操作

   sum()、maxBy() 等函数底层源码也是有ValueState进行状态存储

需求

  • 根据订单进行分组,统计找出每个商品最大的订单成交额
  • 不用maxBy实现,用ValueState实现

根据订单进行分组,统计找出每个商品最大的订单成交额

      

Flink22StateMaxByApp

 1import net.xdclass.util.TimeUtil;
 2import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 3import org.apache.flink.api.common.functions.FlatMapFunction;
 4import org.apache.flink.api.common.functions.RichMapFunction;
 5import org.apache.flink.api.common.state.ValueState;
 6import org.apache.flink.api.common.state.ValueStateDescriptor;
 7import org.apache.flink.api.java.functions.KeySelector;
 8import org.apache.flink.api.java.tuple.Tuple2;
 9import org.apache.flink.api.java.tuple.Tuple3;
10import org.apache.flink.configuration.Configuration;
11import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
12import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
13import org.apache.flink.streaming.api.datastream.DataStreamSource;
14import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
15import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
16import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
17import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
18import org.apache.flink.streaming.api.windowing.time.Time;
19import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
20import org.apache.flink.util.Collector;
21import org.apache.flink.util.OutputTag;
22
23import java.time.Duration;
24import java.util.ArrayList;
25
26
27public class Flink22StateMaxByApp {
28
29    /**
30     * 使用valueState实现maxBy功能,统计分组内订单金额最高的订单
31     *
32     * @param args
33     */
34    public static void main(String[] args) throws Exception {
35        //获取流的执行环境
36        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
37        //并行度为1更容易观察
38        env.setParallelism(1);
39
40        //数据来源  java,2022-11-11 23:12:07,10
41        DataStreamSource<String> ds = env.socketTextStream("127.0.0.1", 8888);
42        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
43            @Override
44            public void flatMap(String s, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
45                String[] arr = s.split(",");
46                collector.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
47            }
48        });
49
50        //分组、聚合
51        SingleOutputStreamOperator<Tuple2<String, Integer>> maxVideoOrderDS = flatMapDS.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
52            @Override
53            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
54                return value.f0;
55            }
56        }).map(new RichMapFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>>() {
57            //局部变量(商品最大成交金额)
58            private ValueState<Integer> maxVideoOrderState = null;
59
60            /**
61             * 初始化状态
62             *
63             * @param parameters
64             * @throws Exception
65             */
66            @Override
67            public void open(Configuration parameters) throws Exception {
68                //获取上下文信息
69                maxVideoOrderState = getRuntimeContext().getState(new ValueStateDescriptor<>("maxValue", Integer.class));
70            }
71
72            @Override
73            public Tuple2<String, Integer> map(Tuple3<String, String, Integer> value) throws Exception {
74                //获取历史值
75                Integer maxValue = maxVideoOrderState.value();
76                //获取当前值
77                Integer currentValue = value.f2;
78                //判断(更新状态,把当前最大的值存储到state)
79                if (maxValue == null || currentValue > maxValue) {
80                    maxVideoOrderState.update(currentValue);
81                    return Tuple2.of(value.f0, currentValue);
82                } else {
83                    return Tuple2.of(value.f0, maxValue);
84                }
85            }
86
87            @Override
88            public void close() throws Exception {
89                super.close();
90            }
91        });
92
93        //打印
94        maxVideoOrderDS.print("商品最大交额的订单");
95
96        //执行任务
97        env.execute("flink watermark job");
98    }
99}

netcat

1C:\Tools\netcat-win32-1.12>nc -lp 8888
2java,2021,19
3java,2022,21
4java,2022,12

控制台输出

1商品最大交额的订单> (java,19)
2商品最大交额的订单> (java,21)
3商品最大交额的订单> (java,21)

Flink 的 Checkpoint-SavePoint 和端到端(​end-to-end​)状态一致性

Checkpoint 检查点

  • Flink 中所有的 Operator当前State的全局快照
  • 默认情况下 checkpoint 是禁用的,需要开启
  • Checkpoint 是把 State 数据定时 持久化存储,防止丢失

Savepoint 保存点

  • 手工调用 checkpoint,叫 ​savepoint​,主要是用于flink集群维护升级等
  • 底层使用了 Chandy-Lamport 分布式快照算法,保证数据在分布式环境下的一致性

开箱即用,Flink 捆绑了这些检查点存储类型

检查点存储类型 功能描述
JobManagerCheckpointStorage 作业管理器检查点存储
FileSystemCheckpointStorage 文件系统检查点存储

配置

1### 全局配置checkpoints
2# 全局配置
3state.checkpoints.dir: hdfs:///checkpoints/
4
5# 作业单独配置checkpoints(代码配置)
6env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");
7
8### 全局配置savepoint
9state.savepoints.dir: hdfs:///flink/savepoints

Savepoint 与 Checkpoint 的不同之处

  • 类似于传统数据库中的备份与恢复日志之间的差异
  • Checkpoint 的主要目的是为意外失败的作业提供【重启恢复机制】,
  • Checkpoint 的生命周期 由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交互
  • Savepoint 由用户创建,拥有和删除, 主要是【升级 Flink 版本】,调整用户逻辑
  • 除去概念上的差异Checkpoint 和 Savepoint 的当前实现基本上使用相同的代码并生成相同的格式

端到端(end-to-end)状态一致性
    数据一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的。
    在真实应用中,了流处理器以外还包含了数据源(例如 Kafka、Mysql)和输出到持久化系统(Kafka、MySQL、Hbase、CK)
    端到端的一致性保证,是意味着结果的正确性贯穿了整个流处理应用的各个环节,每一个组件都要保证自己的一致性。

处理环节 如何保证数据一致性
Source 需要外部数据源可以重置读取位置当发生故障的时候重置偏移量到故障之前的位置
内部 依赖 Checkpoints 机制在发生故障的时可以恢复各个环节的数据
Sink 当故障恢复时,数据不会重复写入外部系统,常见的就是 幂等和事务写入(和checkpoint配合)

Flink 的 Checkpoint 代码配置编码

Flink23CheckpointApp

 1import org.apache.flink.api.common.functions.FlatMapFunction;
 2import org.apache.flink.api.common.functions.RichMapFunction;
 3import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 4import org.apache.flink.api.common.state.ValueState;
 5import org.apache.flink.api.common.state.ValueStateDescriptor;
 6import org.apache.flink.api.java.functions.KeySelector;
 7import org.apache.flink.api.java.tuple.Tuple2;
 8import org.apache.flink.api.java.tuple.Tuple3;
 9import org.apache.flink.configuration.Configuration;
10import org.apache.flink.streaming.api.CheckpointingMode;
11import org.apache.flink.streaming.api.datastream.DataStreamSource;
12import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
13import org.apache.flink.streaming.api.environment.CheckpointConfig;
14import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
15import org.apache.flink.util.Collector;
16
17
18public class Flink23CheckpointApp {
19
20    /**
21     * 使用valueState实现maxBy功能,统计分组内订单金额最高的订单
22     *
23     * @param args
24     */
25    public static void main(String[] args) throws Exception {
26        //获取流的执行环境
27        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
28        //并行度为1(更容易观察)
29        env.setParallelism(1);
30
31        //两个检查点之间的间隔时间 (默认是0,单位毫秒):检查点快照比较耗费资源,因此不能过于频分
32        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
33
34        //容忍度:Checkpoint过程中出现错误,是否让整体任务都失败(默认值为0,表示不容忍任何Checkpoint失败):可以容忍5次失败
35        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
36
37        //当一个Flink应用程序失败终止、人为取消等时,它的Checkpoint就会被清除。可以配置不同策略进行操作
38        //DELETE_ON_CANCELLATION: 当作业取消时,Checkpoint 状态信息会被删除,因此取消任务后,不能从 Checkpoint 位置进行恢复任务
39        //RETAIN_ON_CANCELLATION(多的多): 当作业手动取消时,将会保留作业的 Checkpoint 状态信息,要手动清除该作业的 Checkpoint 状态信息
40        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
41
42        //数据一致性保障策略:Flink默认提供Extractly-Once保证State的一致性,还提供了Extractly-Once,At-Least-Once 两种模式
43        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
44
45        //快照耗时最大时间:设置checkpoint的超时时间, 如果规定时间没完成则放弃,默认是10分钟
46        env.getCheckpointConfig().setCheckpointTimeout(60000);
47
48        //设置同一时刻有多少个checkpoint可以同时执行,默认为1就行,以避免占用太多正常数据处理资源
49        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
50
51        //设置了重启策略, 作业在失败后能自动恢复,失败后最多重启3次,每次重启间隔10s
52        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
53
54
55        //执行任务
56        env.execute("flink Checkpoint job");
57    }
58}

复杂事件处理 CEP

FlinkCEP

用途

  • 检测和发现无边界事件流中多个记录的关联规则,得到满足规则的复杂事件
  • 允许业务定义要 从输入流中提取的复杂模式序列

模式(Pattern)
    定义处理事件的规则

模式类型 描述
个体模式(Individual Patterns) 组成复杂规则的每一个单独的模式定义,就是个体模式
组合模式(Combining Patterns) 很多个体模式组合起来,形成组合模式
模式组(Groups of Patterns) 将一个组合模式作为条件嵌套在个体模式里,就是模式组

近邻模式

模式类型 描述
严格近邻 期望所有匹配事件严格地一个接一个出现,中间没有任何不匹配的事件, API 是.next()
宽松近邻 允许中间出现不匹配的事件,API 是.followedBy()
非确定性宽松近邻 可以忽略已经匹配的条件,API 是 followedByAny()
指定时间约束 指定模式在多长时间内匹配有效,API 是 within

模式分类

模式分类 描述
单次模式 接收一次一个事件
循环模式 接收一个或多个事件

其他参数

参数 功能描述
times 指定固定的循环执行次数
greedy 贪婪模式,尽可能多触发
oneOrMore 指定触发一次或多次
timesOrMore 指定触发固定以上的次数
optional 要么不触发要么触发指定的次数

引入依赖

1<!--flink-cep-->
2        <dependency>
3            <groupId>org.apache.flink</groupId>
4            <artifactId>flink-cep_${scala.version}</artifactId>
5            <version>${flink.version}</version>
6        </dependency>

使用流程

  • 定义 pattern
  • pattern 应用到数据流,得到模式流
  • 从模式流 获取结果
 1//定义pattern
 2Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
 3        new SimpleCondition<Event>() {
 4            @Override
 5            public boolean filter(Event event) {
 6                return event.getId() == 42;
 7            }
 8        }
 9    ).next("middle").subtype(SubEvent.class).where(
10        new SimpleCondition<SubEvent>() {
11            @Override
12            public boolean filter(SubEvent subEvent) {
13                return subEvent.getVolume() >= 10.0;
14            }
15        }
16    ).followedBy("end").where(
17         new SimpleCondition<Event>() {
18            @Override
19            public boolean filter(Event event) {
20                return event.getName().equals("end");
21            }
22         }
23    );
24
25//pattern应用到数据流,得到模式流
26PatternStream<Event> patternStream = CEP.pattern(input, pattern);
27
28//从模式流 获取结果
29DataStream<Alert> result = patternStream.process(
30    new PatternProcessFunction<Event, Alert>() {
31        @Override
32        public void processMatch(
33                Map<String, List<Event>> pattern,
34                Context ctx,
35                Collector<Alert> out) throws Exception {
36            out.collect(createAlertFrom(pattern));
37        }
38    });

CEP-账号登录风控检测

需求

    同个账号,在 5 秒内连续登录失败 2 次,则认为存在而已登录问题
    数据格式soulboy,2022-11-11 12:01:01,-1,状态码 -1 代表登录失败。

引入依赖

1<!--flink-cep-->
2        <dependency>
3            <groupId>org.apache.flink</groupId>
4            <artifactId>flink-cep_${scala.version}</artifactId>
5            <version>${flink.version}</version>
6        </dependency>

Flink24CEPLoginApp

  1import net.xdclass.util.TimeUtil;
  2import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3import org.apache.flink.api.common.functions.FlatMapFunction;
  4import org.apache.flink.api.java.functions.KeySelector;
  5import org.apache.flink.api.java.tuple.Tuple3;
  6import org.apache.flink.cep.CEP;
  7import org.apache.flink.cep.PatternSelectFunction;
  8import org.apache.flink.cep.PatternStream;
  9import org.apache.flink.cep.pattern.Pattern;
 10import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 11import org.apache.flink.streaming.api.datastream.DataStreamSource;
 12import org.apache.flink.streaming.api.datastream.KeyedStream;
 13import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 14import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 15import org.apache.flink.streaming.api.windowing.time.Time;
 16import org.apache.flink.util.Collector;
 17
 18import java.util.List;
 19import java.util.Map;
 20
 21
 22public class Flink24CEPLoginApp {
 23
 24    /**
 25     * 使用valueState实现maxBy功能,统计分组内订单金额最高的订单
 26     *
 27     * @param args
 28     */
 29    public static void main(String[] args) throws Exception {
 30        //获取流的执行环境
 31        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 32        //并行度为1更容易观察
 33        env.setParallelism(1);
 34
 35        //数据来源 soulboy,2022-11-11 12:01:01,-1       状态码`-1`代表登录失败。
 36        DataStreamSource<String> ds = env.socketTextStream("127.0.0.1", 8888);
 37
 38        //转成Tuple3
 39        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMap = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
 40            @Override
 41            public void flatMap(String s, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
 42                String[] arr = s.split(",");
 43                collector.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
 44            }
 45        });
 46
 47        //指定event time列
 48        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarkDS = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy
 49                //生成一个延迟3s的watermark
 50                //.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
 51                //分配watermark策略=>时间是单调递增,event中的时间戳充当了水印
 52                .<Tuple3<String, String, Integer>>forMonotonousTimestamps()
 53                .withTimestampAssigner((event, timestamp) -> TimeUtil.strToDate(event.f1).getTime()));
 54
 55        //分组
 56        KeyedStream<Tuple3<String, String, Integer>, String> keyedStream = watermarkDS.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
 57            @Override
 58            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
 59                //获取到账户:(soulboy,2022-11-11 12:01:01,-1 )
 60                return value.f0;
 61            }
 62        });
 63
 64        //cep:定义模式(Pattern)  匹配5秒内登录2次失败
 65        Pattern<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>> pattern = Pattern.<Tuple3<String, String, Integer>>
 66                        begin("firstLoginTime")
 67                .where(new SimpleCondition<Tuple3<String, String, Integer>>() {
 68                    @Override
 69                    public boolean filter(Tuple3<String, String, Integer> value) throws Exception {
 70                        //-1是登录失败的错误码
 71                        return value.f2 == -1;
 72                    }
 73                })
 74                .next("secondLoginTime")
 75                .where(new SimpleCondition<Tuple3<String, String, Integer>>() {
 76                    @Override
 77                    public boolean filter(Tuple3<String, String, Integer> value) throws Exception {
 78                        //-1是登录失败的错误码
 79                        return value.f2 == -1;
 80                    }
 81                }).within(Time.seconds(5));
 82
 83        //cep:pattern应用到数据流,得到模式流
 84        PatternStream<Tuple3<String, String, Integer>> patternStream = CEP.pattern(keyedStream, pattern);
 85
 86        //cep:从模式流获取结果   输出到redis中Tuple3 类型<String, String, String>  账户、第一次登录失败时间、第二次登录失败时间
 87        SingleOutputStreamOperator<Tuple3<String, String, String>> selectedResult = patternStream.select(new PatternSelectFunction<Tuple3<String, String, Integer>, Tuple3<String, String, String>>() {
 88            @Override
 89            public Tuple3<String, String, String> select(Map<String, List<Tuple3<String, String, Integer>>> map) throws Exception {
 90                Tuple3<String, String, Integer> firstLoginTimeEvent = map.get("firstLoginTime").get(0);
 91                Tuple3<String, String, Integer> secondLoginTimeEvent = map.get("secondLoginTime").get(0);
 92                //Tuple3.of(账户,第一次登录失败时间,第二次登录失败时间)  输出到redis 或其他sink
 93                return Tuple3.of(firstLoginTimeEvent.f0, firstLoginTimeEvent.f1, secondLoginTimeEvent.f1);
 94            }
 95        });
 96
 97        //输出风险账户: TODO ... sink 可以输出到 redis mysql kafka
 98        //调用微服务之前,获取登录信息时在redis中判断该账户是不是风险账户(redis中判断),如果是风险账户则触发输入验证码
 99        selectedResult.print("风险帐号");
100
101        //执行任务
102        env.execute("cep job");
103    }
104}

测试数据

1soulboy,2022-11-11 12:01:01,-1
2dero,2022-11-11 12:01:10,-1
3dero,2022-11-11 12:01:11,-1
4soulboy,2022-11-11 12:01:13,-1
5dero,2022-11-11 12:01:14,-1
6dero,2022-11-11 12:01:15,1
7soulboy,2022-11-11 12:01:16,-1
8dero,2022-11-11 12:01:17,-1
9soulboy,2022-11-11 12:01:20,1

cmd 命令行

 1C:\Users\chao1>c:
 2
 3C:\Users\chao1>cd C:\Tools\netcat-win32-1.12
 4
 5C:\Tools\netcat-win32-1.12>nc -lp 8888
 6soulboy,2022-11-11 12:01:01,-1
 7dero,2022-11-11 12:01:10,-1
 8dero,2022-11-11 12:01:11,-1
 9soulboy,2022-11-11 12:01:13,-1
10dero,2022-11-11 12:01:14,-1
11dero,2022-11-11 12:01:15,1
12soulboy,2022-11-11 12:01:16,-1
13dero,2022-11-11 12:01:17,-1
14soulboy,2022-11-11 12:01:20,1

idea 控制台输出

1风险帐号> (dero,2022-11-11 12:01:10,2022-11-11 12:01:11)
2风险帐号> (dero,2022-11-11 12:01:11,2022-11-11 12:01:14)
3风险帐号> (soulboy,2022-11-11 12:01:13,2022-11-11 12:01:16)

Flink 项目打包(插件)阿里云部署

    Flink 部署方式是灵活,主要是对 Flink计算时所需资源的管理方式不同 ,详细可参考文档

部署模式 描述
Local 本地部署,直接启动进程,适合调试使用(直接部署启动服务)
Standalone Cluster 集群部署,flink 自带集群模式
Hadoop YARN 计算资源统一由 Hadoop YARN 管理资源进行调度,按需使用提高集群的资源利用率
Kubernetes 部署
Docker 部署

Flink1.13.1 下载和 Local 本地模式部署

目录介绍

conf 目录
    flink-conf.yaml

1# web ui 端口
2rest.port=8081
3
4# 调整内存大小
5jobmanager.memory.process.size: 1000m
6taskmanager.memory.process.size: 1000m
7taskmanager.numberOfTaskSlots: 4

bin 目录

脚本名
start-cluster.sh
stop-cluster.sh
yarn-session.sh

解压

1[root@Flink software]# tar -zxvf flink-1.13.1-bin-scala_2.12.tgz
2[root@Flink software]# mv flink-1.13.1 flink

常用命令

 1# 启动flink 
 2[root@Flink flink]# cd /usr/local/software/flink/bin/
 3[root@Flink bin]# ./start-cluster.sh
 4
 5
 6# 停止flink
 7bin/stop-cluster.sh
 8
 9# 查看进程 jps
10[root@Flink bin]# jps
11TaskManagerRunner
12StandaloneSessionClusterEntrypoint

local 模式命令行运行 jar 包

创建文件

 1[root@Flink bin]# mkdir /usr/local/software/flink/examples/source
 2
 3[root@Flink bin]# cat /usr/local/software/flink/examples/source/soulboy.txt
 4java soulboy
 5springboot springcloud
 6html flink
 7springboot redis
 8java flink
 9kafka flink
10java springboot

bin 目录运行官方案例
    统计各单词出现的次数

1[root@Flink bin]# pwd
2/usr/local/software/flink/bin
3
4[root@Flink bin]# ./flink run /usr/local/software/flink/examples/batch/WordCount.jar --input /usr/local/software/flink/examples/source/soulboy.txt --output /usr/local/software/flink/examples/source/soulboy_result.txt
5Job has been submitted with JobID 8079e6a6818d0f8dc20174028ff04de7
6Program execution finished
7Job with JobID 8079e6a6818d0f8dc20174028ff04de7 has finished.
8Job Runtime: 387 ms

查看输出结果

1[root@Flink bin]# cat /usr/local/software/flink/examples/source/soulboy_result.txt
2flink 3
3html 1
4java 3
5kafka 1
6redis 1
7soulboy 1
8springboot 3
9springcloud 1

Flink Web Dashboard
    http://192.168.10.62:8081/#/overview

image.png

Flink 项目打包

Maven 打包插件

插件名 功能描述
maven-jar-plugin
默认的打包插件,用来打普通的 jar 包,需建立lib目录里来存放需要的依赖包
maven-shade-plugin (推荐) 将依赖的jar包打包到当前jar包,成为 fat JAR 包,也可以防止类冲突(隔离)
maven-assembly-plugin 大数据项目用的比较多,支持自定义的打包结构,比如 sql/shell 等

测试插件

插件名 功能描述
maven-surefire-plugin 用于 mvn 生命周期的测试阶段的插件,通过参数设置在 junit 下控制测试

引入 maven 常用插件
    调整为JDK8,java -version

 1<build>
 2        <finalName>soulboy-flink</finalName>
 3        <plugins>
 4            <!--默认编译版本比较低,所以用compiler插件,指定项目源码的jdk版本,编译后的jdk版本和编码,-->
 5            <plugin>
 6                <groupId>org.apache.maven.plugins</groupId>
 7                <artifactId>maven-compiler-plugin</artifactId>
 8                <version>3.6.1</version>
 9                <configuration>
10                    <source>${java.version}</source>
11                    <target>${java.version}</target>
12                    <encoding>${file.encoding}</encoding>
13                </configuration>
14            </plugin>
15
16            <plugin>
17                <groupId>org.apache.maven.plugins</groupId>
18                <artifactId>maven-shade-plugin</artifactId>
19                <version>2.3</version>
20                <executions>
21                    <execution>
22                        <phase>package</phase>
23                        <goals>
24                            <goal>shade</goal>
25                        </goals>
26                    </execution>
27                </executions>
28            </plugin>
29
30        </plugins>
31    </build>

本地 Flink 项目打包
    调整为JDK8,java -version

1mvn clean
2mvn install

image.png

image.png

Flink 的 WebUI 运行 jar 包

CentOS7 安装 Netcat

 1# 下载 Netcat RPM 包
 2[root@Flink tmp]# wget http://vault.centos.org/7.9.2009/os/x86_64/Packages/nmap-ncat-6.40-19.el7.x86_64.rpm
 3
 4# 安装依赖项 libpcap
 5[root@Flink tmp]# wget http://vault.centos.org/7.9.2009/os/x86_64/Packages/libpcap-1.5.3-12.el7.x86_64.rpm
 6[root@Flink tmp]# sudo rpm -ivh libpcap-1.5.3-12.el7.x86_64.rpm                                          
 7
 8# 安装 Netcat
 9[root@Flink tmp]# sudo rpm -ivh nmap-ncat-6.40-19.el7.x86_64.rpm
10
11# 验证安装
12[root@Flink tmp]# nc -h

Flink Web Dashboard

  • 上传 jar 包

    image.png

  • 选择 main 入口类 APP
    image.png
    image.png

  • 提交任务查看情况

    1# Centos7监听
    2[root@Flink tmp]# nc -lk 8888
    

image.png

image.png

image.png

image.png

image.png

image.png

Task Solt

   Task Solt指taskmanager的并发执行能力
   parallelism是指taskmanager实际使用的并发能力

   Task Slots 是具备的并发能力,大于 Parallelism 并行度(实际用的)

   数据流里面算子的最大并行度就是 Parallelism, 2-2-2-3-1 这样的并行度,最大的Parallelism就是3(同个任务job里面)

1# 假如每一个taskmanager中的分配4个TaskSlot,
2# 那有3个taskmanager一共有12个TaskSlot
3taskmanager.numberOfTaskSlots:4

Docker-Compose 容器化部署 Flink 集群实战

安装并运行 Docker

 1yum install docker-io -y
 2systemctl start docker
 3
 4systemctl start docker     #运行Docker守护进程
 5systemctl stop docker      #停止Docker守护进程
 6systemctl restart docker   #重启Docker守护进程
 7
 8# 修改镜像仓库vim /etc/docker/daemon.json 改为下面内容,然后重启docker
 9{
10"debug":true,"experimental":true,
11"registry-mirrors":["https://pb5bklzr.mirror.aliyuncs.com","https://hub-mirror.c.163.com","https://docker.mirrors.ustc.edu.cn"]
12}
13
14#查看信息
15docker info

Docker Compose 容器编排安装
   官方安装地址:https://docs.docker.com/compose/install/

1curl -SL https://github.com/docker/compose/releases/download/v2.24.7/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose
2
3chmod +x /usr/local/bin/docker-compose
4
5docker-compose version

创建 docker-compose.yml 文件
   配置Session Cluster模式

 1[root@Flink tmp]# cat docker-compose.yml
 2version: "3.7"
 3services:
 4  jobmanager:
 5    image: flink:scala_2.12-java8
 6    ports:
 7      - "8081:8081"
 8    command: jobmanager
 9    environment:
10      - |
11        FLINK_PROPERTIES=
12        jobmanager.rpc.address: jobmanager
13
14  taskmanager:
15    image: flink:scala_2.12-java8
16    depends_on:
17      - jobmanager
18    command: taskmanager
19    scale: 3
20    environment:
21      - |
22        FLINK_PROPERTIES=
23        jobmanager.rpc.address: jobmanager
24        taskmanager.numberOfTaskSlots: 2

部署 Flink

1[root@Flink tmp]# chmod +x /usr/local/bin/docker-compose
2[root@Flink tmp]# docker-compose up -d

有3个TaskManager,每个TaskManager有2个TaskSlots,所以一共有6个Task Slots

image.png

image.png

image.png

端口说明

1The Web Client is on port 8081
2JobManager RPC port 6123
3TaskManagers RPC port 6122
4TaskManagers Data port 6121
5
6注意:docker-compose.yml 文件中 expose 和 ports 的区别
7  expose暴露容器给link到当前容器的容器
8  ports是暴露容器端口到宿主机端口进行映射

问题

  • 内存不足 (其他程序不运行,最少也需要 1 核 2g,建议是 4 或者 8g)
  • 网络安全组没开放端口 8081

实时监控告警

需求公司开发一个监控告警平台(类似nginx访问日志)

需求
每 5 秒统计过去 1 分钟可以统计各个接口的访问量(滑动时间窗 + 聚合)
每 5 秒统计过去 1 分钟各个接口的各个状态码次数 (多级分组)

日志来源nginx访问日志

日志特点
访问日志日志来源存在乱序、无效访问记录(爬虫)
要实时显示数据(Watermark),可以间隔时间二次更新(Allowlateness)
最大允许 1 分钟延迟,超过后兜底保存(SideOutput)

定义 POJO 类

AccessLogDO

 1import lombok.AllArgsConstructor;
 2import lombok.Data;
 3import lombok.NoArgsConstructor;
 4import java.util.Date;
 5
 6@Data
 7@AllArgsConstructor
 8@NoArgsConstructor
 9public class AccessLogDO {
10
11    private String title;
12
13    private String url;
14
15    private String method;
16
17    private Integer httpCode;
18
19    private String body;
20
21    private Date createTime;
22
23    private String userId;
24
25    private String city;
26}

ResultCount

 1import lombok.AllArgsConstructor;
 2import lombok.Data;
 3import lombok.NoArgsConstructor;
 4
 5@Data
 6@AllArgsConstructor
 7@NoArgsConstructor
 8public class ResultCount {
 9    
10    private String url;
11    
12    private Integer code;
13    
14    private Long count;
15    
16    private String startTime;
17    
18    private String endTime;
19    
20    private String type;
21}

自定义 source

AccessLogSource

 1import net.xdclass.util.TimeUtil;
 2import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 3
 4import java.util.ArrayList;
 5import java.util.Date;
 6import java.util.List;
 7import java.util.Random;
 8
 9/**
10 * 模拟source,未来可以从kafka中读取数据来源
11 */
12public class AccessLogSource  extends RichParallelSourceFunction<AccessLogDO> {
13
14    private volatile Boolean flag = true;
15
16    private Random random = new Random();
17
18
19    //模拟不同接口调用
20    private static List<AccessLogDO> urlList = new ArrayList<>();
21    static {
22        urlList.add(new AccessLogDO("首页","/pub/api/v1/web/index_card","GET",200,"",new Date(),"",""));
23        urlList.add(new AccessLogDO("个人信息","/pub/api/v1/web/user_info","GET",200,"",new Date(),"",""));
24        urlList.add(new AccessLogDO("分类列表","/pub/api/v1/web/all_category","GET",200,"",new Date(),"",""));
25        urlList.add(new AccessLogDO("分页视频","/pub/api/v1/web/page_video","GET",200,"",new Date(),"",""));
26        urlList.add(new AccessLogDO("收藏","/user/api/v1/favorite/save","POST",200,"",new Date(),"",""));
27        urlList.add(new AccessLogDO("下单","/user/api/v1/product/order/save","POST",200,"",new Date(),"",""));
28        urlList.add(new AccessLogDO("异常url","","POST",200,"",new Date(),"",""));
29    }
30
31    //状态码
32    private static List<Integer> codeList = new ArrayList<>();
33    static {
34        codeList.add(200);
35        codeList.add(200);
36        codeList.add(200);
37        codeList.add(502);
38        codeList.add(403);
39    }
40
41    @Override
42    public void run(SourceContext<AccessLogDO> ctx) throws Exception {
43        while (flag){
44            Thread.sleep(1000);
45            int userId = random.nextInt(50);
46            int httpCodeNum = random.nextInt(codeList.size());
47            int accessLogNum = random.nextInt(urlList.size());
48            AccessLogDO accessLogDO = urlList.get(accessLogNum);
49            accessLogDO.setHttpCode(codeList.get(httpCodeNum));
50            accessLogDO.setUserId(userId+"");
51            //模拟迟到数据,100秒波动
52            //long timestamp = System.currentTimeMillis() - random.nextInt(100000);
53            //当前时间 - 0~5秒之间得随机时间
54            long timestamp = System.currentTimeMillis() - random.nextInt(5000);
55            accessLogDO.setCreateTime(new Date(timestamp));
56            System.out.println("产生:" + accessLogDO.getTitle() + ",状态码:" + accessLogDO.getHttpCode() + ", 时间:" + TimeUtil.formatTime(accessLogDO.getCreateTime()));
57            //收集accesslog
58            ctx.collect(accessLogDO);
59        }
60    }
61
62    @Override
63    public void cancel() {
64        flag = false;
65    }
66}

过滤-分组-开窗-分配 Watermark-聚合

ProcessWindowFunction 方法说明

  • 一次性迭代整个窗口里的所有元素,通过 Context,可以获取到事件、窗口和状态信息
  • 可以和ReduceFunction, AggregateFunction 来做增量计算
  • 可以在 Agg 方法做第 2 个参数 ,windowFunction 会把每个 key 的窗口聚合后的结果(AggregateFunction<T, ACC, V> aggFunction)带上 上下文信息进行输出,aggFunction 会传给 windowFunction
  • 之前在 agg 方法里面的 ProcessWindowFunction获取整个窗口的全部元素现在agg方法里面的 ProcessWindowFunction 是**获取聚合后的结果(一个元素)**
1aggregate( AggregateFunction<T, ACC, V> aggFunction, 
2ProcessWindowFunction<V, R, K, W> windowFunction )

SoulboyMonitorApp

  1import net.xdclass.util.TimeUtil;
  2import org.apache.commons.lang3.StringUtils;
  3import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  4import org.apache.flink.api.common.functions.AggregateFunction;
  5import org.apache.flink.api.common.functions.FilterFunction;
  6import org.apache.flink.api.java.functions.KeySelector;
  7import org.apache.flink.streaming.api.datastream.DataStreamSource;
  8import org.apache.flink.streaming.api.datastream.KeyedStream;
  9import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 10import org.apache.flink.streaming.api.datastream.WindowedStream;
 11import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 12import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 13import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 14import org.apache.flink.streaming.api.windowing.time.Time;
 15import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 16import org.apache.flink.util.Collector;
 17import org.apache.flink.util.OutputTag;
 18
 19import java.time.Duration;
 20
 21public class SoulboyMonitorApp {
 22    public static void main(String[] args) throws Exception {
 23        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
 24        environment.setParallelism(1);
 25        DataStreamSource<AccessLogDO> ds = environment.addSource(new AccessLogSource());
 26
 27        //过滤url: ""为不合法
 28        SingleOutputStreamOperator<AccessLogDO> filterDS = ds.filter(new FilterFunction<AccessLogDO>() {
 29            @Override
 30            public boolean filter(AccessLogDO value) throws Exception {
 31                return StringUtils.isNotBlank(value.getUrl());
 32            }
 33        });
 34
 35        //指定watermark(插线),指定eventtime的列
 36        SingleOutputStreamOperator<AccessLogDO> watermarkDS = filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<AccessLogDO>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event, timestamp) -> {
 37            //指定eventtime的列
 38            return event.getCreateTime().getTime();
 39        }));
 40
 41        //最后兜底数据
 42        OutputTag<AccessLogDO> lateDate = new OutputTag<AccessLogDO>("lateDataLog") {
 43        };
 44
 45        //分组
 46        KeyedStream<AccessLogDO, String> keyedStream = watermarkDS.keyBy(new KeySelector<AccessLogDO, String>() {
 47            @Override
 48            public String getKey(AccessLogDO value) throws Exception {
 49                return value.getUrl();
 50            }
 51        });
 52
 53        //开窗(滑动):每5秒统计过去1分钟可以统计各个接口的访问量
 54        WindowedStream<AccessLogDO, String, TimeWindow> windowedStream = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(5)))
 55                //允许有1分钟延迟
 56                .allowedLateness(Time.minutes(1))
 57                //兜底侧输出流(不会更新到时间窗口,需要人工兜底处理)
 58                .sideOutputLateData(lateDate);
 59
 60        //聚合 <输入类型,中间聚合类型(接口访问量),输出类型>
 61        SingleOutputStreamOperator<ResultCount> aggregate = windowedStream.aggregate(
 62                new AggregateFunction<AccessLogDO, Long, Long>() {
 63                    @Override
 64                    public Long createAccumulator() {
 65                        return 0L;
 66                    }
 67
 68                    @Override
 69                    public Long add(AccessLogDO accessLogDO, Long accumulator) {
 70                        return accumulator+1;
 71                    }
 72
 73                    @Override
 74                    public Long getResult(Long accumulator) {
 75                        return accumulator;
 76                    }
 77
 78                    @Override
 79                    public Long merge(Long a, Long b) {
 80                        return a+b;
 81                    }
 82                },
 83                //<aggFunction的集合结果类型,输出类型,key类型,window>
 84                new ProcessWindowFunction<Long, ResultCount, String, TimeWindow>() {
 85                    @Override
 86                    //process(value是分组的key(url), context(上下文信息), iterable(AggregateFunction()聚合后的结果,只有一个元素), collector(收集输出))
 87                    public void process(String value, ProcessWindowFunction<Long, ResultCount, String, TimeWindow>.Context context, Iterable<Long> iterable, Collector<ResultCount> collector) throws Exception {
 88                        ResultCount resultCount = new ResultCount();
 89                        resultCount.setUrl(value);
 90                        resultCount.setStartTime(TimeUtil.format(context.window().getStart()));
 91                        resultCount.setEndTime(TimeUtil.format(context.window().getEnd()));
 92                        //上一步AggregateFunction()聚合结果只有一个,因此这里不需要迭代,直接next()即可获取数据(接口访问量)
 93                        long total = iterable.iterator().next();
 94                        resultCount.setCount(total);
 95                        //收集数据输出
 96                        collector.collect(resultCount);
 97                    }
 98                });
 99
100        aggregate.print("实时1分钟接口访问量");
101        environment.execute("SoulboyMonitor");
102
103
104    }
105}

多接口多状态码统计

   多个分组字段

SoulboyApiCodeMonitorApp

  1import net.xdclass.util.TimeUtil;
  2import org.apache.commons.lang3.StringUtils;
  3import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  4import org.apache.flink.api.common.functions.AggregateFunction;
  5import org.apache.flink.api.common.functions.FilterFunction;
  6import org.apache.flink.api.java.functions.KeySelector;
  7import org.apache.flink.api.java.tuple.Tuple2;
  8import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9import org.apache.flink.streaming.api.datastream.KeyedStream;
 10import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 11import org.apache.flink.streaming.api.datastream.WindowedStream;
 12import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 13import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 14import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 15import org.apache.flink.streaming.api.windowing.time.Time;
 16import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 17import org.apache.flink.util.Collector;
 18import org.apache.flink.util.OutputTag;
 19
 20import java.time.Duration;
 21
 22public class SoulboyApiCodeMonitorApp {
 23    public static void main(String[] args) throws Exception {
 24        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
 25        environment.setParallelism(1);
 26        DataStreamSource<AccessLogDO> ds = environment.addSource(new AccessLogSource());
 27
 28        //过滤url: ""为不合法
 29        SingleOutputStreamOperator<AccessLogDO> filterDS = ds.filter(new FilterFunction<AccessLogDO>() {
 30            @Override
 31            public boolean filter(AccessLogDO value) throws Exception {
 32                return StringUtils.isNotBlank(value.getUrl());
 33            }
 34        });
 35
 36        //指定watermark(插线),指定eventtime的列
 37        SingleOutputStreamOperator<AccessLogDO> watermarkDS = filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<AccessLogDO>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event, timestamp) -> {
 38            //指定eventtime的列
 39            return event.getCreateTime().getTime();
 40        }));
 41
 42        //最后兜底数据
 43        OutputTag<AccessLogDO> lateDate = new OutputTag<AccessLogDO>("lateDataLog") {
 44        };
 45
 46        //多字段分组
 47        //KeySelector<输入类型,分组key类型(Tuple2<url, state_code>)>
 48        KeyedStream<AccessLogDO, Tuple2<String, Integer>> keyedStream = watermarkDS.keyBy(new KeySelector<AccessLogDO, Tuple2<String, Integer>>() {
 49            @Override
 50            public Tuple2<String, Integer> getKey(AccessLogDO value) throws Exception {
 51                return Tuple2.of(value.getUrl(), value.getHttpCode());
 52            }
 53        });
 54
 55        //开窗
 56        WindowedStream<AccessLogDO, Tuple2<String, Integer>, TimeWindow> windowedStream = keyedStream.window(SlidingEventTimeWindows.of(
 57                Time.seconds(60), Time.seconds(5)))
 58                .allowedLateness(Time.minutes(1))
 59                .sideOutputLateData(lateDate);
 60
 61        //聚合 <输入类型,中间类型,输出类型>
 62        SingleOutputStreamOperator<ResultCount> aggregateDS = windowedStream.aggregate(
 63                new AggregateFunction<AccessLogDO, Long, Long>() {
 64                    @Override
 65                    public Long createAccumulator() {
 66                        return 0L;
 67                    }
 68
 69                    @Override
 70                    public Long add(AccessLogDO value, Long accumulator) {
 71                        return accumulator+1;
 72                    }
 73
 74                    @Override
 75                    public Long getResult(Long accumulator) {
 76                        return accumulator;
 77                    }
 78
 79                    @Override
 80                    public Long merge(Long a, Long b) {
 81                        return a+b;
 82                    }
 83                },
 84                //<aggFunction的集合结果类型,输出类型,key类型,window>
 85                new ProcessWindowFunction<Long, ResultCount, Tuple2<String, Integer>, TimeWindow>() {
 86                    @Override
 87                    public void process(Tuple2<String, Integer> value, ProcessWindowFunction<Long, ResultCount, Tuple2<String, Integer>, TimeWindow>.Context context, Iterable<Long> iterable, Collector<ResultCount> collector) throws Exception {
 88                        ResultCount resultCount = new ResultCount();
 89                        resultCount.setUrl(value.f0);
 90                        resultCount.setCode(value.f1);
 91                        //上一步AggregateFunction()聚合结果只有一个,因此这里不需要迭代,直接next()即可获取数据(接口访问量)
 92                        long total = iterable.iterator().next();
 93                        resultCount.setCount(total);
 94                        resultCount.setStartTime(TimeUtil.format(context.window().getStart()));
 95                        resultCount.setEndTime(TimeUtil.format(context.window().getEnd()));
 96                        collector.collect(resultCount);
 97                    }
 98                });
 99        aggregateDS.print("接口状态码");
100        aggregateDS.getSideOutput(lateDate).print("late data");
101        environment.execute("SoulboyMonitor");
102
103
104    }
105}

控制台输出

 1产生:分页视频,状态码:200, 时间:2024-09-16 10:48:12
 2产生:首页,状态码:403, 时间:2024-09-16 10:48:15
 3产生:收藏,状态码:403, 时间:2024-09-16 10:48:14
 4产生:首页,状态码:502, 时间:2024-09-16 10:48:15
 5产生:分页视频,状态码:502, 时间:2024-09-16 10:48:18
 6接口状态码> ResultCount(url=/pub/api/v1/web/page_video, code=200, count=1, startTime=2024-09-16 10:47:15, endTime=2024-09-16 10:48:15, type=null)
 7接口状态码> ResultCount(url=/user/api/v1/favorite/save, code=403, count=1, startTime=2024-09-16 10:47:15, endTime=2024-09-16 10:48:15, type=null)
 8产生:首页,状态码:200, 时间:2024-09-16 10:48:19
 9产生:收藏,状态码:403, 时间:2024-09-16 10:48:19
10产生:个人信息,状态码:403, 时间:2024-09-16 10:48:20
11产生:收藏,状态码:200, 时间:2024-09-16 10:48:23
12接口状态码> ResultCount(url=/pub/api/v1/web/page_video, code=200, count=1, startTime=2024-09-16 10:47:20, endTime=2024-09-16 10:48:20, type=null)
13接口状态码> ResultCount(url=/user/api/v1/favorite/save, code=403, count=2, startTime=2024-09-16 10:47:20, endTime=2024-09-16 10:48:20, type=null)
14接口状态码> ResultCount(url=/pub/api/v1/web/index_card, code=200, count=1, startTime=2024-09-16 10:47:20, endTime=2024-09-16 10:48:20, type=null)
15接口状态码> ResultCount(url=/pub/api/v1/web/index_card, code=403, count=1, startTime=2024-09-16 10:47:20, endTime=2024-09-16 10:48:20, type=null)
16接口状态码> ResultCount(url=/pub/api/v1/web/index_card, code=502, count=1, startTime=2024-09-16 10:47:20, endTime=2024-09-16 10:48:20, type=null)
17接口状态码> ResultCount(url=/pub/api/v1/web/page_video, code=502, count=1, startTime=2024-09-16 10:47:20, endTime=2024-09-16 10:48:20, type=null)
18产生:首页,状态码:200, 时间:2024-09-16 10:48:23
19产生:收藏,状态码:502, 时间:2024-09-16 10:48:23
20产生:个人信息,状态码:200, 时间:2024-09-16 10:48:26
21产生:下单,状态码:502, 时间:2024-09-16 10:48:25
22产生:分类列表,状态码:200, 时间:2024-09-16 10:48:25
23产生:首页,状态码:403, 时间:2024-09-16 10:48:26
24产生:异常url,状态码:200, 时间:2024-09-16 10:48:31
25产生:下单,状态码:200, 时间:2024-09-16 10:48:31
26接口状态码> ResultCount(url=/user/api/v1/favorite/save, code=403, count=2, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
27接口状态码> ResultCount(url=/pub/api/v1/web/index_card, code=200, count=2, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
28接口状态码> ResultCount(url=/pub/api/v1/web/user_info, code=403, count=1, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
29接口状态码> ResultCount(url=/pub/api/v1/web/page_video, code=200, count=1, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
30接口状态码> ResultCount(url=/pub/api/v1/web/index_card, code=502, count=1, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
31接口状态码> ResultCount(url=/pub/api/v1/web/index_card, code=403, count=1, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
32接口状态码> ResultCount(url=/user/api/v1/favorite/save, code=200, count=1, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
33接口状态码> ResultCount(url=/pub/api/v1/web/page_video, code=502, count=1, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
34接口状态码> ResultCount(url=/user/api/v1/favorite/save, code=502, count=1, startTime=2024-09-16 10:47:25, endTime=2024-09-16 10:48:25, type=null)
35产生:异常url,状态码:502, 时间:2024-09-16 10:48:33
36产生:分类列表,状态码:200, 时间:2024-09-16 10:48:31
37产生:下单,状态码:200, 时间:2024-09-16 10:48:34

CEP 接口监控告警

需求1分钟内超过3次则告警

CEP 乱序延迟问题

1Flink CEP会将事件进行缓存,在相应的watermark到底之后,Flink CEP将缓存中的事件按照事件时间进行升序排序,然后再进行的模式匹配。
2
3再迟到的数据可以用SideOutput(侧输出流)进行处理
4
5当watermark到达时,处理该缓冲区中事件时间小于watermark时间的所有数据。
6
7时间小于上次的watermark的时间就是迟到的数据,迟到的数据需要用侧输出流处理

SoulboyApiCodeCEPMonitorApp

 1import net.xdclass.util.TimeUtil;
 2import org.apache.commons.lang3.StringUtils;
 3import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 4import org.apache.flink.api.common.functions.AggregateFunction;
 5import org.apache.flink.api.common.functions.FilterFunction;
 6import org.apache.flink.api.java.functions.KeySelector;
 7import org.apache.flink.api.java.tuple.Tuple2;
 8import org.apache.flink.cep.CEP;
 9import org.apache.flink.cep.PatternSelectFunction;
10import org.apache.flink.cep.PatternStream;
11import org.apache.flink.cep.pattern.Pattern;
12import org.apache.flink.cep.pattern.conditions.SimpleCondition;
13import org.apache.flink.streaming.api.datastream.DataStreamSource;
14import org.apache.flink.streaming.api.datastream.KeyedStream;
15import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
16import org.apache.flink.streaming.api.datastream.WindowedStream;
17import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
18import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
19import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
20import org.apache.flink.streaming.api.windowing.time.Time;
21import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
22import org.apache.flink.util.Collector;
23import org.apache.flink.util.OutputTag;
24
25import java.time.Duration;
26import java.util.List;
27import java.util.Map;
28
29public class SoulboyApiCodeCEPMonitorApp {
30    public static void main(String[] args) throws Exception {
31        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
32        environment.setParallelism(1);
33        DataStreamSource<AccessLogDO> ds = environment.addSource(new AccessLogSource());
34
35        //过滤url: ""为不合法
36        SingleOutputStreamOperator<AccessLogDO> filterDS = ds.filter(new FilterFunction<AccessLogDO>() {
37            @Override
38            public boolean filter(AccessLogDO value) throws Exception {
39                return StringUtils.isNotBlank(value.getUrl());
40            }
41        });
42
43        //指定watermark(插线),指定eventtime的列
44        SingleOutputStreamOperator<AccessLogDO> watermarkDS = filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<AccessLogDO>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) -> {
45            //指定eventtime的列
46            return event.getCreateTime().getTime();
47        }));
48
49        //最后兜底数据
50        OutputTag<AccessLogDO> lateDate = new OutputTag<AccessLogDO>("lateDataLog") {
51        };
52
53        //多字段分组
54        //KeySelector<输入类型,分组key类型(Tuple2<url, state_code>)>
55        KeyedStream<AccessLogDO, Tuple2<String, Integer>> keyedStream = watermarkDS.keyBy(new KeySelector<AccessLogDO, Tuple2<String, Integer>>() {
56            @Override
57            public Tuple2<String, Integer> getKey(AccessLogDO value) throws Exception {
58                return Tuple2.of(value.getUrl(), value.getHttpCode());
59            }
60        });
61
62        //定义pattern: 60秒内3次错误则报警
63        Pattern<AccessLogDO, AccessLogDO> pattern = Pattern.<AccessLogDO>begin("errorCode").where(new SimpleCondition<AccessLogDO>() {
64            @Override
65            public boolean filter(AccessLogDO value) throws Exception {
66                return value.getHttpCode() != 200;
67            }
68        }).timesOrMore(3).within(Time.seconds(60));
69
70        //pattern匹配数据流(模式流)
71        PatternStream<AccessLogDO> patternStream = CEP.pattern(keyedStream, pattern);
72
73        //从模式流中获取结果
74        SingleOutputStreamOperator<ResultCount> CEPResult = patternStream.select(new PatternSelectFunction<AccessLogDO, ResultCount>() {
75            @Override
76            public ResultCount select(Map<String, List<AccessLogDO>> map) throws Exception {
77                //获取匹配数据
78                List<AccessLogDO> listErrorCode = map.get("errorCode");
79                AccessLogDO accessLogDO = listErrorCode.get(0);
80
81                ResultCount resultCount = new ResultCount();
82                resultCount.setUrl(accessLogDO.getUrl());
83                resultCount.setCount(Long.valueOf(listErrorCode.size()));
84                resultCount.setCode(accessLogDO.getHttpCode());
85                return resultCount;
86            }
87        });
88
89        //60秒内3次错误则报警
90        CEPResult.print("接口告警");
91        environment.execute("SoulboyMonitor");
92    }
93}

控制台输出

 1产生:首页,状态码:200, 时间:2024-09-16 11:18:05
 2产生:个人信息,状态码:403, 时间:2024-09-16 11:18:08
 3产生:个人信息,状态码:403, 时间:2024-09-16 11:18:11
 4产生:首页,状态码:200, 时间:2024-09-16 11:18:11
 5产生:首页,状态码:200, 时间:2024-09-16 11:18:12
 6产生:个人信息,状态码:403, 时间:2024-09-16 11:18:13
 7产生:个人信息,状态码:200, 时间:2024-09-16 11:18:11
 8产生:个人信息,状态码:403, 时间:2024-09-16 11:18:15
 9接口告警> ResultCount(url=/pub/api/v1/web/user_info, code=403, count=3, startTime=null, endTime=null, type=null)
10产生:首页,状态码:200, 时间:2024-09-16 11:18:13
11产生:首页,状态码:502, 时间:2024-09-16 11:18:16
12接口告警> ResultCount(url=/pub/api/v1/web/user_info, code=403, count=4, startTime=null, endTime=null, type=null)
13接口告警> ResultCount(url=/pub/api/v1/web/user_info, code=403, count=3, startTime=null, endTime=null, type=null)
14产生:首页,状态码:403, 时间:2024-09-16 11:18:14
15产生:首页,状态码:200, 时间:2024-09-16 11:18:16
16产生:首页,状态码:200, 时间:2024-09-16 11:18:17
17产生:首页,状态码:200, 时间:2024-09-16 11:18:21
18产生:个人信息,状态码:403, 时间:2024-09-16 11:18:20
19产生:个人信息,状态码:200, 时间:2024-09-16 11:18:21
20产生:个人信息,状态码:502, 时间:2024-09-16 11:18:21
21产生:个人信息,状态码:200, 时间:2024-09-16 11:18:23

      
      


作者:Soulboy