基础排序算法实战 二次排序算法实战 更高级排序算法 排序算法内幕解密 sc.setLogLevel("WARN")
基础排序算法: sc.textFile().flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_, 1).map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1)).collect
所谓二次排序,就是指,排序的时候考虑两个维度 2 3 4 1 3 2 4 3 9 7 2 1 构造器要有val,因为要做个成员 Scala实现 package com.tom.spark
import org.apache.spark.{SparkConf, SparkContext}
class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable{
override def compare(other: SecondarySortKey): Int = {
if(this.first - other.first != 0) {this.first - other.first}
else {this.second - other.second}
}
}
object SecondarySortKey {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SecondarySortKey").setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("F:/helloSpark2.txt")
val pairWithSortKey = lines.map(line => {
(new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt), line)
}
)
val sorted = pairWithSortKey.sortByKey()
val sortedResult = sorted.map(pair => pair._2)
sortedResult.collect().foreach(println)
}
}
java实现 /**
* SecondarySortKey.java
*/
package com.tom.spark.SparkApps.cores;
import java.io.Serializable;
import scala.math.Ordered;
/**
* 自定义二次排序的Key
*/
public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable{
//需要二次排序的Key
private int first;
private int second;
//二次排序的公开构造器
public SecondarySortKey(int first, int second) {
this.first = first;
this.second = second;
}
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
public boolean $greater(SecondarySortKey other) {
// TODO Auto-generated method stub
if(this.first > other.getSecond())
return true;
else if(this.first == other.getFirst() && this.second > other.getSecond())
return true;
else return false;
}
public boolean $greater$eq(SecondarySortKey other) {
// TODO Auto-generated method stub
if($greater(other))
return true;
else if ( this.first == other.getFirst() && this.second == other.second)
return true;
else return false;
}
public boolean $less(SecondarySortKey other) {
// TODO Auto-generated method stub
return !$greater$eq(other);
}
public boolean $less$eq(SecondarySortKey other) {
// TODO Auto-generated method stub
return !$greater(other);
}
public int compare(SecondarySortKey other) {
// TODO Auto-generated method stub
if(this.first != other.getFirst())
return this.first - other.getFirst();
else return this.second - other.getSecond();
}
public int compareTo(SecondarySortKey other) {
// TODO Auto-generated method stub
if(this.first != other.getFirst())
return this.first - other.getFirst();
else return this.second - other.getSecond();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + first;
result = prime * result + second;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SecondarySortKey other = (SecondarySortKey) obj;
if (first != other.first)
return false;
if (second != other.second)
return false;
return true;
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
}
}
/**
* SecondarySortKeyApp.java
*/
package com.tom.spark.SparkApps;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import com.tom.spark.SparkApps.cores.SecondarySortKey;
/**
* 二次排序,具体实现步骤:
* 第一步:按照Ordered和Serializable接口实现自定义排序的Key
* 第二步:将要排序的二次排序的文件加载进<Key, Value>类型的RDD
* 第三步:使用sortByKey基于自定义的Key进行二次排序
* 第四步:去除掉排序的Key,只保留排序后的结果
*
*/
public class SecondarySortKeyApp {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf conf = new SparkConf().setAppName("SecondarySortKeyApp").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> line = sc.textFile("F:/helloSpark2.txt",1);
JavaPairRDD<SecondarySortKey, String> pairs = line.mapToPair(new PairFunction<String, SecondarySortKey, String>() {
public Tuple2<SecondarySortKey, String> call(String line)
throws Exception {
// TODO Auto-generated method stub
return new Tuple2<SecondarySortKey, String>(new SecondarySortKey(Integer.valueOf(line.split(" ")[0]), Integer.valueOf(line.split(" ")[1])), line);
}
});
JavaPairRDD<SecondarySortKey, String> sortedPairs = pairs.sortByKey(false); //完成二次排序
//过滤掉排序后自定的Key,保留排序的结果
JavaRDD<String> values = sortedPairs.map(new Function<Tuple2<SecondarySortKey,String>, String>() {
public String call(Tuple2<SecondarySortKey, String> pair)
throws Exception {
// TODO Auto-generated method stub
return pair._2;
}
});
values.foreach(new VoidFunction<String>() {
public void call(String line) throws Exception {
// TODO Auto-generated method stub
System.out.println(line);
}
});
sc.close();
}
}
|