目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

XXL-Job

定时任务

通过时间表达式这一方式来进行任务调度的被称为定时任务。

在平常的业务场景当中,经常有一些场景需要使用到定时任务,比如:在某个时间点会发送优惠券、发送短信等等的一些业务操作。又比如:比如一些支付系统,需要在每天的凌晨1点来进行对前一天的清算。

分类

  • 单机定时任务
    • 单机的容易实现,但应用于集群环境做分布式部署,就会带来重复执行
    • 解决重复执行的方案有很多,比如加锁、数据库等,但是增加了很多非业务逻辑
  • 分布式调度(分布式定时任务)
    • 把需要处理的计划任务放入到统一的平台,实现集群管理调度与分布式部署的定时任务 叫做分布式定时任务
    • 支持集群部署、高可用、并行调度、分片处理等

常见定时任务

  • 单机:Java 自带的 java.util.Timer 类配置比较麻烦,时间延后问题
  • 单机:ScheduledExecutorService
    • 是基于线程池来进行设计的定时任务类,在这里每个调度的任务都会分配到线程池里的一个线程去执行该任务,并发执行,互不影响
  • 单机:SpringBoot 框架自带
    • SpringBoot 使用注解方式开启定时任务
    • 启动类里面 @EnableScheduling 开启定时任务,自动扫描
    • 定时任务业务类 加注解 @Component 被容器扫描
    • 定时执行的方法加上注解 @Scheduled(fixedRate=2000) 定期执行一次

为什么需要任务调度平台
因为在传统的Java当中,传统的定时任务实现方案,像上述的Timer、Quartz等,它们都有不少缺点。

  • 不支持集群、不支持统计、没有管理的平台、也没有报警、监控等等
  • 在分布式架构当中,有一些场景是需要用到分布式的任务调度的,在同一个服务器中的多个实例任务之间存在着互斥,需要进行统一的调度,所以任务调度需要支持高可用、监控、故障告警等一系列措施。
  • 需要统一管理和追踪各个的服务节点之间任务调度的结果,并保存记录任务信息等等

常见分布式调度平台

Quartz

  • Quartz 关注点在于定时任务而并非是数据,并没有一套根据数据化处理而定的流程
  • 虽然可以实现数据库作业的高可用,但是缺少了分布式的并行调度功能,相对弱点
  • 不支持任务分片、没 UI 界面管理,并行调度、失败策略等也缺少

TBSchedule

  • 这个是阿里早期开源的分布式任务调度系统,使用的是 timer 而不是线程池执行任务调度,使用 timer 在处理异常的时候是有缺陷的,但 TBSchedule 的作业类型比较单一,文档也缺失得比较严重

ScheduleX(目前阿里内部使用,阿里云商业化产品)

Elastic-job
当开发的分布式任务调度系统,功能强大,采用的是zookeeper实现分布式协调,具有高可用与分片。2020年6月,ElasticJob的四个子项目已经正式迁入Apache仓库。由 2 个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成:

  • ElasticJob-Lite 定位为轻量级无中心化解决方案,使用 jar 的形式提供分布式任务的协调服务;
  • ElasticJob-Cloud 使用 Mesos 的解决方案,额外提供资源治理、应用分发以及进程隔离等服务

XXL-JOB
大众点评的员工徐雪里在15年发布的分布式任务调度平台,是轻量级的分布式任务调度框架,目标是开发迅速、简单、清理、易扩展; 老版本是依赖quartz的定时任务触发,在v2.1.0版本开始 移除quartz依赖

对比项 XXL-JOB elastic-job
并行调度 调度系统多线程并行 任务分片的方式并行
弹性扩容 使用 Quartz 基于数据库分布式功能 通过 zookeeper 保证
高可用 通过 DB 锁保证 通过 zookeeper 保证
阻塞策略 单机串行/丢弃后续的调度/覆盖之前的调度 执行超过 zookeeper 的 session timeout 时间的话,会被清除,重新进行分片
动态分片策略 以执行器为维度进行分片、支持动态的扩容 平均分配/作业名 hash 分配/自定义策略
失败处理策略 失败告警/失败重试 执行完毕后主动获取未分配分片任务 服务器下线后主动寻找可以用的服务器执行任务
监控 支持 支持
日志 支持 支持

技术选型

  • XXL-Job 和 Elastic-Job 都具有广泛的用户基础和完善的技术文档,都可以满足定时任务的基本功能需求
  • xxl-job 侧重在业务实现简单和管理方便,容易学习,失败与路由策略丰富, 推荐使用在用户基数相对较少,服务器的数量在一定的范围内的场景下使用
  • elastic-job 关注的点在数据,添加了弹性扩容和数据分片的思路,更方便利用分布式服务器的资源, 但是学习难度较大,推荐在数据量庞大,服务器数量多的时候使用

核心设计思想

xxl-job 的设计思想
“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性。

  • 将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。
  • 将任务抽象成分散的 JobHandler,交由“执行器”统一管理。“执行器”负责接收调度请求并执行对应的 JobHandler 中业务逻辑。

架构图

  • 调度中心
    • 负责管理调度的信息,按照调度的配置来发出调度请求
    • 支持可视化、简单的动态管理调度信息,包括新建、删除、更新等,这些操作都会实时生效,同时也支持监控调度结果以及执行日志。
  • 执行器
    • 负责接收请求并且执行任务的逻辑。任务模块专注于任务的执行操作等等,使得开发和维护更加的简单与高效

xxl-job

  • 调度中心 HA(中心式):调度采用了中心式进行设计,“调度中心”支持集群部署,可保证调度中心 HA
  • 执行器 HA(分布式):任务分布式的执行,任务执行器支持集群部署,可保证任务执行 HA
  • 触发策略:有 Cron 触发、固定间隔触发、固定延时触发、API 事件触发、人工触发、父子任务触发
  • 路由策略:执行器在集群部署的时候提供了丰富的路由策略,如:第一个、最后一个、轮询、随机、一致性 HASH、最不经常使用 LFU、最久未使用 LRU、故障转移等等
  • 故障转移:如果执行器集群的一台机器发生故障,会自动切换到一台正常的执行器发送任务调度
  • Rolling 实时日志的监控:支持 rolling 方式查看输入的完整执行日志
  • 脚本任务:支持 GLUE 模式开发和运行脚本任务,包括 Shell、python、node.js、PHP 等等类型脚本

部署

XXL-Job-Server

目录概览

目录名 功能
doc xxl-job 的文档资料,包括了数据库的脚本
xxl-job-core 公共 jar 包依赖
xxl-job-admin 调度中心,项目源码,是 SpringBoot 项目,可以直接启动
xxl-job-executor-samples 执行器,是 Sample 实例项目,里面的 SpringBoot 工程可以直接启动,也可以在该项目的基础上进行开发,也可以将现有的项目改造成为执行器项目

数据库概览

库名 功能
xxl_job_group 执行器信息表,用于维护任务执行器的信息
xxl_job_info 调度扩展信息表,主要是用于保存 xxl-job 的调度任务的扩展信息,比如说像任务分组、任务名、机器的地址等等
xxl_job_lock 任务调度锁表
xxl_job_log 日志表,主要是用在保存 xxl-job 任务调度历史信息,像调度结果、执行结果、调度入参等等
xxl_job_log_report 日志报表,会存储 xxl-job 任务调度的日志报表,会在调度中心里的报表功能里使用到
xxl_job_logglue 任务的 GLUE 日志,用于保存 GLUE 日志的更新历史变化,支持 GLUE 版本的回溯功能
xxl_job_registry 执行器的注册表,用在维护在线的执行器与调度中心的地址信息
xxl_job_user 系统的用户表

初始化数据

  1### 运行Mysql
  2docker run \
  3    -p 3306:3306 \
  4    -e MYSQL_ROOT_PASSWORD=123456 \
  5    --name mysql \
  6    --restart=always \
  7    -d mysql:5.7
  8
  9### 运行SQL
 10#
 11# XXL-JOB v2.3.0
 12# Copyright (c) 2015-present, xuxueli.
 13
 14CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci;
 15use `xxl_job`;
 16
 17SET NAMES utf8mb4;
 18
 19CREATE TABLE `xxl_job_info` (
 20  `id` int(11) NOT NULL AUTO_INCREMENT,
 21  `job_group` int(11) NOT NULL COMMENT '执行器主键ID',
 22  `job_desc` varchar(255) NOT NULL,
 23  `add_time` datetime DEFAULT NULL,
 24  `update_time` datetime DEFAULT NULL,
 25  `author` varchar(64) DEFAULT NULL COMMENT '作者',
 26  `alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',
 27  `schedule_type` varchar(50) NOT NULL DEFAULT 'NONE' COMMENT '调度类型',
 28  `schedule_conf` varchar(128) DEFAULT NULL COMMENT '调度配置,值含义取决于调度类型',
 29  `misfire_strategy` varchar(50) NOT NULL DEFAULT 'DO_NOTHING' COMMENT '调度过期策略',
 30  `executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
 31  `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
 32  `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
 33  `executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',
 34  `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
 35  `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
 36  `glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型',
 37  `glue_source` mediumtext COMMENT 'GLUE源代码',
 38  `glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',
 39  `glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',
 40  `child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',
 41  `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行',
 42  `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间',
 43  `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间',
 44  PRIMARY KEY (`id`)
 45) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 46
 47CREATE TABLE `xxl_job_log` (
 48  `id` bigint(20) NOT NULL AUTO_INCREMENT,
 49  `job_group` int(11) NOT NULL COMMENT '执行器主键ID',
 50  `job_id` int(11) NOT NULL COMMENT '任务,主键ID',
 51  `executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址',
 52  `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
 53  `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
 54  `executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2',
 55  `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
 56  `trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',
 57  `trigger_code` int(11) NOT NULL COMMENT '调度-结果',
 58  `trigger_msg` text COMMENT '调度-日志',
 59  `handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
 60  `handle_code` int(11) NOT NULL COMMENT '执行-状态',
 61  `handle_msg` text COMMENT '执行-日志',
 62  `alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败',
 63  PRIMARY KEY (`id`),
 64  KEY `I_trigger_time` (`trigger_time`),
 65  KEY `I_handle_code` (`handle_code`)
 66) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 67
 68CREATE TABLE `xxl_job_log_report` (
 69  `id` int(11) NOT NULL AUTO_INCREMENT,
 70  `trigger_day` datetime DEFAULT NULL COMMENT '调度-时间',
 71  `running_count` int(11) NOT NULL DEFAULT '0' COMMENT '运行中-日志数量',
 72  `suc_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量',
 73  `fail_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量',
 74  `update_time` datetime DEFAULT NULL,
 75  PRIMARY KEY (`id`),
 76  UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE
 77) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 78
 79CREATE TABLE `xxl_job_logglue` (
 80  `id` int(11) NOT NULL AUTO_INCREMENT,
 81  `job_id` int(11) NOT NULL COMMENT '任务,主键ID',
 82  `glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型',
 83  `glue_source` mediumtext COMMENT 'GLUE源代码',
 84  `glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注',
 85  `add_time` datetime DEFAULT NULL,
 86  `update_time` datetime DEFAULT NULL,
 87  PRIMARY KEY (`id`)
 88) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 89
 90CREATE TABLE `xxl_job_registry` (
 91  `id` int(11) NOT NULL AUTO_INCREMENT,
 92  `registry_group` varchar(50) NOT NULL,
 93  `registry_key` varchar(255) NOT NULL,
 94  `registry_value` varchar(255) NOT NULL,
 95  `update_time` datetime DEFAULT NULL,
 96  PRIMARY KEY (`id`),
 97  KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
 98) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 99
100CREATE TABLE `xxl_job_group` (
101  `id` int(11) NOT NULL AUTO_INCREMENT,
102  `app_name` varchar(64) NOT NULL COMMENT '执行器AppName',
103  `title` varchar(12) NOT NULL COMMENT '执行器名称',
104  `address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '执行器地址类型:0=自动注册、1=手动录入',
105  `address_list` text COMMENT '执行器地址列表,多地址逗号分隔',
106  `update_time` datetime DEFAULT NULL,
107  PRIMARY KEY (`id`)
108) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
109
110CREATE TABLE `xxl_job_user` (
111  `id` int(11) NOT NULL AUTO_INCREMENT,
112  `username` varchar(50) NOT NULL COMMENT '账号',
113  `password` varchar(50) NOT NULL COMMENT '密码',
114  `role` tinyint(4) NOT NULL COMMENT '角色:0-普通用户、1-管理员',
115  `permission` varchar(255) DEFAULT NULL COMMENT '权限:执行器ID列表,多个逗号分割',
116  PRIMARY KEY (`id`),
117  UNIQUE KEY `i_username` (`username`) USING BTREE
118) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
119
120CREATE TABLE `xxl_job_lock` (
121  `lock_name` varchar(50) NOT NULL COMMENT '锁名称',
122  PRIMARY KEY (`lock_name`)
123) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
124
125INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2018-11-03 22:21:31' );
126INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *', 'DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', '');
127INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL);
128INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock');
129
130commit;

xxl-job-admin/src/main/resources/application.properties

1### xxl-job, datasource
2spring.datasource.url=jdbc:mysql://192.168.10.57:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
3spring.datasource.username=root
4spring.datasource.password=123456
5spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
6
7### xxl-job, access token
8xxl.job.accessToken=abc1024.pub

com.xxl.job.admin.XxlJobAdminApplication

115:24:23.703 logback [main] INFO  o.s.b.w.e.tomcat.TomcatWebServer - Tomcat started on port(s): 8080 (http) with context path '/xxl-job-admin'
215:24:23.714 logback [main] INFO  c.x.job.admin.XxlJobAdminApplication - Started XxlJobAdminApplication in 1.29 seconds (JVM running for 1.73)
315:24:28.001 logback [xxl-job, admin JobScheduleHelper#scheduleThread] INFO  c.x.j.a.c.thread.JobScheduleHelper - >>>>>>>>> init xxl-job admin scheduler success.

src/main/resources/logback.xml (路径加一个.)

1<property name="log.path" value="./data/applogs/xxl-job/xxl-job-admin.log"/>

访问地址

1http://192.168.10.88:8080/xxl-job-admin
2账户: admin
3密码: 123456

job-admin

SpringBoot 整合 XXL-Job

Maven 整合配置

依赖

1<dependency>
2			<groupId>com.xuxueli</groupId>
3			<artifactId>xxl-job-core</artifactId>
4			<version>2.3.0</version>
5		</dependency>

src/main/resources/logback.xml

 1<?xml version="1.0" encoding="UTF-8"?>
 2<configuration debug="false" scan="true" scanPeriod="1 seconds">
 3
 4    <contextName>logback</contextName>
 5    <property name="log.path" value="./data/logs/xxl-job/app.log"/>
 6
 7    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
 8        <encoder>
 9            <pattern>%d{HH🇲🇲ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
10        </encoder>
11    </appender>
12
13    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
14        <file>${log.path}</file>
15        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
16            <fileNamePattern>${log.path}.%d{yyyy-MM-dd}.zip</fileNamePattern>
17        </rollingPolicy>
18        <encoder>
19            <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n
20            </pattern>
21        </encoder>
22    </appender>
23
24    <root level="info">
25        <appender-ref ref="console"/>
26        <appender-ref ref="file"/>
27    </root>
28
29</configuration>

application.properties

 1server.port=8081
 2
 3#----------xxl-job配置--------------
 4logging.config=classpath:logback.xml
 5
 6# 调度中心部署地址,多个配置逗号分隔 "http://address01,http://address02"
 7xxl.job.admin.addresses=http://192.168.10.88:8080/xxl-job-admin
 8
 9# 执行器token,非空时启用 xxl-job, access token
10xxl.job.accessToken=abc1024.pub
11
12# 执行器app名称,和控制台那边配置一样的名称,不然注册不上去
13xxl.job.executor.appname=abc1024-shop
14
15# [选填]执行器注册:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。
16# 从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
17xxl.job.executor.address=
18
19# [选填]执行器IP :默认为空表示自动获取IP(即springboot容器的ip和端口,可以自动获取,也可以指定),多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务",
20xxl.job.executor.ip=192.168.10.88
21
22# [选填]执行器端口号:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
23xxl.job.executor.port=9999
24
25# 执行器日志文件存储路径,需要对该路径拥有读写权限;为空则使用默认路径
26xxl.job.executor.logpath=./data/logs/xxl-job/executor
27
28# 执行器日志保存天数
29xxl.job.executor.logretentiondays=30

配置类 net/xdclass/xdclassjob/config/XxlJobConfig.java

 1package net.xdclass.xdclassjob.config;
 2
 3import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
 4import org.slf4j.Logger;
 5import org.slf4j.LoggerFactory;
 6import org.springframework.beans.factory.annotation.Value;
 7import org.springframework.context.annotation.Bean;
 8import org.springframework.context.annotation.Configuration;
 9
10/**
11 * 小滴课堂,愿景:让技术不再难学
12 *
13 * @Description
14 * @Author 二当家小D,微信:xdclass6
15 * @Remark 有问题直接联系我,源码-笔记-技术交流群
16 * @Version 1.0
17 **/
18@Configuration
19public class XxlJobConfig {
20    private Logger log = LoggerFactory.getLogger(XxlJobConfig.class);
21
22    @Value("${xxl.job.admin.addresses}")
23    private String adminAddresses;
24
25    @Value("${xxl.job.executor.appname}")
26    private String appName;
27
28    @Value("${xxl.job.executor.ip}")
29    private String ip;
30
31    @Value("${xxl.job.executor.port}")
32    private int port;
33
34    @Value("${xxl.job.accessToken}")
35    private String accessToken;
36
37    @Value("${xxl.job.executor.logpath}")
38    private String logPath;
39
40    @Value("${xxl.job.executor.logretentiondays}")
41    private int logRetentionDays;
42
43    //旧版的有bug
44    //@Bean(initMethod = "start", destroyMethod = "destroy")
45    @Bean
46    public XxlJobSpringExecutor xxlJobExecutor() {
47        log.info(">>>>>>>>>>> xxl-job config init.");
48        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
49        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
50        xxlJobSpringExecutor.setAppname(appName);
51        xxlJobSpringExecutor.setIp(ip);
52        xxlJobSpringExecutor.setPort(port);
53        xxlJobSpringExecutor.setAccessToken(accessToken);
54        xxlJobSpringExecutor.setLogPath(logPath);
55        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
56
57        return xxlJobSpringExecutor;
58    }
59}

上手 XXL-Job 分布式调度任务

注解介绍
在 Spring Bean 实例中,开发 Job 方法方式格式要求为:

1public ReturnT<String> execute(String param)

为 Job 方法添加注解 (注解value值是调度中心新建任务的 JobHandler 属性的值)

1@XxlJob(value="自定义jobHandler名称", init = "handler初始化方法", destroy = "handler 销毁方法")

net/xdclass/xdclassjob/job/MyJobHandler.java

 1package net.xdclass.xdclassjob.job;
 2
 3import com.xxl.job.core.biz.model.ReturnT;
 4import com.xxl.job.core.handler.annotation.XxlJob;
 5import org.slf4j.Logger;
 6import org.slf4j.LoggerFactory;
 7import org.springframework.stereotype.Component;
 8
 9import java.time.LocalDateTime;
10
11@Component
12public class MyJobHandler {
13    private Logger log = LoggerFactory.getLogger(MyJobHandler.class);
14
15    /**
16     * XXJ-Job核心方法
17     * @param param
18     * @return
19     */
20    @XxlJob(value = "demoJobHandler", init = "init", destroy = "destroy")
21    public ReturnT<String> execute(String param) {
22        log.info("abc1024-shop execute 任务方法触发成功" + LocalDateTime.now());
23        return ReturnT.SUCCESS;
24    }
25
26    /**
27     * 初始化方法
28     */
29    private void init() {
30        log.info("init 方法调用成功 >>>>>>");
31    }
32
33    /**
34     * 销毁方法
35     */
36    private void destroy() {
37        log.info("destroy 方法调用成功 >>>>>>");
38    }
39}

任务管理器新增

abc1024.oss

控制台输出

112:40:16.534 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - init 方法调用成功 >>>>>>
212:40:16.537 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - abc1024-shop execute 任务方法触发成功2024-01-02T12:40:16.537122900
312:45:58.008 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - destroy 方法调用成功 >>>>>>

调度报表

调度日志

多执行器

多种路由策略

参数传递

调度中心配置

参数传递

执行器代码

 1package net.xdclass.xdclassjob.job;
 2
 3import com.xxl.job.core.biz.model.ReturnT;
 4import com.xxl.job.core.context.XxlJobHelper;
 5import com.xxl.job.core.handler.annotation.XxlJob;
 6import org.slf4j.Logger;
 7import org.slf4j.LoggerFactory;
 8import org.springframework.stereotype.Component;
 9
10import java.time.LocalDateTime;
11
12@Component
13public class MyJobHandler {
14    private Logger log = LoggerFactory.getLogger(MyJobHandler.class);
15
16    /**
17     * XXJ-Job核心方法
18     * @return
19     */
20    @XxlJob(value = "demoJobHandler", init = "init", destroy = "destroy")
21    public ReturnT<String> execute() {
22        //获取执行参数
23        String param = XxlJobHelper.getJobParam();
24        System.out.println("param=" + param);
25
26        log.info("abc1024-shop execute 任务方法触发成功" + LocalDateTime.now());
27        return ReturnT.SUCCESS;
28    }
29
30    /**
31     * 初始化方法
32     */
33    private void init() {
34        log.info("init 方法调用成功 >>>>>>");
35    }
36
37    /**
38     * 销毁方法
39     */
40    private void destroy() {
41        log.info("destroy 方法调用成功 >>>>>>");
42    }
43}

控制台输出

113:32:40.157 logback [xxl-rpc, EmbedServer bizThreadPool-1452437736] INFO  c.x.job.core.executor.XxlJobExecutor - >>>>>>>>>>> xxl-job regist JobThread success, jobId:2, handler:com.xxl.job.core.handler.impl.MethodJobHandler@6b649efa[class net.xdclass.xdclassjob.job.MyJobHandler#execute]
213:32:40.157 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - init 方法调用成功 >>>>>>
3param=video=springboot&price=9.9
413:32:40.161 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - abc1024-shop execute 任务方法触发成功2024-01-02T13:32:40.161531600

调度日志配置

执行器代码中可以使用打印日志追加到调度中心的调度日志中

 1/**
 2     * XXJ-Job核心方法
 3     * @return
 4     */
 5    @XxlJob(value = "demoJobHandler", init = "init", destroy = "destroy")
 6    public ReturnT<String> execute() {
 7        //获取执行参数
 8        String param = XxlJobHelper.getJobParam();
 9        System.out.println("param=" + param);
10
11        log.info("abc1024-shop execute 任务方法触发成功" + LocalDateTime.now());
12
13        //打印日志!!!
14        XxlJobHelper.log("打印执行日志:param=" + param);
15
16        return ReturnT.SUCCESS;
17    }

调度结果、执行结果
默认任务结果为 "成功" 状态,不需要主动设置

  • 如想设置任务结果为失败,可以通过 "XxlJobHelper.handleFail
  • 如想设置任务结果为成功,handleSuccess"

执行器代码:自定义执行错误

 1/**
 2     * XXJ-Job核心方法
 3     * @return
 4     */
 5    @XxlJob(value = "demoJobHandler", init = "init", destroy = "destroy")
 6    public void execute() {
 7        //获取执行参数
 8        String param = XxlJobHelper.getJobParam();
 9        System.out.println("param=" + param);
10
11        log.info("abc1024-shop execute 任务方法触发成功" + LocalDateTime.now());
12
13        //打印日志!!!
14        XxlJobHelper.log("打印执行日志:param=" + param);
15
16        //自定义执行错误
17        XxlJobHelper.handleFail("自定义错误,任务执行失败");
18
19        //自定义执行成功
20        //XxlJobHelper.handleSuccess("任务执行成功");
21
22        //return ReturnT.SUCCESS;
23    }

自定义执行错误

集群 HA

为了避免单点故障,任务调度系统通常需要通过集群实现系统高可用,由于任务调度系统的特殊性,“调度”和“任务”两个模块需要均支持集群部署,由于职责不同,因此各自集群侧重点也有有所不同。

  • 调度中心集群:目标为避免调度模块单点故障,集群节点需要通过锁或命名服务保证单个任务的单次触发,只在其中一个节点上生效,以防止任务的重复触发。
  • 执行器集群:目标为避免任务模块单点故障,进一步可以通过自定义路由策略实现 Failover 等高级功能,从而在执行器某台机器节点故障时自动转移不会影响到任务的正常触发执行

分布式调度 HA

启动两个调度中心
分别是8080、8081

执行器配置

1# 调度中心部署地址,多个配置逗号分隔 "http://address01,http://address02"
2xxl.job.admin.addresses=http://192.168.10.88:8080/xxl-job-admin,http://192.168.10.88:8081/xxl-job-admin

海量数据处理:分片任务

背景需求
案例:双十一大促,给100万用户发营销短信

  • 有一个任务需要处理 100W 条数据,每条数据的业务逻辑处理要 0.1s
  • 对于普通任务来说,只有一个线程来处理 可能需要 10 万秒才能处理完,业务则严重受影响

什么是分片任务

  • 执行器集群部署,如果任务的路由策略选择【分片广播】,一次任务调度将会【广播触发】对应集群中所有执行器执行一次任务,同时系统自动传递分片参数,执行器可根据分片参数开发分片任务
  • 需要处理的海量数据,以执行器为划分,每个执行器分配一定的任务数,并行执行
  • XXL-Job 支持动态扩容执行器集群,从而动态增加分片数量,到达更快处理任务
  • 分片的值是调度中心分配的
1// 当前分片数,从0开始,即执行器的序号
2int shardIndex = XxlJobHelper.getShardIndex();
3// 总分片数,执行器集群总机器数量
4int shardTotal = XxlJobHelper.getShardTotal();

解决思路

  • 如果将 100W 数据均匀分给集群里的 10 台机器同时处理
  • 每台机器耗时,1 万秒即可,耗时会大大缩短,也能充分利用集群资源
  • 在 xxl-job 里,可以配置执行器集群有 10 个机器,那么分片总数是 10,分片序号 0~9 分别对应那 10 台机器。
  • 分片方式
    • id % 分片总数 余数是 0 的,在第 1 个执行器上执行
    • id % 分片总数 余数是 1 的,在第 2 个执行器上执行
    • id % 分片总数 余数是 2 的,在第 3 个执行器上执行
    • ...
    • id % 分片总数 余数是 9 的,在第 10 个执行器上执行

编码实战
需求:100个用户,分片处理

执行器编码

 1package net.xdclass.xdclassjob.job;
 2
 3import com.xxl.job.core.biz.model.ReturnT;
 4import com.xxl.job.core.context.XxlJobHelper;
 5import com.xxl.job.core.handler.annotation.XxlJob;
 6import org.slf4j.Logger;
 7import org.slf4j.LoggerFactory;
 8import org.springframework.stereotype.Component;
 9
10import java.time.LocalDateTime;
11import java.util.ArrayList;
12import java.util.List;
13
14@Component
15public class MyJobHandler {
16
17    /**
18     * 模拟获取数据库中的所有用户
19     * @return
20     */
21    private List<Integer> getAllUserIds() {
22        ArrayList<Integer> ids = new ArrayList<>();
23        for (int i = 0; i < 100; i++) {
24            ids.add(i);
25        }
26        return ids;
27    }
28
29    /**
30     * 分片任务
31     */
32    @XxlJob(value = "shardingJobHandler")
33    public void shardingJobHandler() {
34        //拿到当前执行器编号
35        int shardIndex = XxlJobHelper.getShardIndex();
36        //总的分片数(执行器的数量)
37        int shardTotal = XxlJobHelper.getShardTotal();
38        log.info("分片总数:{},当前分片数{}", shardTotal, shardIndex);
39        //只处理自身相关的任务
40        getAllUserIds().forEach(obj -> {
41            if (obj % shardTotal == shardIndex) {
42                log.info("第{}片,命中分片开始处理用户id={}", shardIndex, obj);
43            }
44        });
45    }
46}

调度中心配置

11. **http://192.168.10.88:9997/**
22. **http://192.168.10.88:9998/**
33. **http://192.168.10.88:9999/**

控制台输出

 1### 执行器0
 215:19:26.925 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - 分片总数:3,当前分片数0
 315:19:26.925 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - 第0片,命中分片开始处理用户id=0
 415:19:26.925 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - 第0片,命中分片开始处理用户id=3
 515:19:26.926 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - 第0片,命中分片开始处理用户id=6
 6……
 7
 8### 执行器1
 915:19:26.933 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - 分片总数:3,当前分片数1
1015:19:26.933 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - 第1片,命中分片开始处理用户id=1
1115:19:26.934 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - 第1片,命中分片开始处理用户id=4
1215:19:26.934 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - 第1片,命中分片开始处理用户id=7
13……
14
15### 执行器2
1615:19:26.941 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - 分片总数:3,当前分片数2
1715:19:26.942 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - 第2片,命中分片开始处理用户id=2
1815:19:26.942 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - 第2片,命中分片开始处理用户id=5
1915:19:26.942 logback [Thread-6] INFO  n.x.xdclassjob.job.MyJobHandler - 第2片,命中分片开始处理用户id=8
20……

作者:Soulboy