分享

直接上代码了

 昵称23016082 2017-04-10

package com.kingbase.kmeansDemo

import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.mllib.clustering.KMeansModel

object Kmeans_result {
  def main(args: Array[String]) {
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    //val resPath = "hdfs://node42:8020 /user/data/vsm/output/result"
    //val data = sc.textFile(resPath)
    val data = sc.textFile(args(0) + args(1))
    val parsedData1 = data.map(x => (x.split("\t")(0),x.split("\t")(1)))
   
    //内容
    //val parsedData2 = parsedData1.map( s =>(s._2.split(" "))).map(x=>x.mkString(" "))
    val parsedData2 = parsedData1.map{ s =>
      var allLine = ""
      val line = s._2.split(" ").foreach{ x =>
         allLine += x.split(",")(1)+" "//
      }
      allLine
    }
    //文件名
    //val parsedData3 = parsedData1.map( s =>(s._1))
    //kmeans 需要的矩阵
    val parsedData4 = parsedData2.map(s => Vectors.dense(s.split(' ').map(_.trim.toDouble))).cache()
   
    //设置簇的个数为3
    val numClusters = args(2).toInt
    //迭代20次
    val numIterations = 20
    //运行10次,选出最优解
    val runs = 10
    val clusters = KMeans.train(parsedData4,numClusters,numIterations,runs)
    val WSSSE = clusters.computeCost(parsedData4)

   
    //val pathWC = "hdfs://node42:8020/user/data/kmeans/output/pathWC"
    val pathWC = args(0) + "/user/data/kmeans/output/pathWC"
    val rdd = sc.makeRDD(List(numClusters.toString + " --> " +WSSSE.toString))
    rdd.saveAsTextFile(pathWC)
     
    val pathRes   = args(0) + "/user/data/kmeans/output/test"
    val pathCount = args(0) + "/user/data/kmeans/output/count"
    val pathKey   = args(0) + "/user/data/kmeans/output/key"
   
    val result = parsedData1.map { lines =>
      val filename = lines._1
      var allLine = ""
      val buffer = lines._2.split(" ").foreach{ x =>
        allLine += x.split(",")(1)+" "
      }
      val line = Vectors.dense(allLine.split(" ").map(_.trim.toDouble))
      val res = clusters.predict(line)
      filename + " "+lines._2+" "+ res
    }.saveAsTextFile(pathRes)
   
    //filename + \t lines._2 \t + res    =====>  统计每个分类的数量
    val data2 = sc.textFile(pathRes)
    val count1 = data2.map(x => (x.split("\t")(0),x.split("\t")(1),x.split("\t")(2)))
    val count2 = count1.map(x => (x._3,1)).reduceByKey(_+_).saveAsTextFile(pathCount)
   
    //==============================================
    val data3 = count1.map(x => (x._3,x._2)).reduceByKey{(x,y) =>
       x + " " + y
    }.map{ x =>
      var map: Map[String,Double] = Map()
      x._2.split(" ").foreach{ allKV =>
        val kv = allKV.split(",")
        if(kv.size == 2){ ////***
          val k = kv(0)
        val v = kv(1).trim.toDouble
        if(map.contains(k)){
          map += ( k -> (map(k) + v) )
          }
        else{
          map += (k -> v)
          }
          }
        }
      val key = map.toSeq.sortWith(_._2>_._2) //降序排序 value
      (x._1,key)
    }.map{ x =>
      var map: Map[String,Double] = Map()
      var newString = ""
      x._2.foreach{ size =>
         map += (size._1 -> size._2)
         if(map.size == 10){
          newString += (size._1 + "," + size._2 + "@")
          return
         }else{
          newString += (size._1 + "," + size._2 + " ")
         }
      }
      (x._1,newString)
    }.map{ x =>
      (x._1,x._2.split("@")(0))
    }.saveAsTextFile(pathKey)

  }
 
}

 


 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多