* 生成数据 SparkSQLDataManually.java
*/
package com.tom.spark.SparkApps.sql;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Random;
/**
* 论坛数据自动生成代码,数据格式如下:
* data:日期,格式为yyyy-MM-dd
* timestamp:时间戳
* userID:用户ID
* pageID:页面ID
* channelID:板块ID
* action:点击和注册
*/
public class SparkSQLDataManually {
static String yesterday = yesterday();
static String[] channelNames = new String[] {
"Spark", "Scala", "Kafka", "Flink", "Hadoop", "Storm",
"Hive", "Impala", "HBase", "ML"
};
static String[] actionNames = new String[] {
"register","view"
};
public static void main(String[] args) {
/**
* 通过传递进来的参数生成制定大小规模的数据
*/
long numberItems = 5000;
String path = ".";
if (args.length > 0) {
numberItems = Integer.valueOf(args[0]);
path = args[1];
System.out.println(path);
}
System.out.println("User log number is : " + numberItems);
//具体的论坛频道
/**
* 昨天的时间生成
*/
userlogs(numberItems, path);
}
private static void userlogs(long numberItems, String path) {
// TODO Auto-generated method stub
Random random = new Random();
StringBuffer userLogBuffer = new StringBuffer("");
int[] unregisteredUsers = new int[]{1,2,3,4,5,6,7,8};
for(int i = 0; i < numberItems; i++) {
long timestamp = new Date().getTime();
Long userID = 0L;
long pageID = 0;
//随机生成的用户ID
if(unregisteredUsers[random.nextInt(8)] == 1) {
userID = null;
}
else {
userID = (long) random.nextInt((int) numberItems);
}
//随机生成的页面ID
pageID = random.nextInt((int) numberItems);
//随机生成Channel
String channel = channelNames[random.nextInt(10)];
//随机生成acton行为
String action = actionNames[random.nextInt(2)];
userLogBuffer.append(yesterday)
.append("\t")
.append(timestamp)
.append("\t")
.append(userID)
.append("\t")
.append(pageID)
.append("\t")
.append(channel)
.append("\t")
.append(action)
.append("\n");
}
// System.out.print(userLogBuffer);
PrintWriter pw = null;
try {
pw = new PrintWriter(new OutputStreamWriter(new FileOutputStream(path + "\\userlog.log")));
System.out.println(path + "userlog.log");
pw.write(userLogBuffer.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
pw.close();
}
}
private static String yesterday() {
SimpleDateFormat date = new SimpleDateFormat("yyyy-MM-dd");
Calendar cal = Calendar.getInstance();
cal.setTime(new Date());
cal.add(Calendar.DATE, -1);
Date yesterday = cal.getTime();
return date.format(yesterday);
}
}
/**
* 计算PV、UV、热门板块、跳出率、新用户注册比率
*/
package com.tom.spark.SparkApps.sql;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.hive.HiveContext;
/**
* Table in hive database creation:
* sqlContext.sql("create table userlogs(date string, timestamp bigint, userID bigint, pageID bigint, channel string, action string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'")
*
*/
public class SparkSQLUserlogsOps {
/**
* @param args
*/
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SparkSQLUserlogsOps").setMaster("spark://Master:7077");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("WARN");
HiveContext hiveContext = new HiveContext(sc);
String yesterday = getYesterday();
pvStat(hiveContext, yesterday); //PV
uvStat(hiveContext, yesterday); //UV
hotChannel(hiveContext, yesterday); //热门板块
jumpOutStat(hiveContext, yesterday); //跳出率
newUserRegisterPercentStat(hiveContext, yesterday); //新用户注册的比例
}
private static void newUserRegisterPercentStat(HiveContext hiveContext, String yesterday) {
// TODO Auto-generated method stub
hiveContext.sql("use hive");
String newUserSQL = "select count(*) "
+ "from userlogs "
+ "where action = 'View' and date='"+ yesterday+"' and userID is NULL "
// + "limit 10"
;
String RegisterUserSQL = "SELECT count(*) "
+ "from userlogs"
+ "where action = 'Register' and date='"+ yesterday+"' "
// + "limit 10"
;
Object newUser = hiveContext.sql(newUserSQL).collect()[0].get(0);
Object RegisterUser = hiveContext.sql(RegisterUserSQL).collect()[0].get(0);
double total = Double.valueOf(newUser.toString());
double register = Double.valueOf(RegisterUser.toString());
System.out.println("模拟新用户注册比例:" + register / total);
}
private static void jumpOutStat(HiveContext hiveContext, String yesterday) {
// TODO Auto-generated method stub
hiveContext.sql("use hive");
String totalPvSQL = "select count(*) "
+ "from "
+ "userlogs "
+ "where action = 'View' and date='"+ yesterday+"' "
// + "limit 10"
;
String pv2OneSQL = "SELECT count(*) "
+ "from "
+ "(SELECT count(*) totalNumber from userlogs "
+ "where action = 'View' and date='"+ yesterday+"' "
+ "group by userID "
+ "having totalNumber = 1) subquery "
// + "limit 10"
;
Object totalPv = hiveContext.sql(totalPvSQL).collect()[0].get(0);
Object pv2One = hiveContext.sql(pv2OneSQL).collect()[0].get(0);
double total = Double.valueOf(totalPv.toString());
double pv21 = Double.valueOf(pv2One.toString());
System.out.println("跳出率为" + pv21 / total);
}
private static void uvStat(HiveContext hiveContext, String yesterday) {
// TODO Auto-generated method stub
hiveContext.sql("use hive");
String sqlText = "select date, pageID, uv "
+ "from "
+ "(select date, pageID, count(distinct(userID)) uv from userlogs "
+ "where action = 'View' and date='"+ yesterday+"' "
+ "group by date, pageID) subquery "
+ "order by uv desc "
// + "limit 10"
;
hiveContext.sql(sqlText).show();
}
private static void hotChannel(HiveContext hiveContext, String yesterday) {
// TODO Auto-generated method stub
hiveContext.sql("use hive");
String sqlText = "select date, pageID, pv "
+ "from "
+ "(select date, pageID, count(1) pv from userlogs "
+ "where action = 'View' and date='"+ yesterday+"' "
+ "group by date, pageID) subquery "
+ "order by pv desc "
// + "limit 10"
;
hiveContext.sql(sqlText).show();
}
private static void pvStat(HiveContext hiveContext, String yesterday) {
// TODO Auto-generated method stub
hiveContext.sql("use hive");
String sqlText = "select date, channel, channelpv "
+ "from "
+ "(select date, channel, count(*) channelpv from userlogs "
+ "where action = 'View' and date='"+ yesterday+"' "
+ "group by date, channel) subquery "
+ "order by channelpv desc "
// + "limit 10"
;
hiveContext.sql(sqlText).show();
//把执行结果放到数据库或Hive中
//select date, pageID, pv from (select date, pageID, count(1) pv from userlogs where action = 'View' and
//date='2017-03-10' group by date, pageID) subquery order by pv desc limit 10
}
private static String getYesterday() {
SimpleDateFormat date = new SimpleDateFormat("yyyy-MM-dd");
Calendar cal = Calendar.getInstance();
cal.setTime(new Date());
cal.add(Calendar.DATE, -2);
Date yesterday = cal.getTime();
return date.format(yesterday);
}
}
|