分享

Spark- word Count案例

 余香阁 2021-07-03

1 新建项目

新建 idea Maven项目工程, 并创建子工程,pom.xml文件中引入spark依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven./POM/4.0.0" xmlns:xsi="http://www./2001/XMLSchema-instance" xsi:schemaLocation="http://maven./POM/4.0.0 http://maven./xsd/maven-4.0.0.xsd"> <parent> <artifactId>dintalk-classes</artifactId> <groupId>cn.dintalk.bigdata</groupId> <version>1.0.0</version> </parent> <modelVersion>4.0.0</modelVersion>
<artifactId>spark-core</artifactId>
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> </dependencies>
</project>

2 准备数据文件


3 代码编写

3.1 第一种写法

package cn.dintalk.bigdata.spark.core.wc
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}
object Spark01_WordCount { def main(args: Array[String]): Unit = {
// application -> spark 框架
// 1. 建立 和 Spark框架的连接 // JDBC 有 connection , Spark 有 SparkContext val sparkConf= new SparkConf() .setMaster("local") .setAppName("wordCount")
val sc = new SparkContext(sparkConf)
// 2. 执行业务操作
// 2.1 读取文件, 获取一行一行的数据 val lines: RDD[String] = sc.textFile("datas") // 2.2 将行数据进行切分,形成一个一个的单词 val words: RDD[String] = lines.flatMap(_.split(" ")) // 2.3 将数据按照单词进行分组,便于统计 val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(word => word) // 2.4 对分组后的数据进行转换 // (hello,hello,hello), (word,word) -> (hello,3),(word,2) val wordCount: RDD[(String, Int)] = wordGroup.map { case (word, list) => { (word, list.size) } } // 2.5 将转换结果采集到控制台输出 val tuples: Array[(String, Int)] = wordCount.collect() tuples.foreach(println) // 3. 关闭连接 sc.stop() }}

3.2 第二种写法

package cn.dintalk.bigdata.spark.core.wc
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}
object Spark02_WordCount { def main(args: Array[String]): Unit = {
// application -> spark 框架
// 1. 建立 和 Spark框架的连接 // JDBC 有 connection , Spark 有 SparkContext val sparkConf= new SparkConf() .setMaster("local") .setAppName("wordCount")
val sc = new SparkContext(sparkConf)
// 2. 执行业务操作
// 2.1 读取文件, 获取一行一行的数据 val lines: RDD[String] = sc.textFile("datas") // 2.2 将行数据进行切分,形成一个一个的单词 val words: RDD[String] = lines.flatMap(_.split(" "))
val wordToOne: RDD[(String, Int)] = words.map(word => (word, 1))
val wordGroup: RDD[(String, Iterable[(String, Int)])] = wordToOne .groupBy(t => t._1)
val wordCount: RDD[(String, Int)] = wordGroup.map { case (word, list) => { list.reduce( (t1, t2) => { (t1._1, t1._2 + t2._2) } ) } } // 2.5 将转换结果采集到控制台输出 val tuples: Array[(String, Int)] = wordCount.collect() tuples.foreach(println) // 3. 关闭连接 sc.stop() }}

3.3 第三种写法

package cn.dintalk.bigdata.spark.core.wc
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}
object Spark03_WordCount { def main(args: Array[String]): Unit = {
val sparkConf= new SparkConf() .setMaster("local") .setAppName("wordCount") val sc = new SparkContext(sparkConf)
val lines: RDD[String] = sc.textFile("datas")
val words: RDD[String] = lines.flatMap(_.split(" ")) val wordToOne: RDD[(String, Int)] = words.map((_, 1)) val wordCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
val tuples: Array[(String, Int)] = wordCount.collect() tuples.foreach(println)
sc.stop() }}

3.4结果验证


4 log4j控制日志输出

4.1 resources目录下新建log4j.properties并 配置

log4j.rootLogger=ERROR, stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppenderlog4j.appender.R.File=../log/agent.loglog4j.appender.R.MaxFileSize=10240KBlog4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayoutlog4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n

4.2 验证日志输出

无多余日志的输出

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多