SpringBoot使用Elastic-Job-lite实现动态创建定时任务和任务持久化_第1页
SpringBoot使用Elastic-Job-lite实现动态创建定时任务和任务持久化_第2页
SpringBoot使用Elastic-Job-lite实现动态创建定时任务和任务持久化_第3页
SpringBoot使用Elastic-Job-lite实现动态创建定时任务和任务持久化_第4页
SpringBoot使用Elastic-Job-lite实现动态创建定时任务和任务持久化_第5页
已阅读5页,还剩3页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

SpringBoot使用Elastic-Job-lite实现动态创建定时任务和任务持久化Elastic-Job是当当开源的一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务;Elastic-Job-Cloud采用自研MesosFramework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。这里以Elastic-Job-lite为例,跟SpringBoot进行整合,当当的官方文档中并没有对SpringBoot集成作说明,所有的配置都是基于文档中的xml的配置修改出来的。起步准备好一个SpringBoot的项目,pom.xml中引入Elastic-job,mysql,jpa等依赖<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version></dependency><dependency><groupId>jectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId></dependency></dependencies>http://www.f-1.cc配置使用yaml进行相关属性的配置,主要配置的是数据库连接池,jpaelasticjob:

serverlists:

8:2181

namespace:

boot-job

spring:

datasource:

url:

jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false

driver-class-name:

com.mysql.jdbc.Driver

username:

root

password:

root

type:

com.zaxxer.hikari.HikariDataSource

#

自动创建更新验证数据库结构

jpa:

hibernate:

ddl-auto:

update

show-sql:

true

database:

mysqlelastic-job相关的配置使用java配置实现,代替官方文档的xml配置@Configuration@Data@ConfigurationProperties(prefix

=

"elasticjob")public

class

ElasticJobConfig

{private

String

serverlists;private

String

namespace;@Resourceprivate

HikariDataSource

dataSource;@Beanpublic

ZookeeperConfiguration

zkConfig()

{return

new

ZookeeperConfiguration(serverlists,

namespace);

}@Bean(initMethod

=

"init")public

ZookeeperRegistryCenter

regCenter(ZookeeperConfiguration

config)

{return

new

ZookeeperRegistryCenter(config);

}/**

*

将作业运行的痕迹进行持久化到DB

*

*

@return

*/@Beanpublic

JobEventConfiguration

jobEventConfiguration()

{return

new

JobEventRdbConfiguration(dataSource);

}@Beanpublic

ElasticJobListener

elasticJobListener()

{return

new

ElasticJobListener(100,

100);

}}所有相关的配置到这里就已经OK了,接下来开始具体的编码实现定时任务实现先实现一个自己的任务类,需要实现elastic-job提供的SimpleJob接口,实现它的execute(ShardingContextshardingContext)方法@Slf4jpublic

class

MyElasticJob

implements

SimpleJob

{@Overridepublic

void

execute(ShardingContext

shardingContext)

{//打印出任务相关信息,JobParameter用于传递任务的ID("任务名:{},

片数:{},

id={}",

shardingContext.getJobName(),

shardingContext.getShardingTotalCount(),

shardingContext.getJobParameter());

}}接下来实现一个分布式的任务监听器,如果任务有分片,分布式监听器会在总的任务开始前执行一次,结束时执行一次。监听器在之前的ElasticJobConfig已经注册到了Spring容器之中。public

class

ElasticJobListener

extends

AbstractDistributeOnceElasticJobListener

{@Resourceprivate

TaskRepository

taskRepository;public

ElasticJobListener(long

startedTimeoutMilliseconds,

long

completedTimeoutMilliseconds)

{super(startedTimeoutMilliseconds,

completedTimeoutMilliseconds);

}@Overridepublic

void

doBeforeJobExecutedAtLastStarted(ShardingContexts

shardingContexts)

{

}@Overridepublic

void

doAfterJobExecutedAtLastCompleted(ShardingContexts

shardingContexts)

{//任务执行完成后更新状态为已执行JobTask

jobTask

=

taskRepository.findOne(Long.valueOf(shardingContexts.getJobParameter()));

jobTask.setStatus(1);

taskRepository.save(jobTask);

}}实现一个ElasticJobHandler,用于向Elastic-job中添加指定的作业配置,作业配置分为3级,分别是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration。LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,层层嵌套。@Componentpublic

class

ElasticJobHandler

{@Resourceprivate

ZookeeperRegistryCenter

registryCenter;@Resourceprivate

JobEventConfiguration

jobEventConfiguration;@Resourceprivate

ElasticJobListener

elasticJobListener;/**

*

@param

jobName

*

@param

jobClass

*

@param

shardingTotalCount

*

@param

cron

*

@param

id

数据ID

*

@return

*/private

static

LiteJobConfiguration.Builder

simpleJobConfigBuilder(String

jobName,

Class<?

extends

SimpleJob>

jobClass,

int

shardingTotalCount,

String

cron,

String

id)

{return

LiteJobConfiguration.newBuilder(new

SimpleJobConfiguration(

JobCoreConfiguration.newBuilder(jobName,

cron,

shardingTotalCount).jobParameter(id).build(),

jobClass.getCanonicalName()));

}/**

*

添加一个定时任务

*

*

@param

jobName

任务名

*

@param

cron

表达式

*

@param

shardingTotalCount

分片数

*/public

void

addJob(String

jobName,

String

cron,

Integer

shardingTotalCount,

String

id)

{

LiteJobConfiguration

jobConfig

=

simpleJobConfigBuilder(jobName,

MyElasticJob.class,

shardingTotalCount,

cron,

id)

.overwrite(true).build();new

SpringJobScheduler(new

MyElasticJob(),

registryCenter,

jobConfig,

jobEventConfiguration,

elasticJobListener).init();

}}到这里,elastic-job的注册中心,数据源相关配置,以及动态添加的逻辑已经做完了,接下来在service中调用上面写好的方法,验证功能是否正常。编写一个ElasticJobService类,扫描数据库中状态为0的任务,并且把这些任务添加到Elastic-job中,这里的相关数据库操作使用了spring-data-jpa,dao层相关代码就不贴了,可以在源码中查看。@Servicepublic

class

ElasticJobService

{@Resourceprivate

ElasticJobHandler

jobHandler;@Resourceprivate

TaskRepository

taskRepository;/**

*

扫描db,并添加任务

*/public

void

scanAddJob()

{

Specification

query

=

(Specification<JobTask>)

(root,

criteriaQuery,

criteriaBuilder)

->

criteriaBuilder

.and(criteriaBuilder.equal(root.get("status"),

0));

List<JobTask>

jobTasks

=

taskRepository.findAll(query);

jobTasks.forEach(jobTask

->

{

Long

current

=

System.currentTimeMillis();

String

jobName

=

"job"

+

jobTask.getSendTime();

String

cron;//说明消费未发送,但是已经过了消息的发送时间,调整时间继续执行任务if

(jobTask.getSendTime()

<

current)

{//设置为一分钟之后执行,把Date转换为cron表达式cron

=

CronUtils.getCron(new

Date(current

+

60000));

}

else

{

cron

=

CronUtils.getCron(new

Date(jobTask.getSendTime()));

}

jobHandler.addJob(jobName,

cron,

1,

String.valueOf(jobTask.getId()));

});

}}在Junit中添加几条测试数据@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestpublic

class

JobTaskTest

{@Resourceprivate

TaskRepository

taskRepository;@Testpublic

void

add()

{//生成几个任务,第一任务在三分钟之后Long

unixTime

=

System.currentTimeMillis()

+

60000;

JobTask

task

=

new

JobTask("test-msg-1",

0,

unixTime);

taskRepository.save(task);

unixTime

+=

60000;

task

=

new

JobTask("test-msg-2",

0,

unixTime);

taskRepository.save(task);

unixTime

+=

60000;

task

=

new

JobTask("test-msg-3",

0,

unixTime);

taskRepository.save(task);

unixTime

+=

60000;

task

=

new

JobTask("test-msg-4",

0,

unix

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论