分享

spring batch的批处理框架简单介绍 - Spring - Java - Java...

 lylf615 2010-09-21

有关spring batch的介绍我就不多说了,可以去下面的网址看看:

http://www./cn/news/2008/07/spring-batch-zh

 

刚接触到spring batch的时候无从下手,javaeye有关的帖子博文也非常的少,很郁闷只能看它自己提供的文档,说真的,那文档帮助不大,顶多就是让你知道spring batch靠这么几个类玩的。没办法只能自己一步步看代码调试,走了不少弯路呢。

 

这篇文章简单介绍一下spring batch是怎么处理单个文件的。

 

首先看下spring batch关键的几个类:

 

 

JobLauncher负责启动Job,Job中干事的是Step,想让Step干活,就要给它点工具,itemReader,ItemProcessor,itemWriter就是它需要的工具,这些工具可以是你自己提供,以可以用spring batch现成的。就看实际情况了。

 

接下来就具体看看这些类究竟包含了什么东西,干了哪些事情。

先看配置文件:

Xml代码 复制代码
  1. <bean id="jobLauncher"  
  2.         class="org.springframework.batch.core.launch.support.SimpleJobLauncher">  
  3.         <property name="jobRepository" ref="jobRepository" />  
  4. </bean>  
  5.        
  6.        
  7. <bean id="jobRepository"class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"   
  8.         p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" />  

 jobLauncher的配置很简单,只是将需要的jobRepository注进来而已。复杂的是job和step的配置

Xml代码 复制代码
  1. <!-- 对账单批处理 begin -->  
  2.     <bean id="checkSheetFileImportJob" parent="simpleJob">  
  3.         <property name="steps">  
  4.             <list>  
  5.                 <bean id="checkSheetTransactionLoadStep" parent="simpleStep">  
  6.                     <property name="commitInterval" value="3"/>  
  7.                     <property name="allowStartIfComplete" value="true"/>  
  8.                     <property name="itemReader" ref="checkSheetTransactionItemReader"/>  
  9.                     <property name="itemWriter" ref="checkSheetTransactionWriter"/>  
  10.                 </bean>  
  11.             </list>  
  12.         </property>  
  13.     </bean>  
  14.   
  15.         <bean id="checkSheetTransactionWriter" class="com.longtop.netbank.checksheet.CheckSheetTransactionJDBCWriter">  
  16.         <property name="dataSource" ref="dataSource"/>  
  17.         <property name="resource" ref="checkSheetFileResource"></property>  
  18.     </bean>  
  19.        
  20.     <bean id="checkSheetTransactionItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">  
  21.         <property name="resource" ref="checkSheetFileResource"/>  
  22.         <property name="linesToSkip" value="0"/>  
  23.         <property name="lineMapper">  
  24.             <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">  
  25.                 <property name="lineTokenizer">  
  26.                     <bean  
  27.                         class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">  
  28.                         <property name="names"  
  29.                             value="accountNo,transactionDate,transactionType,oppositeAccountNo,oppositeAccountNickName,summary,lending,transactionAmount,balance"/>  
  30.                     </bean>  
  31.                 </property>  
  32.                 <property name="fieldSetMapper">  
  33.                     <bean class="com.longtop.netbank.checksheet.CheckSheetTransactionFieldSetMapper"/>  
  34.                 </property>  
  35.             </bean>  
  36.         </property>  
  37.     </bean>  

 

Xml代码 复制代码
  1.        <bean id="simpleJob" class="org.springframework.batch.core.job.SimpleJob" abstract="true">  
  2.     <property name="jobRepository" ref="jobRepository" />  
  3.     <property name="restartable" value="true" />  
  4. </bean>  
  5.   
  6. <bean id="simpleStep" class="org.springframework.batch.core.step.item.SimpleStepFactoryBean"  
  7.     abstract="true">  
  8.     <property name="transactionManager" ref="transactionManager" />  
  9.     <property name="jobRepository" ref="jobRepository" />  
  10.     <property name="commitInterval" value="1" />  
  11. </bean>  

 job是springbatch自带的,我们配置steps属性,以便执行批处理。

在示例中,step的工作就是通过itemReader读取数据文件,然后用itemProcessor进行处理,然后通过itemWriter写入到数据库中,示例中的step也是用的springbatch自带的类SimpleStepFactoryBean。

 

对于SimpleStepFactoryBean需要花功夫好好的看看它究竟做了哪些事情。

 

Simplestepbeanfactory代码 复制代码
  1. public class SimpleStepFactoryBean<T,S> implements FactoryBean, BeanNameAware {   
  2.   
  3. private int commitInterval = 0;   
  4.   
  5. /**   
  6.      * Create a {@link Step} from the configuration provided.   
  7.      *    
  8.      * @see org.springframework.beans.factory.FactoryBean#getObject()   
  9.      */   
  10.     public final Object getObject() throws Exception {   
  11.         TaskletStep step = new TaskletStep(getName());   
  12.         applyConfiguration(step);   
  13.         return step;   
  14.     }   
  15.   
  16.     public Class<Step> getObjectType() {   
  17.         return Step.class;   
  18.     }   
  19. ...   
  20. }  

 

 我们可以知道这个类返回的是TaskletStep对象,并通过applyConfiguration方法设置TaskletStep对象的属性。applyConfiguration比较长,着了就不贴出来了。再讲到后面的时候在回过头来介绍这个方法。

commitInterval 大家通过单词的意思就应该知道该变量是控制itemReader每次读取的量,和itemWriter每次写入的量,简单的说就是:

如果commitInterval=10,数据文件有30个数据,当读到写到24个数据的时候抛出了异常,那么成功写入数据库的数据以后20条,第21--第24条数据放弃。下次如果执行断点回复时,就从第21条数据开始

 

 

Java代码 复制代码
  1. /**  
  2.      * @return a {@link CompletionPolicy} consistent with the commit interval  
  3.      * and injected policy (if present).  
  4.      */  
  5.     private CompletionPolicy getChunkCompletionPolicy() {   
  6.         Assert.state(!(chunkCompletionPolicy != null && commitInterval != 0),   
  7.                 "You must specify either a chunkCompletionPolicy or a commitInterval but not both.");   
  8.         Assert.state(commitInterval >= 0"The commitInterval must be positive or zero (for default value).");   
  9.        
  10.         if (chunkCompletionPolicy != null) {   
  11.             return chunkCompletionPolicy;   
  12.         }   
  13.         if (commitInterval == 0) {   
  14.             logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")");   
  15.             commitInterval = DEFAULT_COMMIT_INTERVAL;   
  16.         }   
  17.         return new SimpleCompletionPolicy(commitInterval);   
  18.     }  

 这个方法就是设置将commitInterval设置到SimpleCompetionPolicy中以便以后使用。 SimpleCompetionPolicy是用来控制循环什么时候结束,控制的方法就是通过判断执行的次数是否超过了commitInterval。

 

下面就大概的讲下执行步骤(只贴类名和部分代码,类名方便查找源代码):

 

1 jobLauncher调用job:

Java代码 复制代码
  1. public class SimpleJobLauncher implements JobLauncher, InitializingBean {   
  2. /**  
  3.      * Run the provided job with the given {@link JobParameters}. The  
  4.      * {@link JobParameters} will be used to determine if this is an execution  
  5.      * of an existing job instance, or if a new one should be created.  
  6.      *   
  7.      * @param job the job to be run.  
  8.      * @param jobParameters the {@link JobParameters} for this particular  
  9.      * execution.  
  10.      * @return JobExecutionAlreadyRunningException if the JobInstance already  
  11.      * exists and has an execution already running.  
  12.      * @throws JobRestartException if the execution would be a re-start, but a  
  13.      * re-start is either not allowed or not needed.  
  14.      * @throws JobInstanceAlreadyCompleteException if this instance has already  
  15.      * completed successfully  
  16.      */  
  17.     public JobExecution run(final Job job, final JobParameters jobParameters)   
  18.             throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {   
  19.   
  20.         Assert.notNull(job, "The Job must not be null.");   
  21.         Assert.notNull(jobParameters, "The JobParameters must not be null.");   
  22.   
  23.         boolean exists = jobRepository.isJobInstanceExists(job.getName(), jobParameters);   
  24.         if (exists && !job.isRestartable()) {   
  25.             throw new JobRestartException("JobInstance already exists and is not restartable");   
  26.         }   
  27.         /**  
  28.          * There is a very small probability that a non-restartable job can be  
  29.          * restarted, but only if another process or thread manages to launch  
  30.          * <i>and</i> fail a job execution for this instance between the last assertion  
  31.          * and the next method returning successfully.  
  32.          */  
  33.         final JobExecution jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);   
  34.   
  35.         taskExecutor.execute(new Runnable() {   
  36.   
  37.             public void run() {   
  38.                 try {   
  39.                        
  40.                     job.execute(jobExecution);   
  41.                        
  42.                 }   
  43.                 catch (Throwable t) {   
  44.                     logger.info("Job: [" + job + "] failed with the following parameters: [" + jobParameters + "]", t);   
  45.                     rethrow(t);   
  46.                 }   
  47.             }   
  48.   
  49.             private void rethrow(Throwable t) {   
  50.                 if (t instanceof RuntimeException) {   
  51.                     throw (RuntimeException) t;   
  52.                 }   
  53.                 throw new RuntimeException(t);   
  54.             }   
  55.         });   
  56.   
  57.         return jobExecution;   
  58.     }  

 执行job前先判断下job实例是否已经存在或能否重复执行。不满足条件就抛异常。这段代码很简单不用多讲。

 

2 job执行step

 

Java代码 复制代码
  1. public class SimpleJob extends AbstractJob {   
  2.     public void execute(JobExecution execution) {   
  3. ...   
  4.         getCompositeListener().beforeJob(execution);   
  5.        
  6.         StepExecution lastStepExecution = handleSteps(steps, execution);   
  7. ...   
  8.                 try {   
  9.                 getCompositeListener().afterJob(execution);   
  10.             }   
  11. ...   
  12.        }  

 代码很多,贴了一写关键的。在job执行step之前,会先执行JobExecutionListener的beforeJob方法。执行结束之后又会执行JobExecutionListener的afterJob方法。可以根据需要来决定这些方法干什么事情。如果这个job有多个step

那么程序会根据最后一个step的stepExecution来更新jobExecution.

 

3 step的执行

 

Java代码 复制代码
  1. public abstract class AbstractStep implements Step, InitializingBean, BeanNameAware {   
  2.   
  3. /**  
  4.      * Template method for step execution logic - calls abstract methods for  
  5.      * resource initialization ({@link #open(ExecutionContext)}), execution  
  6.      * logic ({@link #doExecute(StepExecution)}) and resource closing ({@link #close(ExecutionContext)}).  
  7.      */  
  8.     public final void execute(StepExecution stepExecution) throws JobInterruptedException,   
  9.             UnexpectedJobExecutionException 『   
  10.         ...   
  11.         }   
  12. }  

 step就是执行这个方法干活的。首先是将数据文件放入流中:

Java代码 复制代码
  1. open(stepExecution.getExecutionContext());  

也就是调用 FlatFileItemReader的open方法,因为在配置文件中itemReader的实现类型就是FlatFileItemReader,在SimpleStepBeanFacotry的applyConfiguration中讲FlatFileItemReader配置到TaskletStep中。(详情请看代码)。

然后调用TaskletStep的doExecute方法

Java代码 复制代码
  1. exitStatus = doExecute(stepExecution);  

 看下该方法的内容(代码不贴了,请看源码)

Java代码 复制代码
  1. return stepOperations.iterate(ReapCallBack callback)  

 这个callback就是从流中读取commitInterval指定个数的数据并写入到数据库中它是调用tasklet.execute方法

Java代码 复制代码
  1. exitStatus = tasklet.execute(contribution, attributes);  

 该tasklet类型是SimpleChunkOrientedTasklet,至于为什么是这个类型,就要看看SimpleStepBeanFactory的applyConfiguration方法了。

SimpleChunkOrientedTasklet的execute简单明了,不多说了。

 

这里有一点饶人的地方就是stepOperations.iterate了,看代码的时候需要明确的一点是stepOperations(RepeatTemplate)是使用CompletionPolicy接口来控制循环的,对于TaskletStep和SimpleChunkOrientedTasklet使用哪个CompletionPolicy来控制循环,那么还是要看下SimpleStepBeanFactory的

applyConfiguration方法。

 

 

上面介绍的就是一个job的执行过程。因为是第一次写。所以很多地方都写有说清楚,如果有疑问,请提出来。

附件是demo,要创建一个springbatch的数据就可以运行。当然这个demo有小陷阱(每次执行的时候会删除数据表重新创建)大家可以根据需要修改一下,当然如果每次都删除数据表的话,就看不到spring batch断点回复的功能了。

因为附件太大不能上传,各位只能自己去下了。

 

 

 

 

 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多