前提 最近的新项目和数据同步相关,有定时调度的需求。之前一直有使用过Quartz
、XXL-Job
、Easy Scheduler
等调度框架,后来越发觉得这些框架太重量级了,于是想到了Spring
内置的Scheduling
模块。而原生的Scheduling
模块只是内存态的调度模块,不支持任务的持久化或者配置(配置任务通过@Scheduled
注解进行硬编码,不能抽离到类之外),因此考虑理解Scheduling
模块的底层原理,并且基于此造一个简单的轮子,使之支持调度任务配置:通过配置文件或者JDBC
数据源。
Scheduling模块 Scheduling
模块是spring-context
依赖下的一个包org.springframework.scheduling
:
这个模块的类并不多,有四个子包:
顶层包的定义了一些通用接口和异常。
org.springframework.scheduling.annotation
:定义了调度、异步任务相关的注解和解析类,常用的注解如@Async
、@EnableAsync
、@EnableScheduling
和@Scheduled
。
org.springframework.scheduling.concurrent
:定义了调度任务执行器和相对应的FactoryBean
。
org.springframework.scheduling.config
:定义了配置解析、任务具体实现类、调度任务XML
配置文件解析相关的解析类。
org.springframework.scheduling.support
:定义了反射支持类、Cron
表达式解析器等工具类。
如果想单独使用Scheduling
,只需要引入spring-context
这个依赖。但是现在流行使用SpringBoot
,引入spring-boot-starter-web
已经集成了spring-context
,可以直接使用Scheduling
模块,笔者编写本文的时候(2020-03-14
)SpringBoot
的最新版本为2.2.5.RELEASE
,可以选用此版本进行源码分析或者生产应用:
<properties > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <maven.compiler.source > 1.8</maven.compiler.source > <maven.compiler.target > 1.8</maven.compiler.target > <spring.boot.version > 2.2.5.RELEASE</spring.boot.version > </properties > <dependencyManagement > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-dependencies</artifactId > <version > ${spring.boot.version}</version > <type > pom</type > <scope > import</scope > </dependency > </dependencies > </dependencyManagement > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > </dependencies >
开启Scheduling
模块支持只需要在某一个配置类中添加@EnableScheduling
注解即可,一般为了明确模块的引入,建议在启动类中使用此注解,如:
@EnableScheduling @SpringBootApplication public class App { public static void main (String[] args) { SpringApplication.run(App.class, args); } }
Scheduling模块的工作流程
这个图描述了Scheduling
模块的工作流程,这里分析一下非XML
配置下的流程(右边的分支):
通过注解@EnableScheduling
中的@Import
引入了SchedulingConfiguration
,而SchedulingConfiguration
中配置了一个类型为ScheduledAnnotationBeanPostProcessor
名称为org.springframework.context.annotation.internalScheduledAnnotationProcessor
的Bean
,这里有个常见的技巧,Spring
内部加载的Bean
一般会定义名称为internalXXX
,Bean
的role
会定义为ROLE_INFRASTRUCTURE = 2
。
Bean
后置处理器ScheduledAnnotationBeanPostProcessor
会解析和处理每一个符合特定类型的Bean
中的@Scheduled
注解(注意@Scheduled
只能使用在方法或者注解上),并且把解析完成的方法封装为不同类型的Task
实例,缓存在ScheduledTaskRegistrar
中的。
ScheduledAnnotationBeanPostProcessor
中的钩子接口方法afterSingletonsInstantiated()
在所有单例初始化完成之后回调触发,在此方法中设置了ScheduledTaskRegistrar
中的任务调度器(TaskScheduler
或者ScheduledExecutorService
类型)实例,并且调用ScheduledTaskRegistrar#afterPropertiesSet()
方法添加所有缓存的Task
实例到任务调度器中执行。
任务调度器 Scheduling
模块支持TaskScheduler
或者ScheduledExecutorService
类型的任务调度器,而ScheduledExecutorService
其实是JDK
并发包java.util.concurrent
的接口,一般实现类就是调度线程池ScheduledThreadPoolExecutor
。实际上,ScheduledExecutorService
类型的实例最终会通过适配器模式 转变为ConcurrentTaskScheduler
,所以这里只需要分析TaskScheduler
类型的执行器。
ThreadPoolTaskScheduler
:基于线程池实现的任务执行器,这个是最常用的实现,底层依赖于ScheduledThreadPoolExecutor
实现。
ConcurrentTaskScheduler
:TaskScheduler
接口和ScheduledExecutorService
接口的适配器,如果自定义一个ScheduledThreadPoolExecutor
类型的Bean
,那么任务执行器就会适配为ConcurrentTaskScheduler
。
DefaultManagedTaskScheduler
:JDK7
引入的JSR-236
的支持,可以通过JNDI
配置此调度执行器,一般很少用到,底层也是依赖于ScheduledThreadPoolExecutor
实现。
也就是说,内置的三个调度器类型底层都依赖于JUC
调度线程池ScheduledThreadPoolExecutor
。这里分析一下顶层接口org.springframework.scheduling.TaskScheduler
提供的功能(笔者已经把功能一致的default
方法暂时移除):
public interface TaskScheduler { ScheduledFuture<?> schedule(Runnable task, Trigger trigger); ScheduledFuture<?> schedule(Runnable task, Date startTime); ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period); ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period); ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay); ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay); }
Task的分类 Scheduling
模块中支持不同类型的任务,主要包括下面的3种(解析的优先顺序也是如下):
Cron
表达式任务,支持通过Cron
表达式配置执行的周期,对应的任务类型为org.springframework.scheduling.config.CronTask
。
固定延迟间隔任务,也就是上一轮执行完毕后间隔固定周期再执行本轮,依次类推,对应的的任务类型为org.springframework.scheduling.config.FixedDelayTask
。
固定频率任务,基于固定的间隔时间执行,不会理会上一轮是否执行完毕本轮会照样执行 ,对应的的任务类型为org.springframework.scheduling.config.FixedRateTask
。
关于这几类Task
,举几个简单的例子。CronTask
是通过cron
表达式指定执行周期的,并且不支持延迟执行 ,可以使用特殊字符-
禁用任务执行:
@Scheduled(cron = "*/5 * * * * ?") public void processTask () {} @Scheduled(cron = "-") public void processTask () {} public class Tasks { static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ); public static void main (String[] args) throws Exception { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10 ); taskScheduler.initialize(); CronTask cronTask = new CronTask(() -> { System.out.println(String.format("[%s] - CronTask触发..." , F.format(LocalDateTime.now()))); }, "*/5 * * * * ?" ); taskScheduler.schedule(cronTask.getRunnable(),cronTask.getTrigger()); Thread.sleep(Integer.MAX_VALUE); } } [2020 -03 -16 01 :07 :00 ] - CronTask触发... [2020 -03 -16 01 :07 :05 ] - CronTask触发... ......
FixedDelayTask
需要配置延迟间隔值(fixedDelay
或者fixedDelayString
)和可选的起始延迟执行时间(initialDelay
或者initialDelayString
),这里注意一点是fixedDelayString
和initialDelayString
都支持从EmbeddedValueResolver
(简单理解为配置文件的属性处理器)读取和Duration
(例如P2D
就是parses as 2 days
,表示86400秒)支持格式的解析:
@Scheduled(fixedDelay = 5000, initialDelay = 1000) public void process () { } @Scheduled(fixedDelayString = "${process.task.fixedDelay}", initialDelayString = "${process.task.initialDelay}") public void process () { } public class Tasks { static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ); public static void main (String[] args) throws Exception { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10 ); taskScheduler.initialize(); FixedDelayTask fixedDelayTask = new FixedDelayTask(() -> { System.out.println(String.format("[%s] - FixedDelayTask触发..." , F.format(LocalDateTime.now()))); }, 5000 , 1000 ); Date startTime = new Date(System.currentTimeMillis() + fixedDelayTask.getInitialDelay()); taskScheduler.scheduleWithFixedDelay(fixedDelayTask.getRunnable(), startTime, fixedDelayTask.getInterval()); Thread.sleep(Integer.MAX_VALUE); } } [2020 -03 -16 01 :06 :12 ] - FixedDelayTask触发... [2020 -03 -16 01 :06 :17 ] - FixedDelayTask触发... ......
FixedRateTask
需要配置固定间隔值(fixedRate
或者fixedRateString
)和可选的起始延迟执行时间(initialDelay
或者initialDelayString
),这里注意一点是fixedRateString
和initialDelayString
都支持从EmbeddedValueResolver
(简单理解为配置文件的属性处理器)读取和Duration
(例如P2D
就是parses as 2 days
,表示86400秒)支持格式的解析:
@Scheduled(fixedRate = 5000, initialDelay = 1000) public void processTask () {} @Scheduled(fixedRateString = "${process.task.fixedRate}", initialDelayString = "${process.task.initialDelay}") public void process () { } public class Tasks { static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ); public static void main (String[] args) throws Exception { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10 ); taskScheduler.initialize(); FixedRateTask fixedRateTask = new FixedRateTask(() -> { System.out.println(String.format("[%s] - FixedRateTask触发..." , F.format(LocalDateTime.now()))); }, 5000 , 1000 ); Date startTime = new Date(System.currentTimeMillis() + fixedRateTask.getInitialDelay()); taskScheduler.scheduleAtFixedRate(fixedRateTask.getRunnable(), startTime, fixedRateTask.getInterval()); Thread.sleep(Integer.MAX_VALUE); } } [2020 -03 -16 23 :58 :25 ] - FixedRateTask触发... [2020 -03 -16 23 :58 :30 ] - FixedRateTask触发... ......
简单分析核心流程的源代码 在SpringBoot
注解体系下,Scheduling
模块的所有逻辑基本在ScheduledAnnotationBeanPostProcessor
和ScheduledTaskRegistrar
中。一般来说,一个类实现的接口代表了它能提供的功能,先看ScheduledAnnotationBeanPostProcessor
实现的接口:
ScheduledTaskHolder
接口:返回Set<ScheduledTask>
,表示持有的所有任务实例。
MergedBeanDefinitionPostProcessor
接口:Bean
定义合并时回调,预留空实现,暂时不做任何处理。
BeanPostProcessor
接口:也就是MergedBeanDefinitionPostProcessor
的父接口,Bean
实例初始化前后分别回调,其中,后回调的postProcessAfterInitialization()
方法就是用于解析@Scheduled
和装载ScheduledTask
,需要重点关注此方法的逻辑。
DestructionAwareBeanPostProcessor
接口:具体的Bean
实例销毁的时候回调,用于Bean
实例销毁的时候移除和取消对应的任务实例。
Ordered
接口:用于Bean
加载时候的排序,主要是改变ScheduledAnnotationBeanPostProcessor
在BeanPostProcessor
执行链中的顺序。
EmbeddedValueResolverAware
接口:回调StringValueResolver
实例,用于解析带占位符的环境变量属性值。
BeanNameAware
接口:回调BeanName
。
BeanFactoryAware
接口:回调BeanFactory
实例,具体是DefaultListableBeanFactory
,也就是熟知的IOC
容器。
ApplicationContextAware
接口:回调ApplicationContext
实例,也就是熟知的Spring
上下文,它是IOC
容器的门面,同时是事件广播器、资源加载器的实现等等。
SmartInitializingSingleton
接口:所有单例实例化完毕之后回调,作用是在持有的applicationContext
为NULL
的时候开始调度所有加载完成的任务,这个钩子接口十分有用,笔者常用它做一些资源初始化工作。
ApplicationListener
接口:监听Spring
应用的事件,具体是ApplicationListener<ContextRefreshedEvent>
,监听上下文刷新的事件,如果事件中携带的ApplicationContext
实例和ApplicationContextAware
回调的ApplicationContext
实例一致,那么在此监听回调方法中开始调度所有加载完成的任务,也就是在ScheduledAnnotationBeanPostProcessor
这个类中,SmartInitializingSingleton
接口的实现和ApplicationListener
接口的实现逻辑是互斥 的。
DisposableBean
接口:当前Bean
实例销毁时候回调,也就是ScheduledAnnotationBeanPostProcessor
自身被销毁的时候回调,用于取消和清理所有的ScheduledTask
。
上面分析的钩子接口在SpringBoot体系中可以按需使用,了解回调不同钩子接口的回调时机,可以在特定时机完成达到理想的效果。
@Scheduled
注解的解析集中在postProcessAfterInitialization()
方法:
public Object postProcessAfterInitialization (Object bean, String beanName) { if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { return bean; } Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this .nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) { Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null ); }); if (annotatedMethods.isEmpty()) { this .nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
processScheduled(Scheduled scheduled, Method method, Object bean)
就是具体的注解解析和Task
封装的方法:
public class ScheduledMethodRunnable implements Runnable { private final Object target; private final Method method; public ScheduledMethodRunnable (Object target, Method method) { this .target = target; this .method = method; } .... @Override public void run () { try { ReflectionUtils.makeAccessible(this .method); this .method.invoke(this .target); } catch (InvocationTargetException ex) { ReflectionUtils.rethrowRuntimeException(ex.getTargetException()); } catch (IllegalAccessException ex) { throw new UndeclaredThrowableException(ex); } } } protected Runnable createRunnable (Object target, Method method) { Assert.isTrue(method.getParameterCount() == 0 , "Only no-arg methods may be annotated with @Scheduled" ); Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass()); return new ScheduledMethodRunnable(target, invocableMethod); } protected void processScheduled (Scheduled scheduled, Method method, Object bean) { try { Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false ; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required" ; Set<ScheduledTask> tasks = new LinkedHashSet<>(4 ); long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0 , "Specify 'initialDelay' or 'initialDelayString', not both" ); if (this .embeddedValueResolver != null ) { initialDelayString = this .embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = parseDelayAsLong(initialDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long" ); } } } String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this .embeddedValueResolver != null ) { cron = this .embeddedValueResolver.resolveStringValue(cron); zone = this .embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1 , "'initialDelay' not supported for cron triggers" ); processedSchedule = true ; if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } tasks.add(this .registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } if (initialDelay < 0 ) { initialDelay = 0 ; } long fixedDelay = scheduled.fixedDelay(); if (fixedDelay >= 0 ) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true ; tasks.add(this .registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { if (this .embeddedValueResolver != null ) { fixedDelayString = this .embeddedValueResolver.resolveStringValue(fixedDelayString); } if (StringUtils.hasLength(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true ; try { fixedDelay = parseDelayAsLong(fixedDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long" ); } tasks.add(this .registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } } long fixedRate = scheduled.fixedRate(); if (fixedRate >= 0 ) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true ; tasks.add(this .registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { if (this .embeddedValueResolver != null ) { fixedRateString = this .embeddedValueResolver.resolveStringValue(fixedRateString); } if (StringUtils.hasLength(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true ; try { fixedRate = parseDelayAsLong(fixedRateString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long" ); } tasks.add(this .registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } } Assert.isTrue(processedSchedule, errorMessage); synchronized (this .scheduledTasks) { Set<ScheduledTask> regTasks = this .scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4 )); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } }
总的来说,这个方法做了四件事:
解析@Scheduled
中的initialDelay
、initialDelayString
属性,适用于FixedDelayTask
或者FixedRateTask
的延迟执行。
优先解析@Scheduled
中的cron
属性,封装为CronTask
,通过ScheduledTaskRegistrar
进行缓存。
解析@Scheduled
中的fixedDelay
、fixedDelayString
属性,封装为FixedDelayTask
,通过ScheduledTaskRegistrar
进行缓存。
解析@Scheduled
中的fixedRate
、fixedRateString
属性,封装为FixedRateTask
,通过ScheduledTaskRegistrar
进行缓存。
@Scheduled
修饰的某个方法如果同时配置了cron
、fixedDelay|fixedDelayString
和fixedRate|fixedRateString
属性,意味着此方法同时封装为三种任务CronTask
、FixedDelayTask
和FixedRateTask
。解析xxString
值的使用,用到了EmbeddedValueResolver
解析字符串的值,支持占位符,这样可以直接获取环境配置中的占位符属性(基于SPEL
的特性,甚至可以支持嵌套占位符)。解析成功的所有任务实例存放在ScheduledAnnotationBeanPostProcessor
的一个映射scheduledTasks
中:
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16 );
解析和缓存工作完成之后,接着分析最终激活所有调度任务的逻辑,见互斥方法afterSingletonsInstantiated()
和onApplicationEvent()
,两者中一定只有一个方法 能够调用finishRegistration()
:
public void afterSingletonsInstantiated () { this .nonAnnotatedClasses.clear(); if (this .applicationContext == null ) { finishRegistration(); } } @Override public void onApplicationEvent (ContextRefreshedEvent event) { if (event.getApplicationContext() == this .applicationContext) { finishRegistration(); } } private void finishRegistration () { if (this .scheduler != null ) { this .registrar.setScheduler(this .scheduler); } if (this .beanFactory instanceof ListableBeanFactory) { Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this .beanFactory).getBeansOfType(SchedulingConfigurer.class); List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(configurers); for (SchedulingConfigurer configurer : configurers) { configurer.configureTasks(this .registrar); } } if (this .registrar.hasTasks() && this .registrar.getScheduler() == null ) { Assert.state(this .beanFactory != null , "BeanFactory must be set to find scheduler by type" ); try { this .registrar.setTaskScheduler(resolveSchedulerBean(this .beanFactory, TaskScheduler.class, false )); } catch (NoUniqueBeanDefinitionException ex) { logger.trace("Could not find unique TaskScheduler bean" , ex); try { this .registrar.setTaskScheduler(resolveSchedulerBean(this .beanFactory, TaskScheduler.class, true )); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskScheduler bean exists within the context, and " + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.trace("Could not find default TaskScheduler bean" , ex); try { this .registrar.setScheduler(resolveSchedulerBean(this .beanFactory, ScheduledExecutorService.class, false )); } catch (NoUniqueBeanDefinitionException ex2) { logger.trace("Could not find unique ScheduledExecutorService bean" , ex2); try { this .registrar.setScheduler(resolveSchedulerBean(this .beanFactory, ScheduledExecutorService.class, true )); } catch (NoSuchBeanDefinitionException ex3) { if (logger.isInfoEnabled()) { logger.info("More than one ScheduledExecutorService bean exists within the context, and " + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex2.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex2) { logger.trace("Could not find default ScheduledExecutorService bean" , ex2); logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing" ); } } } this .registrar.afterPropertiesSet(); } public class ScheduledTaskRegistrar implements ScheduledTaskHolder , InitializingBean , DisposableBean { @Override public void afterPropertiesSet () { scheduleTasks(); } @SuppressWarnings("deprecation") protected void scheduleTasks () { if (this .taskScheduler == null ) { this .localExecutor = Executors.newSingleThreadScheduledExecutor(); this .taskScheduler = new ConcurrentTaskScheduler(this .localExecutor); } if (this .triggerTasks != null ) { for (TriggerTask task : this .triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this .cronTasks != null ) { for (CronTask task : this .cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this .fixedRateTasks != null ) { for (IntervalTask task : this .fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this .fixedDelayTasks != null ) { for (IntervalTask task : this .fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } } }
注意两个个问题:
如果没有配置TaskScheduler
或者ScheduledExecutorService
类型的Bean
,那么调度模块只会创建一个线程 去调度所有装载完毕的任务,如果任务比较多,执行密度比较大,很有可能会造成大量任务饥饿,表现为存在部分任务不会触发调度的场景(这个是调度模块生产中经常遇到的故障,需要重点排查是否没有设置TaskScheduler
或者ScheduledExecutorService
)。
SchedulingConfigurer
是调度模块提供给使用的进行扩展的钩子接口,用于在激活所有调度任务之前回调ScheduledTaskRegistrar
实例,只要拿到ScheduledTaskRegistrar
实例,我们就可以使用它注册和装载新的Task
。
调度任务动态装载 Scheduling
模块本身已经支持基于NamespaceHandler
支持通过XML
文件配置调度任务,但是笔者一直认为XML
给人的感觉太”重”,使用起来显得太笨重,这里打算扩展出JSON
文件配置和基于JDBC
数据源配置(也就是持久化任务,这里选用MySQL
)。根据前文的源码分析,需要用到SchedulingConfigurer
接口的实现,用于在所有调度任务触发之前从外部添加自定义的调度任务。先定义调度任务的一些配置属性类:
@Getter @RequiredArgsConstructor public enum ScheduleTaskType { CRON("CRON" ), FIXED_DELAY("FIXED_DELAY" ), FIXED_RATE("FIXED_RATE" ), ; private final String type; } @Data public class ScheduleTaskProperties { private Long version; private Boolean enable; private List<ScheduleTasks> tasks; } @Data public class ScheduleTasks { private String taskHostKlass; private Boolean enable; private List<ScheduleTaskMethod> taskMethods; } @Data public class ScheduleTaskMethod { private Boolean enable; private String taskDescription; private String taskMethod; private String timeZone; private String cronExpression; private String intervalMilliseconds; private String initialDelayMilliseconds; }
设计的时候,考虑到多个任务执行方法可以放在同一个宿主类,这样可以方便同一种类的任务进行统一管理,如:
public class TaskHostClass { public void task1 () { } public void task2 () { } ...... public void taskN () { } }
细节方面,intervalMilliseconds
和initialDelayMilliseconds
的单位设计为毫秒,使用字符串形式,方便可以基于StringValueResolver
解析配置文件中的属性配置。添加一个抽象的SchedulingConfigurer
:
@Slf4j public abstract class AbstractSchedulingConfigurer implements SchedulingConfigurer , InitializingBean , BeanFactoryAware , EmbeddedValueResolverAware { @Getter private StringValueResolver embeddedValueResolver; private ConfigurableBeanFactory configurableBeanFactory; private final List<InternalTaskProperties> internalTasks = Lists.newLinkedList(); private final Set<String> tasksLoaded = Sets.newHashSet(); @Override public void setBeanFactory (BeanFactory beanFactory) throws BeansException { configurableBeanFactory = (ConfigurableBeanFactory) beanFactory; } @Override public void afterPropertiesSet () throws Exception { internalTasks.clear(); internalTasks.addAll(loadTaskProperties()); } @Override public void setEmbeddedValueResolver (StringValueResolver resolver) { embeddedValueResolver = resolver; } @Override public void configureTasks (ScheduledTaskRegistrar taskRegistrar) { for (InternalTaskProperties task : internalTasks) { try { synchronized (tasksLoaded) { String key = task.taskHostKlass() + "#" + task.taskMethod(); if (!tasksLoaded.contains(key)) { if (task instanceof CronTaskProperties) { loadCronTask((CronTaskProperties) task, taskRegistrar); } if (task instanceof FixedDelayTaskProperties) { loadFixedDelayTask((FixedDelayTaskProperties) task, taskRegistrar); } if (task instanceof FixedRateTaskProperties) { loadFixedRateTask((FixedRateTaskProperties) task, taskRegistrar); } tasksLoaded.add(key); } else { log.info("调度任务已经装载,任务宿主类:{},任务执行方法:{}" , task.taskHostKlass(), task.taskMethod()); } } } catch (Exception e) { throw new IllegalStateException(String.format("加载调度任务异常,任务宿主类:%s,任务执行方法:%s" , task.taskHostKlass(), task.taskMethod()), e); } } } private ScheduledMethodRunnable loadScheduledMethodRunnable (String taskHostKlass, String taskMethod) throws Exception { Class<?> klass = ClassUtils.forName(taskHostKlass, null ); Object target = configurableBeanFactory.getBean(klass); Method method = ReflectionUtils.findMethod(klass, taskMethod); if (null == method) { throw new IllegalArgumentException(String.format("找不到目标方法,任务宿主类:%s,任务执行方法:%s" , taskHostKlass, taskMethod)); } Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass()); return new ScheduledMethodRunnable(target, invocableMethod); } private void loadCronTask (CronTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); String cronExpression = embeddedValueResolver.resolveStringValue(pops.cronExpression()); if (null != cronExpression) { String timeZoneString = embeddedValueResolver.resolveStringValue(pops.timeZone()); TimeZone timeZone; if (null != timeZoneString) { timeZone = TimeZone.getTimeZone(timeZoneString); } else { timeZone = TimeZone.getDefault(); } CronTask cronTask = new CronTask(runnable, new CronTrigger(cronExpression, timeZone)); taskRegistrar.addCronTask(cronTask); log.info("装载CronTask[{}#{}()]成功,cron表达式:{},任务描述:{}" , cronExpression, pops.taskMethod(), pops.cronExpression(), pops.taskDescription()); } } private void loadFixedDelayTask (FixedDelayTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); long fixedDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds())); long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds())); FixedDelayTask fixedDelayTask = new FixedDelayTask(runnable, fixedDelayMilliseconds, initialDelayMilliseconds); taskRegistrar.addFixedDelayTask(fixedDelayTask); log.info("装载FixedDelayTask[{}#{}()]成功,固定延迟间隔:{} ms,初始延迟执行时间:{} ms,任务描述:{}" , pops.taskHostKlass(), pops.taskMethod(), fixedDelayMilliseconds, initialDelayMilliseconds, pops.taskDescription()); } private void loadFixedRateTask (FixedRateTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); long fixedRateMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds())); long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds())); FixedRateTask fixedRateTask = new FixedRateTask(runnable, fixedRateMilliseconds, initialDelayMilliseconds); taskRegistrar.addFixedRateTask(fixedRateTask); log.info("装载FixedRateTask[{}#{}()]成功,固定执行频率:{} ms,初始延迟执行时间:{} ms,任务描述:{}" , pops.taskHostKlass(), pops.taskMethod(), fixedRateMilliseconds, initialDelayMilliseconds, pops.taskDescription()); } private long parseDelayAsLong (String value) { if (null == value) { return 0L ; } if (value.length() > 1 && (isP(value.charAt(0 )) || isP(value.charAt(1 )))) { return Duration.parse(value).toMillis(); } return Long.parseLong(value); } private boolean isP (char ch) { return (ch == 'P' || ch == 'p' ); } protected abstract List<InternalTaskProperties> loadTaskProperties () throws Exception ; interface InternalTaskProperties { String taskHostKlass () ; String taskMethod () ; String taskDescription () ; } @Builder protected static class CronTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String cronExpression; private String taskDescription; private String timeZone; @Override public String taskDescription () { return taskDescription; } public String cronExpression () { return cronExpression; } public String timeZone () { return timeZone; } @Override public String taskHostKlass () { return taskHostKlass; } @Override public String taskMethod () { return taskMethod; } } @Builder protected static class FixedDelayTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String intervalMilliseconds; private String initialDelayMilliseconds; private String taskDescription; @Override public String taskDescription () { return taskDescription; } public String initialDelayMilliseconds () { return initialDelayMilliseconds; } public String intervalMilliseconds () { return intervalMilliseconds; } @Override public String taskHostKlass () { return taskHostKlass; } @Override public String taskMethod () { return taskMethod; } } @Builder protected static class FixedRateTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String intervalMilliseconds; private String initialDelayMilliseconds; private String taskDescription; @Override public String taskDescription () { return taskDescription; } public String initialDelayMilliseconds () { return initialDelayMilliseconds; } public String intervalMilliseconds () { return intervalMilliseconds; } @Override public String taskHostKlass () { return taskHostKlass; } @Override public String taskMethod () { return taskMethod; } } }
loadTaskProperties()
方法用于加载任务配置,留给子类实现。
JSON配置 JSON
配置文件的格式如下(类路径下的scheduling/tasks.json
文件):
{ "version" : 1 , "tasks" : [ { "taskKlass" : "club.throwable.schedule.Tasks" , "taskMethods" : [ { "taskType" : "FIXED_DELAY" , "taskDescription" : "processTask1任务" , "taskMethod" : "processTask1" , "intervalMilliseconds" : "5000" } ] } ] }
每个层级都有一个enable
属性,默认为true
,只有强制指定为false
的时候才不会装载对应的任务调度方法。这里就是简单继承AbstractSchedulingConfigurer
,实现从类路径加载配置的逻辑,定义JsonSchedulingConfigurer
:
public class JsonSchedulingConfigurer extends AbstractSchedulingConfigurer { @Value("${scheduling.json.config.location:scheduling/tasks.json}") private String location; @Autowired private ObjectMapper objectMapper; @Override protected List<InternalTaskProperties> loadTaskProperties () throws Exception { ClassPathResource resource = new ClassPathResource(location); String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); ScheduleTaskProperties properties = objectMapper.readValue(content, ScheduleTaskProperties.class); if (Boolean.FALSE.equals(properties.getEnable()) || null == properties.getTasks()) { return Lists.newArrayList(); } List<InternalTaskProperties> target = Lists.newArrayList(); for (ScheduleTasks tasks : properties.getTasks()) { if (null != tasks) { List<ScheduleTaskMethod> taskMethods = tasks.getTaskMethods(); if (null != taskMethods) { for (ScheduleTaskMethod taskMethod : taskMethods) { if (!Boolean.FALSE.equals(taskMethod.getEnable())) { if (ScheduleTaskType.CRON == taskMethod.getTaskType()) { target.add(CronTaskProperties.builder() .taskMethod(taskMethod.getTaskMethod()) .cronExpression(taskMethod.getCronExpression()) .timeZone(taskMethod.getTimeZone()) .taskDescription(taskMethod.getTaskDescription()) .taskHostKlass(tasks.getTaskKlass()) .build()); } if (ScheduleTaskType.FIXED_DELAY == taskMethod.getTaskType()) { target.add(FixedDelayTaskProperties.builder() .taskMethod(taskMethod.getTaskMethod()) .intervalMilliseconds(taskMethod.getIntervalMilliseconds()) .initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds()) .taskDescription(taskMethod.getTaskDescription()) .taskHostKlass(tasks.getTaskKlass()) .build()); } if (ScheduleTaskType.FIXED_RATE == taskMethod.getTaskType()) { target.add(FixedRateTaskProperties.builder() .taskMethod(taskMethod.getTaskMethod()) .intervalMilliseconds(taskMethod.getIntervalMilliseconds()) .initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds()) .taskDescription(taskMethod.getTaskDescription()) .taskHostKlass(tasks.getTaskKlass()) .build()); } } } } } } return target; } }
添加一个配置类和任务类:
@Configuration public class SchedulingAutoConfiguration { @Bean public JsonSchedulingConfigurer jsonSchedulingConfigurer () { return new JsonSchedulingConfigurer(); } } @Slf4j @Component public class Tasks { public void processTask1 () { log.info("processTask1触发.........." ); } }
启动SpringBoot
应用,某次执行的部分日志如下:
2020-03-22 16:24:17.248 INFO 22836 --- [ main] c.t.s.AbstractSchedulingConfigurer : 装载FixedDelayTask[club.throwable.schedule.Tasks#processTask1()]成功,固定延迟间隔:5000 ms,初始延迟执行时间:0 ms,任务描述:processTask1任务 2020-03-22 16:24:22.275 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发.......... 2020-03-22 16:24:27.277 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发.......... 2020-03-22 16:24:32.279 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发.......... ......
这里有些细节没有完善,例如配置文件参数的一些非空判断、配置值是否合法等等校验逻辑没有做,如果要设计成一个工业级的类库,这些方面必须要考虑。
JDBC数据源配置 JDBC
数据源这里用MySQL
举例说明,先建一个调度任务配置表schedule_task
:
CREATE TABLE `schedule_task` ( id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT '主键' , edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间' , create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' , editor VARCHAR (32 ) NOT NULL DEFAULT 'admin' COMMENT '修改者', creator VARCHAR (32 ) NOT NULL DEFAULT 'admin' COMMENT '创建者', deleted BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '软删除标识', task_host_class VARCHAR (256 ) NOT NULL COMMENT '任务宿主类全类名', task_method VARCHAR (128 ) NOT NULL COMMENT '任务执行方法名', task_type VARCHAR (16 ) NOT NULL COMMENT '任务类型', task_description VARCHAR (64 ) NOT NULL COMMENT '任务描述', cron_expression VARCHAR (128 ) COMMENT 'cron表达式', time_zone VARCHAR (32 ) COMMENT '时区', interval_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '执行间隔时间', initial_delay_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '初始延迟执行时间', UNIQUE uniq_class_method (task_host_class, task_method) ) COMMENT '调度任务配置表' ;
其实具体的做法和JSON
配置差不多,先引入spring-boot-starter-jdbc
,接着编写MysqlSchedulingConfigurer
:
@RequiredArgsConstructor public class MysqlScheduleTaskDao { private final JdbcTemplate jdbcTemplate; private static final ResultSetExtractor<List<ScheduleTask>> MULTI = r -> { List<ScheduleTask> tasks = Lists.newArrayList(); while (r.next()) { ScheduleTask task = new ScheduleTask(); tasks.add(task); task.setId(r.getLong("id" )); task.setCronExpression(r.getString("cron_expression" )); task.setInitialDelayMilliseconds(r.getLong("initial_delay_milliseconds" )); task.setIntervalMilliseconds(r.getLong("interval_milliseconds" )); task.setTimeZone(r.getString("time_zone" )); task.setTaskDescription(r.getString("task_description" )); task.setTaskHostClass(r.getString("task_host_class" )); task.setTaskMethod(r.getString("task_method" )); task.setTaskType(r.getString("task_type" )); } return tasks; }; public List<ScheduleTask> selectAllTasks () { return jdbcTemplate.query("SELECT * FROM schedule_task WHERE deleted = 0" , MULTI); } } @RequiredArgsConstructor public class MysqlSchedulingConfigurer extends AbstractSchedulingConfigurer { private final MysqlScheduleTaskDao mysqlScheduleTaskDao; @Override protected List<InternalTaskProperties> loadTaskProperties () throws Exception { List<InternalTaskProperties> target = Lists.newArrayList(); List<ScheduleTask> tasks = mysqlScheduleTaskDao.selectAllTasks(); if (!tasks.isEmpty()) { for (ScheduleTask task : tasks) { ScheduleTaskType scheduleTaskType = ScheduleTaskType.fromType(task.getTaskType()); if (ScheduleTaskType.CRON == scheduleTaskType) { target.add(CronTaskProperties.builder() .taskMethod(task.getTaskMethod()) .cronExpression(task.getCronExpression()) .timeZone(task.getTimeZone()) .taskDescription(task.getTaskDescription()) .taskHostKlass(task.getTaskHostClass()) .build()); } if (ScheduleTaskType.FIXED_DELAY == scheduleTaskType) { target.add(FixedDelayTaskProperties.builder() .taskMethod(task.getTaskMethod()) .intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds())) .initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds())) .taskDescription(task.getTaskDescription()) .taskHostKlass(task.getTaskHostClass()) .build()); } if (ScheduleTaskType.FIXED_RATE == scheduleTaskType) { target.add(FixedRateTaskProperties.builder() .taskMethod(task.getTaskMethod()) .intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds())) .initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds())) .taskDescription(task.getTaskDescription()) .taskHostKlass(task.getTaskHostClass()) .build()); } } } return target; } }
记得引入spring-boot-starter-jdbc
和mysql-connector-java
并且激活MysqlSchedulingConfigurer
配置。插入一条记录:
INSERT INTO `schedule_task`(`id`, `edit_time`, `create_time`, `editor`, `creator`, `deleted`, `task_host_class`, `task_method`, `task_type`, `task_description`, `cron_expression`, `time_zone`, `interval_milliseconds`, `initial_delay_milliseconds`) VALUES (1 , '2020-03-30 23:46:10' , '2020-03-30 23:46:10' , 'admin' , 'admin' , 0 , 'club.throwable.schedule.Tasks' , 'processTask1' , 'FIXED_DELAY' , '测试任务' , NULL , NULL , 10000 , 5000 );
然后启动服务,某次执行的输出:
2020-03-30 23:47:27.376 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发.......... 2020-03-30 23:47:37.378 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发.......... ....
混合配置 有些时候我们希望可以JSON
配置和JDBC
数据源配置进行混合配置,或者动态二选一以便灵活应对多环境的场景(例如要在开发环境使用JSON
配置而测试和生产环境使用JDBC
数据源配置,甚至可以将JDBC
数据源配置覆盖JSON
配置,这样能保证总是倾向于使用JDBC
数据源配置),这样需要对前面两小节的实现加多一层抽象。这里的设计可以参考SpringMVC
中的控制器参数解析器的设计,具体是HandlerMethodArgumentResolverComposite
,其实道理是相同的。
其他注意事项 在生产实践中,暂时不考虑生成任务执行日志和细粒度的监控,着重做了两件事:
并发控制,(多服务节点下)禁止任务并发执行。
跟踪任务的日志轨迹。
解决并发执行问题 一般情况下,我们需要禁止任务并发执行,考虑引入Redisson
提供的分布式锁:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>最新版本</version> </dependency> @Configuration @AutoConfigureAfter(RedisAutoConfiguration.class) public class RedissonAutoConfiguration { @Autowired private RedisProperties redisProperties; @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient () { Config config = new Config(); SingleServerConfig singleServerConfig = config.useSingleServer(); singleServerConfig.setAddress(String.format("redis://%s:%d" , redisProperties.getHost(), redisProperties.getPort())); if (redisProperties.getDatabase() > 0 ) { singleServerConfig.setDatabase(redisProperties.getDatabase()); } if (null != redisProperties.getPassword()) { singleServerConfig.setPassword(redisProperties.getPassword()); } return Redisson.create(config); } } @Component public class DistributedLockFactory { private static final String DISTRIBUTED_LOCK_PATH_PREFIX = "dl:" ; @Autowired private RedissonClient redissonClient; public DistributedLock provideDistributedLock (String lockKey) { String lockPath = DISTRIBUTED_LOCK_PATH_PREFIX + lockKey; return new RedissonDistributedLock(redissonClient, lockPath); } }
这里考虑到项目依赖了spring-boot-starter-redis
,直接复用了它的配置属性类(RedissonDistributedLock
是RLock
的轻量级封装,见附录)。使用方式如下:
@Autowired private DistributedLockFactory distributedLockFactory;public void task1 () { DistributedLock lock = distributedLockFactory.provideDistributedLock(lockKey); boolean tryLock = lock.tryLock(20L , 60 , TimeUnit.SECONDS); if (tryLock) { try { }finally { lock.unlock(); } } }
引入MDC跟踪任务的Trace MDC
其实是Mapped Diagnostic Context
的缩写,也就是映射诊断上下文,一般用于日志框架里面同一个线程执行过程的跟踪(例如一个线程跑过了多个方法,各个方法里面都打印了日志,那么通过MDC
可以对整个调用链通过一个唯一标识关联起来),例如这里选用slf4j
提供的org.slf4j.MDC
:
@Component public class MappedDiagnosticContextAssistant { public void processInMappedDiagnosticContext (Runnable runnable) { String uuid = UUID.randomUUID().toString(); MDC.put("TRACE_ID" , uuid); try { runnable.run(); } finally { MDC.remove("TRACE_ID" ); } } }
任务执行的时候需要包裹成一个Runnale
实例:
public void task1 () { mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> { StopWatch watch = new StopWatch(); watch.start(); log.info("开始执行......" ); watch.stop(); log.info("执行完毕,耗时:{} ms......" , watch.getTotalTimeMillis()); }); }
结合前面一节提到的并发控制,那么最终执行的任务方法如下:
public void task1 () { mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> { StopWatch watch = new StopWatch(); watch.start(); log.info("开始执行......" ); scheduleTaskAssistant.executeInDistributedLock("任务分布式锁KEY" , () -> { }); watch.stop(); log.info("执行完毕,耗时:{} ms......" , watch.getTotalTimeMillis()); }); }
这里的方法看起来比较别扭,其实可以直接在任务装载的时候基于分布式锁和MDC
进行封装,方式类似于ScheduledMethodRunnable
,这里不做展开,因为要详细展开篇幅可能比较大(ScheduleTaskAssistant
见附录)。
小结 其实spring-context
整个调度模块完全依赖于TaskScheduler
实现,更底层的是JUC
调度线程池ScheduledThreadPoolExecutor
。如果想要从底层原理理解整个调度模块的运行原理,那么就一定要分析ScheduledThreadPoolExecutor
的实现。整篇文章大致介绍了spring-context
调度模块的加载调度任务的流程,并且基于扩展接口SchedulingConfigurer
扩展出多种自定义配置调度任务的方式,但是考虑到需要在生产环境中运行,那么免不了需要考虑监控、并发控制、日志跟踪等等的功能,但是这样就会使得整个调度模块变重,慢慢地就会发现,这个轮子越造越大,越有主流调度框架Quartz
或者Easy Scheduler
的影子。笔者认为,软件工程,有些时候要权衡取舍,该抛弃的就应该果断抛弃,否则总是负重而行,还能走多远?
参考资料:
附录 ScheduleTaskAssistant
:
@RequiredArgsConstructor @Component public class ScheduleTaskAssistant { public static final long DEFAULT_WAIT_TIME = 5L ; public static final long DEFAULT_LEAVE_TIME = 30L ; private final DistributedLockFactory distributedLockFactory; public void executeInDistributedLock (long waitTime, long leaveTime, TimeUnit timeUnit, String lockKey, Runnable task) { DistributedLock lock = distributedLockFactory.dl(lockKey); boolean tryLock = lock.tryLock(waitTime, leaveTime, timeUnit); if (tryLock) { try { long waitTimeMillis = timeUnit.toMillis(waitTime); long start = System.currentTimeMillis(); task.run(); long end = System.currentTimeMillis(); long cost = end - start; if (cost < waitTimeMillis) { Sleeper.X.sleep(waitTimeMillis - cost); } } finally { lock.unlock(); } } } public void executeInDistributedLock (String lockKey, Runnable task) { executeInDistributedLock(DEFAULT_WAIT_TIME, DEFAULT_LEAVE_TIME, TimeUnit.SECONDS, lockKey, task); } }
RedissonDistributedLock
:
@Slf4j public class RedissonDistributedLock implements DistributedLock { private final RedissonClient redissonClient; private final String lockPath; private final RLock internalLock; RedissonDistributedLock(RedissonClient redissonClient, String lockPath) { this .redissonClient = redissonClient; this .lockPath = lockPath; this .internalLock = initInternalLock(); } private RLock initInternalLock () { return redissonClient.getLock(lockPath); } @Override public boolean isLock () { return internalLock.isLocked(); } @Override public boolean isHeldByCurrentThread () { return internalLock.isHeldByCurrentThread(); } @Override public void lock (long leaseTime, TimeUnit unit) { internalLock.lock(leaseTime, unit); } @Override public boolean tryLock (long waitTime, long leaseTime, TimeUnit unit) { try { return internalLock.tryLock(waitTime, leaseTime, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(String.format("Acquire lock fail by thread interrupted,path:%s" , lockPath), e); } } @Override public void unlock () { try { internalLock.unlock(); } catch (IllegalMonitorStateException ex) { log.warn("Unlock path:{} error for thread status change in concurrency" , lockPath, ex); } } }
(本文完 c-7-d e-a-20200324 真是有点滑稽,笔者发现任务持久化最好还是用现成的工业级调度器,于是基于Quartz做了轻量级封装,写了个后台管理界面,且听下回分解)