Spring Batch监听器 Spring Batch提供了多种监听器Listener,用于在任务处理过程中触发我们的逻辑代码。常用的监听器根据粒度从粗到细分别有:Job级别的监听器JobExecutionListener、Step级别的监听器StepExecutionListener、Chunk监听器ChunkListener、ItemReader监听器ItemReadListener、ItemWriter监听器ItemWriteListener和ItemProcessor监听器ItemProcessListener等。具体可以参考下表:
监听器
具体说明
JobExecutionListener
在Job开始之前(beforeJob)和之后(aflerJob)触发
StepExecutionListener
在Step开始之前(beforeStep)和之后(afterStep)触发
ChunkListener
在 Chunk 开始之前(beforeChunk),之后(afterChunk)和错误后(afterChunkError)触发
ItemReadListener
在 Read 开始之前(beforeRead>,之后(afterRead)和错误后(onReadError)触发
ItemProcessListener
在 Processor 开始之前(beforeProcess),之后(afterProcess)和错误后(onProcessError)触发
ItemWriterListener
在 Writer 开始之前(beforeWrite),之后(afterWrite)和错误后(onWriteError)触发
框架搭建 新建一个Spring Boot项目,版本为2.2.4.RELEASE,artifactId为spring-batch-listener,项目结构如下图所示: 剩下的数据库层的准备,项目配置,依赖引入和Spring Batch入门 文章中的框架搭建步骤一致,这里就不再赘述。
监听器演示 每种监听器都可以通过两种方式使用:
接口实现;
注解驱动。
先来看看通过实现接口的方式使用监听器。在cc.mrbird.batch包下新建listener包,然后在该包下新建MyJobExecutionListener,实现JobExecutionListener接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component public class MyJobExecutionListener implements JobExecutionListener { @Override public void beforeJob(JobExecution jobExecution) { System.out.println("before job execute: " + jobExecution.getJobInstance().getJobName()); } @Override public void afterJob(JobExecution jobExecution) { System.out.println("after job execute: " + jobExecution.getJobInstance().getJobName()); } }
上面实现的两个方法很直观了,触发时机分别为任务执行前和任务执行后。
接着看看如何使用注解驱动使用监听器。在listener包下新建MyStepExecutionListener:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component public class MyStepExecutionListener { @BeforeStep public void breforeStep(StepExecution stepExecution) { System.out.println("before step execute: " + stepExecution.getStepName()); } @AfterStep public void afterStep(StepExecution stepExecution) { System.out.println("after step execute: " + stepExecution.getStepName()); } }
通过注解的方式不需要实现接口,而是在对应的方法上通过诸如@BeforeStep、@AfterStep等注解标注即可,不过方法的签名必须符合注解的要求,否则会反射失败。比如,查看@BeforeStep的源码:
监听器的创建大致就这两种姿势了,下面的例子不在详细说明,直接贴代码。
在listener包下继续创建MyChunkListener、MyItemReaderListener、MyItemProcessListener和MyItemWriterListener。
MyChunkListener:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Component public class MyChunkListener implements ChunkListener { @Override public void beforeChunk(ChunkContext context) { System.out.println("before chunk: " + context.getStepContext().getStepName()); } @Override public void afterChunk(ChunkContext context) { System.out.println("after chunk: " + context.getStepContext().getStepName()); } @Override public void afterChunkError(ChunkContext context) { System.out.println("before chunk error: " + context.getStepContext().getStepName()); } }
MyItemReaderListener:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Component public class MyItemReaderListener implements ItemReadListener<String> { @Override public void beforeRead() { System.out.println("before read"); } @Override public void afterRead(String item) { System.out.println("after read: " + item); } @Override public void onReadError(Exception ex) { System.out.println("on read error: " + ex.getMessage()); } }
MyItemProcessListener:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Component public class MyItemProcessListener implements ItemProcessListener<String, String> { @Override public void beforeProcess(String item) { System.out.println("before process: " + item); } @Override public void afterProcess(String item, String result) { System.out.println("after process: " + item + " result: " + result); } @Override public void onProcessError(String item, Exception e) { System.out.println("on process error: " + item + " , error message: " + e.getMessage()); } }
MyItemWriterListener:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Component public class MyItemWriterListener implements ItemWriteListener<String> { @Override public void beforeWrite(List<? extends String> items) { System.out.println("before write: " + items); } @Override public void afterWrite(List<? extends String> items) { System.out.println("after write: " + items); } @Override public void onWriteError(Exception exception, List<? extends String> items) { System.out.println("on write error: " + items + " , error message: " + exception.getMessage()); } }
准备好这些监听器后,我们在cc.mrbird.batch包下新建job包,然后在该包下新建ListenerTestJobDemo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 @Component public class ListenerTestJobDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private MyJobExecutionListener myJobExecutionListener; @Autowired private MyStepExecutionListener myStepExecutionListener; @Autowired private MyChunkListener myChunkListener; @Autowired private MyItemReaderListener myItemReaderListener; @Autowired private MyItemProcessListener myItemProcessListener; @Autowired private MyItemWriterListener myItemWriterListener; @Bean public Job listenerTestJob() { return jobBuilderFactory.get("listenerTestJob") .start(step()) .listener(myJobExecutionListener) .build(); } private Step step() { return stepBuilderFactory.get("step") .listener(myStepExecutionListener) .<String, String>chunk(2) .faultTolerant() .listener(myChunkListener) .reader(reader()) .listener(myItemReaderListener) .processor(processor()) .listener(myItemProcessListener) .writer(list -> list.forEach(System.out::println)) .listener(myItemWriterListener) .build(); } private ItemReader<String> reader() { List<String> data = Arrays.asList("java", "c++", "javascript", "python"); return new simpleReader(data); } private ItemProcessor<String, String> processor() { return item -> item + " language"; } } class simpleReader implements ItemReader<String> { private Iterator<String> iterator; public simpleReader(List<String> data) { this.iterator = data.iterator(); } @Override public String read() { return iterator.hasNext() ? iterator.next() : null; } }
上面代码我们在相应的位置配置了监听器(配置chunk监听器的时候,必须配置faultTolerant())。
启动项目,控制台日志打印如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 2020-03-09 17:08:34.439 INFO 20165 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=listenerTestJob]] launched with the following parameters: [{}] before job execute: listenerTestJob3 2020-03-09 17:08:34.495 INFO 20165 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step] before step execute: step before chunk: step before read after read: java before read after read: c++ before process: java after process: java result: java language before process: c++ after process: c++ result: c++ language before write: [java language, c++ language] java language c++ language after write: [java language, c++ language] after chunk: step before chunk: step before read after read: javascript before read after read: python before process: javascript after process: javascript result: javascript language before process: python after process: python result: python language before write: [javascript language, python language] javascript language python language after write: [javascript language, python language] after chunk: step before chunk: step before read after chunk: step after step execute: step 2020-03-09 17:08:34.546 INFO 20165 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step] executed in 51ms after job execute: listenerTestJob3 2020-03-09 17:08:34.566 INFO 20165 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=listenerTestJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 105ms
从上面的运行结果我们可以看出:
证实了chunk(2)表示每一批处理2个数据块;
Step里的执行顺序是read -> process -> writer。
聚合监听器 每种监听器可以通过对应的聚合类组合在一起,比如有多个JobExecutionListener,则可以使用CompositeJobExecutionListener聚合它们。上面介绍的这几种监听器都有与之对应的CompositeXXXListener聚合类,这里只演示CompositeJobExecutionListener,剩下的以此类推。
在job包下新建CompositeJobExecutionListenerJobDemo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 @Component public class CompositeJobExecutionListenerJobDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job compositeJobExecutionListenerJob() { return jobBuilderFactory.get("compositeJobExecutionListenerJob") .start(step()) .listener(compositeJobExecutionListener()) .build(); } private Step step() { return stepBuilderFactory.get("step") .tasklet((contribution, chunkContext) -> { System.out.println("执行步骤...."); return RepeatStatus.FINISHED; }).build(); } private CompositeJobExecutionListener compositeJobExecutionListener() { CompositeJobExecutionListener listener = new CompositeJobExecutionListener(); // 任务监听器1 JobExecutionListener jobExecutionListenerOne = new JobExecutionListener() { @Override public void beforeJob(JobExecution jobExecution) { System.out.println("任务监听器One,before job execute: " + jobExecution.getJobInstance().getJobName()); } @Override public void afterJob(JobExecution jobExecution) { System.out.println("任务监听器One,after job execute: " + jobExecution.getJobInstance().getJobName()); } }; // 任务监听器2 JobExecutionListener jobExecutionListenerTwo = new JobExecutionListener() { @Override public void beforeJob(JobExecution jobExecution) { System.out.println("任务监听器Two,before job execute: " + jobExecution.getJobInstance().getJobName()); } @Override public void afterJob(JobExecution jobExecution) { System.out.println("任务监听器Two,after job execute: " + jobExecution.getJobInstance().getJobName()); } }; // 聚合 listener.setListeners(Arrays.asList(jobExecutionListenerOne, jobExecutionListenerTwo)); return listener; } }
启动项目,控制台日志打印如下:
1 2 3 4 5 6 7 8 9 2020-03-09 17:26:47.533 INFO 20310 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=compositeJobExecutionListenerJob]] launched with the following parameters: [{}] 任务监听器One,before job execute: compositeJobExecutionListenerJob 任务监听器Two,before job execute: compositeJobExecutionListenerJob 2020-03-09 17:26:47.603 INFO 20310 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step] 执行步骤.... 2020-03-09 17:26:47.660 INFO 20310 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step] executed in 57ms 任务监听器Two,after job execute: compositeJobExecutionListenerJob 任务监听器One,after job execute: compositeJobExecutionListenerJob 2020-03-09 17:26:47.693 INFO 20310 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=compositeJobExecutionListenerJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 129ms
除了本文介绍的这几个监听器外,还有一些和异常处理相关的监听器,会在后续的文章中提到。
原文来源: https://github.com/wuyouzhuguli/SpringAll . 本节源码链接:https://github.com/wuyouzhuguli/SpringAll/tree/master/71.spring-batch-listener 。