目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

Kafka

为什么要序列化?

  • 序列化:把对象转化为可传输的字节序列过程称为序列化。
  • 反序列化:把字节序列还原为对象的过程称为反序列化。

如果光看定义我想你很难一下子理解序列化的意义,那么我们可以从另一个角度来推导出什么是序列化, 那么究竟序列化的目的是什么?

其实序列化最终的目的是为了对象可以跨平台存储,和进行网络传输。而我们进行跨平台存储和网络传输的方式就是IO,而我们的IO支持的数据格式就是字节数组。

因为我们单方面的只把对象转成字节数组还不行,因为没有规则的字节数组我们是没办法把对象的本来面目还原回来的,所以我们必须在把对象转成字节数组的时候就制定一种规则(序列化),那么我们从IO流里面读出数据的时候再以这种规则把对象还原回来(反序列化)。

如果我们要把一栋房子从一个地方运输到另一个地方去,序列化就是我把房子拆成一个个的砖块放到车子里,然后留下一张房子原来结构的图纸,反序列化就是我们把房子运输到了目的地以后,根据图纸把一块块砖头还原成房子原来面目的过程。

MQ使用场景

跨平台 、多语言、分布式事务、最终一致性
RPC调用上下游对接,数据源变动->通知下属

  • 解耦:订单系统-》物流系统
  • 异步:用户注册-》发送邮件,初始化信息
  • 削峰:秒杀、日志处理

JMS

Java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口。

  • JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API
  • 是由Sun公司早期提出的消息标准,旨在为java应用提供统一的消息操作,包括create、send、receive
  • JMS是针对java的,那微软开发了NMS(.NET消息传递服务)

特性

  • 面向Java平台的标准消息传递API
  • 在Java或JVM语言比如Scala、Groovy中具有互用性
  • 无需担心底层协议
  • 有queues和topics两种消息传递模型
  • 支持事务、能够定义消息格式(消息头、属性和内容

常见概念

  • JMS提供者:连接面向消息中间件的,JMS接口的一个实现,RocketMQ,ActiveMQ,Kafka等等
  • JMS生产者(Message Producer):生产消息的服务
  • JMS消费者(Message Consumer):消费消息的服务
  • JMS消息:数据对象
  • JMS队列:存储待消费消息的区域
  • JMS主题:一种支持发送消息给多个订阅者的机制
  • JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)

基础编程模型

  • MQ中需要用的一些类
  • ConnectionFactory :连接工厂,JMS 用它创建连接
  • Connection :JMS 客户端到JMS Provider 的连接
  • Session: 一个发送或接收消息的线程
  • Destination :消息的目的地;消息发送给谁.
  • MessageConsumer / MessageProducer: 消息消费者,消息生产者

AMQP

JMS或者NMS都没有标准的底层协议,API是与编程语言绑定的,每个消息队列厂商就存在多种不同格式规范的产品,对使用者就产生了很多问题, AMQP解决了这个问题,它使用了一套标准的底层协议
AMQP(advanced message queuing protocol)在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题,就是是一种协议,兼容JMS
更准确说的链接协议 binary- wire-level-protocol 直接定义网络交换的数据格式,类似http

AMQP和JMS的主要区别

  • AMQP不从API层进行限定,直接定义网络交换的数据格式,这使得实现了AMQP的provider天然性就是跨平台
  • 比如Java语言产生的消息,可以用其他语言比如python的进行消费
  • AQMP可以用http来进行类比,不关心实现接口的语言,只要都按照相应的数据格式去发送报文请求,不同语言的client可以和不同语言的server进行通讯
  • JMS消息类型:TextMessage/ObjectMessage/StreamMessage等
  • AMQP消息类型:Byte[]

MQTT: 消息队列遥测传输(Message Queueing Telemetry Transport )

我们有面向基于Java的企业应用的JMS和面向所有其他应用需求的AMQP,那这个MQTT是做啥的?

  • 计算性能不高的设备不能适应AMQP上的复杂操作,MQTT它是专门为小设备设计的
  • MQTT主要是是物联网(IOT)中大量的使用

特性

  • 内存占用低,为小型无声设备之间通过低带宽发送短消息而设计
  • 不支持长周期存储和转发,不允许分段消息(很难发送长消息)
  • 支持主题发布-订阅、不支持事务(仅基本确认)
  • 消息实际上是短暂的(短周期)
  • 简单用户名和密码、不支持安全连接、消息不透明

主流消息队列和技术选型

ActiveMQ

  • Apache出品,历史悠久,支持多种语言的客户端和协议,支持多种语言Java, .NET, C++ 等
  • 基于JMS Provider的实现
  • 缺点:吞吐量不高,多队列的时候性能下降,存在消息丢失的情况,比较少大规模使用

Kafka
它提供了类似于JMS的特性,但是在设计实现上完全不同,它并不是JMS规范的实现,缺点:运维难度大,文档比较少, 需要掌握Scala。

  • 是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统(严格意义上是不属于队列产品,是一个流处理平台),它可以处理大规模的网站中的所有动作流数据(网页浏览,搜索和其他用户的行动),副本集机制,实现数据冗余,保障数据尽量不丢失;支持多个生产者和消费者
  • 类似MQ,功能较为简单,主要支持常规的MQ功能。

RocketMQ

阿里开源的一款的消息中间件,纯lava开发,具有高吞吐量高可用性、适合大规模分布式系统应用的特点。

  • 性能强劲(零拷贝技术),支持海量堆积,支持指定次数和时间间隔的失败消息重发,支持consumer端tag过滤、延迟消息等,在阿里内部进行大规模使用,适合在电商,互联网金融等领域基于IMS Provider的实现

RabbitMQ

  • 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、C、用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错
  • 缺点:使用Erlang开发,阅读和修改源码难度大

Kafka核心概念

Kafka是最初由Linkedin公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目,也是一个开源【分布式流处理平台】,由Scala和Java编写,(也当做MQ系统,但不是纯粹的消息系统),一种高吞吐量的分布式流处理平台,它可以处理消费者在网站中的所有动作流数据。 比如:网页浏览,搜索和其他用户的行为等,应用于大数据实时处理领域

  • Broker

    • Kafka的服务端程序,可以认为一个mq节点就是一个broker
    • broker存储topic的数据
  • Producer生产者

    • 创建消息Message,然后发布到MQ中
    • 该角色将消息发布到Kafka的topic中
  • Consumer消费者:

    • 消费队列里面的消息
  • ConsumerGroup消费者组:同个topic, 广播发送给不同的group,一个group中只有一个consumer可以消费此消息

  • Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,主题的意思

  • Partition分区:kafka数据存储的基本单元,topic中的数据分割为一个或多个partition,每个topic至少有一个partition,是有序的。

    • 一个Topic的多个partitions, 被分布在kafka集群中的多个server上
    • 消费者数量 <=小于或者等于Partition数量
  • Replication 副本(备胎)

    • 同个Partition会有多个副本replication ,多个副本的数据是一样的,当其他broker挂掉后,系统可以主动用副本提供服务
    • 默认每个topic的副本都是1(默认是没有副本,节省资源),也可以在创建topic的时候指定
    • 如果当前kafka集群只有3个broker节点,则replication-factor最大就是3了,如果创建副本为4,则会报错
  • ReplicationLeader、ReplicationFollower

    • Partition有多个副本,但只有一个replicationLeader负责该Partition和生产者消费者交互
    • ReplicationFollower只是做一个备份,从replicationLeader进行同步
  • ReplicationManager

    • 负责Broker所有分区副本信息,Replication 副本状态切换
  • offset

    • 每个consumer实例需要为他消费的partition维护一个记录自己消费到哪里的偏移offset
    • kafka把offset保存在消费端的消费者组里

架构图

特点总结

  • 多订阅者
    • 一个topic可以有一个或者多个订阅者
    • 每个订阅者都要有一个partition,所以订阅者数量要少于等于partition数量
  • 高吞吐量、低延迟: 每秒可以处理几十万条消息
  • 高并发:几千个客户端同时读写
  • 容错性:多副本、多分区,允许集群中节点失败,如果副本数据量为n,则可以n-1个节点失败
  • 扩展性强:支持热扩展

基于消费者组可以实现

  • 基于队列的模型:所有消费者都在同一消费者组里,每条消息只会被一个消费者处理
  • 基于发布订阅模型:消费者属于不同的消费者组,假如每个消费者都有自己的消费者组,这样kafka消息就能广播到所有消费者实例上

部署JDK8

### 解压
[root@localhost tmp]# mkdir -pv /usr/local/software
[root@localhost tmp]# tar -zxvf jdk-8u181-linux-x64.tar.gz
[root@localhost tmp]# mv jdk1.8.0_181 /usr/local/software/jdk1.8
[root@localhost tmp]# vim /etc/profile
JAVA_HOME=/usr/local/software/jdk1.8
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
[root@localhost tmp]# source /etc/profile
[root@localhost tmp]# java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)

部署Zookeeper

### 解压Zookeeper
[root@localhost tmp]# tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
[root@localhost tmp]# mv apache-zookeeper-3.7.0-bin /usr/local/software/zookeeper
[root@localhost tmp]# vim /usr/local/software/zookeeper/conf/zoo.cfg

### 启动Zookeeper (默认2181端口)
[root@localhost tmp]# bash /usr/local/software/zookeeper/bin/zkServer.sh start
[root@localhost ~]# tail -f /usr/local/software/zookeeper/logs/zookeeper_audit.log
[root@localhost tmp]# yum install -y lsof
[root@localhost tmp]# lsof -i:2181
COMMAND   PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    25786 root   57u  IPv6  88166      0t0  TCP *:eforward (LISTEN)
[root@localhost ~]# tail -f

部署Kafka

### 解压
[root@localhost tmp]# tar -zxvf kafka_2.13-2.8.0.tgz
[root@localhost tmp]# mv kafka_2.13-2.8.0 /usr/local/software/kafka

### 修改下面两个配置 ( listeners 配置的ip和advertised.listeners相同时启动kafka会报错)、listeners(内网Ip)、advertised.listeners(公网ip)
[root@localhost tmp]# vim /usr/local/software/kafka/config/server.properties
listeners=PLAINTEXT://192.168.10.61:9092
zookeeper.connect=192.168.10.61:2181

### 启动kafka
[root@localhost tmp]# bash /usr/local/software/kafka/bin/kafka-server-start.sh -daemon /usr/local/software/kafka/config/server.properties &
[root@localhost ~]# lsof -i:9092

### 停止kafka
[root@localhost tmp]# bash /usr/local/software/kafka/bin/kafka-server-stop.sh

### 创建topic
[root@localhost tmp]# cd /usr/local/software/kafka/bin/
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.10.61:2181 --replication-factor 1 --partitions 1 --topic xdclass-topic

### topic存放目录
[root@localhost ~]# ls /tmp/kafka-logs/xdclass-topic-0/
00000000000000000000.index  00000000000000000000.timeindex
00000000000000000000.log    leader-epoch-checkpoint

### 查看topic
[root@localhost bin]# ./kafka-topics.sh --list --zookeeper 192.168.10.61:2181
xdclass-topic

Kafka命令生产送消息、消费消息

### 创建topic
[root@localhost ~]# cd /usr/local/software/kafka/bin/
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.10.61:2181 --replication-factor 1 --partitions 1 --topic soulboy-topic

### 查看topic
[root@localhost bin]# ./kafka-topics.sh --list --zookeeper 192.168.10.61:2181
soulboy-topic

### 生产者发送消息
[root@localhost bin]#  ./kafka-console-producer.sh --broker-list 192.168.10.61:9092 --topic soulboy-topic
>111
>222

### 消费者消费消息 ( --from-beginning:会把主题中以往所有的数据都读取出来, 重启后会有这个重复消费,忽略偏移量)
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.61:9092 --from-beginning -topic soulboy-topic
111
222

### 删除topic
[root@localhost bin]# ./kafka-topics.sh --zookeeper 192.168.10.61:2181 --delete --topic soulboy-topic
Topic soulboy-topic is marked for deletion.

### 查看broker节点的指定topic状态信息
[root@localhost bin]# ./kafka-topics.sh --describe --zookeeper 192.168.10.61:2181 --topic xdclass-topic
Topic: xdclass-topic    TopicId: qZse3pJeRL6oYikgJ--V7w PartitionCount: 1   ReplicationFactor: 1    Configs:
        Topic: xdc

点对点模型

对一个消息而言,只会有一个消费者可以消费
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。 Queue支持存在多个消费者。

消费者组配置实现点对点消费模型

### 生产者
# 配置文件中默认的consumer-group已配置
[root@localhost bin]# grep "test-consumer" ../config/consumer.properties
group.id=test-consumer-group

# 创建topic,2个分区
[root@localhost bin]# ./kafka-topics.sh --create --bootstrap-server 192.168.10.61:9092 --replication-factor 1 --partitions 2 --topic t1

# 生产消息
[root@localhost bin]#  ./kafka-console-producer.sh --broker-list 192.168.10.61:9092 --topic t1
>1
>2
>3
>4
>5
>6

### 指定配置文件启动两个消费者
# 消费者1
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.61:9092 --from-beginning -topic t1 --consumer.config ../config/consumer.properties
2
5
6

# 消费者2
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.61:9092 --from-beginning -topic t1 --consumer.config ../config/consumer.properties
1
3
4

发布订阅模型

对指定topic进行广播
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。
和点对点方式不同,发布到topic的消息会被所有订阅者消费。

消费者组配置实现发布订阅消费模型
编辑消费者配置(确保group.id 不一样)

  • 编辑 config/consumer-1.properties
  • 编辑 config/consumer-2.properties
### 生产者
# 创建topic,t2,3个分区
[root@localhost bin]# ./kafka-topics.sh --create --bootstrap-server 192.168.10.61:9092 --replication-factor 1 --partitions 3 --topic t2

# 生产消息
[root@localhost bin]# ./kafka-console-producer.sh --broker-list 192.168.10.61:9092 --topic t2
>too young
>too simple
>sometimes naive
>

### 消费者
# 消费者1
[root@localhost bin]# cp ../config/consumer.properties ../config/consumer-1.properties
[root@localhost bin]# vim ../config/consumer.properties
[root@localhost bin]# vim ../config/consumer-1.properties
group.id=test-consumer-group-1
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.61:9092 --from-beginning -topic t2 --consumer.config ../config/consumer-1.properties
too young
too simple
sometimes naive

# 消费者2
[root@localhost bin]# cp ../config/consumer.properties ../config/consumer-2.properties
[root@localhost bin]# vim ../config/consumer-2.properties
group.id=test-consumer-group-2
[root@localhost bin]#  ./kafka-console-consumer.sh --bootstrap-server 192.168.10.61:9092 --from-beginning -topic t2 --consumer.config ../config/consumer-2.properties
too young
too simple
sometimes naive

# 消费者3
[root@localhost bin]# cp ../config/consumer.properties ../config/consumer-3.properties
[root@localhost bin]# vim ../config/consumer-3.properties
group.id=test-consumer-group-3
[root@localhost bin]#  ./kafka-console-consumer.sh --bootstrap-server 192.168.10.61:9092 --from-beginning -topic t2 --consumer.config ../config/consumer-3.properties
too young
too simple
sometimes naive

数据存储流程

Partition

topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列,是以文件夹的形式存储在具体Broker本机上。

### partition
[root@localhost bin]# ll /tmp/kafka-logs/
t1-0
t1-1
t2-0
t2-1
t2-2

###
[root@localhost bin]# ls /tmp/kafka-logs/t1-0
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
leader-epoch-checkpoint

LEO(LogEndOffset)
表示每个partition的log最后一条Message的位置。

LEO

HW(HighWatermark)

  • 表示partition各个replicas数据间同步且一致的offset位置,即表示allreplicas已经commit的位置
  • HW之前的数据才是Commit后的,对消费者才可见
  • ISR集合里面最小leo

HW

offset

  • 每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中
  • partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息
  • 可以认为offset是partition中Message的id

Segment

  • segment file 由2部分组成,分别为index file和data file(log file),
  • 两个文件是一一对应的,后缀”.index”和”.log”分别表示索引文件和数据文件
  • 命名规则:partition的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset+1
[root@localhost bin]# ls /tmp/kafka-logs/t1-0
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
leader-epoch-checkpoint

Kafka高效文件存储设计特点

  • Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
  • 通过索引信息可以快速定位message
  • producer生产数据,要写入到log文件中,写的过程中一直追加到文件末尾,为顺序写,官网数据表明。同样的磁盘,顺序写能到600M/S,而随机写只有100K/S

存储原理

核心API-Admin

SpringBoot整合kafka

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.4.0</version>
		</dependency>

创建topic

创建topic

package net.xdclass.xdclasskafka;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Properties;

public class KafkaAdminTest {

    private static final String TOPIC_NAME = "soulboy-topic";

    /**
     * 设置admin 客户端
     * @return
     */
    public static AdminClient initAdminClient(){
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.61:9092");
        AdminClient adminClient = AdminClient.create(properties);
        return adminClient;
    }

    /**
     * 创建 topic
     */
    @Test
    public void createTopic(){
        AdminClient adminClient = initAdminClient();
        //指定partitions、replication-factor数量:2分区、1副本
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 2, (short) 1);
        CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));

        //future等待创建,成功不会有任何报错,如果创建失败和超时会报错。
        try {
            createTopicsResult.all().get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(TOPIC_NAME + "创建成功!");
    }
}

查看topic

[root@localhost bin]# ./kafka-topics.sh --list --zookeeper 192.168.10.61:2181
soulboy-topic

查看broker节点topic信息状态

[root@localhost bin]# ./kafka-topics.sh --describe --zookeeper 192.168.10.61:2181 --topic soulboy-topic
Topic: soulboy-topic    TopicId: _5oMYXpmTIuUK1CRZbAhug PartitionCount: 2       ReplicationFactor: 1    Configs:
        Topic: soulboy-topic    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: soulboy-topic    Partition: 1    Leader: 0       Replicas: 0     Isr: 0

查看topic

/**
     * 查看topic
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void listTopic() throws ExecutionException, InterruptedException {
        AdminClient adminClient = initAdminClient();
        //获取所有topic(包含内部的topic)
        ListTopicsOptions options = new ListTopicsOptions();
        options.listInternal(true);
        //不传入参数只会查看用户创建的topic
        ListTopicsResult listTopicsResult = adminClient.listTopics(options);
        Set<String> topics = listTopicsResult.names().get();
        for (String topic : topics) {
            System.out.println(topic);
        }
    }

删除topic

/**
     * 删除 topic
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void delTopicTest() throws ExecutionException, InterruptedException {
        AdminClient adminClient = initAdminClient();
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("xdclass-topic", "t1", "t2"));
        deleteTopicsResult.all().get();
    }

查看topic的详细信息

/**
     * 查看指定 topic 详情
     */
    @Test
    public void getTopicDetial() throws ExecutionException, InterruptedException {
        AdminClient adminClient = initAdminClient();
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
        //<key,description>
        Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
        Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
        entries.stream().forEach((entry)-> System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue()));
        //name :soulboy-topic , desc: (name=soulboy-topic, internal=false, partitions=(partition=0, leader=192.168.10.61:9092 (id: 0 rack: null), replicas=192.168.10.61:9092 (id: 0 rack: null), isr=192.168.10.61:9092 (id: 0 rack: null)),(partition=1, leader=192.168.10.61:9092 (id: 0 rack: null), replicas=192.168.10.61:9092 (id: 0 rack: null), isr=192.168.10.61:9092 (id: 0 rack: null)), authorizedOperations=null)
    }

增加partition数量

注意:Kafka中的分区数只能增加不能减少,减少的话数据不知怎么处理

/**
     * 增加分区数量
     *
     * 如果当主题中的消息包含有key时(即key不为null),根据key来计算分区的行为就会有所影响消息顺序性
     *
     * 注意:Kafka中的分区数只能增加不能减少,减少的话数据不知怎么处理
     *
     * @throws Exception
     */
    @Test
    public  void increatePartitionsTest() throws Exception{
        //封装newPartitions
        Map<String, NewPartitions> infoMap = new HashMap<>();
        NewPartitions newPartitions = NewPartitions.increaseTo(3);
        infoMap.put(TOPIC_NAME, newPartitions);
        //增加partition数量
        AdminClient adminClient = initAdminClient();
        CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap);
        createPartitionsResult.all().get();
    }

生产者API详解

producer投递Broker策略

Kafka的客户端发送数据到服务器,不是来一条就发一条,会经过内存缓冲区(默认是16KB),通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集到的Batch缓冲区,再一次性发送到Broker上去的,这样性能才可能题高。

生产者发送到broker里面的流程是怎样的呢,一个 topic 有多个 partition分区,每个分区又有多个副本。

  • 如果指定Partition ID,则PR(ProducerRecord)被发送至指定Partition
  • 如果未指定Partition ID,但指定了Key, PR会按照hash(key)发送至对应Partition
  • 如果未指定Partition ID也没指定Key,PR会按照默认 round-robin轮训模式发送到每个Partition,消费者消费partition分区默认是range模式
  • 如果同时指定了Partition ID和Key, PR只会发送到指定的Partition (Key不起作用,代码逻辑决定)

注意:Partition有多个副本,但只有一个replicationLeader负责该Partition和生产者消费者交互 。

生产者常见配置

官方文档

# kafka地址,即broker地址
bootstrap.servers  

# 当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。
acks

# 请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性
retries

# 每个分区未发送消息总字节大小,单位:字节,超过设置的值就会提交数据到服务端,默认值是16KB
batch.size

# 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满,如果想减少请求的数量,可以设置大于0,即消息在缓冲区保留的时间,超过设置的值就会被提交到服务端。通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送减少请求,如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送
linger.ms 


# 用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器。会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了。buffer.memory要大于batch.size,否则会报申请内存不足的错误,不要超过物理内存,根据实际情况调整
buffer.memory

# key的序列化器,将用户提供的key和value对象ProducerRecord进行序列化处理,key.serializer必须被设置,即使消息中没有指定key,序列化器必须是一个实现org.apache.kafka.common.serialization.Serializer接口的类,将#key序列化成字节数组。
key.serializer
value.serializer

producerAPI之发送信息

package net.xdclass.xdclasskafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.jupiter.api.Test;

import java.time.LocalDateTime;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaProducerTest {
    private static final String TOPIC_NAME = "soulboy-topic";

    /**
     * 封装配置属性
     * @return
     */
    public static Properties getProperties(){

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.10.61:9092");
        //props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.74.55.160:9092");

        // 当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。
        props.put("acks", "all");
        //props.put(ProducerConfig.ACKS_CONFIG, "all");

        // 请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性
        props.put("retries", 0);
        //props.put(ProducerConfig.RETRIES_CONFIG, 0);

        // 生产者缓存每个分区未发送的消息,缓存的大小是通过 batch.size 配置指定的,默认值是16KB
        props.put("batch.size", 16384);

        /**
         * 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满
         * 如果想减少请求的数量,可以设置 linger.ms 大于0,即消息在缓冲区保留的时间,超过设置的值就会被提交到 服务端
         * 通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送 减少请求
         * 如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送
         */
        props.put("linger.ms", 5);

        /**
         * buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
         * 如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到             Kafka服务器
         * 会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了
         * buffer.memory要大于batch.size,否则会报申请内存不#足的错误,不要超过物理内存,根据实际情况调整
         * 需要结合实际业务情况压测进行配置
         */
        props.put("buffer.memory", 33554432);

        /**
         * key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被          设置,
         * 即使消息中没有指定key,序列化器必须是一个实
         org.apache.kafka.common.serialization.Serializer接口的类,
         * 将key序列化成字节数组。
         */
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        return props;
    }


    /**
     * send()方法是异步的,添加消息到缓冲区等待发送,并立即返回
     * 生产者将单个的消息批量在一起发送来提高效率,即 batch.size和linger.ms结合
     *
     * 实现同步发送:一条消息发送之后,会阻塞当前线程,直至返回 ack
     * 发送消息后返回的一个 Future 对象,调用get即可
     *
     * 消息发送主要是两个线程:一个是Main用户主线程,一个是Sender线程
     *  1)main线程发送消息到RecordAccumulator即返回
     *  2)sender线程从RecordAccumulator拉取信息发送到broker
     *  3) batch.size和linger.ms两个参数可以影响 sender 线程发送次数
     */
    @Test
    public void testSend(){
        Properties props = getProperties();
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 1; i < 3; i++){
            Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME, "soulboy-key"+i, "soulboy-value"+i));
            try {
                //不关心是否发送成功,则不需要这行
                RecordMetadata recordMetadata = future.get();
                //格式:topic-分区编号@offset    soulboy-topic-0@0
                System.out.println("发送状态:"+recordMetadata.toString());

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println(i+"发送:"+ LocalDateTime.now().toString());
        }
        producer.close();
    }

}

ProducerRecord详解

ProducerRecord(简称PR),发送给Kafka Broker的key/value 值对, 封装基础数据信息。

-- Topic (名字)
-- PartitionID (可选)
-- Key(可选)
-- Value

ProducerRecord

key默认是null,大多数应用程序会用到key

  • 如果key为空,kafka使用默认的partitioner,使用RoundRobin算法将消息均衡地分布在各个partition上
  • 如果key不为空,kafka使用自己实现的hash方法对key进行散列,决定消息该被写到Topic的哪个partition,拥有相同key的消息会被写到同一个partition,实现顺序消息

producerAPI回调函数

生产者发送消息是异步调用,怎么知道是否有异常?

  • 发送消息配置回调函数即可, 该回调方法会在 Producer 收到 ack 时被调用,为异步调用
  • 回调函数有两个参数 RecordMetadata 和 Exception,如果 Exception 是 null,则消息发送成功,否则失败

异步发送配置回调函数

/**
     * send()方法是异步的,添加消息到缓冲区等待发送,并立即返回
     * 生产者将单个的消息批量在一起发送来提高效率,即 batch.size和linger.ms结合
     *
     * 实现同步发送:一条消息发送之后,会阻塞当前线程,直至返回 ack
     * 发送消息后返回的一个 Future 对象,调用get即可
     *
     * 消息发送主要是两个线程:一个是Main用户主线程,一个是Sender线程
     *  1)main线程发送消息到RecordAccumulator即返回
     *  2)sender线程从RecordAccumulator拉取信息发送到broker
     *  3) batch.size和linger.ms两个参数可以影响 sender 线程发送次数
     */
    @Test
    public void testSendWithCallback(){
        Properties props = getProperties();
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 1; i < 3; i++) {
            producer.send(new ProducerRecord<>(TOPIC_NAME, "soulboy-key" + i, "soulboy-value" + i), new Callback(){
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("发送状态:" + metadata.toString());
                    } else {
                        exception.printStackTrace();//记录日常
                    }
                }
            });
            System.out.println(i+"发送:"+LocalDateTime.now().toString());
            //发送状态:soulboy-topic-0@2
        }
        producer.close();
    }

producer生产者发送指定分区

创建topic,配置5个分区,1个副本

private static final String TOPIC_NAME = "soulboy-v2-topic-test";

/**
     * 创建 topic
     */
    @Test
    public void createTopic(){
        AdminClient adminClient = initAdminClient();
        //指定partitions、replication-factor数量:2分区、1副本
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 5, (short) 1);
        CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));

        //future等待创建,成功不会有任何报错,如果创建失败和超时会报错。
        try {
            createTopicsResult.all().get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(TOPIC_NAME + "创建成功!");
    }

发送消息到指定分区

@Test
    public void testSendWithCallbackAndPartition(){
        Properties props = getProperties();
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++){
            //发送指定分区3 (分区从0开始)
            producer.send(new ProducerRecord<>("soulboy-v2-topic-test",3, "soulboy-key" + i, "soulboy-value" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("发送状态:"+metadata.toString());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
            System.out.println(i+"发送:"+LocalDateTime.now().toString());
        }
        producer.close();
    }

输出打印

发送状态:soulboy-v2-topic-test-3@0
发送状态:soulboy-v2-topic-test-3@1
发送状态:soulboy-v2-topic-test-3@2
发送状态:soulboy-v2-topic-test-3@3
发送状态:soulboy-v2-topic-test-3@4
发送状态:soulboy-v2-topic-test-3@5
发送状态:soulboy-v2-topic-test-3@6
发送状态:soulboy-v2-topic-test-3@7
发送状态:soulboy-v2-topic-test-3@8
发送状态:soulboy-v2-topic-test-3@9

自定义partition分区规则

创建类,实现Partitioner接口,重写方法

package net.xdclass.xdclasskafka.config;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

public class SoulboyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        //key 为空则抛出异常
        if (keyBytes == null) {
            throw new IllegalArgumentException("key 参数不能为空");
        }

        // 如果key是soulboy-v2-topic-test则分配至0分区
        if ("soulboy".equals(key)) {
            return 0;
        }

        //默认分区规则
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //使用hash值取模,确定分区(默认的也是这个方式)
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

配置 partitioner.class 指定类即可

/**
     * 自定义分区规则
     */
    @Test
    public void testSendWithPartitionStrategy(){
        Properties props = getProperties();
        //配置 partitioner.class 指定类即可
        props.put("partitioner.class", "net.xdclass.xdclasskafka.config.SoulboyPartitioner");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++){
            //没有指定partition,指定了key
            producer.send(new ProducerRecord<>("soulboy-v2-topic-test","soulboy", "soulboy-value" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("发送状态:"+metadata.toString());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
            System.out.println(i+"发送:"+LocalDateTime.now().toString());
        }
        producer.close();
    }

打印输出

发送状态:soulboy-v2-topic-test-0@3
发送状态:soulboy-v2-topic-test-0@4
发送状态:soulboy-v2-topic-test-0@5
发送状态:soulboy-v2-topic-test-0@6
发送状态:soulboy-v2-topic-test-0@7
发送状态:soulboy-v2-topic-test-0@8
发送状态:soulboy-v2-topic-test-0@9
发送状态:soulboy-v2-topic-test-0@10
发送状态:soulboy-v2-topic-test-0@11
发送状态:soulboy-v2-topic-test-0@12

消费者API 详解

Consumer消费者机制

消费者根据什么模式从broker获取数据的?为什么是pull模式,而不是broker主动push?

  • pull 模式则可以根据 consumer 的消费能力进行自己调整,不同的消费者性能不一样。如果broker没有数据,consumer可以配置 timeout 时间,阻塞等待一段时间之后再返回
  • 如果是broker主动push,优点是可以快速处理消息,但是容易造成消费者处理不过来,消息堆积和延迟。

分区策略

消费者从哪个分区进行消费?一个 topic 有多个 partition,一个消费者组里面有多个消费者,那是怎么分配?

  • 一个主题topic可以有多个消费者,因为里面有多个partition分区 ( leader分区)
  • 一个partition leader可以由一个消费者组中的一个消费者进行消费
  • 一个 topic 有多个 partition,所以有多个partition leader,给多个消费者消费,那分配策略如何?

消费者从哪个分区进行消费?两个策略

顶层接口

org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor

round-robin (RoundRobinAssignor非默认策略)轮训

【按照消费者组】进行轮训分配,同个消费者组监听不同主题也一样,是把所有的 partition 和所有的 consumer 都列出来, 所以消费者组里面订阅的主题是一样的才行,主题不一样则会出现分配不均问题,例如7个分区,同组内2个消费者

  • topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5/topic-p6
  • c-1: topic-p0/topic-p2/topic-p4/topic-p6
  • c-2:topic-p1/topic-p3/topic-p5

弊端
如果同一消费者组内,所订阅的消息是不相同的,在执行分区分配的时候不是轮询分配,可能会导致分区分配的不均匀

  • 有3个消费者C0、C1和C2,他们共订阅了 3 个主题:t0、t1 和 t2
  • t0有1个分区(p0),t1有2个分区(p0、p1),t2有3个分区(p0、p1、p2))
  • 消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2

消费不均

range (RangeAssignor 默认策略)范围

【按照主题】进行分配,如果不平均分配,则第一个消费者会分配比较多分区, 一个消费者监听不同主题也不影响,例如7个分区,同组内2个消费者。

  • topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5//topic-p6
  • c-1: topic-p0/topic-p1/topic-p2/topic-p3
  • c-2:topic-p4/topic-p5/topic-p6

弊端

  • 只是针对 1 个 topic 而言,c-1多消费一个分区影响不大
  • 如果有 N 多个 topic,那么针对每个 topic,消费者 C-1 都将多消费 1 个分区,topic越多则消费的分区也越多,则性能有所下降

范围分区

Consumer重新分配策略

什么是Rebalance操作

  • kafka 怎么均匀地分配某个 topic 下的所有 partition 到各个消费者,从而使得消息的消费速度达到最快,这就是平衡(balance),前面讲了 Range 范围分区 和 RoundRobin 轮询分区,也支持自定义分区策略。
  • rebalance(重平衡)其实就是重新进行 partition 的分配,从而使得 partition 的分配重新达到平衡状态

例如70个分区,10个消费者,但是先启动一个消费者,后续再启动一个消费者,这个会怎么分配? Kafka 会进行一次分区分配操作,即 Kafka 消费者端的 Rebalance 操作 ,下面都会发生rebalance操作

  • 当消费者组内的消费者数量发生变化(增加或者减少),就会产生重新分配patition
  • 分区数量发生变化时(即 topic 的分区数量发生变化时)

当消费者在消费过程突然宕机了,重新恢复后是从哪里消费,会有什么问题?

  • 消费者会记录offset,故障恢复后从这里继续消费

这个offset记录在哪里?

  • 记录在zk里面和本地,新版默认将offset保证在kafka的内置topic中,名称是 __consumer_offsets
  • 该Topic默认有50个Partition,每个Partition有3个副本,分区数量由参数offset.topic.num.partition配置
  • 通过groupId的哈希值和该参数取模的方式来确定某个消费者组已消费的offset保存到__consumer_offsets主题的哪个分区中
  • 由消费者组名+主题+分区,确定唯一的offset的key,从而获取对应的值
  • 三元组:group.id+topic+分区号 ,而 value 就是 offset 的值
[root@localhost bin]# ./kafka-topics.sh --describe --zookeeper 192.168.10.61:2181 --topic __consumer_offsets
Topic: __consumer_offsets       TopicId: vu-VvDjrSJGLs5AM-ERgrg PartitionCount: 50     ReplicationFactor: 1    Configs: compression.type=producer,cleanup.policy=compact,segment.bytes=104857600
        Topic: __consumer_offsets       Partition: 47   Leader: 0       Replicas: 0    Isr: 0
        Topic: __consumer_offsets       Partition: 48   Leader: 0       Replicas: 0    Isr: 0
        Topic: __consumer_offsets       Partition: 49   Leader: 0       Replicas: 0    Isr: 0

网络抖动的情况下会存在重复消费,应该怎么解决?

  • 同一个消息,在消费者端可以进行幂等性处理。可以使用数据库,表中的某个字段messageid,并设置成唯一索引,消费消息时,插入消息的唯一id到表中,每次进行消费都插入,如果插入失败则抛出异常,避免重复消费。
  • 也可以使用redis的key/value避免重复消费。

springboot关闭kafka调试日志

application.yml

#yml配置文件修改
logging:
  config: classpath:logback.xml

src/main/resources/logback.xml

<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <!-- 格式化输出: %d表示日期, %thread表示线程名, %-5level: 级别从左显示5个字符宽度 %msg:日志消息, %n是换行符 -->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n</pattern>
        </encoder>
    </appender>

    <root level="info">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>

Consumer配置

消费者组可以实现广播(发布订阅消费模式)、点对点消费模式。

消费者配置

# 消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
group.id

# 为true则自动提交偏移量(消费者主动存到)
enable.auto.commit

# 自动提交offset周期
auto.commit.interval.ms

# 重置消费偏移量策略,消费者在读取一个没有偏移量的分区或者偏移量无效情况下(因消费者长时间失效、包含偏移量的记录已经过时并被删除)该如何处理。默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更 才可以
auto.offset.reset

# 序列化器
key.deserializer

Consumer消费消息配置

Consumer消费消息

package net.xdclass.xdclasskafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerTest {
    /**
     * 获取kafka消费者配置信息
     * @return
     */
    public static Properties getProperties() {
        Properties props = new Properties();

        //broker地址
        props.put("bootstrap.servers", "192.168.10.61:9092");

        //消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
        props.put("group.id", "soulboy-g1");

        //开启自动提交offset
        props.put("enable.auto.commit", "true");

        //自动提交offset延迟时间  1000毫秒 1秒
        props.put("auto.commit.interval.ms", "1000");

        //反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        return props;
    }

    /**
     *  简单消费者测试
     */
    @Test
    public void simpleConsumerTest() {
        //获取kafka配置
        Properties properties = getProperties();

        //创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        //订阅主题
        kafkaConsumer.subscribe(Arrays.asList("soulboy-v2-topic-test"));
        while (true) {
            //拉取消息,阻塞超时100毫秒(队列中没有消息就阻塞100毫秒)
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.err.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s %n",record.topic(),record.partition(),record.offset(),record.key(),record.value() );
            }
        }
    }

}

控制台输出

topic=soulboy-v2-topic-test, partition=4, offset=42, key=soulboy1, value=soulboy-value0 
topic=soulboy-v2-topic-test, partition=4, offset=43, key=soulboy1, value=soulboy-value1 
topic=soulboy-v2-topic-test, partition=4, offset=44, key=soulboy1, value=soulboy-value2 
topic=soulboy-v2-topic-test, partition=4, offset=45, key=soulboy1, value=soulboy-value3 
topic=soulboy-v2-topic-test, partition=4, offset=46, key=soulboy1, value=soulboy-value4 
topic=soulboy-v2-topic-test, partition=4, offset=47, key=soulboy1, value=soulboy-value5 
topic=soulboy-v2-topic-test, partition=4, offset=48, key=soulboy1, value=soulboy-value6 
topic=soulboy-v2-topic-test, partition=4, offset=49, key=soulboy1, value=soulboy-value7 
topic=soulboy-v2-topic-test, partition=4, offset=50, key=soulboy1, value=soulboy-value8 
topic=soulboy-v2-topic-test, partition=4, offset=51, key=soulboy1, value=soulboy-value9

Consumer从头消费配置

  • auto.offset.reset 配置策略即可
  • 默认是latest,需要改为 earliest 且消费者组名变更 ,即可实现从头消费
/**
     *  简单消费者测试
     */
    @Test
    public void simpleConsumerTest() {
        //获取kafka配置
        Properties properties = getProperties();

        //默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效 props.put("group.id", "soulboy-g2");
        properties.put("auto.offset.reset","earliest");

        //创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        //订阅主题
        kafkaConsumer.subscribe(Arrays.asList("soulboy-v2-topic-test"));
        while (true) {
            //拉取消息,阻塞超时100毫秒(队列中没有消息就阻塞100毫秒)
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.err.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s %n",record.topic(),record.partition(),record.offset(),record.key(),record.value() );
            }
        }
    }

输出结果 (旧的消息被重新消费)

topic=soulboy-v2-topic-test, partition=2, offset=0, key=soulboy-key1, value=soulboy-value1 
topic=soulboy-v2-topic-test, partition=2, offset=1, key=soulboy-key7, value=soulboy-value7 
topic=soulboy-v2-topic-test, partition=2, offset=2, key=soulboy-key9, value=soulboy-value9 
topic=soulboy-v2-topic-test, partition=2, offset=3, key=soulboy1, value=soulboy-value0 
topic=soulboy-v2-topic-test, partition=2, offset=4, key=soulboy1, value=soulboy-value1 
topic=soulboy-v2-topic-test, partition=2, offset=5, key=soulboy1, value=soulboy-value2 
topic=soulboy-v2-topic-test, partition=2, offset=6, key=soulboy1, value=soulboy-value3 
topic=soulboy-v2-topic-test, partition=2, offset=7, key=soulboy1, value=soulboy-value4 
topic=soulboy-v2-topic-test, partition=2, offset=8, key=soulboy1, value=soulboy-value5 
topic=soulboy-v2-topic-test, partition=2, offset=9, key=soulboy1, value=soulboy-value6 
topic=soulboy-v2-topic-test, partition=2, offset=10, key=soulboy1, value=soulboy-value7 
topic=soulboy-v2-topic-test, partition=2, offset=11, key=soulboy1, value=soulboy-value8 
topic=soulboy-v2-topic-test, partition=2, offset=12, key=soulboy1, value=soulboy-value9 
topic=soulboy-v2-topic-test, partition=1, offset=0, key=soulboy-key2, value=soulboy-value2 
topic=soulboy-v2-topic-test, partition=1, offset=1, key=soulboy-key3, value=soulboy-value3 
topic=soulboy-v2-topic-test, partition=0, offset=0, key=soulboy-key4, value=soulboy-value4 
topic=soulboy-v2-topic-test, partition=0, offset=1, key=soulboy-key5, value=soulboy-value5 
topic=soulboy-v2-topic-test, partition=0, offset=2, key=soulboy-key8, value=soulboy-value8 
topic=soulboy-v2-topic-test, partition=0, offset=3, key=soulboy, value=soulboy-value0 
topic=soulboy-v2-topic-test, partition=0, offset=4, key=soulboy, value=soulboy-value1 
topic=soulboy-v2-topic-test, partition=0, offset=5, key=soulboy, value=soulboy-value2 
topic=soulboy-v2-topic-test, partition=0, offset=6, key=soulboy, value=soulboy-value3 
topic=soulboy-v2-topic-test, partition=0, offset=7, key=soulboy, value=soulboy-value4 
topic=soulboy-v2-topic-test, partition=0, offset=8, key=soulboy, value=soulboy-value5 
topic=soulboy-v2-topic-test, partition=0, offset=9, key=soulboy, value=soulboy-value6 
topic=soulboy-v2-topic-test, partition=0, offset=10, key=soulboy, value=soulboy-value7 
topic=soulboy-v2-topic-test, partition=0, offset=11, key=soulboy, value=soulboy-value8 
topic=soulboy-v2-topic-test, partition=0, offset=12, key=soulboy, value=soulboy-value9 
topic=soulboy-v2-topic-test, partition=4, offset=0, key=soulboy-key0, value=soulboy-value0 
topic=soulboy-v2-topic-test, partition=4, offset=1, key=soulboy-key6, value=soulboy-value6 
topic=soulboy-v2-topic-test, partition=4, offset=2, key=soulboy1, value=soulboy-value0 
topic=soulboy-v2-topic-test, partition=4, offset=3, key=soulboy1, value=soulboy-value1 
topic=soulboy-v2-topic-test, partition=4, offset=4, key=soulboy1, value=soulboy-value2 
topic=soulboy-v2-topic-test, partition=4, offset=5, key=soulboy1, value=soulboy-value3 
topic=soulboy-v2-topic-test, partition=4, offset=6, key=soulboy1, value=soulboy-value4 
topic=soulboy-v2-topic-test, partition=4, offset=7, key=soulboy1, value=soulboy-value5 
topic=soulboy-v2-topic-test, partition=4, offset=8, key=soulboy1, value=soulboy-value6 
topic=soulboy-v2-topic-test, partition=4, offset=9, key=soulboy1, value=soulboy-value7 
topic=soulboy-v2-topic-test, partition=4, offset=10, key=soulboy1, value=soulboy-value8 
topic=soulboy-v2-topic-test, partition=4, offset=11, key=soulboy1, value=soulboy-value9 
topic=soulboy-v2-topic-test, partition=4, offset=12, key=soulboy1, value=soulboy-value0 
topic=soulboy-v2-topic-test, partition=4, offset=13, key=soulboy1, value=soulboy-value1 
topic=soulboy-v2-topic-test, partition=4, offset=14, key=soulboy1, value=soulboy-value2 
topic=soulboy-v2-topic-test, partition=4, offset=15, key=soulboy1, value=soulboy-value3 
topic=soulboy-v2-topic-test, partition=4, offset=16, key=soulboy1, value=soulboy-value4 
topic=soulboy-v2-topic-test, partition=4, offset=17, key=soulboy1, value=soulboy-value5 
topic=soulboy-v2-topic-test, partition=4, offset=18, key=soulboy1, value=soulboy-value6 
topic=soulboy-v2-topic-test, partition=4, offset=19, key=soulboy1, value=soulboy-value7 
topic=soulboy-v2-topic-test, partition=4, offset=20, key=soulboy1, value=soulboy-value8 
topic=soulboy-v2-topic-test, partition=4, offset=21, key=soulboy1, value=soulboy-value9 
topic=soulboy-v2-topic-test, partition=4, offset=22, key=soulboy1, value=soulboy-value0 
topic=soulboy-v2-topic-test, partition=4, offset=23, key=soulboy1, value=soulboy-value1 
topic=soulboy-v2-topic-test, partition=4, offset=24, key=soulboy1, value=soulboy-value2 
topic=soulboy-v2-topic-test, partition=4, offset=25, key=soulboy1, value=soulboy-value3 
topic=soulboy-v2-topic-test, partition=4, offset=26, key=soulboy1, value=soulboy-value4 
topic=soulboy-v2-topic-test, partition=4, offset=27, key=soulboy1, value=soulboy-value5 
topic=soulboy-v2-topic-test, partition=4, offset=28, key=soulboy1, value=soulboy-value6 
topic=soulboy-v2-topic-test, partition=4, offset=29, key=soulboy1, value=soulboy-value7 
topic=soulboy-v2-topic-test, partition=4, offset=30, key=soulboy1, value=soulboy-value8 
topic=soulboy-v2-topic-test, partition=4, offset=31, key=soulboy1, value=soulboy-value9 
topic=soulboy-v2-topic-test, partition=4, offset=32, key=soulboy1, value=soulboy-value0 
topic=soulboy-v2-topic-test, partition=4, offset=33, key=soulboy1, value=soulboy-value1 
topic=soulboy-v2-topic-test, partition=4, offset=34, key=soulboy1, value=soulboy-value2 
topic=soulboy-v2-topic-test, partition=4, offset=35, key=soulboy1, value=soulboy-value3 
topic=soulboy-v2-topic-test, partition=4, offset=36, key=soulboy1, value=soulboy-value4 
topic=soulboy-v2-topic-test, partition=4, offset=37, key=soulboy1, value=soulboy-value5 
topic=soulboy-v2-topic-test, partition=4, offset=38, key=soulboy1, value=soulboy-value6 
topic=soulboy-v2-topic-test, partition=4, offset=39, key=soulboy1, value=soulboy-value7 
topic=soulboy-v2-topic-test, partition=4, offset=40, key=soulboy1, value=soulboy-value8 
topic=soulboy-v2-topic-test, partition=4, offset=41, key=soulboy1, value=soulboy-value9 
topic=soulboy-v2-topic-test, partition=4, offset=42, key=soulboy1, value=soulboy-value0 
topic=soulboy-v2-topic-test, partition=4, offset=43, key=soulboy1, value=soulboy-value1 
topic=soulboy-v2-topic-test, partition=4, offset=44, key=soulboy1, value=soulboy-value2 
topic=soulboy-v2-topic-test, partition=4, offset=45, key=soulboy1, value=soulboy-value3 
topic=soulboy-v2-topic-test, partition=4, offset=46, key=soulboy1, value=soulboy-value4 
topic=soulboy-v2-topic-test, partition=4, offset=47, key=soulboy1, value=soulboy-value5 
topic=soulboy-v2-topic-test, partition=4, offset=48, key=soulboy1, value=soulboy-value6 
topic=soulboy-v2-topic-test, partition=4, offset=49, key=soulboy1, value=soulboy-value7 
topic=soulboy-v2-topic-test, partition=4, offset=50, key=soulboy1, value=soulboy-value8 
topic=soulboy-v2-topic-test, partition=4, offset=51, key=soulboy1, value=soulboy-value9 
topic=soulboy-v2-topic-test, partition=3, offset=0, key=soulboy-key0, value=soulboy-value0 
topic=soulboy-v2-topic-test, partition=3, offset=1, key=soulboy-key1, value=soulboy-value1 
topic=soulboy-v2-topic-test, partition=3, offset=2, key=soulboy-key2, value=soulboy-value2 
topic=soulboy-v2-topic-test, partition=3, offset=3, key=soulboy-key3, value=soulboy-value3 
topic=soulboy-v2-topic-test, partition=3, offset=4, key=soulboy-key4, value=soulboy-value4 
topic=soulboy-v2-topic-test, partition=3, offset=5, key=soulboy-key5, value=soulboy-value5 
topic=soulboy-v2-topic-test, partition=3, offset=6, key=soulboy-key6, value=soulboy-value6 
topic=soulboy-v2-topic-test, partition=3, offset=7, key=soulboy-key7, value=soulboy-value7 
topic=soulboy-v2-topic-test, partition=3, offset=8, key=soulboy-key8, value=soulboy-value8 
topic=soulboy-v2-topic-test, partition=3, offset=9, key=soulboy-key9, value=soulboy-value9

Consumer手工提交offset配置

自动提交offset问题

  • 没法控制消息是否正常被消费(下单失败,当然不能提交offset)
  • 适合不严谨的场景,比如日志收集发送

手工提交offset配置和测试

手工提交offset

  • 初次启动消费者会请求broker获取当前消费的offset值

手工提交offset

  • 同步 commitSync 阻塞当前线程 (自动失败重试)
  • 异步 commitAsync 不会阻塞当前线程 (没有失败重试,回调callback函数获取提交信息,记录日志)

关闭自动提交offset、改为手动异步提交offset

package net.xdclass.xdclasskafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

public class KafkaConsumerTest {
    /**
     * 获取kafka消费者配置信息
     * @return
     */
    public static Properties getProperties() {
        Properties props = new Properties();

        //broker地址
        props.put("bootstrap.servers", "192.168.10.61:9092");

        //消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
        props.put("group.id", "soulboy-g5");

        //开启自动提交offset
        //props.put("enable.auto.commit", "true");
        props.put("enable.auto.commit", "false");

        //自动提交offset延迟时间  1000毫秒 1秒
        //props.put("auto.commit.interval.ms", "1000");

        //反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        return props;
    }

    /**
     *  简单消费者测试
     */
    @Test
    public void simpleConsumerTest() {
        //获取kafka配置
        Properties properties = getProperties();
        //默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效 props.put("group.id", "soulboy-g2");
        //properties.put("auto.offset.reset","earliest");

        //创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        //订阅主题
        kafkaConsumer.subscribe(Arrays.asList("soulboy-v2-topic-test"));
        while (true) {
            //拉取消息,阻塞超时100毫秒(队列中没有消息就阻塞100毫秒)
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.err.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s %n",record.topic(),record.partition(),record.offset(),record.key(),record.value() );
            }
            //非空才做异步提交
            if (!records.isEmpty()) {
                //手动提交offset
                //同步阻塞提交
                //kafkaConsumer.commitSync();
                //异步提交
                kafkaConsumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        //如果没有异常,代表提交成功
                        if (exception == null) {
                            System.err.println("手动提交offset成功:" + offsets.toString());
                        } else {
                            System.err.println("手动提交offset失败:" + offsets.toString());
                        }
                    }
                });
            }
        }
    }

}

控制台输出

topic=soulboy-v2-topic-test, partition=4, offset=102, key=soulboy1, value=soulboy-value0 
topic=soulboy-v2-topic-test, partition=4, offset=103, key=soulboy1, value=soulboy-value1 
topic=soulboy-v2-topic-test, partition=4, offset=104, key=soulboy1, value=soulboy-value2 
topic=soulboy-v2-topic-test, partition=4, offset=105, key=soulboy1, value=soulboy-value3 
topic=soulboy-v2-topic-test, partition=4, offset=106, key=soulboy1, value=soulboy-value4 
topic=soulboy-v2-topic-test, partition=4, offset=107, key=soulboy1, value=soulboy-value5 
topic=soulboy-v2-topic-test, partition=4, offset=108, key=soulboy1, value=soulboy-value6 
topic=soulboy-v2-topic-test, partition=4, offset=109, key=soulboy1, value=soulboy-value7 
topic=soulboy-v2-topic-test, partition=4, offset=110, key=soulboy1, value=soulboy-value8 
topic=soulboy-v2-topic-test, partition=4, offset=111, key=soulboy1, value=soulboy-value9 
手动提交offset成功:{soulboy-v2-topic-test-2=OffsetAndMetadata{offset=13, leaderEpoch=null, metadata=''}, soulboy-v2-topic-test-1=OffsetAndMetadata{offset=2, leaderEpoch=null, metadata=''}, soulboy-v2-topic-test-0=OffsetAndMetadata{offset=13, leaderEpoch=null, metadata=''}, soulboy-v2-topic-test-4=OffsetAndMetadata{offset=112, leaderEpoch=0, metadata=''}, soulboy-v2-topic-test-3=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}}

CAP

    CAP定理: 指的是在一个分布式系统中,Consistency(一致性)Availability(可用性)Partition tolerance(分区容错性),三者不可同时获得。

  • 一致性(C):所有节点都可以访问到最新的数据;锁定其他节点,不一致之前不可读
  • 可用性(A):每个请求都是可以得到响应的,不管请求是成功还是失败;被节点锁定后 无法响应
  • 分区容错性(P):除了全部整体网络故障,其他故障都不能导致整个系统不可用,;节点间通信可能失败,无法避免

CAP

CAP理论就是说在分布式存储系统中,最多只能实现上面的两点。而由于当前的网络硬件肯定会出现延迟丢包等问题,所以分区容忍性是我们必须需要实现的。所以我们只能在一致性和可用性之间进行权衡。

CA
如果不要求P(不允许分区),则C(强一致性)和A(可用性)是可以保证的。但放弃P的同时也就意味着放弃了系统的扩展性,也就是分布式节点受限,没办法部署子节点,这是违背分布式系统设计的初衷的。

CP(偏金融)
如果不要求A(可用),每个请求都需要在服务器之间保持强一致,而P(分区)会导致同步时间无限延长(也就是等待数据同步完才能正常访问服务),一旦发生网络故障或者消息丢失等情况,就要牺牲用户的体验,等待所有数据全部一致了之后再让用户访问系统。

AP(偏互联网)
要高可用并允许分区,则需放弃一致性。一旦分区发生,节点之间可能会失去联系,为了高可用,每个节点只能用本地数据提供服务,而这样会导致全局数据的不一致性。

数据文件存储(可靠性保证ISR)

Kafka数据存储流程

 Kafka 采取了分片索引 机制,将每个partition分为多个segment,每个segment对应2个文件 log 和 index

  • 优点:index文件中并没有为每一条message建立索引,采用了稀疏存储的方式每隔一定字节的数据建立一条索引,避免了索引文件占用过多的空间和资源,从而可以将索引文件保留到内存中。
  • 缺点:没有建立索引的数据在查询的过程中需要小范围内的顺序扫描操作。

# 分段一
00000000000000000000.index  00000000000000000000.log

# 分段二 数字 1234指的是当前文件的最小偏移量offset,即上个文件的最后一个消息的offset+1
00000000000000001234.index  00000000000000001234.log

# 分段三
00000000000000088888.index  00000000000000088888.log

配置文件 server.properties
The maximum size of a log segment file. When this size is reached a new log segment will be created. 默认是1G,当log数据文件大于1g后,会创建一个新的log文件(即segment,包括index和log)

log.segment.bytes=1073741824

数据可靠性保证原理:副本Replica+ACK

replica

Kafka之间副本数据同步是怎样的?一致性怎么保证,数据怎样保证不丢失?

  • topic可以设置有N个副本, 副本数最好要小于broker的数量
  • 每个分区有1个leader和0到多个follower,我们把多个replica分为Learder replica和follower replica

生产者发送数据流程

  1. 保证producer 发送到指定的 topic
  2. topic 的每个 partition 收到producer发送的数据后,将数据写入磁盘,并向 producer 发送 ack 确认收到
  3. 如果producer 收到 ack, 就会进行下一轮的发送
  4. 否则重新发送数据

副本数据同步机制

  1. 当producer在向partition中写数据时,根据ack机制,默认ack=1,只会向leader中写入数据
  2. 然后leader中的数据会复制到其他的replica中,follower会周期性的从leader中pull数据。
  3. 对于数据的读写操作都在leader replica中,follower副本只是当leader副本挂了后才重新选取leader,,,,follower并不向外提供服务,假如还没同步完成,leader副本就宕机了,怎么办?

假如还没同步完成,leader副本就宕机了,怎么办?
Partition什么时间发送ack确认机制(要追求高吞吐量,那么就要放弃可靠性)
当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别。副本数据同步策略 , ack有3个可选值,分别是0, 1,all。

  • ack=0
    producer发送一次就不再发送了,不管是否发送成功。发送出去的消息还在半路,或者还没写入磁盘, Partition Leader所在Broker就直接挂了,客户端认为消息发送成功了,此时就会导致这条消息就丢失。
  • ack=1(默认)
    只要Partition Leader接收到消息而且写入【本地磁盘】,就认为成功了,不管他其他的Follower有没有同步过去这条消息了。 问题:万一Partition Leader刚刚接收到消息,Follower还没来得及同步过去,结果Leader所在的broker宕机了
  • ack= all(即-1)
    producer只有收到分区内所有副本的成功写入全部落盘的通知才认为推送消息成功,leader会维持一个与其保持同步的replica集合,该集合就是ISR,leader副本也在isr里面。

问题一:如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复
数据发送到leader后 ,部分ISR的副本同步,leader此时挂掉。比如follower1和follower2都有可能变成新的leader, producer端会得到返回异常,producer端会重新发送数据,数据可能会重复。

问题二:acks=all 就可以代表数据一定不会丢失了吗
Partition只有一个副本,也就是一个Leader,任何Follower都没有 接收完消息后宕机,也会导致数据丢失,acks=all,必须跟ISR列表里至少有2个以上的副本配合使用。
在设置request.required.acks=-1的同时,也要min.insync.replicas这个参数设定 ISR中的最小副本数是多少,默认值为1,改为 >=2,如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常。

ISR (in-sync replica set)

什么是ISR (in-sync replica set )

  • leader会维持一个与其保持同步的replica集合,该集合就是ISR,每一个leader partition都有一个ISR,leader动态维护, 要保证kafka不丢失message,就要保证ISR这组集合存活(至少有一个存活),并且消息commit成功
  • Partition leader 保持同步的 Partition Follower 集合, 当 ISR 中的Partition Follower 完成数据的同步之后,就会给 leader 发送 ack
  • 如果Partition follower长时间(replica.lag.time.max.ms) 未向leader同步数据,则该Partition Follower将被踢出ISR
  • Partition Leader 发生故障之后,就会从 ISR 中选举新的 Partition Lea

OSR (out-of-sync-replica set)
与leader副本分区 同步滞后过多的副本集合

AR(Assign Replicas)
分区中所有副本统称为AR

HighWatermark的作用

broker故障后

  • ACK保障了【生产者】的投递可靠性
  • partition的多副本保障了【消息存储】的可靠性
  • 重复消费问题需要消费者自己处理

HW作用
保证消费数据的一致性和副本数据的一致性

假设没有HW,消费者消费leader到15,下面消费者应该消费16。

此时leader挂掉,选下面某个follower为leader,此时消费者找新leader消费数据,发现新Leader没有16数据,报错。

HW(High Watermark)是所有副本中最小的LEO。
  • Follower故障
    Follower发生故障后会被临时踢出ISR(动态变化),待该follower恢复后,follower会读取本地的磁盘记录的上次的HW,并将该log文件高于HW的部分截取掉,从HW开始向leader进行同步,等该follower的LEO大于等于该Partition的hw,即follower追上leader后,就可以重新加入ISR。
  • Leader故障
    Leader发生故障后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于hw的部分截掉(新leader自己不会截掉),然后从新的leader同步数据。

HW

高可用集群

需求规划

服务名称节点数量端口
zookeeper32182、2183、2184
kafka39092、9093、9094

zookeeper集群搭建

### 构建目录
[root@localhost software]# pwd
/usr/local/software
[root@localhost software]# cp -r zookeeper  zk1
[root@localhost software]# cp -r zookeeper  zk2
[root@localhost software]# cp -r zookeeper  zk3

### 修改zk1配置(myid)
dataDir=/tmp/zookeeper/1
clientPort=2181
admin.serverPort=8881

### 修改zk2配置
dataDir=/tmp/zookeeper/2
clientPort=2182
admin.serverPort=8882

### 修改zk3配置
dataDir=/tmp/zookeeper/3
clientPort=2183
admin.serverPort=8883

###  配置集群文件
[root@localhost software]# mkdir /tmp/zookeeper/1
[root@localhost software]# mkdir /tmp/zookeeper/2
[root@localhost software]# mkdir /tmp/zookeeper/3
[root@localhost software]# echo 1 > /tmp/zookeeper/1/myid
[root@localhost software]# echo 2 > /tmp/zookeeper/2/myid
[root@localhost software]# echo 3 > /tmp/zookeeper/3/myid

### 配置zookeeper集群
[root@localhost software]# vim /usr/local/software/zk1/conf/zoo.cfg
server.1=192.168.10.61:2881:3881
server.2=192.168.10.61:2882:3882
server.3=192.168.10.61:2883:3883

[root@localhost software]# vim /usr/local/software/zk2/conf/zoo.cfg
server.1=192.168.10.61:2881:3881
server.2=192.168.10.61:2882:3882
server.3=192.168.10.61:2883:3883


[root@localhost software]# vim /usr/local/software/zk3/conf/zoo.cfg
server.1=192.168.10.61:2881:3881
server.2=192.168.10.61:2882:3882
server.3=192.168.10.61:2883:3883

### 启动zookeeper
[root@localhost software]# bash zk1/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/software/zk1/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@localhost software]# bash zk2/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/software/zk2/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@localhost software]# bash zk3/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/software/zk3/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

### 查看节点状态(leader节点、follower节点)
# zk1
[root@localhost software]# bash zk1/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/software/zk1/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

# zk2
[root@localhost software]# bash zk2/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/software/zk2/bin/../conf/zoo.cfg
Client port found: 2182. Client address: localhost. Client SSL: false.
Mode: leader

# zk3
[root@localhost software]# bash zk3/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/software/zk3/bin/../conf/zoo.cfg
Client port found: 2183. Client address: localhost. Client SSL: false.
Mode: follower

### 查看日志观察各节点的变化
[root@localhost software]# cat zk3/logs/zookeeper-root-server-localhost.localdomain.out

Kafka集群搭建

### 目录
[root@localhost software]# pwd
/usr/local/software
[root@localhost software]# cp -r kafka kafka1
[root@localhost software]# cp -r kafka kafka2
[root@localhost software]# cp -r kafka kafka3

### 修改配置文件
# kafka1
[root@localhost software]# vim kafka1/config/server.properties
broker.id=1
# 内网中使用,内网部署 kafka 集群只需要用到 listeners
listeners=PLAINTEXT://192.168.10.61:9092
# 内外网需要作区分时 才需要用到advertised.listeners,不能和内网IP一样一样会报错
# advertised.listeners=PLAINTEXT://公网
port=9092
ip:9092log.dirs=/tmp/kafka-logs/1
zookeeper.connect=192.168.10.61:2181,192.168.10.61:2182,192.168.10.61:2183

# kafka2
[root@localhost software]# vim kafka2/config/server.properties
broker.id=2
# 内网中使用,内网部署 kafka 集群只需要用到 listeners
listeners=PLAINTEXT://192.168.10.61:9093
port=9093
ip:9092log.dirs=/tmp/kafka-logs/2
zookeeper.connect=192.168.10.61:2181,192.168.10.61:2182,192.168.10.61:2183

# kafka3
[root@localhost software]# vim kafka2/config/server.properties
broker.id=3
# 内网中使用,内网部署 kafka 集群只需要用到 listeners
listeners=PLAINTEXT://192.168.10.61:9094
port=9094
ip:9092log.dirs=/tmp/kafka-logs/3
zookeeper.connect=192.168.10.61:2181,192.168.10.61:2182,192.168.10.61:2183

### 启动Kafka集群
# kafka1
[root@localhost software]# bash kafka1/bin/kafka-server-start.sh kafka1/config/server.properties


# kafka2
[root@localhost software]# bash kafka2/bin/kafka-server-start.sh kafka2/config/server.properties

# kafka3
[root@localhost software]# bash kafka3/bin/kafka-server-start.sh kafka3/config/server.properties

# kafka1(守护进程的方式启动)
[root@localhost software]# bash kafka1/bin/kafka-server-start.sh -daemon kafka1/config/server.properties &

# kafka2(守护进程的方式启动)
[root@localhost software]# bash kafka2/bin/kafka-server-start.sh -daemon kafka2/config/server.properties &

# kafka3(守护进程的方式启动)
[root@localhost software]# bash kafka3/bin/kafka-server-start.sh -daemon kafka3/config/server.properties &

### 创建topic对kafka集群进行测试(springboot项目观察各节点变化)
[root@localhost software]# bash kafka1/bin/kafka-topics.sh --create --zookeeper 192.168.10.61:2181,192.168.10.61:2182,192.168.10.61:2183 --replication-factor 3 --partitions 6 --topic soulboy-cluster-topic
Created topic soulboy-cluster-topic.

### springboot项目观察各节点变化
# getTopicDetial()  查看指定 topic 详情
name :soulboy-v2-topic-test-cluster , desc: (name=soulboy-v2-topic-test-cluster, internal=false, 
partitions=(partition=0, leader=192.168.10.61:9092 (id: 1 rack: null), replicas=192.168.10.61:9092 (id: 1 rack: null), 192.168.10.61:9093 (id: 2 rack: null), 192.168.10.61:9094 (id: 3 rack: null), isr=192.168.10.61:9092 (id: 1 rack: null), 192.168.10.61:9093 (id: 2 rack: null), 192.168.10.61:9094 (id: 3 rack: null)),
(partition=1, leader=192.168.10.61:9093 (id: 2 rack: null), replicas=192.168.10.61:9093 (id: 2 rack: null), 192.168.10.61:9094 (id: 3 rack: null), 192.168.10.61:9092 (id: 1 rack: null), isr=192.168.10.61:9093 (id: 2 rack: null), 192.168.10.61:9094 (id: 3 rack: null), 192.168.10.61:9092 (id: 1 rack: null)),
(partition=2, leader=192.168.10.61:9094 (id: 3 rack: null), replicas=192.168.10.61:9094 (id: 3 rack: null), 192.168.10.61:9092 (id: 1 rack: null), 192.168.10.61:9093 (id: 2 rack: null), isr=192.168.10.61:9094 (id: 3 rack: null), 192.168.10.61:9092 (id: 1 rack: null), 192.168.10.61:9093 (id: 2 rack: null)),
(partition=3, leader=192.168.10.61:9092 (id: 1 rack: null), replicas=192.168.10.61:9092 (id: 1 rack: null), 192.168.10.61:9094 (id: 3 rack: null), 192.168.10.61:9093 (id: 2 rack: null), isr=192.168.10.61:9092 (id: 1 rack: null), 192.168.10.61:9094 (id: 3 rack: null), 192.168.10.61:9093 (id: 2 rack: null)),
(partition=4, leader=192.168.10.61:9093 (id: 2 rack: null), replicas=192.168.10.61:9093 (id: 2 rack: null), 192.168.10.61:9092 (id: 1 rack: null), 192.168.10.61:9094 (id: 3 rack: null), isr=192.168.10.61:9093 (id: 2 rack: null), 192.168.10.61:9092 (id: 1 rack: null), 192.168.10.61:9094 (id: 3 rack: null)),
(partition=5, leader=192.168.10.61:9094 (id: 3 rack: null), replicas=192.168.10.61:9094 (id: 3 rack: null), 192.168.10.61:9093 (id: 2 rack: null), 192.168.10.61:9092 (id: 1 rack: null), isr=192.168.10.61:9094 (id: 3 rack: null), 192.168.10.61:9093 (id: 2 rack: null), 192.168.10.61:9092 (id: 1 rack: null)), authorizedOperations=null)

集群topic操作

package net.xdclass.xdclasskafka;

import org.apache.kafka.clients.admin.*;
import org.junit.jupiter.api.Test;

import java.util.*;
import java.util.concurrent.ExecutionException;

public class KafkaAdminTest {

    private static final String TOPIC_NAME = "soulboy-v2-topic-test-cluster";

    /**
     * 设置admin 客户端
     * @return
     */
    public static AdminClient initAdminClient(){
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.61:9092,192.168.10.61:9093,192.168.10.61:9094");
        AdminClient adminClient = AdminClient.create(properties);
        return adminClient;
    }

    /**
     * 创建 topic
     */
    @Test
    public void createTopic(){
        AdminClient adminClient = initAdminClient();
        //指定partitions、replication-factor数量:6分区、3副本
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 6, (short) 3);
        CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));

        //future等待创建,成功不会有任何报错,如果创建失败和超时会报错。
        try {
            createTopicsResult.all().get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(TOPIC_NAME + "创建成功!");
    }

    /**
     * 列举topic列表
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void listTopic() throws ExecutionException, InterruptedException {
        AdminClient adminClient = initAdminClient();
        //获取所有topic(包含内部的topic)
        ListTopicsOptions options = new ListTopicsOptions();
        options.listInternal(true);
        //不传入参数只会查看用户创建的topic
        ListTopicsResult listTopicsResult = adminClient.listTopics(options);
        Set<String> topics = listTopicsResult.names().get();
        for (String topic : topics) {
            System.out.println(topic);
        }
    }

    /**
     * 删除 topic
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void delTopicTest() throws ExecutionException, InterruptedException {
        AdminClient adminClient = initAdminClient();
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("xdclass-topic", "t1", "t2"));
        deleteTopicsResult.all().get();
    }

    /**
     * 查看指定 topic 详情
     */
    @Test
    public void getTopicDetial() throws ExecutionException, InterruptedException {
        AdminClient adminClient = initAdminClient();
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
        //<key,description>
        Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
        Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
        entries.stream().forEach((entry)-> System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue()));
        //name :soulboy-topic , desc: (name=soulboy-topic, internal=false, partitions=(partition=0, leader=192.168.10.61:9092 (id: 0 rack: null), replicas=192.168.10.61:9092 (id: 0 rack: null), isr=192.168.10.61:9092 (id: 0 rack: null)),(partition=1, leader=192.168.10.61:9092 (id: 0 rack: null), replicas=192.168.10.61:9092 (id: 0 rack: null), isr=192.168.10.61:9092 (id: 0 rack: null)), authorizedOperations=null)
    }

    /**
     * 增加分区数量
     *
     * 如果当主题中的消息包含有key时(即key不为null),根据key来计算分区的行为就会有所影响消息顺序性
     *
     * 注意:Kafka中的分区数只能增加不能减少,减少的话数据不知怎么处理
     *
     * @throws Exception
     */
    @Test
    public  void increatePartitionsTest() throws Exception{
        //封装newPartitions
        Map<String, NewPartitions> infoMap = new HashMap<>();
        NewPartitions newPartitions = NewPartitions.increaseTo(3);
        infoMap.put(TOPIC_NAME, newPartitions);
        //增加partition数量
        AdminClient adminClient = initAdminClient();
        CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap);
        createPartitionsResult.all().get();
    }

}

集群producer操作

package net.xdclass.xdclasskafka;

import org.apache.kafka.clients.producer.*;
import org.junit.jupiter.api.Test;

import java.time.LocalDateTime;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaProducerTest {
    private static final String TOPIC_NAME = "soulboy-v2-topic-test-cluster";

    /**
     * 封装配置属性
     * @return
     */
    public static Properties getProperties(){

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.10.61:9092,192.168.10.61:9093,192.168.10.61:9094");
        //props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.74.55.160:9092");

        // 当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。
        props.put("acks", "all");
        //props.put(ProducerConfig.ACKS_CONFIG, "all");

        // 请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性
        props.put("retries", 0);
        //props.put(ProducerConfig.RETRIES_CONFIG, 0);

        // 生产者缓存每个分区未发送的消息,缓存的大小是通过 batch.size 配置指定的,默认值是16KB
        props.put("batch.size", 16384);

        /**
         * 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满
         * 如果想减少请求的数量,可以设置 linger.ms 大于0,即消息在缓冲区保留的时间,超过设置的值就会被提交到 服务端
         * 通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送 减少请求
         * 如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送
         */
        props.put("linger.ms", 5);

        /**
         * buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
         * 如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到             Kafka服务器
         * 会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了
         * buffer.memory要大于batch.size,否则会报申请内存不#足的错误,不要超过物理内存,根据实际情况调整
         * 需要结合实际业务情况压测进行配置
         */
        props.put("buffer.memory", 33554432);

        /**
         * key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被          设置,
         * 即使消息中没有指定key,序列化器必须是一个实
         org.apache.kafka.common.serialization.Serializer接口的类,
         * 将key序列化成字节数组。
         */
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        return props;
    }


    /**
     * send()方法是异步的,添加消息到缓冲区等待发送,并立即返回
     * 生产者将单个的消息批量在一起发送来提高效率,即 batch.size和linger.ms结合
     *
     * 实现同步发送:一条消息发送之后,会阻塞当前线程,直至返回 ack
     * 发送消息后返回的一个 Future 对象,调用get即可
     *
     * 消息发送主要是两个线程:一个是Main用户主线程,一个是Sender线程
     *  1)main线程发送消息到RecordAccumulator即返回
     *  2)sender线程从RecordAccumulator拉取信息发送到broker
     *  3) batch.size和linger.ms两个参数可以影响 sender 线程发送次数
     */
    @Test
    public void testSend(){
        Properties props = getProperties();
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 1; i < 3; i++){
            Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME, "soulboy-key"+i, "soulboy-value"+i));
            try {
                //不关心是否发送成功,则不需要这行
                RecordMetadata recordMetadata = future.get();
                //格式:topic-分区编号@offset    soulboy-topic-0@0
                System.out.println("发送状态:"+recordMetadata.toString());

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println(i+"发送:"+ LocalDateTime.now().toString());
        }
        producer.close();
    }

    /**
     * send()方法是异步的,添加消息到缓冲区等待发送,并立即返回
     * 生产者将单个的消息批量在一起发送来提高效率,即 batch.size和linger.ms结合
     *
     * 实现同步发送:一条消息发送之后,会阻塞当前线程,直至返回 ack
     * 发送消息后返回的一个 Future 对象,调用get即可
     *
     * 消息发送主要是两个线程:一个是Main用户主线程,一个是Sender线程
     *  1)main线程发送消息到RecordAccumulator即返回
     *  2)sender线程从RecordAccumulator拉取信息发送到broker
     *  3) batch.size和linger.ms两个参数可以影响 sender 线程发送次数
     */
    @Test
    public void testSendWithCallback(){
        Properties props = getProperties();
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 9; i++) {
            producer.send(new ProducerRecord<>(TOPIC_NAME, "soulboy-key" + i, "soulboy-value" + i), new Callback(){
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("发送状态:" + metadata.toString());
                    } else {
                        exception.printStackTrace();//记录日常
                    }
                }
            });
            System.out.println(i+"发送:"+LocalDateTime.now().toString());
            //发送状态:soulboy-topic-0@2
        }
        producer.close();
    }

    @Test
    public void testSendWithCallbackAndPartition(){
        Properties props = getProperties();
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++){
            //发送指定分区3 (分区从0开始)
            producer.send(new ProducerRecord<>("soulboy-v2-topic-test",3, "soulboy-key" + i, "soulboy-value" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("发送状态:"+metadata.toString());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
            System.out.println(i+"发送:"+LocalDateTime.now().toString());
        }
        producer.close();
    }

    /**
     * 自定义分区规则
     */
    @Test
    public void testSendWithPartitionStrategy(){
        Properties props = getProperties();
        //配置 partitioner.class 指定类即可
        props.put("partitioner.class", "net.xdclass.xdclasskafka.config.SoulboyPartitioner");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++){
            //没有指定partition,指定了key
            producer.send(new ProducerRecord<>("soulboy-v2-topic-test",4,"soulboy1", "soulboy-value" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("发送状态:"+metadata.toString());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
            System.out.println(i+"发送:"+LocalDateTime.now().toString());
        }
        producer.close();
    }
}

testSendWithCallback() 控制台输出

0发送:2023-12-30T21:05:12.637948700
1发送:2023-12-30T21:05:12.640519600
2发送:2023-12-30T21:05:12.641032900
3发送:2023-12-30T21:05:12.642566600
4发送:2023-12-30T21:05:12.643076100
5发送:2023-12-30T21:05:12.643076100
6发送:2023-12-30T21:05:12.643076100
7发送:2023-12-30T21:05:12.643076100
8发送:2023-12-30T21:05:12.643076100
2023-12-30 21:05:12.643[main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
发送状态:soulboy-v2-topic-test-cluster-5@0
发送状态:soulboy-v2-topic-test-cluster-2@0
发送状态:soulboy-v2-topic-test-cluster-1@0
发送状态:soulboy-v2-topic-test-cluster-0@0
发送状态:soulboy-v2-topic-test-cluster-3@0
发送状态:soulboy-v2-topic-test-cluster-4@0
发送状态:soulboy-v2-topic-test-cluster-1@1
发送状态:soulboy-v2-topic-test-cluster-1@2
发送状态:soulboy-v2-topic-test-cluster-1@3

日志清理

Kafka将数据持久化到了硬盘上,为了控制磁盘容量,需要对过去的消息进行清理

  • 内部有个定时任务检测删除日志,默认是5分钟log.retention.check.interval.ms
  • 根据segment单位进行定期清理
  • 启用cleaner
    • log.cleaner.enable=true
    • log.cleaner.threads = 2 (清理线程数配置)
  • 支持配置策略对数据清理
    • 日志删除
    • 日志压缩

日志删除

log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除。

基于时间戳删除
清理超过指定时间的消息,默认是168小时,7天,还有log.retention.ms, log.retention.minutes,log.retention.hours,优先级高到低。

配置了7天后删除,那7天如何确定呢?

每个日志段文件都维护一个最大时间戳字段,每次日志段写入新的消息时,都会更新该字段,一个日志段segment写满了被切分之后,就不再接收任何新的消息,最大时间戳字段的值也将保持不变,kafka通过将当前时间与该最大时间戳字段进行比较,从而来判定是否过期。

log.retention.hours=168

基于大小删除
注意:超过阈值的部分必须要大于一个日志段的大小!!!
假设日志段大小是500MB,当前分区共有4个日志段文件,大小分别是500MB,500MB,500MB和10MB。

10MB那个文件就是active日志段,此时该分区总的日志大小是3*500MB+10MB=1500MB+10MB。如果阈值设置为1500MB,那么超出阈值的部分就是10MB,小于日志段大小500MB,故Kafka不会执行任何删除操作,即使总大小已经超过了阈值。

如果阈值设置为1000MB,那么超过阈值的部分就是500MB+10MB > 500MB,此时Kafka会删除最老的那个日志段文件

# 超过指定大小后,删除旧的消息,下面是1G的字节数,-1就是没限制
log.retention.bytes=1073741824

日志压缩

按照消息key进行整理,有相同key不同value值,只保留最后一个,使用频率不高,主要还是使用基于时间和大小的日志删除。

# 启用压缩策略
log.cleanup.policy=compact

高性能-ZeroCopy

将一个File读取并发送出去:File文件的经历了4次copy

  • 调用read,将文件拷贝到了kernel内核态
  • CPU控制 kernel态的数据copy到用户态
  • 调用write时,user态下的内容会copy到内核态的socket的buffer中
  • 最后将内核态socket buffer的数据copy到网卡设备中传送

4copy

缺点:增加了上下文切换、浪费了2次无效拷贝(即步骤2和3)

零拷贝ZeroCopy(SendFile)
请求kernel直接把disk的data传输给socket,而不是通过应用程序传输。Zero copy大大提高了应用程序的性能,减少不必要的内核缓冲区跟用户缓冲区间的拷贝,从而减少CPU的开销和减少了kernel和user模式的上下文切换,达到性能的提升

对应零拷贝技术有mmap及sendfile
应用:Kafka、Netty、RocketMQ等都采用了零拷贝技术

  • mmap:小文件传输快
  • sendfile:大文件传输比mmap快

kafka高性能

  • 存储模型,topic多分区,每个分区多segment段
  • index索引文件查找,利用分段和稀疏索引
  • 磁盘顺序写入
  • 异步操作少阻塞sender和main线程,批量操作(batch)
  • 页缓存Page cache,没利用JVM内存,因为容易GC影响性能
  • 零拷贝ZeroCopy(SendFile)

SpringBoot整合Spring-kafka

添加pom文件

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

配置文件修改增加生产者信息

server:
  port: 8080

logging:
  config: classpath:logback.xml

spring:
  kafka:
    bootstrap-servers: 192.168.10.61:9092,192.168.10.61:9093,192.168.10.61:9094

    producer:
      # 消息重发的次数。
      retries: 0
      #一个批次可以使用的内存大小
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all

    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S

      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      auto-offset-reset: earliest

      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false

      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    listener:
      #手工ack,调用ack后立刻提交offset
      ack-mode: manual_immediate
      #容器运行的线程数
      concurrency: 4

发送消息

package net.xdclass.xdclasskafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UserController {
    private static final String TOPIC = "user.register.topic";

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * http://192.168.10.88:8080/api/v1/111
     * 发送消息成功:user.register.topic-0-0
     * @param num
     */
    @GetMapping("/api/v1/{num}")
    public void sendMessage(@PathVariable("num") String num) {
        kafkaTemplate.send(TOPIC, "这是一个消息,num=" + num).addCallback(
                success->{
                    String topic = success.getRecordMetadata().topic();
                    int partition = success.getRecordMetadata().partition();
                    long offset = success.getRecordMetadata().offset();
                    System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
                },
                fail->{
                    System.out.println("发送消息失败:" + fail.getMessage());
                });
    }
}

控制台输出

发送消息成功:user.register.topic-0-0
发送消息成功:user.register.topic-0-1
发送消息成功:user.register.topic-0-2
发送消息成功:user.register.topic-0-3

消费消息

package net.xdclass.xdclasskafka.mq;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class MQListener {

    /**
     * 消费消息
     * @param record
     * @param ack
     * @param topic
     */
    @KafkaListener(topics = {"user.register.topic"},groupId = "soulboy-gp1")
    public void onMessage(ConsumerRecord<?, ?> record,
                          Acknowledgment ack,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        // 打印出消息内容
        System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
        //确认消费成功
        ack.acknowledge();
    }
}

控制台输出

消费:user.register.topic-0-这是一个消息,num=777
消费:user.register.topic-0-这是一个消息,num=888
消费:user.register.topic-0-这是一个消息,num=999
消费:user.register.topic-0-这是一个消息,num=555

Kafka事务消息

Kafka 从 0.11 版本开始引入了事务支持

  • 事务可以保证对多个分区写入操作的原子性
  • 操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能

application.yaml

server:
  port: 8080

logging:
  config: classpath:logback.xml

spring:
  kafka:
    bootstrap-servers: 192.168.10.61:9092,192.168.10.61:9093,192.168.10.61:9094

    producer:
      # 消息重发的次数。
      retries: 1
      #一个批次可以使用的内存大小
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      #事务id
      transaction-id-prefix: soulboy-tran-

    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S

      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      auto-offset-reset: earliest

      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false

      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    listener:
      #手工ack,调用ack后立刻提交offset
      ack-mode: manual_immediate
      #容器运行的线程数
      concurrency: 4

事务发送消息:注解方式的事务、声明式的事务

package net.xdclass.xdclasskafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UserController {
    private static final String TOPIC = "user.register.topic";

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * http://192.168.10.88:8080/api/v1/111
     * 发送消息成功:user.register.topic-0-0
     * @param num
     */
    @GetMapping("/api/v1/{num}")
    public void sendMessage(@PathVariable("num") String num) {
        kafkaTemplate.send(TOPIC, "这是一个消息,num=" + num).addCallback(
                success->{
                    String topic = success.getRecordMetadata().topic();
                    int partition = success.getRecordMetadata().partition();
                    long offset = success.getRecordMetadata().offset();
                    System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
                },
                fail->{
                    System.out.println("发送消息失败:" + fail.getMessage());
                });
    }

    /**
     * 注解方式的事务
     *
     * http://192.168.10.88:8080/api/v1/tranAnnotate/555
     * 消费:user.register.topic-3-这个是事务里面的消息:1  i=555
     * 消费:user.register.topic-3-这个是事务里面的消息:2  i=555
     * @param num
     */
    @GetMapping("/api/v1/tranAnnotate/{num}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMessageWithAnnotate(@PathVariable("num") int num) {
        kafkaTemplate.send(TOPIC, "这个是事务里面的消息:1  i="+num);
        if (num == 0) {
            throw new RuntimeException("fail");
        }
        kafkaTemplate.send(TOPIC, "这个是事务里面的消息:2  i="+num);
    }

    /**
     * 声明式的事务
     *
     * http://192.168.10.88:8080/api/v1/declarative/66
     * 消费:user.register.topic-3-这个是事务里面的消息:1  i=66
     * 消费:user.register.topic-3-这个是事务里面的消息:2  i=66
     * @param num
     */
    @GetMapping("/api/v1/declarative/{num}")
    public void sendMessageWithDeclarative(@PathVariable("num") int num) {
        kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() {
            @Override
            public Object doInOperations(KafkaOperations<String, Object> kafkaOperations) {

                kafkaOperations.send(TOPIC, "这个是事务里面的消息:1  i="+num);
                if (num == 0) {
                    throw new RuntimeException("fail");
                }
                kafkaOperations.send(TOPIC, "这个是事务里面的消息:2  i="+num);
                return true;
            }
        });
    }
}

作者:Soulboy