需求分析和技术架构广告点击系统实时分析 广告来自于广告或者移动App等,广告需要设定在具体的广告位,当用户点击广告的时候,一般都会通过ajax或Socket往后台发送日志数据,在这里我们是要做基于SparkStreaming做实时在线统计。那么数据就需要放进消息系统(Kafka)中,我们的Spark Streaming应用程序就会去Kafka中Pull数据过来进行计算和消费,并把计算后的数据放入到持久化系统中(MySQL) 广告点击系统实时分析的意义:因为可以在线实时的看见广告的投放效果,就为广告的更大规模的投入和调整打下了坚实的基础,从而为公司带来最大化的经济回报。 核心需求: 1、实时黑名单动态过滤出有效的用户广告点击行为:因为黑名单用户可能随时出现,所以需要动态更新; 2、在线计算广告点击流量; 3、Top3热门广告; 4、每个广告流量趋势; 5、广告点击用户的区域分布分析 6、最近一分钟的广告点击量; 7、整个广告点击Spark Streaming处理程序7*24小时运行; 数据格式: 时间、用户、广告、城市等 技术细节: 在线计算用户点击的次数分析,屏蔽IP等; 使用updateStateByKey或者mapWithState进行不同地区广告点击排名的计算; Spark Streaming+Spark SQL+Spark Core等综合分析数据; 使用Window类型的操作; 高可用和性能调优等等; 流量趋势,一般会结合DB等; Spark Core 
/**
*
*/
package com.tom.spark.SparkApps.sparkstreaming;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* 数据生成代码,Kafka Producer产生数据
*/
public class MockAdClickedStat {
/**
* @param args
*/
public static void main(String[] args) {
final Random random = new Random();
final String[] provinces = new String[]{"Guangdong", "Zhejiang", "Jiangsu", "Fujian"};
final Map<String, String[]> cities = new HashMap<String, String[]>();
cities.put("Guangdong", new String[]{"Guangzhou", "Shenzhen", "Dongguan"});
cities.put("Zhejiang", new String[]{"Hangzhou", "Wenzhou", "Ningbo"});
cities.put("Jiangsu", new String[]{"Nanjing", "Suzhou", "Wuxi"});
cities.put("Fujian", new String[]{"Fuzhou", "Xiamen", "Sanming"});
final String[] ips = new String[] {
"192.168.112.240",
"192.168.112.239",
"192.168.112.245",
"192.168.112.246",
"192.168.112.247",
"192.168.112.248",
"192.168.112.249",
"192.168.112.250",
"192.168.112.251",
"192.168.112.252",
"192.168.112.253",
"192.168.112.254",
};
/**
* Kafka相关的基本配置信息
*/
Properties kafkaConf = new Properties();
kafkaConf.put("serializer.class", "kafka.serializer.StringEncoder");
kafkaConf.put("metadeta.broker.list", "Master:9092,Worker1:9092,Worker2:9092");
ProducerConfig producerConfig = new ProducerConfig(kafkaConf);
final Producer<Integer, String> producer = new Producer<Integer, String>(producerConfig);
new Thread(new Runnable() {
public void run() {
while(true) {
//在线处理广告点击流的基本数据格式:timestamp、ip、userID、adID、province、city
Long timestamp = new Date().getTime();
String ip = ips[random.nextInt(12)]; //可以采用网络上免费提供的ip库
int userID = random.nextInt(10000);
int adID = random.nextInt(100);
String province = provinces[random.nextInt(4)];
String city = cities.get(province)[random.nextInt(3)];
String clickedAd = timestamp + "\t" + ip + "\t" + userID + "\t" + adID + "\t" + province + "\t" + city;
producer.send(new KeyedMessage<Integer, String>("AdClicked", clickedAd));
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}).start();
}
}
package com.tom.spark.SparkApps.sparkstreaming;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.KafkaUtils;
import com.google.common.base.Optional;
import scala.Tuple2;
/**
* 数据处理,Kafka消费者
*/
public class AdClickedStreamingStats {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
//好处:1、checkpoint 2、工厂
final SparkConf conf = new SparkConf().setAppName("SparkStreamingOnKafkaDirect").setMaster("hdfs://Master:7077/");
final String checkpointDirectory = "hdfs://Master:9000/library/SparkStreaming/CheckPoint_Data";
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
public JavaStreamingContext create() {
// TODO Auto-generated method stub
return createContext(checkpointDirectory, conf);
}
};
/**
* 可以从失败中恢复Driver,不过还需要指定Driver这个进程运行在Cluster,并且在提交应用程序的时候制定--supervise;
*/
JavaStreamingContext javassc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
/**
* 第三步:创建Spark Streaming输入数据来源input Stream:
* 1、数据输入来源可以基于File、HDFS、Flume、Kafka、Socket等
* 2、在这里我们指定数据来源于网络Socket端口,Spark Streaming连接上该端口并在运行的时候一直监听该端口的数据
* (当然该端口服务首先必须存在),并且在后续会根据业务需要不断有数据产生(当然对于Spark Streaming
* 应用程序的运行而言,有无数据其处理流程都是一样的)
* 3、如果经常在每间隔5秒钟没有数据的话不断启动空的Job其实会造成调度资源的浪费,因为并没有数据需要发生计算;所以
* 实际的企业级生成环境的代码在具体提交Job前会判断是否有数据,如果没有的话就不再提交Job;
*/
//创建Kafka元数据来让Spark Streaming这个Kafka Consumer利用
Map<String, String> kafkaParameters = new HashMap<String, String>();
kafkaParameters.put("metadata.broker.list", "Master:9092,Worker1:9092,Worker2:9092");
Set<String> topics = new HashSet<String>();
topics.add("SparkStreamingDirected");
JavaPairInputDStream<String, String> adClickedStreaming = KafkaUtils.createDirectStream(javassc,
String.class, String.class,
StringDecoder.class, StringDecoder.class,
kafkaParameters,
topics);
/**因为要对黑名单进行过滤,而数据是在RDD中的,所以必然使用transform这个函数;
* 但是在这里我们必须使用transformToPair,原因是读取进来的Kafka的数据是Pair<String,String>类型,
* 另一个原因是过滤后的数据要进行进一步处理,所以必须是读进的Kafka数据的原始类型
*
* 在此再次说明,每个Batch Duration中实际上讲输入的数据就是被一个且仅被一个RDD封装的,你可以有多个
* InputDStream,但其实在产生job的时候,这些不同的InputDStream在Batch Duration中就相当于Spark基于HDFS
* 数据操作的不同文件来源而已罢了。
*/
JavaPairDStream<String, String> filteredadClickedStreaming = adClickedStreaming.transformToPair(new Function<JavaPairRDD<String,String>, JavaPairRDD<String,String>>() {
public JavaPairRDD<String, String> call(
JavaPairRDD<String, String> rdd) throws Exception {
/**
* 在线黑名单过滤思路步骤:
* 1、从数据库中获取黑名单转换成RDD,即新的RDD实例封装黑名单数据;
* 2、然后把代表黑名单的RDD的实例和Batch Duration产生的RDD进行Join操作,
* 准确的说是进行leftOuterJoin操作,也就是说使用Batch Duration产生的RDD和代表黑名单的RDD实例进行
* leftOuterJoin操作,如果两者都有内容的话,就会是true,否则的话就是false
*
* 我们要留下的是leftOuterJoin结果为false;
*
*/
final List<String> blackListNames = new ArrayList<String>();
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
jdbcWrapper.doQuery("SELECT * FROM blacklisttable", null, new ExecuteCallBack() {
public void resultCallBack(ResultSet result) throws Exception {
while(result.next()){
blackListNames.add(result.getString(1));
}
}
});
List<Tuple2<String, Boolean>> blackListTuple = new ArrayList<Tuple2<String,Boolean>>();
for(String name : blackListNames) {
blackListTuple.add(new Tuple2<String, Boolean>(name, true));
}
List<Tuple2<String, Boolean>> blacklistFromListDB = blackListTuple; //数据来自于查询的黑名单表并且映射成为<String, Boolean>
JavaSparkContext jsc = new JavaSparkContext(rdd.context());
/**
* 黑名单的表中只有userID,但是如果要进行join操作的话就必须是Key-Value,所以在这里我们需要
* 基于数据表中的数据产生Key-Value类型的数据集合
*/
JavaPairRDD<String, Boolean> blackListRDD = jsc.parallelizePairs(blacklistFromListDB);
/**
* 进行操作的时候肯定是基于userID进行join,所以必须把传入的rdd进行mapToPair操作转化成为符合格式的RDD
*
*/
JavaPairRDD<String, Tuple2<String, String>> rdd2Pair = rdd.mapToPair(new PairFunction<Tuple2<String,String>, String, Tuple2<String, String>>() {
public Tuple2<String, Tuple2<String, String>> call(
Tuple2<String, String> t) throws Exception {
// TODO Auto-generated method stub
String userID = t._2.split("\t")[2];
return new Tuple2<String, Tuple2<String,String>>(userID, t);
}
});
JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> joined = rdd2Pair.leftOuterJoin(blackListRDD);
JavaPairRDD<String, String> result = joined.filter(new Function<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, Boolean>() {
public Boolean call(
Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> tuple)
throws Exception {
// TODO Auto-generated method stub
Optional<Boolean> optional = tuple._2._2;
if(optional.isPresent() && optional.get()){
return false;
} else {
return true;
}
}
}).mapToPair(new PairFunction<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, String, String>() {
public Tuple2<String, String> call(
Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> t)
throws Exception {
// TODO Auto-generated method stub
return t._2._1;
}
});
return result;
}
});
//广告点击的基本数据格式:timestamp、ip、userID、adID、province、city
JavaPairDStream<String, Long> pairs = filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {
public Tuple2<String, Long> call(Tuple2<String, String> t) throws Exception {
String[] splited=t._2.split("\t");
String timestamp = splited[0]; //YYYY-MM-DD
String ip = splited[1];
String userID = splited[2];
String adID = splited[3];
String province = splited[4];
String city = splited[5];
String clickedRecord = timestamp + "_" +ip + "_"+userID+"_"+adID+"_"
+province +"_"+city;
return new Tuple2<String, Long>(clickedRecord, 1L);
}
});
/**
* 第4.3步:在单词实例计数为1基础上,统计每个单词在文件中出现的总次数
*/
JavaPairDStream<String, Long> adClickedUsers= pairs.reduceByKey(new Function2<Long, Long, Long>() {
public Long call(Long i1, Long i2) throws Exception{
return i1 + i2;
}
});
/*判断有效的点击,复杂化的采用机器学习训练模型进行在线过滤 简单的根据ip判断1天不超过100次;也可以通过一个batch duration的点击次数
判断是否非法广告点击,通过一个batch来判断是不完整的,还需要一天的数据也可以每一个小时来判断。*/
JavaPairDStream<String, Long> filterClickedBatch = adClickedUsers.filter(new Function<Tuple2<String,Long>, Boolean>() {
public Boolean call(Tuple2<String, Long> v1) throws Exception {
if (1 < v1._2){
//更新一些黑名单的数据库表
return false;
} else {
return true;
}
}
});
//filterClickedBatch.print();
//写入数据库
filterClickedBatch.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {
public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {
public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {
//使用数据库连接池的高效读写数据库的方式将数据写入数据库mysql
//例如一次插入 1000条 records,使用insertBatch 或 updateBatch
//插入的用户数据信息:userID,adID,clickedCount,time
//这里面有一个问题,可能出现两条记录的key是一样的,此时需要更新累加操作
List<UserAdClicked> userAdClickedList = new ArrayList<UserAdClicked>();
while(partition.hasNext()) {
Tuple2<String, Long> record = partition.next();
String[] splited = record._1.split("\t");
UserAdClicked userClicked = new UserAdClicked();
userClicked.setTimestamp(splited[0]);
userClicked.setIp(splited[1]);
userClicked.setUserID(splited[2]);
userClicked.setAdID(splited[3]);
userClicked.setProvince(splited[4]);
userClicked.setCity(splited[5]);
userAdClickedList.add(userClicked);
}
final List<UserAdClicked> inserting = new ArrayList<UserAdClicked>();
final List<UserAdClicked> updating = new ArrayList<UserAdClicked>();
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
for(final UserAdClicked clicked : userAdClickedList) {
jdbcWrapper.doQuery("SELECT clickedCount FROM adclicked WHERE"
+ " timestamp =? AND userID = ? AND adID = ?",
new Object[]{clicked.getTimestamp(), clicked.getUserID(),
clicked.getAdID()}, new ExecuteCallBack() {
public void resultCallBack(ResultSet result) throws Exception {
// TODO Auto-generated method stub
if(result.next()) {
long count = result.getLong(1);
clicked.setClickedCount(count);
updating.add(clicked);
} else {
inserting.add(clicked);
clicked.setClickedCount(1L);
}
}
});
}
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
List<Object[]> insertParametersList = new ArrayList<Object[]>();
for(UserAdClicked insertRecord : inserting) {
insertParametersList.add(new Object[] {
insertRecord.getTimestamp(),
insertRecord.getIp(),
insertRecord.getUserID(),
insertRecord.getAdID(),
insertRecord.getProvince(),
insertRecord.getCity(),
insertRecord.getClickedCount()
});
}
jdbcWrapper.doBatch("INSERT INTO adclicked VALUES(?, ?, ?, ?, ?, ?, ?)", insertParametersList);
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
List<Object[]> updateParametersList = new ArrayList<Object[]>();
for(UserAdClicked updateRecord : updating) {
updateParametersList.add(new Object[] {
updateRecord.getTimestamp(),
updateRecord.getIp(),
updateRecord.getUserID(),
updateRecord.getAdID(),
updateRecord.getProvince(),
updateRecord.getCity(),
updateRecord.getClickedCount() + 1
});
}
jdbcWrapper.doBatch("UPDATE adclicked SET clickedCount = ? WHERE"
+ " timestamp =? AND ip = ? AND userID = ? AND adID = ? "
+ "AND province = ? AND city = ?", updateParametersList);
}
});
return null;
}
});
//再次过滤,从数据库中读取数据过滤黑名单
JavaPairDStream<String, Long> blackListBasedOnHistory = filterClickedBatch.filter(new Function<Tuple2<String,Long>, Boolean>() {
public Boolean call(Tuple2<String, Long> v1) throws Exception {
//广告点击的基本数据格式:timestamp,ip,userID,adID,province,city
String[] splited = v1._1.split("\t"); //提取key值
String date =splited[0];
String userID =splited[2];
String adID =splited[3];
//查询一下数据库同一个用户同一个广告id点击量超过50次列入黑名单
//接下来 根据date、userID、adID条件去查询用户点击广告的数据表,获得总的点击次数
//这个时候基于点击次数判断是否属于黑名单点击
int clickedCountTotalToday = 81 ;
if (clickedCountTotalToday > 50) {
return true;
}else {
return false ;
}
}
});
//map操作,找出用户的id
JavaDStream<String> blackListuserIDBasedInBatchOnhistroy =blackListBasedOnHistory.map(new Function<Tuple2<String,Long>, String>() {
public String call(Tuple2<String, Long> v1) throws Exception {
// TODO Auto-generated method stub
return v1._1.split("\t")[2];
}
});
//有一个问题,数据可能重复,在一个partition里面重复,这个好办;
//但多个partition不能保证一个用户重复,需要对黑名单的整个rdd进行去重操作。
//rdd去重了,partition也就去重了,一石二鸟,一箭双雕// 找出了黑名单,下一步就写入黑名单数据库表中
JavaDStream<String> blackListUniqueuserBasedInBatchOnhistroy = blackListuserIDBasedInBatchOnhistroy.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
// TODO Auto-generated method stub
return rdd.distinct();
}
});
// 下一步写入到数据表中
blackListUniqueuserBasedInBatchOnhistroy.foreachRDD(new Function<JavaRDD<String>, Void>() {
public Void call(JavaRDD<String> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
public void call(Iterator<String> t) throws Exception {
// TODO Auto-generated method stub
//插入的用户信息可以只包含:useID
//此时直接插入黑名单数据表即可。
//写入数据库
List<Object[]> blackList = new ArrayList<Object[]>();
while(t.hasNext()) {
blackList.add(new Object[]{t.next()});
}
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
jdbcWrapper.doBatch("INSERT INTO blacklisttable values (?)", blackList);
}
});
return null;
}
});
/**广告点击累计动态更新,每个updateStateByKey都会在Batch Duration的时间间隔的基础上进行广告点击次数的更新,
* 更新之后我们一般都会持久化到外部存储设备上,在这里我们存储到MySQL数据库中
*/
JavaPairDStream<String, Long> updateStateByKeyDSteam = filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {
public Tuple2<String, Long> call(Tuple2<String, String> t)
throws Exception {
String[] splited=t._2.split("\t");
String timestamp = splited[0]; //YYYY-MM-DD
String ip = splited[1];
String userID = splited[2];
String adID = splited[3];
String province = splited[4];
String city = splited[5];
String clickedRecord = timestamp + "_" +ip + "_"+userID+"_"+adID+"_"
+province +"_"+city;
return new Tuple2<String, Long>(clickedRecord, 1L);
}
}).updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {
public Optional<Long> call(List<Long> v1, Optional<Long> v2)
throws Exception {
// v1:当前的Key在当前的Batch Duration中出现的次数的集合,例如{1,1,1,。。。,1}
// v2:当前的Key在以前的Batch Duration中积累下来的结果;
Long clickedTotalHistory = 0L;
if(v2.isPresent()){
clickedTotalHistory = v2.get();
}
for(Long one : v1) {
clickedTotalHistory += one;
}
return Optional.of(clickedTotalHistory);
}
});
updateStateByKeyDSteam.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {
public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {
public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {
//使用数据库连接池的高效读写数据库的方式将数据写入数据库mysql
//例如一次插入 1000条 records,使用insertBatch 或 updateBatch
//插入的用户数据信息:timestamp、adID、province、city
//这里面有一个问题,可能出现两条记录的key是一样的,此时需要更新累加操作
List<AdClicked> AdClickedList = new ArrayList<AdClicked>();
while(partition.hasNext()) {
Tuple2<String, Long> record = partition.next();
String[] splited = record._1.split("\t");
AdClicked adClicked = new AdClicked();
adClicked.setTimestamp(splited[0]);
adClicked.setAdID(splited[1]);
adClicked.setProvince(splited[2]);
adClicked.setCity(splited[3]);
adClicked.setClickedCount(record._2);
AdClickedList.add(adClicked);
}
final List<AdClicked> inserting = new ArrayList<AdClicked>();
final List<AdClicked> updating = new ArrayList<AdClicked>();
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
for(final AdClicked clicked : AdClickedList) {
jdbcWrapper.doQuery("SELECT clickedCount FROM adclickedcount WHERE"
+ " timestamp = ? AND adID = ? AND province = ? AND city = ?",
new Object[]{clicked.getTimestamp(), clicked.getAdID(),
clicked.getProvince(), clicked.getCity()}, new ExecuteCallBack() {
public void resultCallBack(ResultSet result) throws Exception {
// TODO Auto-generated method stub
if(result.next()) {
long count = result.getLong(1);
clicked.setClickedCount(count);
updating.add(clicked);
} else {
inserting.add(clicked);
clicked.setClickedCount(1L);
}
}
});
}
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
List<Object[]> insertParametersList = new ArrayList<Object[]>();
for(AdClicked insertRecord : inserting) {
insertParametersList.add(new Object[] {
insertRecord.getTimestamp(),
insertRecord.getAdID(),
insertRecord.getProvince(),
insertRecord.getCity(),
insertRecord.getClickedCount()
});
}
jdbcWrapper.doBatch("INSERT INTO adclickedcount VALUES(?, ?, ?, ?, ?)", insertParametersList);
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
List<Object[]> updateParametersList = new ArrayList<Object[]>();
for(AdClicked updateRecord : updating) {
updateParametersList.add(new Object[] {
updateRecord.getClickedCount(),
updateRecord.getTimestamp(),
updateRecord.getAdID(),
updateRecord.getProvince(),
updateRecord.getCity()
});
}
jdbcWrapper.doBatch("UPDATE adclickedcount SET clickedCount = ? WHERE"
+ " timestamp =? AND adID = ? AND province = ? AND city = ?", updateParametersList);
}
});
return null;
}
});
/**
* 对广告点击进行TopN计算,计算出每天每个省份Top5排名的广告
* 因为我们直接对RDD进行操作,所以使用了transfomr算子;
*/
updateStateByKeyDSteam.transform(new Function<JavaPairRDD<String,Long>, JavaRDD<Row>>() {
public JavaRDD<Row> call(JavaPairRDD<String, Long> rdd) throws Exception {
JavaRDD<Row> rowRDD = rdd.mapToPair(new PairFunction<Tuple2<String,Long>, String, Long>() {
public Tuple2<String, Long> call(Tuple2<String, Long> t)
throws Exception {
// TODO Auto-generated method stub
String[] splited=t._1.split("_");
String timestamp = splited[0]; //YYYY-MM-DD
String adID = splited[3];
String province = splited[4];
String clickedRecord = timestamp + "_" + adID + "_" + province;
return new Tuple2<String, Long>(clickedRecord, t._2);
}
}).reduceByKey(new Function2<Long, Long, Long>() {
public Long call(Long v1, Long v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
}).map(new Function<Tuple2<String,Long>, Row>() {
public Row call(Tuple2<String, Long> v1) throws Exception {
// TODO Auto-generated method stub
String[] splited=v1._1.split("_");
String timestamp = splited[0]; //YYYY-MM-DD
String adID = splited[3];
String province = splited[4];
return RowFactory.create(timestamp, adID, province, v1._2);
}
});
StructType structType = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("timestamp", DataTypes.StringType, true),
DataTypes.createStructField("adID", DataTypes.StringType, true),
DataTypes.createStructField("province", DataTypes.StringType, true),
DataTypes.createStructField("clickedCount", DataTypes.LongType, true)
));
HiveContext hiveContext = new HiveContext(rdd.context());
DataFrame df = hiveContext.createDataFrame(rowRDD, structType);
df.registerTempTable("topNTableSource");
DataFrame result = hiveContext.sql("SELECT timestamp, adID, province, clickedCount, FROM"
+ " (SELECT timestamp, adID, province,clickedCount, "
+ "ROW_NUMBER() OVER(PARTITION BY province ORDER BY clickeCount DESC) rank "
+ "FROM topNTableSource) subquery "
+ "WHERE rank <= 5");
return result.toJavaRDD();
}
}).foreachRDD(new Function<JavaRDD<Row>, Void>() {
public Void call(JavaRDD<Row> rdd) throws Exception {
// TODO Auto-generated method stub
rdd.foreachPartition(new VoidFunction<Iterator<Row>>() {
public void call(Iterator<Row> t) throws Exception {
// TODO Auto-generated method stub
List<AdProvinceTopN> adProvinceTopN = new ArrayList<AdProvinceTopN>();
while(t.hasNext()) {
Row row = t.next();
AdProvinceTopN item = new AdProvinceTopN();
item.setTimestamp(row.getString(0));
item.setAdID(row.getString(1));
item.setProvince(row.getString(2));
item.setClickedCount(row.getLong(3));
adProvinceTopN.add(item);
}
// final List<AdProvinceTopN> inserting = new ArrayList<AdProvinceTopN>();
// final List<AdProvinceTopN> updating = new ArrayList<AdProvinceTopN>();
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
Set<String> set = new HashSet<String>();
for(AdProvinceTopN item: adProvinceTopN){
set.add(item.getTimestamp() + "_" + item.getProvince());
}
//表的字段timestamp、adID、province、clickedCount
ArrayList<Object[]> deleteParametersList = new ArrayList<Object[]>();
for(String deleteRecord : set) {
String[] splited = deleteRecord.split("_");
deleteParametersList.add(new Object[]{
splited[0],
splited[1]
});
}
jdbcWrapper.doBatch("DELETE FROM adprovincetopn WHERE timestamp = ? AND province = ?", deleteParametersList);
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
List<Object[]> insertParametersList = new ArrayList<Object[]>();
for(AdProvinceTopN insertRecord : adProvinceTopN) {
insertParametersList.add(new Object[] {
insertRecord.getClickedCount(),
insertRecord.getTimestamp(),
insertRecord.getAdID(),
insertRecord.getProvince()
});
}
jdbcWrapper.doBatch("INSERT INTO adprovincetopn VALUES (?, ?, ?, ?)", insertParametersList);
}
});
return null;
}
});
/**
* 计算过去半个小时内广告点击的趋势
* 广告点击的基本数据格式:timestamp、ip、userID、adID、province、city
*/
filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {
public Tuple2<String, Long> call(Tuple2<String, String> t)
throws Exception {
String splited[] = t._2.split("\t");
String adID = splited[3];
String time = splited[0]; //Todo:后续需要重构代码实现时间戳和分钟的转换提取。此处需要提取出该广告的点击分钟单位
return new Tuple2<String, Long>(time + "_" + adID, 1L);
}
}).reduceByKeyAndWindow(new Function2<Long, Long, Long>() {
public Long call(Long v1, Long v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
}, new Function2<Long, Long, Long>() {
public Long call(Long v1, Long v2) throws Exception {
// TODO Auto-generated method stub
return v1 - v2;
}
}, Durations.minutes(30), Durations.milliseconds(5)).foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {
public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
// TODO Auto-generated method stub
rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {
public void call(Iterator<Tuple2<String, Long>> partition)
throws Exception {
List<AdTrendStat> adTrend = new ArrayList<AdTrendStat>();
// TODO Auto-generated method stub
while(partition.hasNext()) {
Tuple2<String, Long> record = partition.next();
String[] splited = record._1.split("_");
String time = splited[0];
String adID = splited[1];
Long clickedCount = record._2;
/**
* 在插入数据到数据库的时候具体需要哪些字段?time、adID、clickedCount;
* 而我们通过J2EE技术进行趋势绘图的时候肯定是需要年、月、日、时、分这个维度的,所以我们在这里需要
* 年月日、小时、分钟这些时间维度;
*/
AdTrendStat adTrendStat = new AdTrendStat();
adTrendStat.setAdID(adID);
adTrendStat.setClickedCount(clickedCount);
adTrendStat.set_date(time); //Todo:获取年月日
adTrendStat.set_hour(time); //Todo:获取小时
adTrendStat.set_minute(time);//Todo:获取分钟
adTrend.add(adTrendStat);
}
final List<AdTrendStat> inserting = new ArrayList<AdTrendStat>();
final List<AdTrendStat> updating = new ArrayList<AdTrendStat>();
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
for(final AdTrendStat trend : adTrend) {
final AdTrendCountHistory adTrendhistory = new AdTrendCountHistory();
jdbcWrapper.doQuery("SELECT clickedCount FROM adclickedtrend WHERE"
+ " date =? AND hour = ? AND minute = ? AND AdID = ?",
new Object[]{trend.get_date(), trend.get_hour(), trend.get_minute(),
trend.getAdID()}, new ExecuteCallBack() {
public void resultCallBack(ResultSet result) throws Exception {
// TODO Auto-generated method stub
if(result.next()) {
long count = result.getLong(1);
adTrendhistory.setClickedCountHistoryLong(count);
updating.add(trend);
} else {
inserting.add(trend);
}
}
});
}
//表的字段date、hour、minute、adID、clickedCount
List<Object[]> insertParametersList = new ArrayList<Object[]>();
for(AdTrendStat insertRecord : inserting) {
insertParametersList.add(new Object[] {
insertRecord.get_date(),
insertRecord.get_hour(),
insertRecord.get_minute(),
insertRecord.getAdID(),
insertRecord.getClickedCount()
});
}
jdbcWrapper.doBatch("INSERT INTO adclickedtrend VALUES(?, ?, ?, ?, ?)", insertParametersList);
//表的字段date、hour、minute、adID、clickedCount
List<Object[]> updateParametersList = new ArrayList<Object[]>();
for(AdTrendStat updateRecord : updating) {
updateParametersList.add(new Object[] {
updateRecord.getClickedCount(),
updateRecord.get_date(),
updateRecord.get_hour(),
updateRecord.get_minute(),
updateRecord.getAdID()
});
}
jdbcWrapper.doBatch("UPDATE adclickedtrend SET clickedCount = ? WHERE"
+ " date =? AND hour = ? AND minute = ? AND AdID = ?"
, updateParametersList);
}
});
return null;
}
});;
/**
* Spark Streaming 执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的,当然其内部有消息循环体,用于
* 接收应用程序本身或者Executor中的消息,
*/
javassc.start();
javassc.awaitTermination();
javassc.close();
}
private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf) {
// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
System.out.println("Creating new context");
// Create the context with a 5 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(10));
ssc.checkpoint(checkpointDirectory);
return ssc;
}
}
class JDBCWrapper {
private static JDBCWrapper jdbcInstance = null;
private static LinkedBlockingQueue<Connection> dbConnectionPool = new LinkedBlockingQueue<Connection>();
static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static JDBCWrapper getJDBCInstance() {
if(jdbcInstance == null) {
synchronized (JDBCWrapper.class) {
if(jdbcInstance == null) {
jdbcInstance = new JDBCWrapper();
}
}
}
return jdbcInstance;
}
private JDBCWrapper() {
for(int i = 0; i < 10; i++){
try {
Connection conn = DriverManager.getConnection("jdbc:mysql://Master:3306/sparkstreaming","root", "root");
dbConnectionPool.put(conn);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public synchronized Connection getConnection() {
while(0 == dbConnectionPool.size()){
try {
Thread.sleep(20);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return dbConnectionPool.poll();
}
public int[] doBatch(String sqlText, List<Object[]> paramsList){
Connection conn = getConnection();
PreparedStatement preparedStatement = null;
int[] result = null;
try {
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement(sqlText);
for(Object[] parameters: paramsList) {
for(int i = 0; i < parameters.length; i++){
preparedStatement.setObject(i + 1, parameters[i]);
}
preparedStatement.addBatch();
}
result = preparedStatement.executeBatch();
conn.commit();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if(preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(conn != null) {
try {
dbConnectionPool.put(conn);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
return result;
}
public void doQuery(String sqlText, Object[] paramsList, ExecuteCallBack callback){
Connection conn = getConnection();
PreparedStatement preparedStatement = null;
ResultSet result = null;
try {
preparedStatement = conn.prepareStatement(sqlText);
for(int i = 0; i < paramsList.length; i++){
preparedStatement.setObject(i + 1, paramsList[i]);
}
result = preparedStatement.executeQuery();
try {
callback.resultCallBack(result);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if(preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(conn != null) {
try {
dbConnectionPool.put(conn);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
interface ExecuteCallBack {
void resultCallBack(ResultSet result) throws Exception;
}
class UserAdClicked {
private String timestamp;
private String ip;
private String userID;
private String adID;
private String province;
private String city;
private Long clickedCount;
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getUserID() {
return userID;
}
public void setUserID(String userID) {
this.userID = userID;
}
public String getAdID() {
return adID;
}
public void setAdID(String adID) {
this.adID = adID;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public Long getClickedCount() {
return clickedCount;
}
public void setClickedCount(Long clickedCount) {
this.clickedCount = clickedCount;
}
}
class AdClicked {
private String timestamp;
private String adID;
private String province;
private String city;
private Long clickedCount;
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getAdID() {
return adID;
}
public void setAdID(String adID) {
this.adID = adID;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public Long getClickedCount() {
return clickedCount;
}
public void setClickedCount(Long clickedCount) {
this.clickedCount = clickedCount;
}
}
class AdProvinceTopN {
private String timestamp;
private String adID;
private String province;
private Long clickedCount;
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getAdID() {
return adID;
}
public void setAdID(String adID) {
this.adID = adID;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public Long getClickedCount() {
return clickedCount;
}
public void setClickedCount(Long clickedCount) {
this.clickedCount = clickedCount;
}
}
class AdTrendStat {
private String _date;
private String _hour;
private String _minute;
private String adID;
private Long clickedCount;
public String get_date() {
return _date;
}
public void set_date(String _date) {
this._date = _date;
}
public String get_hour() {
return _hour;
}
public void set_hour(String _hour) {
this._hour = _hour;
}
public String get_minute() {
return _minute;
}
public void set_minute(String _minute) {
this._minute = _minute;
}
public String getAdID() {
return adID;
}
public void setAdID(String adID) {
this.adID = adID;
}
public Long getClickedCount() {
return clickedCount;
}
public void setClickedCount(Long clickedCount) {
this.clickedCount = clickedCount;
}
}
class AdTrendCountHistory{
private Long clickedCountHistoryLong;
public Long getClickedCountHistoryLong() {
return clickedCountHistoryLong;
}
public void setClickedCountHistoryLong(Long clickedCountHistoryLong) {
this.clickedCountHistoryLong = clickedCountHistoryLong;
}
}
|