package com.kingbase.kmeansDemo import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.mllib.clustering.KMeansModel object Kmeans_result { def main(args: Array[String]) { val conf = new SparkConf() val sc = new SparkContext(conf) //val resPath = "hdfs://node42:8020 /user/data/vsm/output/result" //val data = sc.textFile(resPath) val data = sc.textFile(args(0) + args(1)) val parsedData1 = data.map(x => (x.split("\t")(0),x.split("\t")(1))) //内容 //val parsedData2 = parsedData1.map( s =>(s._2.split(" "))).map(x=>x.mkString(" ")) val parsedData2 = parsedData1.map{ s => var allLine = "" val line = s._2.split(" ").foreach{ x => allLine += x.split(",")(1)+" "// } allLine } //文件名 //val parsedData3 = parsedData1.map( s =>(s._1)) //kmeans 需要的矩阵 val parsedData4 = parsedData2.map(s => Vectors.dense(s.split(' ').map(_.trim.toDouble))).cache() //设置簇的个数为3 val numClusters = args(2).toInt //迭代20次 val numIterations = 20 //运行10次,选出最优解 val runs = 10 val clusters = KMeans.train(parsedData4,numClusters,numIterations,runs) val WSSSE = clusters.computeCost(parsedData4) //val pathWC = "hdfs://node42:8020/user/data/kmeans/output/pathWC" val pathWC = args(0) + "/user/data/kmeans/output/pathWC" val rdd = sc.makeRDD(List(numClusters.toString + " --> " +WSSSE.toString)) rdd.saveAsTextFile(pathWC) val pathRes = args(0) + "/user/data/kmeans/output/test" val pathCount = args(0) + "/user/data/kmeans/output/count" val pathKey = args(0) + "/user/data/kmeans/output/key" val result = parsedData1.map { lines => val filename = lines._1 var allLine = "" val buffer = lines._2.split(" ").foreach{ x => allLine += x.split(",")(1)+" " } val line = Vectors.dense(allLine.split(" ").map(_.trim.toDouble)) val res = clusters.predict(line) filename + " "+lines._2+" "+ res }.saveAsTextFile(pathRes) //filename + \t lines._2 \t + res =====> 统计每个分类的数量 val data2 = sc.textFile(pathRes) val count1 = data2.map(x => (x.split("\t")(0),x.split("\t")(1),x.split("\t")(2))) val count2 = count1.map(x => (x._3,1)).reduceByKey(_+_).saveAsTextFile(pathCount) //============================================== val data3 = count1.map(x => (x._3,x._2)).reduceByKey{(x,y) => x + " " + y }.map{ x => var map: Map[String,Double] = Map() x._2.split(" ").foreach{ allKV => val kv = allKV.split(",") if(kv.size == 2){ ////*** val k = kv(0) val v = kv(1).trim.toDouble if(map.contains(k)){ map += ( k -> (map(k) + v) ) } else{ map += (k -> v) } } } val key = map.toSeq.sortWith(_._2>_._2) //降序排序 value (x._1,key) }.map{ x => var map: Map[String,Double] = Map() var newString = "" x._2.foreach{ size => map += (size._1 -> size._2) if(map.size == 10){ newString += (size._1 + "," + size._2 + "@") return }else{ newString += (size._1 + "," + size._2 + " ") } } (x._1,newString) }.map{ x => (x._1,x._2.split("@")(0)) }.saveAsTextFile(pathKey) } }
|