Loading...

本篇文章通过 Springboot + Quartz + RabbitMQ 实现了可配置的定时任务。

# 新建项目

新建一个空的 maven 项目,再在其新建一个子模块 quartz(定时任务实现模块)和子模块 business(具体业务模块)。

# 项目结构

项目结构

# 引入依赖

# 总依赖

<dependencies>
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
      <version>2.2.6.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>2.2.6.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <version>2.2.6.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-api</artifactId>
        <version>5.8.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.10</version>
    </dependency>
    <dependency>
        <groupId>javax.persistence</groupId>
        <artifactId>javax.persistence-api</artifactId>
        <version>2.2</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-jpa</artifactId>
        <version>2.2.6.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <version>42.2.14</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.2.5.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>io.swagger</groupId>
        <artifactId>swagger-annotations</artifactId>
        <version>1.5.19</version>
    </dependency>
</dependencies>

# quartz 模块依赖

<dependencies>
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>5.2.5.RELEASE</version>
        </dependency>
    </dependencies>

# business 模块依赖

<dependencies>
    <dependency>
        <groupId>org.example</groupId>
        <artifactId>quartz</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
</dependencies>

# application.yml

server:
  port: 8080
spring:
  datasource:
    driver-class-name: org.postgresql.Driver
    url: jdbc:postgresql://127.0.0.1:5432/xxx
    username: xxx
    password: xxx
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: xxx
    password: xxx
    virtual-host: /xxx
  quartz:
    jobStoreType: JDBC
    schedulerName: Scheduler
    jdbc:
      initializeSchema: NEVER
    auto-startup: true
    properties:
      org:
        quartz:
          threadPool:
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 12
            threadPriority: 5
          jobStore:
            tablePrefix: quartz.QRTZ_
            driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate

# 配置类

QuartzConfig

package com.example.demo.config;
import com.example.demo.quartz.MyJobListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.quartz.SchedulerFactoryBeanCustomizer;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
@Configuration
public class QuartzConfig implements SchedulerFactoryBeanCustomizer {
    @Autowired
    private MyJobListener jobListener;
    @Override
    public void customize(SchedulerFactoryBean schedulerFactoryBean) {
        schedulerFactoryBean.setGlobalJobListeners(jobListener);
        schedulerFactoryBean.setAutoStartup(true);
    }
}

MyJobListener

package com.example.demo.quartz;
import org.apache.commons.lang3.StringUtils;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service("myJobListener")
public class MyJobListener implements JobListener {
    private static final Logger LOG = LoggerFactory.getLogger(JobListener.class);
    public static final String LISTENER_NAME = "QuartSchedulerListener";
    @Override
    public String getName() {
        return LISTENER_NAME;
    }
    /**
     * 任务被调度前
     */
    @Override
    public void jobToBeExecuted(JobExecutionContext context) {
        LOG.info("开始执行任务调度...");
        String name = context.getJobDetail().getKey().getName();
        LOG.info("任务【" + name + "】即将开始!");
    }
    /**
     * 任务调度被拒
     */
    @Override
    public void jobExecutionVetoed(JobExecutionContext context) {
        LOG.info("任务调度被拒!");
    }
    /**
     * 任务调度后
     */
    @Override
    public void jobWasExecuted(JobExecutionContext context, JobExecutionException exception) {
        String name = context.getJobDetail().getKey().getName();
        if (exception != null && !StringUtils.equals("", exception.getMessage())){
            LOG.info("任务【" + name + "】抛出异常:" + exception.getMessage());
        }else {
            LOG.info("任务【" + name + "】已经完成!");
        }
        LOG.info("结束执行任务调度...");
    }
}

# 需要使用到的通用类

ScheduleConstants

package com.example.demo.constant;
/**
 * 定时任务相关常量
 */
public interface ScheduleConstants {
    String DEFAULT_JOB_NAME = "job";
    String DEFAULT_JOB_GROUP = "jobGroup";
    String DEFAULT_CALLBACK_QUEUE = "scheduler.receive";
}

ScheduleStatus

package com.example.demo.constant;
public enum ScheduleStatus {
    STATE_START("START", "开始"),
    STATE_RUNNING("RUNNING", "执行中"),
    STATE_TIMEOUT("TIMEOUT", "超时"),
    STATE_COMPLETE("COMPLETE", "完成"),
    STATE_ERROR("ERROR", "错误");
    private String code;
    private String name;
}

SchedulerJobInfo

package com.example.demo.quartz;
public class SchedulerJobInfo {
    String triggerName;
    String triggerCron;
}

# 定时任务相关类

# 实体

SchedulerTypeEntity

package com.example.demo.entity;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
/**
 * 这个实体是用来定义每一种定时任务的
 * 定时任务类型的 code 即是队列名称
 */
@ApiModel(description = "定时任务类型")
public class SchedulerTypeEntity {
    @ApiModelProperty(value = "类型id")
    private String id;
    @ApiModelProperty(value = "类型编码")
    private String code;
    @ApiModelProperty(value = "类型名称")
    private String name;
}

SchedulerEntity

package com.example.demo.entity;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.Map;
/**
 * 这个实体是用来定义定时任务的执行时机。
 * 任务调度的 code 默认是 quartz 的 job 名称
 */
@ApiModel(value = "任务调度")
public class SchedulerEntity {
    @ApiModelProperty(value = "任务调度id")
    private String id;
    @ApiModelProperty(value = "调度编号")
    private String code;
    @ApiModelProperty(value = "调度名称")
    private String name;
    @ApiModelProperty(value = "执行cron表达式")
    private String cron;
    @ApiModelProperty(value = "任务类型id")
    private String typeId;
    @ApiModelProperty(value = "扩展值")
    private Map<String, Object> extend;
    @ApiModelProperty(value = "调度超时时间")
    private long expiry;
}

SchedulerJobEntity

package com.example.demo.entity;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
/**
 * 这个实体是用来定义定时任务给谁使用
 * 当使用人为空时,说明所有人都会执行该定时任务
 */
@ApiModel(description = "定时任务定义")
public class SchedulerJobEntity {
    @ApiModelProperty(value = "id")
    private String id;
    @ApiModelProperty(value = "任务调度的id")
    private String schedulerId;
    @ApiModelProperty(value = "任务类型的id")
    private String typeId;
    @ApiModelProperty(value = "使用人")
    private String userId;
}

SchedulerLogEntity

package com.example.demo.entity;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.time.LocalDateTime;
/**
 * 定时任务执行日志记录
 */
@ApiModel(description = "定时任务执行日志")
public class SchedulerLogEntity {
    @ApiModelProperty(value = "id")
    private String id;
    @ApiModelProperty(value = "定时任务id")
    private String typeId;
    @ApiModelProperty(value = "定时任务名称")
    private String typeName;
    @ApiModelProperty(value = "任务调度id")
    private String schedulerId;
    @ApiModelProperty(value = "任务调度名称")
    private String schedulerName;
    @ApiModelProperty(value = "用户id")
    private String userId;
    @ApiModelProperty(value = "开始时间")
    private LocalDateTime start;
    @ApiModelProperty(value = "完成时间")
    private LocalDateTime finish;
    @ApiModelProperty(value = "当前执行状态")
    private String status;
    @ApiModelProperty(value = "备注")
    private String note;
}

# 数据层

SchedulerTypeDao

package com.example.demo.dao;
import com.example.demo.entity.SchedulerTypeEntity;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
public class SchedulerTypeDao {
    public static List<SchedulerTypeEntity> data = new ArrayList<>();
    // 模拟数据
    static {
        SchedulerTypeEntity schedulerType = new SchedulerTypeEntity();
        schedulerType.setId("type_001");
        schedulerType.setCode("scheduler.send.message");
        schedulerType.setName("定时发送消息");
        data.add(schedulerType);
    }
    public SchedulerTypeEntity findById(String id) {
        return data.stream().filter(o -> StringUtils.equals(o.getId(), id)).findFirst().orElse(null);
    }
}

SchedulerDao

package com.example.demo.dao;
import com.example.demo.entity.SchedulerEntity;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Component
public class SchedulerDao {
    public static List<SchedulerEntity> data = new ArrayList<>();
    static {
        SchedulerEntity scheduler = new SchedulerEntity();
        scheduler.setId("scheduler_001");
        scheduler.setCode("SEND_MESSAGE");
        scheduler.setName("发送消息");
        scheduler.setCron("*/10 * * * * ?");
        scheduler.setTypeId("type_001");
        data.add(scheduler);
    }
    public SchedulerEntity findByCode(String code) {
        return data.stream().filter(o -> StringUtils.equals(o.getCode(), code)).findFirst().orElse(null);
    }
    public List<SchedulerEntity> queryAll() {
        return data;
    }
}

SchedulerJobDao

package com.example.demo.dao;
import com.example.demo.entity.SchedulerJobEntity;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Component
public class SchedulerJobDao {
    public static List<SchedulerJobEntity> data = new ArrayList<>();
    static {
        SchedulerJobEntity schedulerJob = new SchedulerJobEntity();
        schedulerJob.setId("1");
        schedulerJob.setSchedulerId("scheduler_001");
        schedulerJob.setUserId(".");
        data.add(schedulerJob);
    }
    public List<SchedulerJobEntity> queryBySchedulerId(String schedulerId) {
        return data.stream().filter(o -> StringUtils.equals(o.getSchedulerId(), schedulerId)).collect(Collectors.toList());
    }
}

SchedulerLogDao

package com.example.demo.dao;
import com.example.demo.entity.SchedulerLogEntity;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
@Component
public class SchedulerLogDao {
    public static List<SchedulerLogEntity> data = new ArrayList<>();
    public List<SchedulerLogEntity> queryBySchedulerIdAndStatus(String schedulerId, String status) {
        return data.stream().filter(o -> StringUtils.equals(o.getSchedulerId(), schedulerId) && StringUtils.equals(o.getStatus(), status)).collect(Collectors.toList());
    }
    public List<SchedulerLogEntity> updateAll(List<SchedulerLogEntity> runJob) {
        List<String> newIds = runJob.stream().map(SchedulerLogEntity::getId).collect(Collectors.toList());
        List<SchedulerLogEntity> oldData = data.stream().filter(o -> !newIds.contains(o.getId())).collect(Collectors.toList());
        oldData.addAll(runJob);
        data = oldData;
        return runJob;
    }
    public List<SchedulerLogEntity> saveAll(List<SchedulerLogEntity> runJob) {
        runJob.forEach(o -> o.setId(UUID.randomUUID().toString()));
        data.addAll(runJob);
        return runJob;
    }
    public List<SchedulerLogEntity> queryAll() {
        return data;
    }
    public void updateDataById(LocalDateTime start, LocalDateTime end, String status, String note, String id) {
        SchedulerLogEntity log = data.stream().filter(o -> StringUtils.equals(o.getId(), id)).findFirst().orElse(new SchedulerLogEntity());
        data.remove(log);
        log.setStart(start);
        log.setFinish(end);
        log.setStatus(status);
        log.setNote(note);
        data.add(log);
    }
}

# 服务层

SchedulerTypeService

package com.example.demo.service;
import com.example.demo.dao.SchedulerTypeDao;
import com.example.demo.entity.SchedulerTypeEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class SchedulerTypeService {
    @Autowired
    private SchedulerTypeDao schedulerTypeDao;
    public SchedulerTypeEntity findById(String id) {
        return schedulerTypeDao.findById(id);
    }
}

SchedulerService

package com.example.demo.service;
import com.example.demo.dao.SchedulerDao;
import com.example.demo.entity.SchedulerEntity;
import com.example.demo.quartz.SchedulerJobInfo;
import com.example.demo.quartz.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@Service
public class SchedulerService {
    @Autowired
    private SchedulerDao schedulerDao;
    @Autowired
    private TaskService taskService;
    public SchedulerEntity findByCode(String triggerName) {
        return schedulerDao.findByCode(triggerName);
    }
    public void resetAll() {
        List<SchedulerEntity> schedulers = schedulerDao.queryAll();
        List<SchedulerJobInfo> jobInfoList = Optional.ofNullable(schedulers).orElse(new ArrayList<>()).
                stream().map(this::convertJobInfo).collect(Collectors.toList());
        taskService.saveAllTriggers(jobInfoList);
    }
    private SchedulerJobInfo convertJobInfo(SchedulerEntity schedulerEntity) {
        SchedulerJobInfo jobInfo = new SchedulerJobInfo();
        jobInfo.setTriggerName(schedulerEntity.getCode());
        jobInfo.setTriggerCron(schedulerEntity.getCron());
        return jobInfo;
    }
}

SchedulerJobService

package com.example.demo.service;
import com.example.demo.dao.SchedulerJobDao;
import com.example.demo.entity.SchedulerJobEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class SchedulerJobService {
    @Autowired
    private SchedulerJobDao schedulerJobDao;
    public List<SchedulerJobEntity> queryBySchedulerId(String schedulerId) {
        return schedulerJobDao.queryBySchedulerId(schedulerId);
    }
}

SchedulerLogService

package com.example.demo.service;
import com.example.demo.dao.SchedulerLogDao;
import com.example.demo.dto.CallbackMessage;
import com.example.demo.entity.SchedulerLogEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
public class SchedulerLogService {
    @Autowired
    private SchedulerLogDao schedulerLogDao;
    public List<SchedulerLogEntity> queryBySchedulerIdAndStatus(String schedulerId, String status) {
        return schedulerLogDao.queryBySchedulerIdAndStatus(schedulerId, status);
    }
    public List<SchedulerLogEntity> updateAll(List<SchedulerLogEntity> runJob) {
        return schedulerLogDao.updateAll(runJob);
    }
    public List<SchedulerLogEntity> saveAll(List<SchedulerLogEntity> runJob) {
        return schedulerLogDao.saveAll(runJob);
    }
    public SchedulerLogEntity save(SchedulerLogEntity global) {
        List<SchedulerLogEntity> save = new ArrayList<>();
        save.add(global);
        return schedulerLogDao.saveAll(save).get(0);
    }
    public void updateDateById(String id, CallbackMessage message) {
        schedulerLogDao.updateDataById(message.getStart(), message.getEnd(), message.getStatus(), message.getNote(), id);
    }
}

# Quartz 实现类

# 定时发送 MQ 消息

TaskJob

package com.example.demo.quartz;
import com.example.demo.constant.ScheduleStatus;
import com.example.demo.dto.Message;
import com.example.demo.entity.SchedulerEntity;
import com.example.demo.entity.SchedulerJobEntity;
import com.example.demo.entity.SchedulerLogEntity;
import com.example.demo.entity.SchedulerTypeEntity;
import com.example.demo.service.SchedulerJobService;
import com.example.demo.service.SchedulerLogService;
import com.example.demo.service.SchedulerService;
import com.example.demo.service.SchedulerTypeService;
import org.apache.commons.lang3.StringUtils;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public class TaskJob extends QuartzJobBean {
    private static final Logger LOG = LoggerFactory.getLogger(TaskJob.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private SchedulerService schedulerService;
    @Autowired
    private SchedulerTypeService schedulerTypeService;
    @Autowired
    private SchedulerJobService schedulerJobService;
    @Autowired
    private SchedulerLogService schedulerLogService;
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        String triggerName = context.getTrigger().getKey().getName();
        LOG.info("开始执行定时调度:" + triggerName);
        try {
            JobDataMap jobDataMap = context.getMergedJobDataMap();
            handle(triggerName, jobDataMap);
        }catch (Exception e){
            LOG.error("执行定时调度异常:" + e);
            e.printStackTrace();
        }
        LOG.info("结束执行定时调度!");
    }
    private void handle(String triggerName, JobDataMap jobDataMap) {
        // 获取相关任务调度
        SchedulerEntity scheduler = schedulerService.findByCode(triggerName);
        if (scheduler == null) return;
        // 获取定时任务
        SchedulerTypeEntity schedulerType = schedulerTypeService.findById(scheduler.getTypeId());
        if (schedulerType == null) return;
        // 创建定时任务日志
        List<SchedulerLogEntity> schedulerLog = createSchedulerLog(scheduler, schedulerType);
        if (schedulerLog == null) return;
        // 创建消息
        Message message = createMessage(schedulerLog);
        // 发送 MQ 消息
        rabbitTemplate.convertAndSend(schedulerType.getCode(), message);
    }
    private Message createMessage(List<SchedulerLogEntity> schedulerLog) {
        LOG.info("开始创建MQ消息");
        Message message = new Message();
        message.setInfos(schedulerLog.stream().map(o -> {
            Message.Info info = new Message.Info();
            info.setLogId(o.getId());
            info.setUserId(o.getUserId());
            return info;
        }).collect(Collectors.toList()));
        message.setTypeId(schedulerLog.get(0).getTypeId());
        message.setSchedulerId(schedulerLog.get(0).getSchedulerId());
        LOG.info("结束创建MQ消息");
        return message;
    }
    private List<SchedulerLogEntity> createSchedulerLog(SchedulerEntity scheduler, SchedulerTypeEntity schedulerType) {
        // 查询定时任务定义
        List<SchedulerJobEntity> schedulerJobs = schedulerJobService.queryBySchedulerId(scheduler.getId());
        // 如果定时任务未绑定用户,则不执行
        if (schedulerJobs == null) return null;
        // 根据策略清理日志
        clearLogByStrategy(scheduler, schedulerJobs);
        // 更新任务的执行状态并返回执行中的用户 id
        Set<String> runUserIds = updateTaskStatusAndReturnRunUserId(scheduler);
        // 记录任务的开始状态
        List<SchedulerLogEntity> logs = schedulerJobs.stream().
                filter(o -> !runUserIds.contains(o.getUserId())).
                map(o -> {
                    SchedulerLogEntity log = new SchedulerLogEntity();
                    log.setTypeId(o.getTypeId());
                    log.setSchedulerId(o.getSchedulerId());
                    log.setUserId(o.getUserId());
                    log.setStatus(ScheduleStatus.STATE_START.getCode());
                    return log;
                }).
                collect(Collectors.toList());
        // 包含全局的任务,只记录一个任务
        SchedulerLogEntity global = logs.stream().filter(o -> StringUtils.equals(o.getUserId(), ".")).findFirst().orElse(null);
        // 如果没有全局的,则保存相关用户的任务记录
        List<SchedulerLogEntity> resultLogs = null;
        if (global == null && !CollectionUtils.isEmpty(logs)){
            resultLogs = schedulerLogService.saveAll(logs);
        }
        if (global != null){
            List<SchedulerLogEntity> saveData = new ArrayList<>();
            saveData.add(global);
            resultLogs = schedulerLogService.saveAll(saveData);
        }
        return resultLogs;
    }
    private Set<String> updateTaskStatusAndReturnRunUserId(SchedulerEntity scheduler) {
        // 如果未设置超时时间,返回
        if (scheduler.getExpiry() <= 0) return new HashSet<>();
        // 当前时间
        LocalDateTime now = LocalDateTime.now();
        // 获取已经开始的定时任务
        List<SchedulerLogEntity> logs = schedulerLogService.queryBySchedulerIdAndStatus(scheduler.getId(), ScheduleStatus.STATE_START.getCode());
        Map<Boolean, List<SchedulerLogEntity>> allJob = logs.stream().collect(Collectors.partitioningBy(jobIsRun(scheduler, now)));
        // 运行中的任务
        List<SchedulerLogEntity> runJob = allJob.get(true);
        // 超时的任务
        List<SchedulerLogEntity> timeOutJob = allJob.get(false);
        if (!CollectionUtils.isEmpty(runJob)){
            runJob.forEach(job -> {
                job.setStatus(ScheduleStatus.STATE_RUNNING.getCode());
            });
            schedulerLogService.updateAll(runJob);
        }
        if (!CollectionUtils.isEmpty(timeOutJob)){
            timeOutJob.forEach(job -> {
                job.setStatus(ScheduleStatus.STATE_TIMEOUT.getCode());
            });
            schedulerLogService.updateAll(timeOutJob);
        }
        return runJob.stream().map(SchedulerLogEntity::getUserId).filter(StringUtils::isNotBlank).collect(Collectors.toSet());
    }
    private Predicate<SchedulerLogEntity> jobIsRun(SchedulerEntity scheduler, LocalDateTime now) {
        return item -> {
            LocalDateTime completeTime = item.getStart().plus(scheduler.getExpiry(), ChronoUnit.SECONDS);
            if (now.compareTo(completeTime) < 0){
                return true;
            }
            return false;
        };
    }
    private void clearLogByStrategy(SchedulerEntity scheduler, List<SchedulerJobEntity> schedulerJobs) {
    }
}

# 消息体

发送消息体

package com.example.demo.dto;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.List;
public class Message implements Serializable {
    @ApiModelProperty(value = "任务分类id")
    private String typeId;
    @ApiModelProperty(value = "任务调度id")
    private String schedulerId;
    @ApiModelProperty(value = "基本信息")
    private List<Info> infos;
    @ApiModelProperty(value = "传输的内容")
    private String data;
    public static class Info implements Serializable{
        @ApiModelProperty(value = "日志id")
        private String logId;
        @ApiModelProperty(value = "用户id")
        private String userId;
    }
}

回执消息体

package com.example.demo.dto;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.time.LocalDateTime;
public class CallbackMessage implements Serializable {
    @ApiModelProperty(value = "日志id")
    private String logId;
    @ApiModelProperty(value = "当前用户id")
    private String userId;
    @ApiModelProperty(value = "当前任务id")
    private String typeId;
    @ApiModelProperty(value = "当前任务编码")
    private String typeCode;
    @ApiModelProperty(value = "当前调度id")
    private String schedulerId;
    @ApiModelProperty(value = "开始时间")
    private LocalDateTime start;
    @ApiModelProperty(value = "完成时间")
    private LocalDateTime end;
    @ApiModelProperty(value = "任务状态")
    private String status;
    @ApiModelProperty(value = "备注")
    private String note;
}

# 业务层

TaskService

package com.example.demo.quartz;
import java.util.List;
public interface TaskService {
    String saveAllTriggers(List<SchedulerJobInfo> jobs);
}

TaskServiceImpl

package com.example.demo.quartz;
import com.example.demo.constant.ScheduleConstants;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@Service
public class TaskServiceImpl implements TaskService{
    private static final Logger LOG = LoggerFactory.getLogger(TaskServiceImpl.class);
    @Autowired
    private Scheduler scheduler;
    /**
     * 保存所有的定时 job 调度
     */
    @Override
    public String saveAllTriggers(List<SchedulerJobInfo> jobs) {
        try {
            JobKey jobKey = new JobKey(ScheduleConstants.DEFAULT_JOB_NAME, ScheduleConstants.DEFAULT_JOB_GROUP);
            if (!CollectionUtils.isEmpty(jobs)) {
                scheduler.deleteJob(jobKey);
                jobs.forEach(this::saveTrigger);
            }
        }catch (Exception e){
            LOG.error("启动所有的任务调度失败:{}", e.getMessage());
            e.printStackTrace();
            return "启动所有的任务调度失败!";
        }
        return "成功!";
    }
    /**
     * 启动单个的定时调度
     */
    private void saveTrigger(SchedulerJobInfo jobInfo) {
        try {
            checkAndSaveJob();
            TriggerKey triggerKey = new TriggerKey(jobInfo.getTriggerName(), ScheduleConstants.DEFAULT_JOB_GROUP);
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(jobInfo.getTriggerCron());
            CronTrigger cronTrigger = TriggerBuilder.newTrigger().
                    withIdentity(jobInfo.getTriggerName(), ScheduleConstants.DEFAULT_JOB_GROUP).
                    forJob(ScheduleConstants.DEFAULT_JOB_NAME, ScheduleConstants.DEFAULT_JOB_GROUP).
                    withSchedule(scheduleBuilder).
                    build();
            if (scheduler.checkExists(triggerKey)) {
                scheduler.rescheduleJob(triggerKey, cronTrigger);
                return;
            }
            scheduler.scheduleJob(cronTrigger);
        }catch (Exception e){
            LOG.error("启动定时调度失败:{}", e.getMessage());
            e.printStackTrace();
        }
    }
    private void checkAndSaveJob() {
        try {
            JobKey jobKey = new JobKey(ScheduleConstants.DEFAULT_JOB_NAME, ScheduleConstants.DEFAULT_JOB_GROUP);
            JobDetail oldJobDetail = scheduler.getJobDetail(jobKey);
            // 如果数据库不存在 job,添加一条
            if (oldJobDetail == null || !ClassUtils.isAssignable(oldJobDetail.getJobClass(), TaskJob.class)){
                JobDetail jobDetail = JobBuilder.newJob(TaskJob.class).
                        withIdentity(ScheduleConstants.DEFAULT_JOB_NAME, ScheduleConstants.DEFAULT_JOB_GROUP).
                        storeDurably().
                        build();
                scheduler.addJob(jobDetail, true);
            }
        }catch (Exception e){
            LOG.error("保存定时调度失败:{}", e.getMessage());
            e.printStackTrace();
        }
    }
}

# 启动初始化

SchedulerInitApplicationListener

package com.example.demo.listener;
import com.example.demo.service.SchedulerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
 * 定时器初始化
 **/
@Component
public class SchedulerInitApplicationListener {
    private static final Logger LOG = LoggerFactory.getLogger(SchedulerInitApplicationListener.class);
    @Autowired
    private SchedulerService schedulerService;
    @EventListener(classes = {ContextRefreshedEvent.class})
    public void onApplicationEvent(ContextRefreshedEvent event){
        LOG.info("重置定时任务调度开始...");
        schedulerService.resetAll();
        LOG.info("重置定时任务调度结束...");
    }
}

# 回执类

package com.example.demo.listener;
import com.example.demo.dto.CallbackMessage;
import com.example.demo.service.SchedulerLogService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
/**
 * 定时任务回执消息监听器
 */
@Component
public class SchedulerReceiveListener {
    private static final Logger LOG = LoggerFactory.getLogger(SchedulerReceiveListener.class);
    @Autowired
    private SchedulerLogService schedulerLogService;
    @RabbitListener(queuesToDeclare = @Queue("scheduler.receive"))
    public void process(@Payload CallbackMessage message){
        if (message == null){
            return;
        }
        // 回写执行结果
        schedulerLogService.updateDateById(message.getLogId(), message);
    }
}

# 业务实现类

# 消息处理

package com.example.demo.scheduler;
import com.example.demo.constant.ScheduleConstants;
import com.example.demo.constant.ScheduleStatus;
import com.example.demo.dto.CallbackMessage;
import com.example.demo.dto.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
@Component
public class DemoScheduler {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    private static final Logger LOG = LoggerFactory.getLogger(DemoScheduler.class);
    @RabbitListener(queuesToDeclare = @Queue(name = "scheduler.send.message"))
    private void process(@Payload Message message){
        LOG.info("处理消息定时任务开始,消息体为{}", message);
        if (message == null || CollectionUtils.isEmpty(message.getInfos())){
            LOG.info("未有用户需要处理此消息!");
            return;
        }
        // 对每个用户处理此消息
        for (Message.Info info : message.getInfos()){
            // 构建回执消息
            CallbackMessage callbackMessage = handlerBefore(message, info);
            try {
                handler(info.getUserId());
            }catch (Exception e){
                e.printStackTrace();
                callbackMessage.setNote(e.getMessage());
                callbackMessage.setStatus(ScheduleStatus.STATE_ERROR.getCode());
                LOG.info("处理消息的定时任务异常,异常信息为{}", e.getMessage());
            }finally {
                handlerAfter(callbackMessage);
            }
        }
    }
    private void handlerAfter(CallbackMessage callbackMessage) {
        callbackMessage.setEnd(LocalDateTime.now());
        rabbitTemplate.convertAndSend(ScheduleConstants.DEFAULT_CALLBACK_QUEUE, callbackMessage);
    }
    private void handler(String userId) {
        LOG.info("用户{}处理消息", userId);
        //TODO 处理实现
    }
    private CallbackMessage handlerBefore(Message message, Message.Info info) {
        CallbackMessage callbackMessage = new CallbackMessage();
        callbackMessage.setLogId(info.getLogId());
        callbackMessage.setUserId(info.getUserId());
        callbackMessage.setTypeId(message.getTypeId());
        callbackMessage.setSchedulerId(message.getSchedulerId());
        callbackMessage.setStatus(ScheduleStatus.STATE_COMPLETE.getCode());
        callbackMessage.setStart(LocalDateTime.now());
        callbackMessage.setNote("处理成功!");
        return callbackMessage;
    }
}
更新于

请我喝[茶]~( ̄▽ ̄)~*

七音 微信支付

微信支付

七音 支付宝

支付宝