分享

Spark MLlib算法调用展示平台及其实现过程

 陈永正的图书馆 2017-05-19

1. 软件版本:

IDE:Intellij IDEA 14,Java:1.7,Scala:2.10.6;Tomcat:7,CDH:5.8.0; Spark:1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0 ; Hadoop:hadoop2.6.0-cdh5.8.0;(使用的是CDH提供的虚拟机)

2. 工程下载及部署:

Scala封装Spark算法工程:https://github.com/fansy1990/Spark_MLlib_Algorithm_1.6.0.git ;
调用Spark算法工程:https://github.com/fansy1990/Spark_MLlib_1.6.0_.git ;
部署(主要针对Spark_MLlib_1.6.0工程):
1)配置好db.properties中相应用户名密码/数据库等参数;
2)第一次启动tomcat,修改hibernate.cfg.xml文件中的hibernate.hbm2ddl.auto值为create,第二次启动修改为update;
3) 打开集群参数页面,点击初始化,初始化集群参数,如果集群参数和当前集群不匹配,那么需要做相应修改;
   暂时考虑使用配置文件的方式来配置集群参数,如果要调整为数据库配置,那么修改Utisl.dbOrFile参数即可;即,暂时只需修改utisl.properties文件;
4)拷贝Spark_MLlib_Algorithm_1.6.0工程生成的算法到到3)中spark.jar所在路径;
5)拷贝集群中的yarn-site.xml到3)中spark.files所在路径;
6)拷贝spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar到3)中spark.yarn.jar所在路径;

3. 工程实现原理:

3.1 Scala封装Spark算法工程:

3.1.1 工程目录

1. 工程目录如下所示:

其中,data目录为所有的测试数据所在目录,这里针对不同的算法建立了不同的目录,主要有5类:分类与回归/聚类/协同过滤/降维/频繁项集挖掘;
main/scala里面就是所有封装Spark源码中的代码;
test/scala里面对应每个封装代码的测试;

2. 工程采用Maven构建,直接根据pom文件加载对应依赖;

3. 该工程需要经过maven打包,把打包好的jar包放到CDH的虚拟机中的HDFS上某一固定目录,方便Spark算法调用工程调用(具体目录下文有说);

3.1.2 单个算法实现(封装/测试),比如针对逻辑回归

1. 针对逻辑回归,其封装代码如下所示:
代码清单3-1 逻辑回归算法封装(Scala)
[plain] view plain copy
  1. package com.fz.classification  
  2.   
  3. import com.fz.util.Utils  
  4. import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, LogisticRegressionWithLBFGS}  
  5. import org.apache.spark.mllib.linalg.Vectors  
  6. import org.apache.spark.mllib.regression.LabeledPoint  
  7. import org.apache.spark.{SparkConf, SparkContext}  
  8.   
  9. /**  
  10.  * 逻辑回归封装算法  
  11.  * Labels used in Logistic Regression should be {0, 1, ..., k - 1} for k classes multi-label classification problem  
  12.  * 输入参数:  
  13.  * testOrNot : 是否是测试,正常情况设置为false  
  14.  * input:输出数据;  
  15.  * minPartitions : 输入数据最小partition个数  
  16.  * output:输出路径  
  17.  * targetIndex:目标列所在下标,从1开始  
  18.  * splitter:数据分隔符;  
  19.  * method:使用逻辑回归算法:"SGD" or "LBFGS"  
  20.  * hasIntercept : 是否具有截距  
  21.  * numClasses: 目标列类别个数;  
  22.  * Created by fanzhe on 2016/12/19.  
  23.  */  
  24. object LogisticRegression {  
  25.   
  26.    def main (args: Array[String]) {  
  27.     if(args.length != 9){  
  28.       println("Usage: com.fz.classification.LogisticRegression testOrNot input minPartitions output targetIndex " +  
  29.         "splitter method hasIntercept numClasses")  
  30.       System.exit(-1)  
  31.     }  
  32.      val testOrNot = args(0).toBoolean // 是否是测试,sparkContext获取方式不一样, true 为test  
  33.      val input = args(1)  
  34.      val minPartitions = args(2).toInt  
  35.      val output = args(3)  
  36.      val targetIndex = args(4).toInt // 从1开始,不是从0开始要注意  
  37.      val splitter = args(5)  
  38.      val method = args(6) //should be "SGD" or "LBFGS"  
  39.      val hasIntercept = args(7).toBoolean  
  40.      val numClasses = args(8).toInt  
  41.   
  42.      val sc =  Utils.getSparkContext(testOrNot,"Logistic Create Model")  
  43.   
  44.      // construct data  
  45.      // Load and parse the data  
  46.      val training = Utils.getLabeledPointData(sc,input,minPartitions,splitter,targetIndex).cache()  
  47.   
  48.      // Run training algorithm to build the model  
  49.      val model = method match {  
  50.        case "SGD" => new LogisticRegressionWithSGD()  
  51.          .setIntercept(hasIntercept)  
  52.          .run(training)  
  53.        case "LBFGS" => new LogisticRegressionWithLBFGS().setNumClasses(numClasses)  
  54.          .setIntercept(hasIntercept)  
  55.          .run(training)  
  56.        case _ => throw new RuntimeException("no method")  
  57.      }  
  58.      // save model  
  59.   
  60.      model.save(sc,output)  
  61.   
  62.      sc.stop()  
  63.   }  
  64. }  
在上面的代码中,有对每个参数的解释,包括参数的含义,参数有哪些参数等;
在Main函数中,首先对各个参数进行获取并赋值变量,接着就是获取SparkContext;
其中,最重要的部分就是调用Spark自己封装的LogisticRegressionWithSGD 或 LogisticRegressionWithBFGS类进行逻辑回归建模;
最后,调用模型的save方法,把模型固化到HDFS上;
基本,所有的算法封装都采取这种模式,及对Spark MLlib中原生的算法再加一层封装。

2. 测试
测试主要使用JUnit进行测试,其逻辑回归示例代码如下:
代码清单3-2 逻辑回归算法封装测试(Scala)
[plain] view plain copy
  1. package com.fz.classification  
  2.   
  3. import java.io.File  
  4.   
  5. import com.fz.util.Utils  
  6. import org.junit.{Assert, Test}  
  7. import Assert._  
  8. /**  
  9.  * 测试Logistics Regression算法  
  10.  * Created by fanzhe on 2016/12/19.  
  11.  */  
  12. @Test  
  13. class LogisticRegressionTest {  
  14.   
  15.   @Test  
  16.   def testMain1()={  
  17. //    testOrNot input output targetIndex splitter method hasIntercept numClasses  
  18.     val args = Array(  
  19.       "true",  
  20.       "./src/data/classification_regression/logistic.dat",  
  21.       "2",  
  22.       "./target/logistic/tmp1",  
  23.       "1",  
  24.       " ",  
  25.       "SGD",  
  26.       "true",  
  27.       "2" // this parameter is useless  
  28.     )  
  29.     // 删除输出目录  
  30.     Utils.deleteOutput(args(3))  
  31.     LogisticRegression.main(args)  
  32.     assertTrue(Utils.fileContainsClassName(args(3)+"/metadata/part-00000",  
  33.       "org.apache.spark.mllib.classification.LogisticRegressionModel"))  
  34.   }  
  35.   
  36.   @Test  
  37.   def testMain2()={  
  38.     //    testOrNot input minPartitions output targetIndex splitter method hasIntercept numClasses  
  39.     val args = Array(  
  40.       "true",  
  41.       "./src/data/classification_regression/logistic.dat",  
  42.       "2",  
  43.       "./target/logistic/tmp2",  
  44.       "1",  
  45.       " ",  
  46.       "LBFGS",  
  47.       "true",  
  48.       "2"  
  49.     )  
  50.     // 删除输出目录  
  51.     Utils.deleteOutput(args(3))  
  52.     LogisticRegression.main(args)  
  53.     assertTrue(Utils.fileContainsClassName(args(3)+"/metadata/part-00000",  
  54.       "org.apache.spark.mllib.classification.LogisticRegressionModel"))  
  55.   }  
  56. }  
这里面的方法都是第一步先构建算法参数;接着调用main方法;第三步,查看输出中是否具有模型的相关信息;
当然,这里面还可以添加多个测试方法,使用不同的算法参数或数据进行测试;(读者可自行添加)

3.2 Spark算法调用工程:

3.2.1 界面介绍

1. 首页


在系统首页有对该系统实现算法的介绍,系统主要功能有:
1)集群参数维护:主要是底层使用的Hadoop集群的参数配置,每次配置完成后,不仅仅会更新数据库对应记录,而且会更新Hadoop Configuration的获取;
2)监控:主要指Spark任务运行在YARN资源管理器下的任务状态监控;
3)文件上传及预览:文件上传主要是上传本地测试数据到HDFS上,方便页面进行测试;而预览则是查看HDFS上面的数据;
4)分类与回归/协同过滤/聚类/降维/关联规则:各个种类算法下面的每个算法的调用建模页面;
2. 集群参数页面:

点击初始化,会把各个参数固定写入到后台数据库中,用户可以根据自己集群的配置不同,而进行参数修改,而每次修改也会刷新Hadoop 中Configuration的获取;
3. 监控:

监控页面,会监控用户提交的SPark任务的运行状态,如果任务失败,则会显示异常信息(代码中只是截取了部分信息,需要进行调整,看如何可以得出重要的信息,直接显示);后面会有具体实现过程分析。
4. 文件上传:

文件上传有两个功能:1)可指定一个本地目录和一个HDFS目录,然后把数据从本地上传到HDFS中;2)直接选择对应算法的数据,然后进行初始化,这个是把本地工程路径src/main/data中的对应数据上传到HDFS中的固定目录中;这两个上传的数据都可以在后面的算法建模中进行使用。
还有一点需要注意:被写入的HDFS路径是需要具有写权限的,而用户则是启动Tomcat的用户;
5. 文件查看

文件查看功能只能查看Text编码的文件,即文本文件,同时可以输入行号,即可进行文件内容的读取;
6. 逻辑回归算法:

在逻辑回归算法界面,输入算法参数,点击提交,如果任务提交成功,即可在下面看到任务提交的ID,如果提交失败(即任务ID获取不到),同样有对应的提示信息;
同时,在任务提交后,在监控界面同样可以观察到该任务的状态,通过刷新即可获得最新的任务状态;

7. 其他算法与逻辑回归算法类似

3.2.2 架构

系统架构图如下所示(算法调用及监控):

流程描述如下:
1. 前台界面设置参数,包括算法数据、算法参数等,然后提交任务;
2. 任务提交后,CloudAction接收后,会发起一个线程,该线程会启动Hadoop上的一个Job,该Job有一个返回值,为任务ID,如果任务提交失败,则返回null;
3. 初级监控状态:CloudAction发起线程后,主线程阻塞,等待hadoop任务线程返回值,根据返回值状态,前台返回任务提交成功或失败;
4. 在3的同时,即可通过DBService来更新数据库相应表JobInfo的状态;
5. 在monitor.html界面,通过刷新按钮即可及时获取Hadoop任务状态(有相应的服务,见下文介绍),并更新数据库相关数据,返回前台所有任务信息;

3.2.3 部分实现细节

1. Spark提交任务

2. monitor实时查询任务状态列表
monitor实时查询任务状态列表其流程描述如下:
[plain] view plain copy
  1. 1) 获取JobInfo中最新的records条记录;  
  2. 2) 查找其中isFinished字段为false的数据;  
  3. 3) 根据2)中查找的数据,去YARN获取其实时状态,并更新1)中的数据,然后存入数据库中;  
  4. 4) 根据row和page字段分页返回JSON数据;  
其代码如下所示:
代码清单3-3 更新监控任务列表
  1. public void getJobInfo(){  
  2.         Map<String ,Object> jsonMap = new HashMap<String,Object>();  
  3.         // 1.  
  4.         List<Object> jobInfos = dBService.getLastNRows("JobInfo","jobId",true,records);  
  5.         // 2,3  
  6.         List<Object> list = null;  
  7.         try {  
  8.             list = HUtils.updateJobInfo(jobInfos);  
  9.             if(list != null || list.size()>0) {  
  10.                 dBService.updateTableData(list);  
  11.             }  
  12.         }catch (Exception e){  
  13.             e.printStackTrace();  
  14.             log.warn("更新任务状态异常!");  
  15.             jsonMap.put("total", 0);  
  16.             jsonMap.put("rows", null);  
  17.             Utils.write2PrintWriter(JSON.toJSONString(jsonMap));  
  18.             return ;  
  19.         }  
  20.         // 4.  
  21.         jsonMap.put("total",list.size());  
  22.         jsonMap.put("rows",Utils.getSubList(list,page,rows));  
  23.         Utils.write2PrintWriter(JSON.toJSONString(jsonMap));  
  24.     }  
第一步通过dBService获取给定records个记录;第二步则更新这些记录;看下HUtils.updateJobInfo的实现:
代码清单3-4 获取任务最新状态
  1. public static List<Object> updateJobInfo(List<Object> jobInfos)throws YarnException,IOException{  
  2.         List<Object> list = new ArrayList<>();  
  3.         JobInfo jobInfo;  
  4.         for(Object o :jobInfos){  
  5.             jobInfo = (JobInfo) o;  
  6.             if(!jobInfo.isFinished()){ // 如果没有完成,则检查其最新状态  
  7.                 ApplicationReport appReport=null;  
  8.                 try {  
  9.                    appReport = getClient().getApplicationReport(SparkUtils.getAppId(jobInfo.getJobId()));  
  10.                 } catch (YarnException  | IOException e) {  
  11.                     e.printStackTrace();  
  12.                     throw e;  
  13.                 }  
  14.                 /** 
  15.                  * NEW, 0 
  16.                  NEW_SAVING, 1 
  17.                  SUBMITTED, 2 
  18.                  ACCEPTED, 3 
  19.                  RUNNING, 4 
  20.                  FINISHED, 5 
  21.                  FAILED, 6 
  22.                  KILLED; 7 
  23.                  */  
  24.                 switch (appReport.getYarnApplicationState().ordinal()){  
  25.                     case 0 | 1 | 2 |3 : // 都更新为Accepted状态  
  26.                         jobInfo.setRunState(JobState.ACCETPED);  
  27.                         break;  
  28.                     case 4 :  
  29.                         jobInfo.setRunState(JobState.RUNNING);break;  
  30.                     case 5:  
  31. //                        UNDEFINED,  
  32. //                                SUCCEEDED,  
  33. //                                FAILED,  
  34. //                                KILLED;  
  35.                         switch (appReport.getFinalApplicationStatus().ordinal()){  
  36.                             case 1: jobInfo.setRunState(JobState.SUCCESSED);  
  37.                             SparkUtils.cleanupStagingDir(jobInfo.getJobId());  
  38.                             jobInfo.setFinished(true);break;  
  39.                             case 2:  
  40.                                 jobInfo.setRunState(JobState.FAILED);  
  41.                                 SparkUtils.cleanupStagingDir(jobInfo.getJobId());  
  42.                                 jobInfo.setErrorInfo(appReport.getDiagnostics().substring(0,Utils.EXCEPTIONMESSAGELENGTH));  
  43.                                 jobInfo.setFinished(true);break;  
  44.                             case 3:  
  45.                                 jobInfo.setRunState(JobState.KILLED);  
  46.                                 SparkUtils.cleanupStagingDir(jobInfo.getJobId());  
  47.                                 jobInfo.setFinished(true);break;  
  48.                             default: log.warn("App:" + jobInfo.getJobId() + "获取任务状态异常! " +  
  49.                                     "appReport.getFinalApplicationStatus():"+appReport.getFinalApplicationStatus().name()  
  50.                             +",ordinal:"+ appReport.getFinalApplicationStatus().ordinal());  
  51.                         }  
  52.                         break;  
  53.                     case 6:  
  54.                         jobInfo.setRunState(JobState.FAILED);  
  55.                         SparkUtils.cleanupStagingDir(jobInfo.getJobId());  
  56.                         jobInfo.setErrorInfo(appReport.getDiagnostics().substring(0,Utils.EXCEPTIONMESSAGELENGTH));  
  57.                         jobInfo.setFinished(true);break;  
  58.                     case 7:  
  59.                         jobInfo.setRunState(JobState.KILLED);  
  60.                         SparkUtils.cleanupStagingDir(jobInfo.getJobId());  
  61.                         jobInfo.setFinished(true);break;  
  62.                     default: log.warn("App:" + jobInfo.getJobId() + "获取任务状态异常!"+  
  63.                             "appReport.getYarnApplicationState():"+appReport.getYarnApplicationState().name()  
  64.                             +",ordinal:"+ appReport.getYarnApplicationState().ordinal());  
  65.                 }  
  66.                 jobInfo.setModifiedTime(new Date());  
  67.             }  
  68.             list.add(jobInfo);// 把更新后的或原始的JobInfo添加到list中  
  69.         }  
  70.   
  71.         return list;  
  72.     }  
这里的工作就是根据数据库中任务的状态,只查询任务没有完成的任务的最新状态,并更新原始任务状态,最后把更新后的或者原始任务添加到list中,并返回;
在代码清单3-3中,返回更新后的list后,接着调用了DBService.updateTableData,对数据进行固化;最后,使用subList对list进行截取,返回给前台某个分页的数据。

4. Spark算法调用工程后续开发:

不得不说,这个版本的工程还是没有开发完成的,那如果你想接着来开发,一般流程是怎样的呢?
。。。
[plain] view plain copy
  1. 1)编写src/main/java/下算法对应的Thread;  
  2.    2)编写webapp下的对应页面;  
  3.    3)编写webapp/js下对应的js;  
  4.    4)修改webapp/preprocess/upload.jsp,添加一条数据上传记录,并在main/data下添加对应的数据;  
  5.    5)启动工程,在页面上传数据,然后选择算法,设置参数,即可提交任务,提交任务后在监控界面即可看到算法运行状态;  

工程状态(假设Scala工程为工程1,调用Spark算法工程为工程2):
工程1
基本封装了Spark Mllib中的数据挖掘相关算法,包括聚类、分类、回归、协同过滤、降维、频繁集挖掘(这个还有点问题);
工程2
目前只做了分类和回归算法的相关页面以及调用;

所以,如果你要在这个版本上开发,那么可以参考上面的流程先试着编写ALS算法的调用即可。

5. 总结

1. Spark算法调用工程还有很多页面没有完成,这个是类似重复性工作,并没有难点需要克服;
2. Spark算法调用工程中针对每个算法,本来是想在其算法调用界面加上其数据描述、算法描述、参数描述的,不过暂时还没有添加,but这些信息在Scala算法封装工程里面都有;
3. 关于使用SPARK ON YARN的方式调用Spark算法,并使用YARN来管理任务的流程基本在Spark算法调用工程中体现淋漓尽致了,再多也玩不出花儿了,所以如果有想学习研究这块内容的,则工程是一个很好的参考;
4. 之前对于分类算法这块是想加算法对比分析的,然后再加上些图表之类的展示,这样就显得更加高大上了,不过目前只进行了一步,就是写了个分类算法评估的Scala封装算法;
5. 可以考虑一些流程性的定时任务之类的加入到工程中,这样其实有点像Oozie了,不过为什么Oozie里面没有直接拖拽界面或流程任务监控管理的东西,如果有的话其实就更加像一个商业的软件了(Kettle);
6. 关于SSH框架其实我是比较弱的,所以里面应用ssh的地方只是简单的应用(比如说在返回分页的时候,我直接用的是subList,这个应该是不妥的);
7. 关于前台页面展现,我也是比较弱的,所以界面风格或单页的相关信息显示之类的,看着还不能做到赏心悦目;
8. The Code is free ,just enjoy!



分享,成长,快乐


脚踏实地,专注


转载请注明blog地址:http://blog.csdn.net/fansy1990


    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多