分享

构建企业级批处理应用(二)

 easynet8 2016-07-23
引言


上篇对批处理的概念、技术选型、各种实现、以及spring batch的基础组件做了介绍,本篇将继续介绍spring batch的其他部分。


点击阅读原文,查看《构建企业级批处理应用(一)》


健壮性

主要包含重启、跳过、重试等。


重启(restart)

在某个job执行发生异常时,可能执行完成了某些步骤,期望重启相同参数的该次job,就可以用restart参数来进行设置。

  • 可配置参数allow-start-if-complete来设置开启重启,start-limit设置重启限制次数

  • 重启相同job参数的job launch


配置如下:

<job id='restartJob' xmlns='http://www./schema/batch'>

     <step id='restartStep' next='readWriteStep'>          

        <tasklet allow-start-if-complete='true' start-limit='3'>

            <ref bean='decompressTasklet' xmlns='http://www./schema/beans'/>

        </tasklet>

    </step>

</job>


跳过(skip)

再发生非致命异常时候,比如某些值不符合格式要求,程序检查发生异常时。这样的情况一般选择设置跳过即可。

配置如下:

<tasklet>

         <chunk reader='reader' writer='writer' commit-interval='100' skip-limit='10'>

            <skippable-exception-classes>

               <include

               class='org.springframework.batch.item.file.FlatFileParseException' />

            </skippable-exception-classes>

         </chunk>

</tasklet>


<!-- ?自定义忽略策略 -->

<chunk reader='reader' writer='writer' commit-interval='100' skip-policy='skipPolicy' /> 


重试(retry)

发?瞬态异常,当发?瞬态失败的时候进行重试(例如遇到记录锁的情况),一般在Chunk的Step和应用程序中进行配置或处理。


配置如下:

<batch:tasklet>

          <batch:chunk reader='reader' processor='processor' writer='writer'

             commit-interval='5' retry-limit='3' skip-limit='3' >

             <batch:retryable-exception-classes>

                 <batch:include class='org.springframework.dao.OptimisticLockingFailureException' />

                 <batch:include class='org.springframework.dao.DeadlockLoserDataAccessException' />

             </batch:retryable-exception-classes>

             <batch:retry-listeners>

                 <batch:listener ref='mockRetryListener' />

                 <batch:listener ref='retryListener' />

             </batch:retry-listeners>

          </batch:chunk>

</batch:tasklet>


伸缩性

Spring Batch支持Multi-threaded、Partitioning、Parallel 、Remote四种伸缩方式。


Multi-threaded方式

单线程执行情况

前文已有部分介绍,当批量提交次数为1的情况。批量提交次数为多个, commit-interval='3'见下图。


多线程方式

多个线程并行执行chunk。



配置如下:

<batch:job id='spring-batch-job'>

        <batch:step id='clear-dest' next='step1'>

            <batch:tasklet ref='clearDestData'/>

        </batch:step>

        <batch:step id='step1'>

            <batch:tasklet task-executor='taskExecutor'>

                <batch:chunk reader='itemReader' processor='senderEmailProcessor' writer='mysqlItemWriter'

                             commit-interval='1000'>

                </batch:chunk>

            </batch:tasklet>

        </batch:step>

    </batch:job>


线程池的配置

 <bean id='taskExecutor' class='org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor'>

        <property name='corePoolSize' value='5'/>

        <property name='maxPoolSize' value='10'/>

        <property name='queueCapacity' value='30'/>

    </bean>


需要注意的是,在多线程Step中,需要确保Reader、Processor和Writer是线程安全的,否则容易出现并发问题。Spring Batch提供的大部分组件都是非线程安全的,他们都保存有部分状态信息,主要是为了支持任务重启。


因此,使用多线程Step的核心任务是实现无状态化,例如不保存当前读取的item的cursor,而是同item的flag字段来区分item是否被处理过,已经被处理过的下次重启的时候,直接被过滤掉。多线程Step实现的是单个Step的多线程化。


Partitioning方式

分区主要包含:

  • 数据分区

  • 分区处理


方式基本上包含:

  • Local Partitioning

  • Remote Partitioning


Spring Batch提供了一个同一台机器上的Handler实现,在同一机器上创建多个Step Execution。


Local Partitioning


配置如下:

<!-- partitioner job -->

    <job id='partitionJob' xmlns='http://www./schema/batch'>

        <!-- master step, 10 threads (grid-size)  -->

        <step id='masterStep'>

            <partition step='slave' partitioner='rangePartitioner'>

                <handler grid-size='10' task-executor='taskExecutor' />

            </partition>

        </step>

    </job>


    <!-- Jobs to run -->

    <step id='slave' xmlns='http://www./schema/batch'>

        <tasklet>

            <chunk reader='pagingItemReader' writer='flatFileItemWriter'

                processor='itemProcessor' commit-interval='1' />

        </tasklet>

    </step>


Remote Partitioning

处理远程分块来执行的方式外,在远程分块的同时还可以加上分区来实现远程分块分区的实现。


详细的实现以及配置就不做说明了,主要是用到spring integration来做消息集成。


Remote

使用远程分块的Step被拆分成多个进程进行处理,多个进程间通过中间件实现通信。

下面是一幅模型示意图:

(Remote Chunk)


Master组件是单个进程,从属组件(Slaves)一般是多个远程进程。如果Master进程不是瓶颈的话,那么这种模式的效果几乎是最好的,因此应该在处理数据比读取数据消耗更多时间的情况下使用(实际应用中常常是这种情形)。


Parallel

需要并行的程序逻辑可以划分为不同的职责,并分配给各个独立的step,那么就可以在单个进程中并行执行。并行Step执行很容易配置和使用,如下图所示:


配置如下:

<batch:job id='spring-batch-job'>

        <batch:step id='clear-dest' next='step1'>

            <batch:tasklet ref='clearDestData'/>

        </batch:step>

        <batch:split id='step1' task-executor='taskExecutor'>

            <batch:flow>

                <batch:step id='step2'>

                    <batch:tasklet>

                        <batch:chunk reader='itemReader' processor='senderEmailProcessor' writer='mysqlItemWriter'

                                     commit-interval='1000'>

                        </batch:chunk>

                    </batch:tasklet>

                </batch:step>

            </batch:flow>

            <batch:flow>

                <batch:step id='step3'>

                    <batch:tasklet>

                        <batch:chunk reader='itemReader2' processor='senderEmailProcessor' writer='mysqlItemWriter'

                                     commit-interval='1000'>

                        </batch:chunk>

                    </batch:tasklet>

                </batch:step>

            </batch:flow>

        </batch:split>

    </batch:job>


四种方式比较

前面对四种方式的实现方式和图示已做了简单介绍,下面我们再次看看四种方式的应用上的差异,做一个比较。


写在最后

以上为笔者在使用过程中的心得体会。目前spring batch 3.x 已涵盖了批处理的多方面的内容,包含对jsr-352也有较好的支持、也提供了丰富的接口,易于扩展,且上手相对容易,可是对annotation的配置方式支持不够全,期望后续的版本能够有改进。


参考:

  1. 选择正确的批处理实现

  2. Horizontal and Vertical Scaling Strategies for Batch Applications

  3. Spring Batch Behind the Scenes



本文作者:杨涛(点融黑帮),目前在点融网架构组从事应用架构相关工作。对微服务和docker技术有较多的实战经验。



    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多