上篇对批处理的概念、技术选型、各种实现、以及spring batch的基础组件做了介绍,本篇将继续介绍spring batch的其他部分。 点击阅读原文,查看《构建企业级批处理应用(一)》 健壮性 主要包含重启、跳过、重试等。 重启(restart) 在某个job执行发生异常时,可能执行完成了某些步骤,期望重启相同参数的该次job,就可以用restart参数来进行设置。
配置如下: <job id='restartJob' xmlns='http://www./schema/batch'> <step id='restartStep' next='readWriteStep'> <tasklet allow-start-if-complete='true' start-limit='3'> <ref bean='decompressTasklet' xmlns='http://www./schema/beans'/> </tasklet> </step> </job> 跳过(skip) 再发生非致命异常时候,比如某些值不符合格式要求,程序检查发生异常时。这样的情况一般选择设置跳过即可。 配置如下: <tasklet> <chunk reader='reader' writer='writer' commit-interval='100' skip-limit='10'> <skippable-exception-classes> <include class='org.springframework.batch.item.file.FlatFileParseException' /> </skippable-exception-classes> </chunk> </tasklet> <!-- ?自定义忽略策略 --> <chunk reader='reader' writer='writer' commit-interval='100' skip-policy='skipPolicy' /> 重试(retry) 发?瞬态异常,当发?瞬态失败的时候进行重试(例如遇到记录锁的情况),一般在Chunk的Step和应用程序中进行配置或处理。 配置如下: <batch:tasklet> <batch:chunk reader='reader' processor='processor' writer='writer' commit-interval='5' retry-limit='3' skip-limit='3' > <batch:retryable-exception-classes> <batch:include class='org.springframework.dao.OptimisticLockingFailureException' /> <batch:include class='org.springframework.dao.DeadlockLoserDataAccessException' /> </batch:retryable-exception-classes> <batch:retry-listeners> <batch:listener ref='mockRetryListener' /> <batch:listener ref='retryListener' /> </batch:retry-listeners> </batch:chunk> </batch:tasklet> 伸缩性 Spring Batch支持Multi-threaded、Partitioning、Parallel 、Remote四种伸缩方式。 Multi-threaded方式 单线程执行情况 前文已有部分介绍,当批量提交次数为1的情况。批量提交次数为多个, commit-interval='3'见下图。 多线程方式 多个线程并行执行chunk。 配置如下: <batch:job id='spring-batch-job'> <batch:step id='clear-dest' next='step1'> <batch:tasklet ref='clearDestData'/> </batch:step> <batch:step id='step1'> <batch:tasklet task-executor='taskExecutor'> <batch:chunk reader='itemReader' processor='senderEmailProcessor' writer='mysqlItemWriter' commit-interval='1000'> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> 线程池的配置 <bean id='taskExecutor' class='org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor'> <property name='corePoolSize' value='5'/> <property name='maxPoolSize' value='10'/> <property name='queueCapacity' value='30'/> </bean> 需要注意的是,在多线程Step中,需要确保Reader、Processor和Writer是线程安全的,否则容易出现并发问题。Spring Batch提供的大部分组件都是非线程安全的,他们都保存有部分状态信息,主要是为了支持任务重启。 因此,使用多线程Step的核心任务是实现无状态化,例如不保存当前读取的item的cursor,而是同item的flag字段来区分item是否被处理过,已经被处理过的下次重启的时候,直接被过滤掉。多线程Step实现的是单个Step的多线程化。 Partitioning方式 分区主要包含:
方式基本上包含:
Spring Batch提供了一个同一台机器上的Handler实现,在同一机器上创建多个Step Execution。 Local Partitioning 配置如下: <!-- partitioner job --> <job id='partitionJob' xmlns='http://www./schema/batch'> <!-- master step, 10 threads (grid-size) --> <step id='masterStep'> <partition step='slave' partitioner='rangePartitioner'> <handler grid-size='10' task-executor='taskExecutor' /> </partition> </step> </job> <!-- Jobs to run --> <step id='slave' xmlns='http://www./schema/batch'> <tasklet> <chunk reader='pagingItemReader' writer='flatFileItemWriter' processor='itemProcessor' commit-interval='1' /> </tasklet> </step> Remote Partitioning 处理远程分块来执行的方式外,在远程分块的同时还可以加上分区来实现远程分块分区的实现。 详细的实现以及配置就不做说明了,主要是用到spring integration来做消息集成。 Remote 使用远程分块的Step被拆分成多个进程进行处理,多个进程间通过中间件实现通信。 下面是一幅模型示意图: (Remote Chunk) Master组件是单个进程,从属组件(Slaves)一般是多个远程进程。如果Master进程不是瓶颈的话,那么这种模式的效果几乎是最好的,因此应该在处理数据比读取数据消耗更多时间的情况下使用(实际应用中常常是这种情形)。 Parallel 需要并行的程序逻辑可以划分为不同的职责,并分配给各个独立的step,那么就可以在单个进程中并行执行。并行Step执行很容易配置和使用,如下图所示: 配置如下: <batch:job id='spring-batch-job'> <batch:step id='clear-dest' next='step1'> <batch:tasklet ref='clearDestData'/> </batch:step> <batch:split id='step1' task-executor='taskExecutor'> <batch:flow> <batch:step id='step2'> <batch:tasklet> <batch:chunk reader='itemReader' processor='senderEmailProcessor' writer='mysqlItemWriter' commit-interval='1000'> </batch:chunk> </batch:tasklet> </batch:step> </batch:flow> <batch:flow> <batch:step id='step3'> <batch:tasklet> <batch:chunk reader='itemReader2' processor='senderEmailProcessor' writer='mysqlItemWriter' commit-interval='1000'> </batch:chunk> </batch:tasklet> </batch:step> </batch:flow> </batch:split> </batch:job> 四种方式比较 前面对四种方式的实现方式和图示已做了简单介绍,下面我们再次看看四种方式的应用上的差异,做一个比较。 写在最后 以上为笔者在使用过程中的心得体会。目前spring batch 3.x 已涵盖了批处理的多方面的内容,包含对jsr-352也有较好的支持、也提供了丰富的接口,易于扩展,且上手相对容易,可是对annotation的配置方式支持不够全,期望后续的版本能够有改进。 参考:
本文作者:杨涛(点融黑帮),目前在点融网架构组从事应用架构相关工作。对微服务和docker技术有较多的实战经验。 |
|