




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
SpringBoot使用Elastic-Job-lite实现动态创建定时任务和任务持久化Elastic-Job是当当开源的一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务;Elastic-Job-Cloud采用自研MesosFramework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。这里以Elastic-Job-lite为例,跟SpringBoot进行整合,当当的官方文档中并没有对SpringBoot集成作说明,所有的配置都是基于文档中的xml的配置修改出来的。起步准备好一个SpringBoot的项目,pom.xml中引入Elastic-job,mysql,jpa等依赖<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version></dependency><dependency><groupId>jectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId></dependency></dependencies>http://www.f-1.cc配置使用yaml进行相关属性的配置,主要配置的是数据库连接池,jpaelasticjob:
serverlists:
8:2181
namespace:
boot-job
spring:
datasource:
url:
jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
driver-class-name:
com.mysql.jdbc.Driver
username:
root
password:
root
type:
com.zaxxer.hikari.HikariDataSource
#
自动创建更新验证数据库结构
jpa:
hibernate:
ddl-auto:
update
show-sql:
true
database:
mysqlelastic-job相关的配置使用java配置实现,代替官方文档的xml配置@Configuration@Data@ConfigurationProperties(prefix
=
"elasticjob")public
class
ElasticJobConfig
{private
String
serverlists;private
String
namespace;@Resourceprivate
HikariDataSource
dataSource;@Beanpublic
ZookeeperConfiguration
zkConfig()
{return
new
ZookeeperConfiguration(serverlists,
namespace);
}@Bean(initMethod
=
"init")public
ZookeeperRegistryCenter
regCenter(ZookeeperConfiguration
config)
{return
new
ZookeeperRegistryCenter(config);
}/**
*
将作业运行的痕迹进行持久化到DB
*
*
@return
*/@Beanpublic
JobEventConfiguration
jobEventConfiguration()
{return
new
JobEventRdbConfiguration(dataSource);
}@Beanpublic
ElasticJobListener
elasticJobListener()
{return
new
ElasticJobListener(100,
100);
}}所有相关的配置到这里就已经OK了,接下来开始具体的编码实现定时任务实现先实现一个自己的任务类,需要实现elastic-job提供的SimpleJob接口,实现它的execute(ShardingContextshardingContext)方法@Slf4jpublic
class
MyElasticJob
implements
SimpleJob
{@Overridepublic
void
execute(ShardingContext
shardingContext)
{//打印出任务相关信息,JobParameter用于传递任务的ID("任务名:{},
片数:{},
id={}",
shardingContext.getJobName(),
shardingContext.getShardingTotalCount(),
shardingContext.getJobParameter());
}}接下来实现一个分布式的任务监听器,如果任务有分片,分布式监听器会在总的任务开始前执行一次,结束时执行一次。监听器在之前的ElasticJobConfig已经注册到了Spring容器之中。public
class
ElasticJobListener
extends
AbstractDistributeOnceElasticJobListener
{@Resourceprivate
TaskRepository
taskRepository;public
ElasticJobListener(long
startedTimeoutMilliseconds,
long
completedTimeoutMilliseconds)
{super(startedTimeoutMilliseconds,
completedTimeoutMilliseconds);
}@Overridepublic
void
doBeforeJobExecutedAtLastStarted(ShardingContexts
shardingContexts)
{
}@Overridepublic
void
doAfterJobExecutedAtLastCompleted(ShardingContexts
shardingContexts)
{//任务执行完成后更新状态为已执行JobTask
jobTask
=
taskRepository.findOne(Long.valueOf(shardingContexts.getJobParameter()));
jobTask.setStatus(1);
taskRepository.save(jobTask);
}}实现一个ElasticJobHandler,用于向Elastic-job中添加指定的作业配置,作业配置分为3级,分别是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration。LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,层层嵌套。@Componentpublic
class
ElasticJobHandler
{@Resourceprivate
ZookeeperRegistryCenter
registryCenter;@Resourceprivate
JobEventConfiguration
jobEventConfiguration;@Resourceprivate
ElasticJobListener
elasticJobListener;/**
*
@param
jobName
*
@param
jobClass
*
@param
shardingTotalCount
*
@param
cron
*
@param
id
数据ID
*
@return
*/private
static
LiteJobConfiguration.Builder
simpleJobConfigBuilder(String
jobName,
Class<?
extends
SimpleJob>
jobClass,
int
shardingTotalCount,
String
cron,
String
id)
{return
LiteJobConfiguration.newBuilder(new
SimpleJobConfiguration(
JobCoreConfiguration.newBuilder(jobName,
cron,
shardingTotalCount).jobParameter(id).build(),
jobClass.getCanonicalName()));
}/**
*
添加一个定时任务
*
*
@param
jobName
任务名
*
@param
cron
表达式
*
@param
shardingTotalCount
分片数
*/public
void
addJob(String
jobName,
String
cron,
Integer
shardingTotalCount,
String
id)
{
LiteJobConfiguration
jobConfig
=
simpleJobConfigBuilder(jobName,
MyElasticJob.class,
shardingTotalCount,
cron,
id)
.overwrite(true).build();new
SpringJobScheduler(new
MyElasticJob(),
registryCenter,
jobConfig,
jobEventConfiguration,
elasticJobListener).init();
}}到这里,elastic-job的注册中心,数据源相关配置,以及动态添加的逻辑已经做完了,接下来在service中调用上面写好的方法,验证功能是否正常。编写一个ElasticJobService类,扫描数据库中状态为0的任务,并且把这些任务添加到Elastic-job中,这里的相关数据库操作使用了spring-data-jpa,dao层相关代码就不贴了,可以在源码中查看。@Servicepublic
class
ElasticJobService
{@Resourceprivate
ElasticJobHandler
jobHandler;@Resourceprivate
TaskRepository
taskRepository;/**
*
扫描db,并添加任务
*/public
void
scanAddJob()
{
Specification
query
=
(Specification<JobTask>)
(root,
criteriaQuery,
criteriaBuilder)
->
criteriaBuilder
.and(criteriaBuilder.equal(root.get("status"),
0));
List<JobTask>
jobTasks
=
taskRepository.findAll(query);
jobTasks.forEach(jobTask
->
{
Long
current
=
System.currentTimeMillis();
String
jobName
=
"job"
+
jobTask.getSendTime();
String
cron;//说明消费未发送,但是已经过了消息的发送时间,调整时间继续执行任务if
(jobTask.getSendTime()
<
current)
{//设置为一分钟之后执行,把Date转换为cron表达式cron
=
CronUtils.getCron(new
Date(current
+
60000));
}
else
{
cron
=
CronUtils.getCron(new
Date(jobTask.getSendTime()));
}
jobHandler.addJob(jobName,
cron,
1,
String.valueOf(jobTask.getId()));
});
}}在Junit中添加几条测试数据@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestpublic
class
JobTaskTest
{@Resourceprivate
TaskRepository
taskRepository;@Testpublic
void
add()
{//生成几个任务,第一任务在三分钟之后Long
unixTime
=
System.currentTimeMillis()
+
60000;
JobTask
task
=
new
JobTask("test-msg-1",
0,
unixTime);
taskRepository.save(task);
unixTime
+=
60000;
task
=
new
JobTask("test-msg-2",
0,
unixTime);
taskRepository.save(task);
unixTime
+=
60000;
task
=
new
JobTask("test-msg-3",
0,
unixTime);
taskRepository.save(task);
unixTime
+=
60000;
task
=
new
JobTask("test-msg-4",
0,
unix
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 加强车站旅客安全保障计划
- 学期建设班级公约计划
- 部编版二年级下册语文教师培训计划
- 班主任的学业辅导计划
- 2025公司及项目部安全培训考试试题(典型题)
- 酒店委托经营管理简单合同
- 25年新版车间安全培训考试试题及答案培优A卷
- 2024-2025工厂员工安全培训考试试题附答案(能力提升)
- 2025年三级安全培训考试试题(完整版)
- 小学学校劳动教育课程设计计划
- 《法律职业伦理》课件-第二讲 法官职业伦理
- 《专业咖啡制作技术》课件
- 印刷行业售后服务质量保障措施
- 2025年扎赉诺尔煤业有限责任公司招聘笔试参考题库含答案解析
- 《急性阑尾炎幻灯》课件
- 员工黄赌毒法制培训
- 舞蹈工作室前台接待聘用合同
- 酒店物业租赁合同样本3篇
- 《编制说明-变电站监控系统防止电气误操作技术规范》
- 《论教育》主要篇目课件
- 河南省劳动关系协调员职业技能大赛技术工作文件
评论
0/150
提交评论