本篇文章通过 Springboot + Quartz + RabbitMQ 实现了可配置的定时任务。
# 新建项目
新建一个空的 maven 项目,再在其新建一个子模块 quartz(定时任务实现模块)和子模块 business(具体业务模块)。
# 项目结构
# 引入依赖
# 总依赖
<dependencies> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter</artifactId> | |
<version>2.2.6.RELEASE</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-web</artifactId> | |
<version>2.2.6.RELEASE</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-test</artifactId> | |
<version>2.2.6.RELEASE</version> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter-api</artifactId> | |
<version>5.8.2</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.commons</groupId> | |
<artifactId>commons-lang3</artifactId> | |
<version>3.10</version> | |
</dependency> | |
<dependency> | |
<groupId>javax.persistence</groupId> | |
<artifactId>javax.persistence-api</artifactId> | |
<version>2.2</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.data</groupId> | |
<artifactId>spring-data-jpa</artifactId> | |
<version>2.2.6.RELEASE</version> | |
</dependency> | |
<dependency> | |
<groupId>org.postgresql</groupId> | |
<artifactId>postgresql</artifactId> | |
<version>42.2.14</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.amqp</groupId> | |
<artifactId>spring-rabbit</artifactId> | |
<version>2.2.5.RELEASE</version> | |
</dependency> | |
<dependency> | |
<groupId>io.swagger</groupId> | |
<artifactId>swagger-annotations</artifactId> | |
<version>1.5.19</version> | |
</dependency> | |
</dependencies> |
# quartz 模块依赖
<dependencies> | |
<dependency> | |
<groupId>org.quartz-scheduler</groupId> | |
<artifactId>quartz</artifactId> | |
<version>2.3.2</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework</groupId> | |
<artifactId>spring-context-support</artifactId> | |
<version>5.2.5.RELEASE</version> | |
</dependency> | |
</dependencies> |
# business 模块依赖
<dependencies> | |
<dependency> | |
<groupId>org.example</groupId> | |
<artifactId>quartz</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
</dependency> | |
</dependencies> |
# application.yml
server: | |
port: 8080 | |
spring: | |
datasource: | |
driver-class-name: org.postgresql.Driver | |
url: jdbc:postgresql://127.0.0.1:5432/xxx | |
username: xxx | |
password: xxx | |
rabbitmq: | |
host: 127.0.0.1 | |
port: 5672 | |
username: xxx | |
password: xxx | |
virtual-host: /xxx | |
quartz: | |
jobStoreType: JDBC | |
schedulerName: Scheduler | |
jdbc: | |
initializeSchema: NEVER | |
auto-startup: true | |
properties: | |
org: | |
quartz: | |
threadPool: | |
class: org.quartz.simpl.SimpleThreadPool | |
threadCount: 12 | |
threadPriority: 5 | |
jobStore: | |
tablePrefix: quartz.QRTZ_ | |
driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate |
# 配置类
QuartzConfig
package com.example.demo.config; | |
import com.example.demo.quartz.MyJobListener; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.autoconfigure.quartz.SchedulerFactoryBeanCustomizer; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.scheduling.quartz.SchedulerFactoryBean; | |
@Configuration | |
public class QuartzConfig implements SchedulerFactoryBeanCustomizer { | |
@Autowired | |
private MyJobListener jobListener; | |
@Override | |
public void customize(SchedulerFactoryBean schedulerFactoryBean) { | |
schedulerFactoryBean.setGlobalJobListeners(jobListener); | |
schedulerFactoryBean.setAutoStartup(true); | |
} | |
} |
MyJobListener
package com.example.demo.quartz; | |
import org.apache.commons.lang3.StringUtils; | |
import org.quartz.JobExecutionContext; | |
import org.quartz.JobExecutionException; | |
import org.quartz.JobListener; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.stereotype.Service; | |
@Service("myJobListener") | |
public class MyJobListener implements JobListener { | |
private static final Logger LOG = LoggerFactory.getLogger(JobListener.class); | |
public static final String LISTENER_NAME = "QuartSchedulerListener"; | |
@Override | |
public String getName() { | |
return LISTENER_NAME; | |
} | |
/** | |
* 任务被调度前 | |
*/ | |
@Override | |
public void jobToBeExecuted(JobExecutionContext context) { | |
LOG.info("开始执行任务调度..."); | |
String name = context.getJobDetail().getKey().getName(); | |
LOG.info("任务【" + name + "】即将开始!"); | |
} | |
/** | |
* 任务调度被拒 | |
*/ | |
@Override | |
public void jobExecutionVetoed(JobExecutionContext context) { | |
LOG.info("任务调度被拒!"); | |
} | |
/** | |
* 任务调度后 | |
*/ | |
@Override | |
public void jobWasExecuted(JobExecutionContext context, JobExecutionException exception) { | |
String name = context.getJobDetail().getKey().getName(); | |
if (exception != null && !StringUtils.equals("", exception.getMessage())){ | |
LOG.info("任务【" + name + "】抛出异常:" + exception.getMessage()); | |
}else { | |
LOG.info("任务【" + name + "】已经完成!"); | |
} | |
LOG.info("结束执行任务调度..."); | |
} | |
} |
# 需要使用到的通用类
ScheduleConstants
package com.example.demo.constant; | |
/** | |
* 定时任务相关常量 | |
*/ | |
public interface ScheduleConstants { | |
String DEFAULT_JOB_NAME = "job"; | |
String DEFAULT_JOB_GROUP = "jobGroup"; | |
String DEFAULT_CALLBACK_QUEUE = "scheduler.receive"; | |
} |
ScheduleStatus
package com.example.demo.constant; | |
public enum ScheduleStatus { | |
STATE_START("START", "开始"), | |
STATE_RUNNING("RUNNING", "执行中"), | |
STATE_TIMEOUT("TIMEOUT", "超时"), | |
STATE_COMPLETE("COMPLETE", "完成"), | |
STATE_ERROR("ERROR", "错误"); | |
private String code; | |
private String name; | |
} |
SchedulerJobInfo
package com.example.demo.quartz; | |
public class SchedulerJobInfo { | |
String triggerName; | |
String triggerCron; | |
} |
# 定时任务相关类
# 实体
SchedulerTypeEntity
package com.example.demo.entity; | |
import io.swagger.annotations.ApiModel; | |
import io.swagger.annotations.ApiModelProperty; | |
/** | |
* 这个实体是用来定义每一种定时任务的 | |
* 定时任务类型的 code 即是队列名称 | |
*/ | |
@ApiModel(description = "定时任务类型") | |
public class SchedulerTypeEntity { | |
@ApiModelProperty(value = "类型id") | |
private String id; | |
@ApiModelProperty(value = "类型编码") | |
private String code; | |
@ApiModelProperty(value = "类型名称") | |
private String name; | |
} |
SchedulerEntity
package com.example.demo.entity; | |
import io.swagger.annotations.ApiModel; | |
import io.swagger.annotations.ApiModelProperty; | |
import java.util.Map; | |
/** | |
* 这个实体是用来定义定时任务的执行时机。 | |
* 任务调度的 code 默认是 quartz 的 job 名称 | |
*/ | |
@ApiModel(value = "任务调度") | |
public class SchedulerEntity { | |
@ApiModelProperty(value = "任务调度id") | |
private String id; | |
@ApiModelProperty(value = "调度编号") | |
private String code; | |
@ApiModelProperty(value = "调度名称") | |
private String name; | |
@ApiModelProperty(value = "执行cron表达式") | |
private String cron; | |
@ApiModelProperty(value = "任务类型id") | |
private String typeId; | |
@ApiModelProperty(value = "扩展值") | |
private Map<String, Object> extend; | |
@ApiModelProperty(value = "调度超时时间") | |
private long expiry; | |
} |
SchedulerJobEntity
package com.example.demo.entity; | |
import io.swagger.annotations.ApiModel; | |
import io.swagger.annotations.ApiModelProperty; | |
/** | |
* 这个实体是用来定义定时任务给谁使用 | |
* 当使用人为空时,说明所有人都会执行该定时任务 | |
*/ | |
@ApiModel(description = "定时任务定义") | |
public class SchedulerJobEntity { | |
@ApiModelProperty(value = "id") | |
private String id; | |
@ApiModelProperty(value = "任务调度的id") | |
private String schedulerId; | |
@ApiModelProperty(value = "任务类型的id") | |
private String typeId; | |
@ApiModelProperty(value = "使用人") | |
private String userId; | |
} |
SchedulerLogEntity
package com.example.demo.entity; | |
import io.swagger.annotations.ApiModel; | |
import io.swagger.annotations.ApiModelProperty; | |
import java.time.LocalDateTime; | |
/** | |
* 定时任务执行日志记录 | |
*/ | |
@ApiModel(description = "定时任务执行日志") | |
public class SchedulerLogEntity { | |
@ApiModelProperty(value = "id") | |
private String id; | |
@ApiModelProperty(value = "定时任务id") | |
private String typeId; | |
@ApiModelProperty(value = "定时任务名称") | |
private String typeName; | |
@ApiModelProperty(value = "任务调度id") | |
private String schedulerId; | |
@ApiModelProperty(value = "任务调度名称") | |
private String schedulerName; | |
@ApiModelProperty(value = "用户id") | |
private String userId; | |
@ApiModelProperty(value = "开始时间") | |
private LocalDateTime start; | |
@ApiModelProperty(value = "完成时间") | |
private LocalDateTime finish; | |
@ApiModelProperty(value = "当前执行状态") | |
private String status; | |
@ApiModelProperty(value = "备注") | |
private String note; | |
} |
# 数据层
SchedulerTypeDao
package com.example.demo.dao; | |
import com.example.demo.entity.SchedulerTypeEntity; | |
import org.apache.commons.lang3.StringUtils; | |
import java.util.ArrayList; | |
import java.util.List; | |
public class SchedulerTypeDao { | |
public static List<SchedulerTypeEntity> data = new ArrayList<>(); | |
// 模拟数据 | |
static { | |
SchedulerTypeEntity schedulerType = new SchedulerTypeEntity(); | |
schedulerType.setId("type_001"); | |
schedulerType.setCode("scheduler.send.message"); | |
schedulerType.setName("定时发送消息"); | |
data.add(schedulerType); | |
} | |
public SchedulerTypeEntity findById(String id) { | |
return data.stream().filter(o -> StringUtils.equals(o.getId(), id)).findFirst().orElse(null); | |
} | |
} |
SchedulerDao
package com.example.demo.dao; | |
import com.example.demo.entity.SchedulerEntity; | |
import org.apache.commons.lang3.StringUtils; | |
import org.springframework.stereotype.Component; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.UUID; | |
@Component | |
public class SchedulerDao { | |
public static List<SchedulerEntity> data = new ArrayList<>(); | |
static { | |
SchedulerEntity scheduler = new SchedulerEntity(); | |
scheduler.setId("scheduler_001"); | |
scheduler.setCode("SEND_MESSAGE"); | |
scheduler.setName("发送消息"); | |
scheduler.setCron("*/10 * * * * ?"); | |
scheduler.setTypeId("type_001"); | |
data.add(scheduler); | |
} | |
public SchedulerEntity findByCode(String code) { | |
return data.stream().filter(o -> StringUtils.equals(o.getCode(), code)).findFirst().orElse(null); | |
} | |
public List<SchedulerEntity> queryAll() { | |
return data; | |
} | |
} |
SchedulerJobDao
package com.example.demo.dao; | |
import com.example.demo.entity.SchedulerJobEntity; | |
import org.apache.commons.lang3.StringUtils; | |
import org.springframework.stereotype.Component; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.stream.Collectors; | |
@Component | |
public class SchedulerJobDao { | |
public static List<SchedulerJobEntity> data = new ArrayList<>(); | |
static { | |
SchedulerJobEntity schedulerJob = new SchedulerJobEntity(); | |
schedulerJob.setId("1"); | |
schedulerJob.setSchedulerId("scheduler_001"); | |
schedulerJob.setUserId("."); | |
data.add(schedulerJob); | |
} | |
public List<SchedulerJobEntity> queryBySchedulerId(String schedulerId) { | |
return data.stream().filter(o -> StringUtils.equals(o.getSchedulerId(), schedulerId)).collect(Collectors.toList()); | |
} | |
} |
SchedulerLogDao
package com.example.demo.dao; | |
import com.example.demo.entity.SchedulerLogEntity; | |
import org.apache.commons.lang3.StringUtils; | |
import org.springframework.stereotype.Component; | |
import java.time.LocalDateTime; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.UUID; | |
import java.util.stream.Collectors; | |
@Component | |
public class SchedulerLogDao { | |
public static List<SchedulerLogEntity> data = new ArrayList<>(); | |
public List<SchedulerLogEntity> queryBySchedulerIdAndStatus(String schedulerId, String status) { | |
return data.stream().filter(o -> StringUtils.equals(o.getSchedulerId(), schedulerId) && StringUtils.equals(o.getStatus(), status)).collect(Collectors.toList()); | |
} | |
public List<SchedulerLogEntity> updateAll(List<SchedulerLogEntity> runJob) { | |
List<String> newIds = runJob.stream().map(SchedulerLogEntity::getId).collect(Collectors.toList()); | |
List<SchedulerLogEntity> oldData = data.stream().filter(o -> !newIds.contains(o.getId())).collect(Collectors.toList()); | |
oldData.addAll(runJob); | |
data = oldData; | |
return runJob; | |
} | |
public List<SchedulerLogEntity> saveAll(List<SchedulerLogEntity> runJob) { | |
runJob.forEach(o -> o.setId(UUID.randomUUID().toString())); | |
data.addAll(runJob); | |
return runJob; | |
} | |
public List<SchedulerLogEntity> queryAll() { | |
return data; | |
} | |
public void updateDataById(LocalDateTime start, LocalDateTime end, String status, String note, String id) { | |
SchedulerLogEntity log = data.stream().filter(o -> StringUtils.equals(o.getId(), id)).findFirst().orElse(new SchedulerLogEntity()); | |
data.remove(log); | |
log.setStart(start); | |
log.setFinish(end); | |
log.setStatus(status); | |
log.setNote(note); | |
data.add(log); | |
} | |
} |
# 服务层
SchedulerTypeService
package com.example.demo.service; | |
import com.example.demo.dao.SchedulerTypeDao; | |
import com.example.demo.entity.SchedulerTypeEntity; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.stereotype.Service; | |
@Service | |
public class SchedulerTypeService { | |
@Autowired | |
private SchedulerTypeDao schedulerTypeDao; | |
public SchedulerTypeEntity findById(String id) { | |
return schedulerTypeDao.findById(id); | |
} | |
} |
SchedulerService
package com.example.demo.service; | |
import com.example.demo.dao.SchedulerDao; | |
import com.example.demo.entity.SchedulerEntity; | |
import com.example.demo.quartz.SchedulerJobInfo; | |
import com.example.demo.quartz.TaskService; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.stereotype.Service; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Optional; | |
import java.util.stream.Collectors; | |
@Service | |
public class SchedulerService { | |
@Autowired | |
private SchedulerDao schedulerDao; | |
@Autowired | |
private TaskService taskService; | |
public SchedulerEntity findByCode(String triggerName) { | |
return schedulerDao.findByCode(triggerName); | |
} | |
public void resetAll() { | |
List<SchedulerEntity> schedulers = schedulerDao.queryAll(); | |
List<SchedulerJobInfo> jobInfoList = Optional.ofNullable(schedulers).orElse(new ArrayList<>()). | |
stream().map(this::convertJobInfo).collect(Collectors.toList()); | |
taskService.saveAllTriggers(jobInfoList); | |
} | |
private SchedulerJobInfo convertJobInfo(SchedulerEntity schedulerEntity) { | |
SchedulerJobInfo jobInfo = new SchedulerJobInfo(); | |
jobInfo.setTriggerName(schedulerEntity.getCode()); | |
jobInfo.setTriggerCron(schedulerEntity.getCron()); | |
return jobInfo; | |
} | |
} |
SchedulerJobService
package com.example.demo.service; | |
import com.example.demo.dao.SchedulerJobDao; | |
import com.example.demo.entity.SchedulerJobEntity; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.stereotype.Service; | |
import java.util.List; | |
@Service | |
public class SchedulerJobService { | |
@Autowired | |
private SchedulerJobDao schedulerJobDao; | |
public List<SchedulerJobEntity> queryBySchedulerId(String schedulerId) { | |
return schedulerJobDao.queryBySchedulerId(schedulerId); | |
} | |
} |
SchedulerLogService
package com.example.demo.service; | |
import com.example.demo.dao.SchedulerLogDao; | |
import com.example.demo.dto.CallbackMessage; | |
import com.example.demo.entity.SchedulerLogEntity; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.stereotype.Service; | |
import java.util.ArrayList; | |
import java.util.List; | |
@Service | |
public class SchedulerLogService { | |
@Autowired | |
private SchedulerLogDao schedulerLogDao; | |
public List<SchedulerLogEntity> queryBySchedulerIdAndStatus(String schedulerId, String status) { | |
return schedulerLogDao.queryBySchedulerIdAndStatus(schedulerId, status); | |
} | |
public List<SchedulerLogEntity> updateAll(List<SchedulerLogEntity> runJob) { | |
return schedulerLogDao.updateAll(runJob); | |
} | |
public List<SchedulerLogEntity> saveAll(List<SchedulerLogEntity> runJob) { | |
return schedulerLogDao.saveAll(runJob); | |
} | |
public SchedulerLogEntity save(SchedulerLogEntity global) { | |
List<SchedulerLogEntity> save = new ArrayList<>(); | |
save.add(global); | |
return schedulerLogDao.saveAll(save).get(0); | |
} | |
public void updateDateById(String id, CallbackMessage message) { | |
schedulerLogDao.updateDataById(message.getStart(), message.getEnd(), message.getStatus(), message.getNote(), id); | |
} | |
} |
# Quartz 实现类
# 定时发送 MQ 消息
TaskJob
package com.example.demo.quartz; | |
import com.example.demo.constant.ScheduleStatus; | |
import com.example.demo.dto.Message; | |
import com.example.demo.entity.SchedulerEntity; | |
import com.example.demo.entity.SchedulerJobEntity; | |
import com.example.demo.entity.SchedulerLogEntity; | |
import com.example.demo.entity.SchedulerTypeEntity; | |
import com.example.demo.service.SchedulerJobService; | |
import com.example.demo.service.SchedulerLogService; | |
import com.example.demo.service.SchedulerService; | |
import com.example.demo.service.SchedulerTypeService; | |
import org.apache.commons.lang3.StringUtils; | |
import org.quartz.DisallowConcurrentExecution; | |
import org.quartz.JobDataMap; | |
import org.quartz.JobExecutionContext; | |
import org.quartz.JobExecutionException; | |
import org.quartz.PersistJobDataAfterExecution; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.amqp.rabbit.core.RabbitTemplate; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.scheduling.quartz.QuartzJobBean; | |
import org.springframework.util.CollectionUtils; | |
import java.time.LocalDateTime; | |
import java.time.temporal.ChronoUnit; | |
import java.util.ArrayList; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.function.Predicate; | |
import java.util.stream.Collectors; | |
@DisallowConcurrentExecution | |
@PersistJobDataAfterExecution | |
public class TaskJob extends QuartzJobBean { | |
private static final Logger LOG = LoggerFactory.getLogger(TaskJob.class); | |
@Autowired | |
private RabbitTemplate rabbitTemplate; | |
@Autowired | |
private SchedulerService schedulerService; | |
@Autowired | |
private SchedulerTypeService schedulerTypeService; | |
@Autowired | |
private SchedulerJobService schedulerJobService; | |
@Autowired | |
private SchedulerLogService schedulerLogService; | |
@Override | |
protected void executeInternal(JobExecutionContext context) throws JobExecutionException { | |
String triggerName = context.getTrigger().getKey().getName(); | |
LOG.info("开始执行定时调度:" + triggerName); | |
try { | |
JobDataMap jobDataMap = context.getMergedJobDataMap(); | |
handle(triggerName, jobDataMap); | |
}catch (Exception e){ | |
LOG.error("执行定时调度异常:" + e); | |
e.printStackTrace(); | |
} | |
LOG.info("结束执行定时调度!"); | |
} | |
private void handle(String triggerName, JobDataMap jobDataMap) { | |
// 获取相关任务调度 | |
SchedulerEntity scheduler = schedulerService.findByCode(triggerName); | |
if (scheduler == null) return; | |
// 获取定时任务 | |
SchedulerTypeEntity schedulerType = schedulerTypeService.findById(scheduler.getTypeId()); | |
if (schedulerType == null) return; | |
// 创建定时任务日志 | |
List<SchedulerLogEntity> schedulerLog = createSchedulerLog(scheduler, schedulerType); | |
if (schedulerLog == null) return; | |
// 创建消息 | |
Message message = createMessage(schedulerLog); | |
// 发送 MQ 消息 | |
rabbitTemplate.convertAndSend(schedulerType.getCode(), message); | |
} | |
private Message createMessage(List<SchedulerLogEntity> schedulerLog) { | |
LOG.info("开始创建MQ消息"); | |
Message message = new Message(); | |
message.setInfos(schedulerLog.stream().map(o -> { | |
Message.Info info = new Message.Info(); | |
info.setLogId(o.getId()); | |
info.setUserId(o.getUserId()); | |
return info; | |
}).collect(Collectors.toList())); | |
message.setTypeId(schedulerLog.get(0).getTypeId()); | |
message.setSchedulerId(schedulerLog.get(0).getSchedulerId()); | |
LOG.info("结束创建MQ消息"); | |
return message; | |
} | |
private List<SchedulerLogEntity> createSchedulerLog(SchedulerEntity scheduler, SchedulerTypeEntity schedulerType) { | |
// 查询定时任务定义 | |
List<SchedulerJobEntity> schedulerJobs = schedulerJobService.queryBySchedulerId(scheduler.getId()); | |
// 如果定时任务未绑定用户,则不执行 | |
if (schedulerJobs == null) return null; | |
// 根据策略清理日志 | |
clearLogByStrategy(scheduler, schedulerJobs); | |
// 更新任务的执行状态并返回执行中的用户 id | |
Set<String> runUserIds = updateTaskStatusAndReturnRunUserId(scheduler); | |
// 记录任务的开始状态 | |
List<SchedulerLogEntity> logs = schedulerJobs.stream(). | |
filter(o -> !runUserIds.contains(o.getUserId())). | |
map(o -> { | |
SchedulerLogEntity log = new SchedulerLogEntity(); | |
log.setTypeId(o.getTypeId()); | |
log.setSchedulerId(o.getSchedulerId()); | |
log.setUserId(o.getUserId()); | |
log.setStatus(ScheduleStatus.STATE_START.getCode()); | |
return log; | |
}). | |
collect(Collectors.toList()); | |
// 包含全局的任务,只记录一个任务 | |
SchedulerLogEntity global = logs.stream().filter(o -> StringUtils.equals(o.getUserId(), ".")).findFirst().orElse(null); | |
// 如果没有全局的,则保存相关用户的任务记录 | |
List<SchedulerLogEntity> resultLogs = null; | |
if (global == null && !CollectionUtils.isEmpty(logs)){ | |
resultLogs = schedulerLogService.saveAll(logs); | |
} | |
if (global != null){ | |
List<SchedulerLogEntity> saveData = new ArrayList<>(); | |
saveData.add(global); | |
resultLogs = schedulerLogService.saveAll(saveData); | |
} | |
return resultLogs; | |
} | |
private Set<String> updateTaskStatusAndReturnRunUserId(SchedulerEntity scheduler) { | |
// 如果未设置超时时间,返回 | |
if (scheduler.getExpiry() <= 0) return new HashSet<>(); | |
// 当前时间 | |
LocalDateTime now = LocalDateTime.now(); | |
// 获取已经开始的定时任务 | |
List<SchedulerLogEntity> logs = schedulerLogService.queryBySchedulerIdAndStatus(scheduler.getId(), ScheduleStatus.STATE_START.getCode()); | |
Map<Boolean, List<SchedulerLogEntity>> allJob = logs.stream().collect(Collectors.partitioningBy(jobIsRun(scheduler, now))); | |
// 运行中的任务 | |
List<SchedulerLogEntity> runJob = allJob.get(true); | |
// 超时的任务 | |
List<SchedulerLogEntity> timeOutJob = allJob.get(false); | |
if (!CollectionUtils.isEmpty(runJob)){ | |
runJob.forEach(job -> { | |
job.setStatus(ScheduleStatus.STATE_RUNNING.getCode()); | |
}); | |
schedulerLogService.updateAll(runJob); | |
} | |
if (!CollectionUtils.isEmpty(timeOutJob)){ | |
timeOutJob.forEach(job -> { | |
job.setStatus(ScheduleStatus.STATE_TIMEOUT.getCode()); | |
}); | |
schedulerLogService.updateAll(timeOutJob); | |
} | |
return runJob.stream().map(SchedulerLogEntity::getUserId).filter(StringUtils::isNotBlank).collect(Collectors.toSet()); | |
} | |
private Predicate<SchedulerLogEntity> jobIsRun(SchedulerEntity scheduler, LocalDateTime now) { | |
return item -> { | |
LocalDateTime completeTime = item.getStart().plus(scheduler.getExpiry(), ChronoUnit.SECONDS); | |
if (now.compareTo(completeTime) < 0){ | |
return true; | |
} | |
return false; | |
}; | |
} | |
private void clearLogByStrategy(SchedulerEntity scheduler, List<SchedulerJobEntity> schedulerJobs) { | |
} | |
} |
# 消息体
发送消息体
package com.example.demo.dto; | |
import io.swagger.annotations.ApiModelProperty; | |
import java.io.Serializable; | |
import java.util.List; | |
public class Message implements Serializable { | |
@ApiModelProperty(value = "任务分类id") | |
private String typeId; | |
@ApiModelProperty(value = "任务调度id") | |
private String schedulerId; | |
@ApiModelProperty(value = "基本信息") | |
private List<Info> infos; | |
@ApiModelProperty(value = "传输的内容") | |
private String data; | |
public static class Info implements Serializable{ | |
@ApiModelProperty(value = "日志id") | |
private String logId; | |
@ApiModelProperty(value = "用户id") | |
private String userId; | |
} | |
} |
回执消息体
package com.example.demo.dto; | |
import io.swagger.annotations.ApiModelProperty; | |
import java.io.Serializable; | |
import java.time.LocalDateTime; | |
public class CallbackMessage implements Serializable { | |
@ApiModelProperty(value = "日志id") | |
private String logId; | |
@ApiModelProperty(value = "当前用户id") | |
private String userId; | |
@ApiModelProperty(value = "当前任务id") | |
private String typeId; | |
@ApiModelProperty(value = "当前任务编码") | |
private String typeCode; | |
@ApiModelProperty(value = "当前调度id") | |
private String schedulerId; | |
@ApiModelProperty(value = "开始时间") | |
private LocalDateTime start; | |
@ApiModelProperty(value = "完成时间") | |
private LocalDateTime end; | |
@ApiModelProperty(value = "任务状态") | |
private String status; | |
@ApiModelProperty(value = "备注") | |
private String note; | |
} |
# 业务层
TaskService
package com.example.demo.quartz; | |
import java.util.List; | |
public interface TaskService { | |
String saveAllTriggers(List<SchedulerJobInfo> jobs); | |
} |
TaskServiceImpl
package com.example.demo.quartz; | |
import com.example.demo.constant.ScheduleConstants; | |
import org.quartz.CronScheduleBuilder; | |
import org.quartz.CronTrigger; | |
import org.quartz.JobBuilder; | |
import org.quartz.JobDetail; | |
import org.quartz.JobKey; | |
import org.quartz.Scheduler; | |
import org.quartz.TriggerBuilder; | |
import org.quartz.TriggerKey; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.stereotype.Service; | |
import org.springframework.util.ClassUtils; | |
import org.springframework.util.CollectionUtils; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Set; | |
import java.util.stream.Collectors; | |
@Service | |
public class TaskServiceImpl implements TaskService{ | |
private static final Logger LOG = LoggerFactory.getLogger(TaskServiceImpl.class); | |
@Autowired | |
private Scheduler scheduler; | |
/** | |
* 保存所有的定时 job 调度 | |
*/ | |
@Override | |
public String saveAllTriggers(List<SchedulerJobInfo> jobs) { | |
try { | |
JobKey jobKey = new JobKey(ScheduleConstants.DEFAULT_JOB_NAME, ScheduleConstants.DEFAULT_JOB_GROUP); | |
if (!CollectionUtils.isEmpty(jobs)) { | |
scheduler.deleteJob(jobKey); | |
jobs.forEach(this::saveTrigger); | |
} | |
}catch (Exception e){ | |
LOG.error("启动所有的任务调度失败:{}", e.getMessage()); | |
e.printStackTrace(); | |
return "启动所有的任务调度失败!"; | |
} | |
return "成功!"; | |
} | |
/** | |
* 启动单个的定时调度 | |
*/ | |
private void saveTrigger(SchedulerJobInfo jobInfo) { | |
try { | |
checkAndSaveJob(); | |
TriggerKey triggerKey = new TriggerKey(jobInfo.getTriggerName(), ScheduleConstants.DEFAULT_JOB_GROUP); | |
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(jobInfo.getTriggerCron()); | |
CronTrigger cronTrigger = TriggerBuilder.newTrigger(). | |
withIdentity(jobInfo.getTriggerName(), ScheduleConstants.DEFAULT_JOB_GROUP). | |
forJob(ScheduleConstants.DEFAULT_JOB_NAME, ScheduleConstants.DEFAULT_JOB_GROUP). | |
withSchedule(scheduleBuilder). | |
build(); | |
if (scheduler.checkExists(triggerKey)) { | |
scheduler.rescheduleJob(triggerKey, cronTrigger); | |
return; | |
} | |
scheduler.scheduleJob(cronTrigger); | |
}catch (Exception e){ | |
LOG.error("启动定时调度失败:{}", e.getMessage()); | |
e.printStackTrace(); | |
} | |
} | |
private void checkAndSaveJob() { | |
try { | |
JobKey jobKey = new JobKey(ScheduleConstants.DEFAULT_JOB_NAME, ScheduleConstants.DEFAULT_JOB_GROUP); | |
JobDetail oldJobDetail = scheduler.getJobDetail(jobKey); | |
// 如果数据库不存在 job,添加一条 | |
if (oldJobDetail == null || !ClassUtils.isAssignable(oldJobDetail.getJobClass(), TaskJob.class)){ | |
JobDetail jobDetail = JobBuilder.newJob(TaskJob.class). | |
withIdentity(ScheduleConstants.DEFAULT_JOB_NAME, ScheduleConstants.DEFAULT_JOB_GROUP). | |
storeDurably(). | |
build(); | |
scheduler.addJob(jobDetail, true); | |
} | |
}catch (Exception e){ | |
LOG.error("保存定时调度失败:{}", e.getMessage()); | |
e.printStackTrace(); | |
} | |
} | |
} |
# 启动初始化
SchedulerInitApplicationListener
package com.example.demo.listener; | |
import com.example.demo.service.SchedulerService; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.context.event.ContextRefreshedEvent; | |
import org.springframework.context.event.EventListener; | |
import org.springframework.stereotype.Component; | |
/** | |
* 定时器初始化 | |
**/ | |
@Component | |
public class SchedulerInitApplicationListener { | |
private static final Logger LOG = LoggerFactory.getLogger(SchedulerInitApplicationListener.class); | |
@Autowired | |
private SchedulerService schedulerService; | |
@EventListener(classes = {ContextRefreshedEvent.class}) | |
public void onApplicationEvent(ContextRefreshedEvent event){ | |
LOG.info("重置定时任务调度开始..."); | |
schedulerService.resetAll(); | |
LOG.info("重置定时任务调度结束..."); | |
} | |
} |
# 回执类
package com.example.demo.listener; | |
import com.example.demo.dto.CallbackMessage; | |
import com.example.demo.service.SchedulerLogService; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.amqp.rabbit.annotation.Queue; | |
import org.springframework.amqp.rabbit.annotation.RabbitListener; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.messaging.handler.annotation.Payload; | |
import org.springframework.stereotype.Component; | |
import org.springframework.util.CollectionUtils; | |
/** | |
* 定时任务回执消息监听器 | |
*/ | |
@Component | |
public class SchedulerReceiveListener { | |
private static final Logger LOG = LoggerFactory.getLogger(SchedulerReceiveListener.class); | |
@Autowired | |
private SchedulerLogService schedulerLogService; | |
@RabbitListener(queuesToDeclare = @Queue("scheduler.receive")) | |
public void process(@Payload CallbackMessage message){ | |
if (message == null){ | |
return; | |
} | |
// 回写执行结果 | |
schedulerLogService.updateDateById(message.getLogId(), message); | |
} | |
} |
# 业务实现类
# 消息处理
package com.example.demo.scheduler; | |
import com.example.demo.constant.ScheduleConstants; | |
import com.example.demo.constant.ScheduleStatus; | |
import com.example.demo.dto.CallbackMessage; | |
import com.example.demo.dto.Message; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.amqp.rabbit.annotation.Queue; | |
import org.springframework.amqp.rabbit.annotation.RabbitListener; | |
import org.springframework.amqp.rabbit.core.RabbitTemplate; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.messaging.handler.annotation.Payload; | |
import org.springframework.stereotype.Component; | |
import org.springframework.util.CollectionUtils; | |
import java.time.LocalDateTime; | |
@Component | |
public class DemoScheduler { | |
@Autowired | |
private RabbitTemplate rabbitTemplate; | |
private static final Logger LOG = LoggerFactory.getLogger(DemoScheduler.class); | |
@RabbitListener(queuesToDeclare = @Queue(name = "scheduler.send.message")) | |
private void process(@Payload Message message){ | |
LOG.info("处理消息定时任务开始,消息体为{}", message); | |
if (message == null || CollectionUtils.isEmpty(message.getInfos())){ | |
LOG.info("未有用户需要处理此消息!"); | |
return; | |
} | |
// 对每个用户处理此消息 | |
for (Message.Info info : message.getInfos()){ | |
// 构建回执消息 | |
CallbackMessage callbackMessage = handlerBefore(message, info); | |
try { | |
handler(info.getUserId()); | |
}catch (Exception e){ | |
e.printStackTrace(); | |
callbackMessage.setNote(e.getMessage()); | |
callbackMessage.setStatus(ScheduleStatus.STATE_ERROR.getCode()); | |
LOG.info("处理消息的定时任务异常,异常信息为{}", e.getMessage()); | |
}finally { | |
handlerAfter(callbackMessage); | |
} | |
} | |
} | |
private void handlerAfter(CallbackMessage callbackMessage) { | |
callbackMessage.setEnd(LocalDateTime.now()); | |
rabbitTemplate.convertAndSend(ScheduleConstants.DEFAULT_CALLBACK_QUEUE, callbackMessage); | |
} | |
private void handler(String userId) { | |
LOG.info("用户{}处理消息", userId); | |
//TODO 处理实现 | |
} | |
private CallbackMessage handlerBefore(Message message, Message.Info info) { | |
CallbackMessage callbackMessage = new CallbackMessage(); | |
callbackMessage.setLogId(info.getLogId()); | |
callbackMessage.setUserId(info.getUserId()); | |
callbackMessage.setTypeId(message.getTypeId()); | |
callbackMessage.setSchedulerId(message.getSchedulerId()); | |
callbackMessage.setStatus(ScheduleStatus.STATE_COMPLETE.getCode()); | |
callbackMessage.setStart(LocalDateTime.now()); | |
callbackMessage.setNote("处理成功!"); | |
return callbackMessage; | |
} | |
} |