分享

kettle插件——实现资源库中JOB和其关联的脚本的下载

 richsky 2012-08-23

1、 设计需求:

 现项目基于Kettle采集各个节点前置机数据到数据中心并且表结构是一致的。每台前置机上安装部署kettle运行JOB,定时采集数据到数据中心。开发的脚本除了前置数据库连接不一致外,其他的都是一样的。若是为每个节点开发JOB和其关联的Trans,开发和维护都有很大的工作量。所以设计一个下载JOB和其关联的脚本组件实现,下载采集的JOB,并能够实现脚本之间关联。

2、 实现过程:

 在Kettle Spoon 设计器中已经实现了该功能。如下图:

Export all Linked resources to XML该菜单会将在spoon中当前选择的Tab的脚本,以zip压缩文件的形式导出到指定的文件夹里。

所以参考Export all Linked resources to XML菜单实现过程,来设计自己的脚本下载。

(1)首先在kettle源码中查找该功能的实现类和方法。

   在org.pentaho.di.ui.spoon.Spoon.java类中找到exportAllXMLFile()方法。该方法实现了这个菜单功能。

 

  1. /** 
  2.      * Export this job or transformation including all depending resources to a single zip file. 
  3.      */  
  4.     public void exportAllXMLFile() {  
  5.   
  6.         <span style="color:#ff6666;">ResourceExportInterface</span> resourceExportInterface = getActiveTransformation();  
  7.         if (resourceExportInterface==null) resourceExportInterface=getActiveJob();  
  8.         if (resourceExportInterface==nullreturn// nothing to do here, prevent an NPE  
  9.           
  10.         // ((VariableSpace)resourceExportInterface).getVariable("Internal.Transformation.Filename.Directory");  
  11.           
  12.         // Ask the user for a zip file to export to:  
  13.         //  
  14.         try {  
  15.             String zipFilename = null;  
  16.             while (Const.isEmpty(zipFilename)) {  
  17.                 FileDialog dialog = new FileDialog(shell, SWT.SAVE);  
  18.                 dialog.setText(Messages.getString("Spoon.ExportResourceSelectZipFile"));  
  19.                 dialog.setFilterExtensions(new String[] {"*.zip;*.ZIP""*"});  
  20.                 dialog.setFilterNames(new String[] { Messages.getString("System.FileType.ZIPFiles"), Messages.getString("System.FileType.AllFiles"), });  
  21.                 setFilterPath(dialog);  
  22.                 if (dialog.open()!=null)  
  23.                 {  
  24.                     lastDirOpened = dialog.getFilterPath();  
  25.                     zipFilename = dialog.getFilterPath()+Const.FILE_SEPARATOR+dialog.getFileName();  
  26.                     FileObject zipFileObject = KettleVFS.getFileObject(zipFilename);  
  27.                     if (zipFileObject.exists()) {  
  28.                         MessageBox box = new MessageBox(shell, SWT.YES | SWT.NO | SWT.CANCEL);  
  29.                         box.setMessage(Messages.getString("Spoon.ExportResourceZipFileExists.Message", zipFilename));  
  30.                         box.setText(Messages.getString("Spoon.ExportResourceZipFileExists.Title"));  
  31.                         int answer = box.open();  
  32.                         if (answer==SWT.CANCEL) return;  
  33.                         if (answer==SWT.NO) zipFilename = null;  
  34.                     }  
  35.                 } else {  
  36.                     return;  
  37.                 }  
  38.             }  
  39.               
  40.             // Export the resources linked to the currently loaded file...  
  41.             //  
  42.             <span style="color:#000000;">TopLevelResource</span> topLevelResource = <span style="color:#ff0000;">ResourceUtil</span>.serializeResourceExportInterface(zipFilename, resourceExportInterface, (VariableSpace)resourceExportInterface, rep);  
  43.             String message = ResourceUtil.getExplanation(zipFilename, topLevelResource.getResourceName(), resourceExportInterface);  
  44.                               
  45.             EnterTextDialog enterTextDialog = new EnterTextDialog(shell, "Resource serialized""This resource was serialized succesfully!", message);  
  46.             enterTextDialog.setReadOnly();  
  47.             enterTextDialog.open();  
  48.         } catch(Exception e) {  
  49.             new ErrorDialog(shell, "Error""Error exporting current file", e);  
  50.         }  
  51.     }  

通过这个方法,可以发现其调用了ResourceExportInterface、ResourceUtil这两个重要的类。

ResourceExportInterface——这是一个资源导出的接口,JobMeta、TransMeta都实现了该接口,所以在查看JobMeta、TransMeta实现的的方法。

ResourceUtil——将脚本导出到zip压缩文件的实现类,其实现方法如下:

  1. /** 
  2.      * Serializes the referenced resource export interface (Job, Transformation, Mapping, Step, Job Entry, etc) to a ZIP file. 
  3.      *  
  4.      * @param zipFilename The ZIP file to put the content in 
  5.      * @param resourceExportInterface the interface to serialize 
  6.      * @param space the space to use for variable replacement 
  7.      * @param repository the repository to load objects from (or null if not used) 
  8.      * @param injectXML The XML to inject into the resulting ZIP archive (optional, can be null) 
  9.      * @param injectFilename The name of the file for the XML to inject in the ZIP archive (optional, can be null) 
  10.      * @return The full VFS filename reference to the serialized export interface XML file in the ZIP archive. 
  11.      * @throws KettleException in case anything goes wrong during serialization 
  12.      */  
  13.     public static final TopLevelResource serializeResourceExportInterface(String zipFilename, ResourceExportInterface resourceExportInterface, VariableSpace space, Repository repository, String injectXML, String injectFilename) throws KettleException {  
  14.         ZipOutputStream out = null;  
  15.           
  16.         try {  
  17.             Map<String, ResourceDefinition> definitions = new HashMap<String, ResourceDefinition>();  
  18.               
  19.             // In case we want to add an extra pay-load to the exported ZIP file...  
  20.             //  
  21.             if (injectXML!=null) {  
  22.                 ResourceDefinition resourceDefinition = new ResourceDefinition(injectFilename, injectXML);  
  23.                 definitions.put(injectFilename, resourceDefinition);  
  24.             }  
  25.               
  26.             ResourceNamingInterface namingInterface = new SequenceResourceNaming();  
  27.           
  28.             String topLevelResource = resourceExportInterface.exportResources(space, definitions, namingInterface, repository);  
  29.               
  30.             if (topLevelResource!=null && !definitions.isEmpty()) {  
  31.                 // Create the ZIP file...  
  32.                 //  
  33.                 FileObject fileObject = KettleVFS.getFileObject(zipFilename);  
  34.   
  35.                 // Store the XML in the definitions in a ZIP file...  
  36.                 //  
  37.                 out = new ZipOutputStream(KettleVFS.getOutputStream(fileObject, false));  
  38.                 out.setEncoding("utf-8");  
  39.                 for(String filename : definitions.keySet()) {  
  40.                       
  41.                     ResourceDefinition resourceDefinition = definitions.get(filename);  
  42.                       
  43.                     ZipEntry zipEntry = new ZipEntry(resourceDefinition.getFilename());  
  44.                     String comment = Messages.getString("ResourceUtil.SerializeResourceExportInterface.ZipEntryComment.OriginatingFile", filename, Const.NVL(resourceDefinition.getOrigin(), "-"));  
  45.                     zipEntry.setComment(comment);  
  46.                     out.putNextEntry(zipEntry);  
  47.                     out.write(resourceDefinition.getContent().getBytes("utf-8"));  
  48.                     out.closeEntry();  
  49.                 }  
  50.                 String zipURL = fileObject.getName().toString();  
  51.                 return new TopLevelResource(topLevelResource, zipURL, "zip:"+zipURL+"!"+topLevelResource);  
  52.             } else {  
  53.                 throw new KettleException(Messages.getString("ResourceUtil.Exception.NoResourcesFoundToExport"));  
  54.             }  
  55.         }  
  56.         catch(Exception e) {  
  57.             throw new KettleException(Messages.getString("ResourceUtil.Exception.ErrorSerializingExportInterface",resourceExportInterface.toString()), e);  
  58.         }   
  59.         finally {  
  60.             if (out!=null) {  
  61.                 try {  
  62.                     out.close();  
  63.                 } catch (IOException e) {  
  64.                     throw new KettleException(Messages.getString("ResourceUtil.Exception.ErrorClosingZipStream", zipFilename));  
  65.                 }  
  66.             }  
  67.         }  
  68.     }  

  (2)通过读取源码之后,发现了其实现的过程,可以参考源码来实现自己的组件。Job plugin开发不写了。这里只是简单将方法贴出来:

  1. public Result execute(Result previousResult, int nr, Repository rep, Job parentJob)  
  2.     {  
  3.         Result result = previousResult;  
  4.         result.setNrErrors(1);  
  5.         result.setResult( false );  
  6.         String realrepName=environmentSubstitute(repositoryname);  
  7.         String realusername=environmentSubstitute(username);  
  8.         String realpassword=environmentSubstitute(password);  
  9.         String realJobRepositoryPath=environmentSubstitute(directoryPath);  
  10.         if(realJobRepositoryPath == nullreturn result;  
  11.         String realJobDir = realJobRepositoryPath.substring(0, realJobRepositoryPath.lastIndexOf("/"));  
  12.         String realJobName =realJobRepositoryPath.substring(realJobRepositoryPath.lastIndexOf("/")+1,realJobRepositoryPath.length());  
  13.         String realoutfilename=environmentSubstitute(targetDirectoryPath);  
  14.         String realfindDatabaseName =environmentSubstitute(findDatabaseName);  
  15.         String realoutconnectname=environmentSubstitute(connectname);  
  16.         String realoutdbtype=environmentSubstitute(dbtype);  
  17.         String realoutdbaccess=environmentSubstitute(dbaccess);  
  18.         String realoutdbHost=environmentSubstitute(dbHost);  
  19.         String realoutdbname=environmentSubstitute(dbname);  
  20.         String realoutdbport=environmentSubstitute(dbport);  
  21.         String realoutdbuser=environmentSubstitute(dbuser);  
  22.         String realoutdbpassword=environmentSubstitute(dbpassword);  
  23.         try  
  24.         {  
  25.             // 连接资源库  
  26.             connectRep(log,realrepName, realJobDir, realusername, realpassword);  
  27.             // 创建新的数据对象用于替换Trans脚本中前置机的数据库对象。实现脚本数据连接的变动。  
  28.             DatabaseMeta clientDb = new DatabaseMeta(realoutconnectname,realoutdbtype,realoutdbaccess,realoutdbHost,realoutdbname,realoutdbport,realoutdbuser,realoutdbpassword);  
  29.             //导出Job和其关联的脚本到指定的目录  
  30.             exportAllXMLFile(realJobName,realoutfilename,realfindDatabaseName,clientDb);  
  31.               
  32.             result.setResult(true);  
  33.         }catch(Exception e)  
  34.         {  
  35.             log.logError(toString(), Messages.getString("JobExportRepository.UnExpectedError",e.toString()));  
  36.             log.logError(toString(), "Stack trace: "+Const.CR+Const.getStackTracker(e));  
  37.         }finally  
  38.         {  
  39.             if(this.rep!=null)   
  40.             {  
  41.                 this.rep.disconnect();  
  42.                 this.rep=null;  
  43.             }  
  44.             if(this.repinfo!=nullthis.repinfo=null;  
  45.             if(this.userinfo!=nullthis.userinfo=null;  
  46.             if(this.repsinfo!=null)   
  47.             {  
  48.                 this.repsinfo.clear();  
  49.                 this.repsinfo=null;  
  50.             }  
  51.         }  
  52.   
  53.         return result;  
  54.     }  

  1. public void exportAllXMLFile(String realJobName,String DirectoryPath,String findDatabaseName,DatabaseMeta databaseMeta) throws Exception {  
  2.         ResourceExportInterface resourceExportInterface = null;  
  3.         try {  
  4.             resourceExportInterface = new JobMeta(log,rep,realJobName,dir);  
  5.         } catch (Exception e) {  
  6.             log.logError(toString(), Messages.getString("JobExportRepository.UnExpectedError", e.toString()));  
  7.             log.logError(toString(), "Stack trace: " + Const.CR+ Const.getStackTracker(e));  
  8.         }   
  9.         if (resourceExportInterface == null)  
  10.             return// nothing to do here, prevent an NPE  
  11.             if(!Const.isEmpty(DirectoryPath)) {  
  12.                 ResourceUtil.serializeResourceExportInterface(DirectoryPath,findDatabaseName,databaseMeta,resourceExportInterface,(VariableSpace) resourceExportInterface, rep);  
  13.             }  
  14.   
  15.     }  


 

3、 效果预览:

 

4、插件说明

   (1)选择指定Job,填写前置机数据库信息,运行Job会,会从资源库中下载指定的job和其关联的脚本。如job 中transformation步骤中的trans,并自动更改transformation步骤中选定的tran脚本文件为下载目录中ktr文件。

  (2)下载过的脚本不要在连接资源库的情况下保存,否则会把资源库中的脚本覆盖掉。在进行下载脚本的时候,就会报错。这是因为转换文件名不再为空,kettle优先使用这个设置,在进行下载时找不到ktr文件而报错。

5、插件下载

 http://download.csdn.net/detail/jdk2006/4514371

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多