版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
SpringBoot使用Elastic-Job-lite实现动态创建定时任务和任务持久化Elastic-Job是当当开源的一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务;Elastic-Job-Cloud采用自研MesosFramework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。这里以Elastic-Job-lite为例,跟SpringBoot进行整合,当当的官方文档中并没有对SpringBoot集成作说明,所有的配置都是基于文档中的xml的配置修改出来的。起步准备好一个SpringBoot的项目,pom.xml中引入Elastic-job,mysql,jpa等依赖<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version></dependency><dependency><groupId>jectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId></dependency></dependencies>http://www.f-1.cc配置使用yaml进行相关属性的配置,主要配置的是数据库连接池,jpaelasticjob:
serverlists:
8:2181
namespace:
boot-job
spring:
datasource:
url:
jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
driver-class-name:
com.mysql.jdbc.Driver
username:
root
password:
root
type:
com.zaxxer.hikari.HikariDataSource
#
自动创建更新验证数据库结构
jpa:
hibernate:
ddl-auto:
update
show-sql:
true
database:
mysqlelastic-job相关的配置使用java配置实现,代替官方文档的xml配置@Configuration@Data@ConfigurationProperties(prefix
=
"elasticjob")public
class
ElasticJobConfig
{private
String
serverlists;private
String
namespace;@Resourceprivate
HikariDataSource
dataSource;@Beanpublic
ZookeeperConfiguration
zkConfig()
{return
new
ZookeeperConfiguration(serverlists,
namespace);
}@Bean(initMethod
=
"init")public
ZookeeperRegistryCenter
regCenter(ZookeeperConfiguration
config)
{return
new
ZookeeperRegistryCenter(config);
}/**
*
将作业运行的痕迹进行持久化到DB
*
*
@return
*/@Beanpublic
JobEventConfiguration
jobEventConfiguration()
{return
new
JobEventRdbConfiguration(dataSource);
}@Beanpublic
ElasticJobListener
elasticJobListener()
{return
new
ElasticJobListener(100,
100);
}}所有相关的配置到这里就已经OK了,接下来开始具体的编码实现定时任务实现先实现一个自己的任务类,需要实现elastic-job提供的SimpleJob接口,实现它的execute(ShardingContextshardingContext)方法@Slf4jpublic
class
MyElasticJob
implements
SimpleJob
{@Overridepublic
void
execute(ShardingContext
shardingContext)
{//打印出任务相关信息,JobParameter用于传递任务的ID("任务名:{},
片数:{},
id={}",
shardingContext.getJobName(),
shardingContext.getShardingTotalCount(),
shardingContext.getJobParameter());
}}接下来实现一个分布式的任务监听器,如果任务有分片,分布式监听器会在总的任务开始前执行一次,结束时执行一次。监听器在之前的ElasticJobConfig已经注册到了Spring容器之中。public
class
ElasticJobListener
extends
AbstractDistributeOnceElasticJobListener
{@Resourceprivate
TaskRepository
taskRepository;public
ElasticJobListener(long
startedTimeoutMilliseconds,
long
completedTimeoutMilliseconds)
{super(startedTimeoutMilliseconds,
completedTimeoutMilliseconds);
}@Overridepublic
void
doBeforeJobExecutedAtLastStarted(ShardingContexts
shardingContexts)
{
}@Overridepublic
void
doAfterJobExecutedAtLastCompleted(ShardingContexts
shardingContexts)
{//任务执行完成后更新状态为已执行JobTask
jobTask
=
taskRepository.findOne(Long.valueOf(shardingContexts.getJobParameter()));
jobTask.setStatus(1);
taskRepository.save(jobTask);
}}实现一个ElasticJobHandler,用于向Elastic-job中添加指定的作业配置,作业配置分为3级,分别是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration。LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,层层嵌套。@Componentpublic
class
ElasticJobHandler
{@Resourceprivate
ZookeeperRegistryCenter
registryCenter;@Resourceprivate
JobEventConfiguration
jobEventConfiguration;@Resourceprivate
ElasticJobListener
elasticJobListener;/**
*
@param
jobName
*
@param
jobClass
*
@param
shardingTotalCount
*
@param
cron
*
@param
id
数据ID
*
@return
*/private
static
LiteJobConfiguration.Builder
simpleJobConfigBuilder(String
jobName,
Class<?
extends
SimpleJob>
jobClass,
int
shardingTotalCount,
String
cron,
String
id)
{return
LiteJobConfiguration.newBuilder(new
SimpleJobConfiguration(
JobCoreConfiguration.newBuilder(jobName,
cron,
shardingTotalCount).jobParameter(id).build(),
jobClass.getCanonicalName()));
}/**
*
添加一个定时任务
*
*
@param
jobName
任务名
*
@param
cron
表达式
*
@param
shardingTotalCount
分片数
*/public
void
addJob(String
jobName,
String
cron,
Integer
shardingTotalCount,
String
id)
{
LiteJobConfiguration
jobConfig
=
simpleJobConfigBuilder(jobName,
MyElasticJob.class,
shardingTotalCount,
cron,
id)
.overwrite(true).build();new
SpringJobScheduler(new
MyElasticJob(),
registryCenter,
jobConfig,
jobEventConfiguration,
elasticJobListener).init();
}}到这里,elastic-job的注册中心,数据源相关配置,以及动态添加的逻辑已经做完了,接下来在service中调用上面写好的方法,验证功能是否正常。编写一个ElasticJobService类,扫描数据库中状态为0的任务,并且把这些任务添加到Elastic-job中,这里的相关数据库操作使用了spring-data-jpa,dao层相关代码就不贴了,可以在源码中查看。@Servicepublic
class
ElasticJobService
{@Resourceprivate
ElasticJobHandler
jobHandler;@Resourceprivate
TaskRepository
taskRepository;/**
*
扫描db,并添加任务
*/public
void
scanAddJob()
{
Specification
query
=
(Specification<JobTask>)
(root,
criteriaQuery,
criteriaBuilder)
->
criteriaBuilder
.and(criteriaBuilder.equal(root.get("status"),
0));
List<JobTask>
jobTasks
=
taskRepository.findAll(query);
jobTasks.forEach(jobTask
->
{
Long
current
=
System.currentTimeMillis();
String
jobName
=
"job"
+
jobTask.getSendTime();
String
cron;//说明消费未发送,但是已经过了消息的发送时间,调整时间继续执行任务if
(jobTask.getSendTime()
<
current)
{//设置为一分钟之后执行,把Date转换为cron表达式cron
=
CronUtils.getCron(new
Date(current
+
60000));
}
else
{
cron
=
CronUtils.getCron(new
Date(jobTask.getSendTime()));
}
jobHandler.addJob(jobName,
cron,
1,
String.valueOf(jobTask.getId()));
});
}}在Junit中添加几条测试数据@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestpublic
class
JobTaskTest
{@Resourceprivate
TaskRepository
taskRepository;@Testpublic
void
add()
{//生成几个任务,第一任务在三分钟之后Long
unixTime
=
System.currentTimeMillis()
+
60000;
JobTask
task
=
new
JobTask("test-msg-1",
0,
unixTime);
taskRepository.save(task);
unixTime
+=
60000;
task
=
new
JobTask("test-msg-2",
0,
unixTime);
taskRepository.save(task);
unixTime
+=
60000;
task
=
new
JobTask("test-msg-3",
0,
unixTime);
taskRepository.save(task);
unixTime
+=
60000;
task
=
new
JobTask("test-msg-4",
0,
unix
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 食品采购管理制度
- 企业环境的应急预案
- 幼儿园手工制作活动策划方案(3篇)
- 春节安全的应急预案范文(35篇)
- 老师工作计划11篇
- 高中体育述职报告5篇
- 高考地理二轮复习综合题专项训练1特征(点)描述类含答案
- 第二十三章 数据分析 综合检测
- 山西省太原市2024-2025学年七年级上学期期中地理试题(含答案)
- 河南省周口市项城市东街小学等校2024-2025学年四年级上学期11月期中数学试题
- 糖尿病肾病护理PPT课件
- 斗首奥语精解
- 海康威视视频车位诱导与反向寻车系统解决方案
- 双机热备RoseHA8.9+oracle1164位配置方法
- 物业公司小区业主满意度调查表(共5页)
- 小学生日常卫生小常识(课堂PPT)
- 施工工期计算器
- 幼儿园大班《风筝飞上天》教案
- 移动公司活动策划
- 寄宿生防火、防盗、人身防护安全知识
- 建筑工程资料管理标准(吉林省地方标准db22t4982010)
评论
0/150
提交评论