分享

大数据IMF传奇行动绝密课程第19课:Spark高级排序彻底解密

 看风景D人 2019-02-24

基础排序算法实战
二次排序算法实战
更高级排序算法
排序算法内幕解密

sc.setLogLevel("WARN")

基础排序算法:

sc.textFile().flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_, 1).map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1)).collect

所谓二次排序,就是指,排序的时候考虑两个维度

2 3
4 1
3 2
4 3
9 7
2 1

构造器要有val,因为要做个成员
Scala实现

package com.tom.spark

import org.apache.spark.{SparkConf, SparkContext}

class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable{
  override def compare(other: SecondarySortKey): Int = {
    if(this.first - other.first != 0) {this.first - other.first}
    else {this.second - other.second}
  }
}

object SecondarySortKey {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SecondarySortKey").setMaster("local")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("F:/helloSpark2.txt")
    val pairWithSortKey = lines.map(line => {
      (new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt), line)
    }
    )
    val sorted = pairWithSortKey.sortByKey()

    val sortedResult = sorted.map(pair => pair._2)

    sortedResult.collect().foreach(println)
  }
}

java实现

/**
 * SecondarySortKey.java
 */
package com.tom.spark.SparkApps.cores;

import java.io.Serializable;

import scala.math.Ordered;

/**
 * 自定义二次排序的Key
 */
public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable{

    //需要二次排序的Key
    private int first;
    private int second;

    //二次排序的公开构造器
    public SecondarySortKey(int first, int second) {
        this.first = first;
        this.second = second;
    }


    public int getFirst() {
        return first;
    }


    public void setFirst(int first) {
        this.first = first;
    }


    public int getSecond() {
        return second;
    }


    public void setSecond(int second) {
        this.second = second;
    }


    public boolean $greater(SecondarySortKey other) {
        // TODO Auto-generated method stub
        if(this.first > other.getSecond())
            return true;
        else if(this.first == other.getFirst() && this.second > other.getSecond())
            return true;
        else return false;
    }
    public boolean $greater$eq(SecondarySortKey other) {
        // TODO Auto-generated method stub
        if($greater(other))
            return true;
        else if ( this.first == other.getFirst() && this.second == other.second)
            return true;
        else return false;
    }
    public boolean $less(SecondarySortKey other) {
        // TODO Auto-generated method stub
        return !$greater$eq(other);
    }
    public boolean $less$eq(SecondarySortKey other) {
        // TODO Auto-generated method stub
        return !$greater(other);
    }
    public int compare(SecondarySortKey other) {
        // TODO Auto-generated method stub
        if(this.first != other.getFirst())
            return this.first - other.getFirst();
        else return this.second - other.getSecond();
    }
    public int compareTo(SecondarySortKey other) {
        // TODO Auto-generated method stub
        if(this.first != other.getFirst())
            return this.first - other.getFirst();
        else return this.second - other.getSecond();
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + first;
        result = prime * result + second;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        SecondarySortKey other = (SecondarySortKey) obj;
        if (first != other.first)
            return false;
        if (second != other.second)
            return false;
        return true;
    }


    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub

    }
}
/**
 * SecondarySortKeyApp.java
 */
package com.tom.spark.SparkApps;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

import com.tom.spark.SparkApps.cores.SecondarySortKey;

/**
 * 二次排序,具体实现步骤:
 * 第一步:按照Ordered和Serializable接口实现自定义排序的Key
 * 第二步:将要排序的二次排序的文件加载进<Key, Value>类型的RDD
 * 第三步:使用sortByKey基于自定义的Key进行二次排序
 * 第四步:去除掉排序的Key,只保留排序后的结果
 *
 */
public class SecondarySortKeyApp {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        SparkConf conf = new SparkConf().setAppName("SecondarySortKeyApp").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> line = sc.textFile("F:/helloSpark2.txt",1);
        JavaPairRDD<SecondarySortKey, String> pairs = line.mapToPair(new PairFunction<String, SecondarySortKey, String>() {

            public Tuple2<SecondarySortKey, String> call(String line)
                    throws Exception {
                // TODO Auto-generated method stub

                return new Tuple2<SecondarySortKey, String>(new SecondarySortKey(Integer.valueOf(line.split(" ")[0]), Integer.valueOf(line.split(" ")[1])), line);
            }           
        });
        JavaPairRDD<SecondarySortKey, String> sortedPairs = pairs.sortByKey(false); //完成二次排序

        //过滤掉排序后自定的Key,保留排序的结果
        JavaRDD<String> values = sortedPairs.map(new Function<Tuple2<SecondarySortKey,String>, String>() {

            public String call(Tuple2<SecondarySortKey, String> pair)
                    throws Exception {
                // TODO Auto-generated method stub
                return pair._2;
            }
        });
        values.foreach(new VoidFunction<String>() {

            public void call(String line) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(line);
            }
        });
        sc.close();
    }
}

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多