概览Kettle也叫PDI(全称是PentahoDataIntegeration),是一款开源的ETL工具,项目开始于2003年,200 6年加入了开源的BI组织Pentaho,正式命名为PDI。官方网站:http://kettle.pentaho.org/h ttp://kettle.pentaho.org/术语Transformation转换步骤,可以理解为将一个或者多个不同的数据源组 装成一条数据流水线。然后最终输出到某一个地方,文件或者数据库等。Job作业,可以调度设计好的转换,也可以执行一些文件处理(比较,删 除等),还可以ftp上传,下载文件,发送邮件,执行shell命令等,Hop连接转换步骤或者连接Job(实际上就是执行顺序)的连线 Transformationhop:主要表示数据的流向。从输入,过滤等转换操作,到输出。Jobhop:可设置执行条件:无条 件执行当上一个Job执行结果为true时执行当上一个Job执行结果为false时执行Kettle,etl设计及运行Kettle整体 结构图Kettle整体结构图转换设计样例图绿色线条为hop,流水线转换设计样例运行方式使用javawebstart方 式运行的配置方法命令行方式Windows下执行kitchen.bat,多个参数之间以“/”分隔,Key和value以”:”分隔例 如:kitchen.bat/file:F:\samples\demo-table2table.ktr/level:Basic /log:test123.log/file:指定转换文件的路径/level:执行日志执行级别/log:执行日志文件路径Lin ux下执行kitchen.sh,多个参数之间以“-”分隔,Key和value以”=”分隔kitchen.sh-file=/hom e/updateWarehouse.kjb-level=Minimal如果设计的转换,Job是保存在数据库中,则命令如下:Kit chen.bat/rep:资源库名称/user:admin/pass:admin/job:job名Xml保存转换,job流 程设计用户定义的作业可以保存在(xml格式)中或某一个特定的数据库中转换的设计文件以.ktr结尾(xml文格式),保存所有配置好 的数据库连接,文件相对路径,字段映射关系等信息。Job的设计文件以.kjb结尾,下面是一个调用已经设计好的转换的job文件的一部分 :demotestTransformationription>TRANS${Internal.Job.Filename.Dir ectory}/demo-table2table.ktrloadcusto merdatajob${Internal.Job.Filename.Direc tory}Nper_row>NN NN ………数据库保存转换,job流程列出几个重要的表r_job:保存job的id,name,status,执 行时间,创建时间,修改时间等信息建表语句:DROPTABLEIFEXISTS`r_job`;CREATETABLE`r _job`(`ID_JOB`bigint(20)NOTNULL,`ID_DIRECTORY`int(11)DEFA ULTNULL,`NAME`varchar(255)DEFAULTNULL,`DESCRIPTION`mediumt ext,`EXTENDED_DESCRIPTION`mediumtext,`JOB_VERSION`varchar(255 )DEFAULTNULL,`JOB_STATUS`int(11)DEFAULTNULL,`ID_DATABASE_L OG`int(11)DEFAULTNULL,`TABLE_NAME_LOG`varchar(255)DEFAULTN ULL,`CREATED_USER`varchar(255)DEFAULTNULL,`CREATED_DATE`dat etimeDEFAULTNULL,`MODIFIED_USER`varchar(255)DEFAULTNULL,`M ODIFIED_DATE`datetimeDEFAULTNULL,`USE_BATCH_ID`char(1)DEFAU LTNULL,`PASS_BATCH_ID`char(1)DEFAULTNULL,`USE_LOGFIELD`cha r(1)DEFAULTNULL,`SHARED_FILE`varchar(255)DEFAULTNULL,PRIMA RYKEY(`ID_JOB`))r_jobentry:设计界面上的一个实体对应一个entry,通过job的id关联DROPT ABLEIFEXISTS`r_jobentry`;CREATETABLE`r_jobentry`(`ID_JOBEN TRY`bigint(20)NOTNULL,`ID_JOB`int(11)DEFAULTNULL,`ID_JOBE NTRY_TYPE`int(11)DEFAULTNULL,`NAME`varchar(255)DEFAULTNULL ,`DESCRIPTION`mediumtext,PRIMARYKEY(`ID_JOBENTRY`))r_jobentr y_attribute:job的详细信息表,包括job执行规则,执行过程中的参数来源,日志记录等DROPTABLEIFE XISTS`r_jobentry_attribute`;CREATETABLE`r_jobentry_attribute` (`ID_JOBENTRY_ATTRIBUTE`bigint(20)NOTNULL,`ID_JOB`int(1 1)DEFAULTNULL,`ID_JOBENTRY`int(11)DEFAULTNULL,`NR`int (11)DEFAULTNULL,`CODE`varchar(255)DEFAULTNULL,`VALUE_NU M`doubleDEFAULTNULL,`VALUE_STR`mediumtext,PRIMARYKEY(` ID_JOBENTRY_ATTRIBUTE`))r_step:保存转换的步骤id,名字等r_step建表语句:DROPTABLE IFEXISTS`r_step`;CREATETABLE`r_step`(`ID_STEP`bigint(20) NOTNULL,`ID_TRANSFORMATION`int(11)DEFAULTNULL,`NAME`varcha r(255)DEFAULTNULL,`DESCRIPTION`mediumtext,`ID_STEP_TYPE`int (11)DEFAULTNULL,`DISTRIBUTE`char(1)DEFAULTNULL,`COPIES`in t(11)DEFAULTNULL,`GUI_LOCATION_X`int(11)DEFAULTNULL,`GUI_L OCATION_Y`int(11)DEFAULTNULL,`GUI_DRAW`char(1)DEFAULTNULL, PRIMARYKEY(`ID_STEP`))r_step_attribute:转换步骤的详细信息,字段重命名,字段映射等。通 过外键id_transformation与r_step的id_transformation关联DROPTABLEIFEXIS TS`r_step_attribute`;CREATETABLE`r_step_attribute`(`ID_STEP_ ATTRIBUTE`bigint(20)NOTNULL,`ID_TRANSFORMATION`int(11)DEFAU LTNULL,`ID_STEP`int(11)DEFAULTNULL,`NR`int(11)DEFAULTNUL L,`CODE`varchar(255)DEFAULTNULL,--操作编码譬如:truncate,也可以是字段映射等信 息`VALUE_NUM`bigint(20)DEFAULTNULL,--操作值`VALUE_STR`mediumte xt,--操作值,譬如truncate对应的是Y或者NPRIMARYKEY(`ID_STEP_ATTRIBUTE`),U NIQUEKEY`IDX_STEP_ATTRIBUTE_LOOKUP`(`ID_STEP`,`CODE`,`NR`))说明: 如果有一个字段firtstname映射到name则在r_step_attribute中增加两条记录。Kettle组成部分Chef: 是一个图形用户界面,使用SWT开发,用来设计一个作业,转换,SQL,FTP,邮件,检查表存在,检查文件存在,执行SHELL脚本Ki tchen:作业执行引擎,用来进行转换,校验,FTP上传。可以执行xml格式定义的任务以及保存在数据库上的。kitchen.bat /file:D:\Jobs\updateWarehouse.kjb/level:Basickitchen.sh-file=/ PRD/updateWarehouse.kjb-level=Minimalkitchen.bat/rep:"Productio nRepository"/job:"Updatedimensions"/dir:/Dimensions/user:mat t/pass:somepassword123/level:Basickitchen.bat/file:F:\java\pd i-ce-3.2.0-stable\data-integration\samples\transformations\files\ demo-table2table.ktr/level:Basic/log:test123.logSpoon:Spoon是Ke ttle的另一个图形用户界面,用来设计数据转换过程Pan:Pan是一个数据转换引擎,负责从不同的数据源读写和转换数据。pan.s h-file="/PRD/CustomerDimension.ktr"-level=MinimalTransformatio n步骤输入类型:Csvfileinput读取csv文件,设置csv文件路径,可以设置csv文件的相对路径或者绝对路径,字段分 隔符,文件读取的缓存大小等ExcelInput读取excel文件,和csv文件读取类似,增加了表单,表头,出错(是否忽略错误, 严格的类型判断等)的处理PropertyInput读取属性.properties文件Tableinput从数据库读数据, 动态绑定参数的SQL语句,参数替换可以从上一个步骤从获取。例如SELECTFROMcustomerWHEREbirth date<’${current_date}’这里的${current_date}在执行过程中会作为动态参数被替换掉。这个值是前一个 转换步骤设置的。注:但是测试过程中发现如果上一个步骤设置的变量,在tableinput里面获取不到,变量设置必须作为一个单独的转 换先执行一次,然后才能获取到这个变量。Textfileinput主要是txt文件内容等,和csv差不多。GenerateR ows生成一些固定字段的记录,主要用来模拟一些数据进行测试。GetFileNames读取给定目录或者文件全路径的文件名 GetSystemInfo包括命令行输入的参数,操作系统时间,ip地址,一些特殊属性,kettle版本等De-serial izefromfile从二进制kettlecube文件中读取记录Accessinput读取access数据库ESRI ShapefileReaderFixedfileinput读取固定大小文件GeneraterandomvalueGet FilesRowsCount获取文件内容的行数GetSubFoldernamesGetdatafromXML从x ml文件解析出数据LDAPInput从ldap库读取数据。LDIFInput读取ldap的ldif文件MondrianInp utMDX语言从Mondrian服务器上读取数据RSSInputS3CSVInputSalesForceInputXBas einput读取XBase系列文件,如Foxpro文件,主要是数据库语言输出类型Tableoutput将数据写入到数据库, 可以指定是否truncate表,编辑前一步转换字段与现在表结构的字段映射关系。以及每次commit的记录数大小等。Textfil eoutput将数据写入到文本文件,通常是csv文件Insert/Update根据关键字找对应的记录,如果找不到则执行i nsert,否则执行updateUpdate跟insert/update类似,只是没有insert操作Delete跟updat e类似,只是执行的是delete操作ExcelOutput输出到excel,格式可以采用excel模板Serializeto file将记录写到二进制文件中(cube文件)AccessOutputPropertiesOutput输出到proper ties文件RSSOutputSQLFileOutput将输出的sqlinsert语句保存到文件Synchronizea ftermergeXMLOutput输出到xml文件Transform类型Selectvalues用于选择列,重命名列, 指定列长度或精度Filterrows通过使用一个表达式从输入行中过滤数据,将结果是TURE或FALSE的行输出到不同的节点。 表达式是“”“OPERATOR”“”的形式,其中OPERATOR可以是=,<>,<,>,<=,>=,REGEXP,I SNULL,ISNOTNULL,INLIST,CONTAINS,STARTSWITH,ENDSWITH。用户 可以增加多个表达式,并用AND或OR连接。Sortrows对指定的列以升序或降序排序,当排序的行数超过5000时需要临时表。 Addsequence为数据流增加一个序列,可以使用ORACLE中某一序列的值或由用户指定值Dummy不做任何处理,一般 作为流程的终点。JoinRows(catesianproduct)对所有输入流做笛卡儿乘积。AggregateRo ws聚集行数据,提供SUM,AVERAGE,COUNT,MIN,MAX,FIRST,LAST聚集函数,该类型不提倡使用,将来会被G ROUPBY类型替代。JavaScriptvalue使用mozilla的rhino作为脚本语言,并提供了很多函数,用户 可以在脚本中使用这些函数。例如varprev_row;if(prev_row==null)prev_row=ro w;...StringpreviousName=prev_row.getString(“Name”,“-”);... prev_row=row;可以获得字段Name的前一条记录的值。RowNormaliser该步骤可以从透视表(PIV OTTABLE)中还原数据到事实表,如从表一转换成表二,需要使用该步骤。Uniquerows去掉输入流中的重复行,在使用该节点 前要先排序,否则只能删除连续的重复行。Groupby分组Calculator提供了一组函数对列值进行运算,使用该方式比用户自 定义JAVASCRIPT脚本速度更快。Addconstants增加常量值。Rowdenormaliser同正规化过程相反 。Rowflattener表扁平化处理除了上述基本节点类型外还定义了扩展节点类型,如SPLITFIELDS,EXECUTE SQLSCRIPT,CUBEINPUT,CUBEOUTPUT等。图一中创建了一个简单的数据流程示例,共包括5个节点,其中Ta bleinput节点使用了SQLSERVER数据库中的一张表(三条记录),Filterrows中定义了过滤条件,将符合条件的 发送到file2节点,不符和条件的记录发送到Selectvalues节点。Selectvalues节点中选择列,并对选择的列 进行了设置,将结果发送到file1节点。file1,file2节点分别是两个文本文件节点,最终用来保存数据。该流程运行后,可以在 LogView面板中查看运行结果,如图四所示从tableinput结点输入3条记录,经过滤后输出到file2节点2条记录(O UTPUT列中的3是指2条记录加1行列名),输出到file1节点1条记录(OUTPUT列中的2是指1条记录加1行列名)。Flow Abort忽略上一步的输入流,通常用在错误处理中,譬如不处理X条记录后的所有记录Appendstreams主要用来处理步 骤之间有优先级的问题。从两个步骤从读取数据流,指定步骤的读取顺序。BlockingStep阻塞所有的输出直到最后一条记录到达 Detectemptystream当输入流为空的时候,输出一条空的记录Dummy(donothing)空操作。是一个空 操作的插件Filterrows通过设定过滤条件来过滤记录IdentifylastrowinastreamSwit ch/Case类似Java的switch语法,通过比较某一确定的字段值来将数据转发到不同的转化步骤JoinsMergejo in合并两种不同输入流,连接方式有内连,左外连接等。需要注意的是记录需要先按关键字进行排序MergeRows(diff)用于比 较两组输入数据,一般用于更新后的数据重新导入到数据仓库中。两组数据中一组是引用流,一组是比较流,每次比较后只有最新版本的行数据被 输出到下一步。比较结果包括:idectical一致:两组流的主键一致,值一致changed有变化:两组流的主键一致,值有一个或多个 不同new新行:引用流中有而比较流中没有某一主键deleted被删除的行:比较流中有而引用流中没有某一主键比较流里面的数据除了 被标记为deleted都会进入下一个步骤里面SortedMerge对记录按某个关键字进行排序XMLJoin将一个XML文作 为节点添加到另一个XML里面合并前的XML文需要合并的XML合并后的XML文ScriptingExecuteSQLscri pt执行SQL脚本,应该避免使用这一步骤,尽可能的使用“tableinput(select)”,”tableoutput(i nsert)”,”update”,”delete”等步骤来替代。譬如动态创建表(表名是可变的,table1,table2,tabl e3):SQL脚本是:CREATETABLE?(IDINTEGER);ExecuterowSQLscript对Exe cutesqlscript的补充,增加了可以自定义sql语句的字段名Formula在数据流中执行公式ModifiedJava ScriptValueModifiedJavaScriptValue应该说是转换步骤里最强大的一步,可以获取前一步的输入 流的所有字段,调用Javaapi对数据做转换等操作,改变所有输出的值。还能通过设置转换状态常量对现有转换流程做改变,(忽略转换, 设置为错误,继续转换)。脚本是Mozilla的Rhino,Rhino是一个Java实现的Javascript解释器。现在已 经加入到JDK1.6的javax.script包中了。对数据流进行修改等操作提供了常量,函数,输入字段,输出字段的列表显 示Transformationscripts已经创建的脚本Transformationconstants已经预先定义好的静 态常量,不可更改,例如SKIP_TRANSFORMATION,ERROR_TRANSFORMATION,CONTINUE_T RANSFORMATIONTransformationfunctions类型转换,操作函数:字符串,数字,日期之间的转换, 字符串截取等逻辑判断函数:isDate(var),isNumber(var)等特殊函数:LoadScriptFile(var ),getProcessCount(var),print(var),writeToLog(var),getVariable(var ,var)文件操作函数:createFolder(var),deleteFile(var),getLastModifiedTime (var,var),moveFile(var,var,var)Inputfields获取输入流中字段的值field.getN umber()Outputfieldsset输出流中字段的值field.setValue(99)例子:过滤Null字段v ara;if(fieldname.isNull()){a=''0'';}else{a=fieldName.getS tring();}字符串截取将字符串“12345McDonalds”前面的数字部分分割出来varstr=field.getS tring();varcode="";varname="";for(i=0;ii++){c=str.charAt(i);if(!java.lang.Character.isDigit(c) ){code=str.substring(0,i);name=str.substring(i);Alert("co de="+code+",name="+name);break;}}过滤记录行,控制转换流程trans_Status=CO NTINUE_TRANSFORMATIONif(field.getString()==’123’)trans_Status= SKIP_TRANSFORMATION使用java类库varmydate="20090723";varparser= java.text.SimpleDateFormat("yyyyMMdd");//Mustusefullyqualifie djavaclassvardateObj=parser.parse(mydate);//justlikehowy ouwoulddoinjavaAlert(dateObj);RegexEvaluation通过正则表达式验证输入字段U serDefinedJavaExpression执行一些简单的java代码譬如表达式:firstname+""+name也 可以用Java代码:newStringBuffer(firstname).append("").append(name).to String()LookupCallDBProcedure执行存储过程并获得返回值,返回值只有一个,参数可以多个。Check ifacolumnexists检查数据库表是否存在某列Databasejoin改步骤允许查询等操作利用上一步的数据,譬如 参数动态绑定的查询语句,可以被上一步某个字段的值替换掉Databaselookup和databasejoin功能类似,从数据库 查询数值,作为新的字段添加到数据流中。可将前面的输出流的值作为查询比较参数DynamicSQLrow动态SQL查询记录行数。 Fileexists检查文件是否存在,文件名由上一步传来HTTPPost处理POST请求,url可以从上一步数据里获取,也 可以在该步骤指定,可以指定编码,请求参数等。HTTPclient仅仅是一个带参数的URL请求,url可以从上一步数据里获取,也 可以在该步骤指定,不支持soap例如http://?param1=value1¶m2=value2Strea mlookup从其他转换步骤产生的数据流中查询数据Tableexists判断数据库中某张表是否存在,表名由前面步骤传来Web serviceslookup处理SOAP请求,数据类型转换是在步骤内部处理,如果有日期,数字等类型需要转换,建议全部返回字符串, 然后使用“Selectvalues”步骤做转换JOB类型常用Start指定job执行规则,是否循环,循环规则等。Dummy空操作 ,主要用来多数据源汇总Abortjob终止,忽略一个JobDisplayMsgboxInfo使用图形化界面执行Job的时候显 示消息框DummypluginJob里面空操作,可以用来将执行循环操作Job执行已经定义好的Job。job可以嵌套job。Pin gahostpingSendSNMPtrap发送SNMPtrap报文SNMPTRAP就是在SNMP设备发生状态变化的 时候向管理器发出信号。不用管理器来检查。SetvariablesSet变量Success当出错后可以强制将该job置成功Tra nsformation执行定义好的转换TruncatetablesWaitforSQL当检测表的记录数是否达到一定条件Wr iteToLogjob里面的日志记录,不同于程序自带的log4j等日志邮件GetmailsfromPOP从POP服务器获 取邮件并存储到文件夹中Mail发送文本或者HTML格式邮件,可添加附件。Mailvalidator通过发送SNMPTRAP到 邮件服务器来验证EMAIL地址是否正确文件管理Addfilenamestoresult将文件夹或者多个文件加入到数据流中,以 便在下一个job步骤中使用Comparefolders比较两个文件夹下面的文件是否一致,可以选择只比较文件大小,也可以选择比较文 件内容CopyFilesCopyorMoveresultfilenames根据上一步的执行结果得到的文件名,复制或者剪切 文件CreateafolderCreatefileDeletefileDeletefilenamesfromresu lt根据上一步的执行结果得到的文件名,复制或者剪切文件DeletefilesDeletefoldersFileCompare 比较两个文件的内容HTTP通过http协议从web服务器上获得文件MoveFiles移动文件UnzipfileUnzip解压 缩文件Waitforfile循环检测文件是否存在,否则直到超时失败Zipfile采用zip压缩文件文件传输FTPDelet e删除FTP上的文件GetafilewithFTP从FTP获取文件,可以设置编码方式,连接超时时间等。文件保存路径,文件名 中是否包含日期,时间,时间日期是否需要特殊格式化,是否覆盖文件。GetafilewithSFTP与ftp类似,只是采用Se cureFTPprotocolPutafilewithFTP上传文件,本地文件路径,通过等模糊匹配要上传的文件。并执 行上传的远程目录。PutafilewithSFTPSSH2GetSSH2PutScriptingJavaScript 编辑javascript脚本并执行。SQL执行SQL脚本可以执行写好的sql脚本,指定sql路径即可。也可以插入编辑sql脚本。 Shell执行SHELL脚本,可以执行已经写好的shell脚本指定shell脚本路径即可,也可以自己插入,编辑shell脚本可 以把前一个步骤的执行结果当作参数传入与现有应用程序集成通过图形化界面设计出来的transformation?,job都是xml格式 的文件。通过加载环境变量,初始化配置文件可以在servlet或者其他应用程序中中执行转换。大致执行方式如下:publicstat icvoidrunTransformation(Stringfilename){try{StepLoader.ini t();EnvUtil.environmentInit();TransMetatransMeta=newTransMe ta(filename);Transtrans=newTrans(transMeta);trans.execute(n ull);//Youcanpassargumentsinsteadofnull.trans.waitUntilF inished();if(trans.getErrors()>0){thrownewRuntimeExcept ion("Therewereerrorsduringtransformationexecution.");}} catch(KettleExceptione){System.out.println(e);}}通过插件扩展现有功能 至少实现4个接口StepMetaInterface接口定义元数据,执行检查等StepInterface接口集成自BaseSte p,步骤StepDataInterface接口处理游标,数据库结果集,文件等StepDialogInterface接口编辑, 设置变量的图形化界面类名命名规则:四个类的前缀保持一致,譬如实现StepMetaInterface的类是MyMeta那么实现Ste pInterface的类是My,实现StepDataInterface的是MyData,实现StepDialogInterface 的是MyDialog定义plugin.xml文件uginid="DummyPlugin"iconfile="DPL.png"description="DummyPlugi n"tooltip="Thisisadummypluginteststep"category="Transform "classname="be.ibridge.kettle.dummy.DummyPluginMeta">>TransformTransformTransformationDummypluginPluginvoorbeeldExempledeplugicielThisisadummypluginteststepDitiseenvoorbeeldpluginookwel''dummyplugin''gehetenExempledeplugiciel小例子文件批量入库:获取一个目录下的文件名,使用一个正则表达式来指定文件名。使用一个javascript脚本,读取文件内容,如果文件是二进制文件,文件内容一般保存为BLOB、Binary、Image等类型如果文件是字符型文件,文件内容一般保存为CLOB、varchar、Text等类型注意:因为该方法是一次性将文件内容都读到了内存中,因此只能处理比较小的文件file=newPackages.java.io.File(filename.getString()); |
|