栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Java

SpingBatch入门及使用

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpingBatch入门及使用

一、SpringBatch概述

Spring Batch 是一个轻量级的,完善的批处理框架,旨在帮企业建立健壮,高效的批处理应用。Spring Batch是Spring的子项目,使用java语言并基于Sping框架为基础开发,使得已经使用Spring框架的开发者或者企业更容易访问和利用企业服务.

Spring Batch提供了大量可重用的组件,包括了日志,追踪,事物,任务作业统计,任务重启,跳过,重复,资源管理,对于大数量和高效的批处理任务,Spring Batch同样提供了高级功能和特性来支持,比如分区功能,远超功能,总之通过Spring Batch能够支持简单的,复杂的和大数据量的批处理作业。

Spring Batch是一个批处理应用,不是调度框架,但需要和调度框架合作来构建完整的批处理任务,它只关注批处理相关的问题,比如:事物,并发,监控,执行等,并不提供相关的调度功能,需要配合调度框架进行使用。(Quartz,Tivili,Cron......)

框架主要有以下功能

  • Transcation Management(事物管理)
  • Chunk based processing(基于块的处理)
  • Declarative I/O(声明式的输入输出)
  • Start/Stop/Restart(启动/停止/再启动)
  • Retry/Skip(重试/跳过)

  

框架一共有4个主要角色:

  1. JobLauncher是任务启动器,通过它来启动任务,可以看做是程序的入口。
  2. Job代表一个具体的任务。
  3. Step代表一个具体的步骤,一个Job可以包含多个Step(比如:把一个大象放进冰箱需要多少步?)。存在2种方式(chunk/tasklet)
  4. JobRepository是存储数据的地方,可以看做是一个数据库的接口,在执行任务的时候需要通过它来记录任务的状态等信息(持久化到数据库当中)。
 二、搭建项目

    org.springframework.batch
    spring-batch-core
    4.1.3.RELEASE

 数据库



    
    
    
    
    
    
    
    
    
    
    
    
    
    
    




    

配合的调度框架



 三、Spring Batch入门程序
@Configuration
@EnableBatchProcessing
public class SpringBatchJob {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;


    @Bean
    @Qualifier("batchJob")//这个可以不写
    public Job batchJob() {
        return jobBuilderFactory.get(System.currentTimeMillis() + "")
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("########################## I am SpringBatch! ##########################");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
}
四、核心API 

 

  1. JobInstance:Job的任务的每一次执行都会对应一个instance,属于作业执行过程当中的概念。
  2. JobExecution:但是每次的job执行不会保证百分百成功,那么就会对应一个Execution。只有当执行成功完成时,给定的与执行相对应的JobInstance才也被视为完成。
  3. JobParameters:如果一个job每天运行一次,那么每天都会有一个JobInstance。但是每天运行的job的定义都是一样的,如何来区分同一个job的不同JobInstance。spring batch中提供的用来标识一个jobinstance的东西是:JobParameters。 JobParameters对象包含一组用于启动批处理作业的参数,它可以在运行期间用于识别或甚至用作参考数据。
  4. StepExecution:表示一次执行Step, 每次运行一个Step时都会创建一个新的StepExecution,类似于JobExecution。 但是,某个步骤可能由于其之前的步骤失败而无法执行。 且仅当Step实际启动时才会创建StepExecution。一次step执行的实例由StepExecution类的对象表示。 每个StepExecution都包含对其相应步骤的引用以及JobExecution和事务相关的数据,例如提交和回滚计数以及开始和结束时间。 此外,每个步骤执行都包含一个ExecutionContext,其中包含开发人员需要在批处理运行中保留的任何数据,例如重新启动所需的统计信息或状态信息。
  5. ExecutionContext:每一个StepExecution 的执行环境也就是一个容器的概念。它包含一系列的键值对。
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
 五、Flow
  1. Flow是多个Step的集合
  2. 可以被多个Job复用
  3. 使用FlowBuilder来创建
@Configuration
@EnableBatchProcessing
public class FlowDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job flowDemoJob() {
        return jobBuilderFactory.get("flowDemoJob")
                .start(flowDemo())
                .next(flowDemoStep3())
                .end()
                .build();
    }

    //指明Flow对象包含哪些Step
    @Bean
    public Flow flowDemo(){
        return new FlowBuilder("flowDemo")
                .start(flowDemoStep1())
                .next(flowDemoStep2())
                .build();
    }

    @Bean
    public Step flowDemoStep1(){
        return stepBuilderFactory.get("flowDemoStep1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("flowDemoStep1");
                        return RepeatStatus.FINISHED;
                    }
                })
                .build();
    }

    @Bean
    public Step flowDemoStep2(){
        return stepBuilderFactory.get("flowDemoStep2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("flowDemoStep2");
                        return RepeatStatus.FINISHED;
                    }
                })
                .build();
    }

    @Bean
    public Step flowDemoStep3(){
        return stepBuilderFactory.get("flowDemoStep3")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("flowDemoStep3");
                        return RepeatStatus.FINISHED;
                    }
                })
                .build();
    }


}
 六、split实现并发执行

实现任务中的多个step和多个flow并发执行

  1. 创建若干个strp
  2. 创建2个flow
  3. 创建一个任务包含以上2个flow,并让这两个flow并发执行
@Bean
public Job splitDemoJob() {
    return jobBuilderFactory.get("splitDemoJob")
            .start(splitDemoFlow1())
            .split(new SimpleAsyncTaskExecutor())
            .add(splitDemoFlow2())
            .end()
            .build();
}
七、决策器

不使用决策器的写法

@Bean
public Job demoJob() {
    return jobBuilderFactory.get("demoJob")
            .start(step1())
            .next(step2())
            .next(step3())
            .on("COMPLETED").to(step2())
            .from(step2())
            .on("COMPLETED").to(step3())
            .from(step3())
            .end()
            .build();
}

 使用决策器的写法

接口:JobExecutionDecider

public class MyDecider implements JobExecutionDecider {

    private int count;

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        count++;
        if(count % 2 == 0){
            return new FlowExecutionStatus("even");
        }else {
            return new FlowExecutionStatus("old");
        }
    }
}
//创建决策器
@Bean
public JobExecutionDecider myDecider(){
    return new MyDecider();
}

//通过决策的返回结果决定执行step或者flow
@Bean
public Job demoDeciderJob() {
    return jobBuilderFactory.get("demoDeciderJob")
            .start(step1())
            .next(myDecider())
            .from(myDecider()).on("even").to(step2())
            .from(myDecider()).on("old").to(step3())
            .from(step3()).on("*").to(myDecider())
            .end()
            .build();
}
八、Job的嵌套

一个job可以嵌套在另一个job当中,被嵌套的job称之为子job,子job不能单独执行,需要由父job来启动。

@Autowired
private Job childJobOne;

@Autowired
private Job childJobTwo;

@Autowired
private JobLauncher launcher;


//嵌套job需要启动父job
@Bean
public Job parentJob(JobRepostory repostory, PlatformTransactionManager transactionManager) {
    return jobBuilderFactory.get("parentJob")
            .start(childJob1(repostory, transactionManager))
            .next(childJob2(repostory, transactionManager))
            .build();
}
//返回Job类型的step,特殊的step
private Step childJob1(JobRepostory repostory, PlatformTransactionManager transactionManager){
    return new JobStepBuilder(new StepBuilder("childJob1"))
            .job(childJobOne)
            //使用启动父job的启动对象
            .launcher(launcher)
            //指定持久化对象
            .repostory(repostory)
            //事物
            .transactionManager(transactionManager)
            .build();
}

private Step childJob2(JobRepostory repostory, PlatformTransactionManager transactionManager){
    return new JobStepBuilder(new StepBuilder("childJob2"))
            .job(childJobTwo)
            //使用启动父job的启动对象
            .launcher(launcher)
            //指定持久化对象
            .repostory(repostory)
            //事物
            .transactionManager(transactionManager)
            .build();
}
九、监听器

用来监听批处理作业的执行情况

创建监听器可以通过接口或者使用注解来实现

  1. JobExecutionListener(before,after)
  2. StepExecutionListener(before,after)
  3. ChunkListener(before,after,error)
  4. ItemReadListener,ItemProcessListener,ItemWriteListener(before,after,error)

接口方式

public class MyJobListener implements JobExecutionListener {
    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println(jobExecution.getJobInstance().getJobName() + "before...");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println(jobExecution.getJobInstance().getJobName() + "after...");
    }
}

注解方式

public class MyChunkListener {

    @BeforeChunk
    public void before(ChunkContext chunkContext){
        System.out.println(chunkContext.getStepContext().getStepName() + "before...");
    }

    @AfterChunk
    public void after(ChunkContext chunkContext){
        System.out.println(chunkContext.getStepContext().getStepName() + "after...");
    }
}

使用

//在执行step1之前和之后分别执行监听器的内容
@Bean
public Job demoListenerJob() {
    return jobBuilderFactory.get("demoListenerJob")
            .start(step1())
            .listener(new MyJobListener())
            .build();
}

@Bean
public Job demoListenerJob() {
    return jobBuilderFactory.get("demoListenerJob")
            .start(step1())
            .listener(new MyChunkListener())
            .build();
}

@Bean
public Step1(){
    return stepBuilderFactory.get("Step1")
            //读完2的数据才执行内容
            .chunk(2)//reade,process,write
            .faultToLerant()
            .listener(new MyChunkListener())
            .reader(read())
            .write(write())
            .build();
}

@Bean
private ItemReder read(){
    return new ListItemRead<>(Arrays.asList("sss","ddd","ssaa"));
}

@Bean
private ItemWrite write(){
    return new ItemWriter(List  dataList) throws Exception{
        public void write(){
            //todo
            
        }
    };
}
十、Job参数

在运行的job当中可以用key=value的形式进行参数传递

job在运行时肯定执行的是step,job使用的数据肯定在step中使用(只需要给step传递参数)

使用step级别的监听来传递数据

Map parameters = jobExecution.getJobParameters().getParameters();
十一、ItemReader

自定义类来实现ItemReader接口

public class MyReader implements ItemReader {


    private Iterator iterator;

    public void MyReader(List dataList){
        this.iterator = dataList.iterator();
    }
    
    
    @Override
    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        if(iterator.hasNext()){
            return iterator.next();
        }else {
            return null;
        }
    }
}
十二、ItemReader的异常处理

实现ItemStreamReader接口

public interface ItemStream {
    void open(ExecutionContext var1) throws ItemStreamException;

    void update(ExecutionContext var1) throws ItemStreamException;

    void close() throws ItemStreamException;
}
  1. open是在执行step之前执行
  2. update是chunk处理完一批数据执行(ExecutionContext 将上下文持久化)
  3. close整个step执行完之后执行

在任务重启后,之前已经处理过的数据不会在进行处理。(可以通过人为判断!)

十三、ItemWrite

不同于ItemReader的一条条读,ItemWrite是一批一批写数据

十四、ItemProcess

ItemProcess用于处理业务逻辑,验证,过滤等等功能

这2个泛型代表读的数据类型和输出的数据类型

CompositeItemProcessor(用于多种处理方式)

@Bean
public CompositeItemProcessor process(){
    CompositeItemProcessor cip = new CompositeItemProcessor();
    List< CompositeItemProcessor> processList = new ArrayList<>();
    processList.add('处理方式1');
    processList.add('处理方式2');
    cip.setDelegates(processList);
    return cip;
}
十五、错误处理

默认情况下,当任务出现异常时,Spring Batch会结束任务,当使用相同的参数重启任务后,Spring Batch会去执行未执行的剩余任务。

十六、错误重试(retry)
@Bean
public Step1(){
    return stepBuilderFactory.get("Step1")
            //读完2的数据才执行内容
            .chunk(2)//reade,process,write
            .reader(read())
            .processor(pro())
            .write(write())
            .faultToLerant()//进行容错(事先假设会出现异常)
            .retry(RetryException.class)//需要指明发生什么异常需要进行容错(RetryException事先自定义的异常)
            .retryLimit(3)//尝试的次数,超过该次数,任务会停止
            .build();
}
 十七、错误跳过(skip)
@Bean
public Step1(){
    return stepBuilderFactory.get("Step1")
            //读完2的数据才执行内容
            .chunk(2)//reade,process,write
            .reader(read())
            .processor(pro())
            .write(write())
            .faultToLerant()//进行容错(事先假设会出现异常)
            .skip(SkipException.class)//需要指明发生什么异常需要进行容错(SkipException事先自定义的异常)
            .skipLimit(3)//跳过的次数,SkipException这种异常最多跳过的次数
            .build();
}
十八、错误跳过监听器(Skip Listener)

需要实现SkipListener接口

public interface SkipListener extends StepListener {
    void onSkipInRead(Throwable var1);

    void onSkipInWrite(S var1, Throwable var2);

    void onSkipInProcess(T var1, Throwable var2);
}
  1. 读取数据的泛型和输出数据的泛型
  2. onSkipInRead:在读取过程中进行跳过需要进行的处理
  3. onSkipInWrite:在写过程中进行跳过需要进行的处理
  4. onSkipInProcess:在执行processor过程中进行跳过需要进行的处理
@Autowired
private MySkipLisener mySkipLisener;

@Bean
public Step1(){
    return stepBuilderFactory.get("Step1")
            //读完2的数据才执行内容
            .chunk(2)//reade,process,write
            .reader(read())
            .processor(pro())
            .write(write())
            .faultToLerant()//进行容错(事先假设会出现异常)
            .skip(SkipException.class)//需要指明发生什么异常需要进行容错(SkipException事先自定义的异常)
            .skipLimit(3)//跳过的次数,SkipException这种异常最多跳过的次数
            .listener(mySkipLisener)
            .build();
}
十九、配合调度进行使用

SpringBatch需要配合调度框架进行使用。

这里采用spring自带的定时器(定时器的配置在搭建项目中提到过)

@Component
public class QuartzTask {


    final static Logger logger = LoggerFactory.getLogger(QuartzTask.class);

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job batchJob;

    
    @Scheduled(cron = "0/5 * * * * ?")
    public void run() {
        try {
            //由于注入bean后,每次调用都是同一个springbatch job实例,且springbatch默认任务是不重复执行的,所以需要每次传递不同参数来每次都执行Springbatch
            //如果有中文参数,需要将springbatch相关表,字符编码改为utf-8,且表中字段字符编码也改为utf-8,不然记录springbatch参数到数据库会报错,导致任务一直无法执行
            JobParameters params = new JobParametersBuilder()
                    .addDate("timestamp", new Date())
                    .toJobParameters();
            jobLauncher.run(batchJob, params);
            logger.info("job run ... ");
        } catch (Exception e) {
            logger.error(e.getLocalizedMessage());
        }
    }
}
转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1036506.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号