分享

大数据IMF传奇行动绝密课程第75

 看风景D人 2019-02-24

 * 生成数据 SparkSQLDataManually.java
 */
package com.tom.spark.SparkApps.sql;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Random;

/**
 * 论坛数据自动生成代码,数据格式如下:
 * data:日期,格式为yyyy-MM-dd
 * timestamp:时间戳
 * userID:用户ID
 * pageID:页面ID
 * channelID:板块ID
 * action:点击和注册
 */
public class SparkSQLDataManually {

    static String yesterday = yesterday();
    static String[] channelNames = new String[] {
            "Spark", "Scala", "Kafka", "Flink", "Hadoop", "Storm",
            "Hive", "Impala", "HBase", "ML"
    };
    static String[] actionNames = new String[] {
        "register","view"
    };

    public static void main(String[] args) {
        /**
         * 通过传递进来的参数生成制定大小规模的数据
         */
        long numberItems = 5000;
        String path = ".";
        if (args.length > 0) {
            numberItems = Integer.valueOf(args[0]);
            path = args[1];
            System.out.println(path);
        }

        System.out.println("User log number is : " + numberItems);

        //具体的论坛频道

        /**
         * 昨天的时间生成
         */


        userlogs(numberItems, path);
    }

    private static void userlogs(long numberItems, String path) {
        // TODO Auto-generated method stub
        Random random = new Random();
        StringBuffer userLogBuffer = new StringBuffer("");
        int[] unregisteredUsers = new int[]{1,2,3,4,5,6,7,8};
        for(int i = 0; i < numberItems; i++) {
            long timestamp = new Date().getTime();
            Long userID = 0L;
            long pageID = 0;

            //随机生成的用户ID
            if(unregisteredUsers[random.nextInt(8)] == 1) {
                userID = null;
            }
            else {
                userID = (long) random.nextInt((int) numberItems);
            }
            //随机生成的页面ID
            pageID = random.nextInt((int) numberItems);
            //随机生成Channel
            String channel = channelNames[random.nextInt(10)];

            //随机生成acton行为
            String action = actionNames[random.nextInt(2)];

            userLogBuffer.append(yesterday)
                .append("\t")
                .append(timestamp)
                .append("\t")
                .append(userID)
                .append("\t")
                .append(pageID)
                .append("\t")
                .append(channel)
                .append("\t")
                .append(action)
                .append("\n");

        }
//      System.out.print(userLogBuffer);
        PrintWriter pw = null;

        try {
            pw = new PrintWriter(new OutputStreamWriter(new FileOutputStream(path + "\\userlog.log")));
            System.out.println(path + "userlog.log");
            pw.write(userLogBuffer.toString());
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            pw.close();
        }
    }

    private static String yesterday() {
        SimpleDateFormat date = new SimpleDateFormat("yyyy-MM-dd");
        Calendar cal = Calendar.getInstance();
        cal.setTime(new Date());
        cal.add(Calendar.DATE, -1);

        Date yesterday = cal.getTime();

        return date.format(yesterday);
    }

}
/**
  * 计算PV、UV、热门板块、跳出率、新用户注册比率
  */
package com.tom.spark.SparkApps.sql;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.hive.HiveContext;

/**
 * Table in hive database creation:
 * sqlContext.sql("create table userlogs(date string, timestamp bigint, userID bigint, pageID bigint, channel string, action string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'")
 *
 */
public class SparkSQLUserlogsOps {

    /**
     * @param args
     */
    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("SparkSQLUserlogsOps").setMaster("spark://Master:7077");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        HiveContext hiveContext = new HiveContext(sc);

        String yesterday = getYesterday();

        pvStat(hiveContext, yesterday); //PV

        uvStat(hiveContext, yesterday); //UV

        hotChannel(hiveContext, yesterday); //热门板块

        jumpOutStat(hiveContext, yesterday); //跳出率

        newUserRegisterPercentStat(hiveContext, yesterday); //新用户注册的比例
    }

    private static void newUserRegisterPercentStat(HiveContext hiveContext, String yesterday) {
        // TODO Auto-generated method stub
        hiveContext.sql("use hive");

        String newUserSQL = "select count(*) "
                + "from userlogs "
                + "where action = 'View' and date='"+ yesterday+"' and userID is NULL "
//              + "limit 10"
                ;

        String RegisterUserSQL = "SELECT count(*) "
                + "from userlogs"
                + "where action = 'Register' and date='"+ yesterday+"' "
//              + "limit 10"
                ;

        Object newUser = hiveContext.sql(newUserSQL).collect()[0].get(0);
        Object RegisterUser = hiveContext.sql(RegisterUserSQL).collect()[0].get(0);

        double total = Double.valueOf(newUser.toString());
        double register = Double.valueOf(RegisterUser.toString());

        System.out.println("模拟新用户注册比例:" + register / total);
    }

    private static void jumpOutStat(HiveContext hiveContext, String yesterday) {
        // TODO Auto-generated method stub
        hiveContext.sql("use hive");

        String totalPvSQL = "select count(*) "
                + "from "
                + "userlogs "
                + "where action = 'View' and date='"+ yesterday+"' "
//              + "limit 10"
                ;


        String pv2OneSQL = "SELECT count(*) "
                + "from "
                + "(SELECT count(*) totalNumber from userlogs "
                + "where action = 'View' and date='"+ yesterday+"' "
                + "group by userID "
                + "having totalNumber = 1) subquery "
//              + "limit 10"
                ;


        Object totalPv = hiveContext.sql(totalPvSQL).collect()[0].get(0);
        Object pv2One = hiveContext.sql(pv2OneSQL).collect()[0].get(0);

        double total = Double.valueOf(totalPv.toString());
        double pv21 = Double.valueOf(pv2One.toString());

        System.out.println("跳出率为" + pv21 / total);
    }



    private static void uvStat(HiveContext hiveContext, String yesterday) {
        // TODO Auto-generated method stub
        hiveContext.sql("use hive");

        String sqlText = "select date, pageID, uv "
                + "from "
                + "(select date, pageID, count(distinct(userID)) uv from userlogs "
                + "where action = 'View' and date='"+ yesterday+"' "
                + "group by date, pageID) subquery "
                + "order by uv desc "
//              + "limit 10"
                ;

        hiveContext.sql(sqlText).show();
    }

    private static void hotChannel(HiveContext hiveContext, String yesterday) {
        // TODO Auto-generated method stub
        hiveContext.sql("use hive");

        String sqlText = "select date, pageID, pv "
                + "from "
                + "(select date, pageID, count(1) pv from userlogs "
                + "where action = 'View' and date='"+ yesterday+"' "
                + "group by date, pageID) subquery "
                + "order by pv desc "
//              + "limit 10"
                ;


        hiveContext.sql(sqlText).show();
    }

    private static void pvStat(HiveContext hiveContext, String yesterday) {
        // TODO Auto-generated method stub
        hiveContext.sql("use hive");

        String sqlText = "select date, channel, channelpv "
                + "from "
                + "(select date, channel, count(*) channelpv from userlogs "
                + "where action = 'View' and date='"+ yesterday+"' "
                + "group by date, channel) subquery "
                + "order by channelpv desc "
//              + "limit 10"
                ;


        hiveContext.sql(sqlText).show();
        //把执行结果放到数据库或Hive中

        //select date, pageID, pv from (select date, pageID, count(1) pv from userlogs where action = 'View' and 
        //date='2017-03-10' group by date, pageID) subquery order by pv desc limit 10   
    }



    private static String getYesterday() {
        SimpleDateFormat date = new SimpleDateFormat("yyyy-MM-dd");
        Calendar cal = Calendar.getInstance();
        cal.setTime(new Date());
        cal.add(Calendar.DATE, -2);

        Date yesterday = cal.getTime();

        return date.format(yesterday);
    }

}

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约