分享

大数据学习(spark的三个样例编程)

 春和秋荣 2019-12-30

首先准备好hadoop和spark以及scala的环境

主节点如下

分节点如下

然后完成以及idea的安装以及idea上scala的插件安装,我们就可以开始编程了。

有以下两点需要注意的:

1.scala和spark的版本,最好按照推荐安装,我是用的spark-2.20,之前用scala-2.12.* 出现巨多问题,例如运行任务时报错:

java.lang.NoSuchMethodError:scala.Predef$.ArrowAssoc(....)

按照官网上的推荐spark-2.20就最好是用scala-2.11.*。

2.如果虚拟机上编程太慢的话,其实更建议直接在本地用idea编程打包后,把jar包传输到虚拟机


一、WordCount

(着重以WordCount编程为重点进行练习,后面的例子若有重复的步骤就简单略过)

1.打开idea,创建scala工程

其中,JDK和Scala SDK就是java和scala的路径

2.在src文件夹下创建两个子目录,一个cluster用于跑spark,另外一个local用于idea上调试。(其中out目录和META-INF创建jar包后自动生成的,开始并没有)然后在两个个文件夹下分别创建scala.class

3.然后要想进行spark编程,我们就得导入spark的相关包

File → Projecte Structure → Libraries → “+”→ Java → *选择spark目录下的jars文件夹*

ps:其实我们的编程暂时用不到这个目录下的所有包,可以只导入需要的,但就需要花时间去找;也可以全部导入,但是整个工程就会变得臃肿然后点OK再点OK,回到界面,我们的相关包就导入完成了

4.接下来就是正式的编程,我们先上WordCount的代码

//指在cluster这个目录下

package cluster

//导入了spark的SparkConf, SparkContext两个类

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

def main(args: Array[String]) {

if (args.length < 1) {

System.err.println("Usage: ")

System.exit(1)

}

//实例化configuration,用于编辑我们任务的相关信息,后面的setAppName可以在控制台运行的时候再设置

val conf = new SparkConf().setAppName("MySparkApp")

// sc是Spark Context,指的是“上下文”,也就是我们运行的环境,需要把conf当参数传进去;

val sc = new SparkContext(conf)

//通过sc获取一个(hdfs上的)文本文件,args(0)就是我们控制台上传入的参数,local运行的话就是传入本地一个文本的path

val line = sc.textFile(args(0))

//下面就是wordcount具体的执行代码

line.flatMap(_.split("")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)

sc.stop()

}

}

这就是WordCount的代码,用于在Spark平台上跑。如果需要在idea上测试的话,就可以把args(0)具体改为一个文本文件的path。比如在工程的目录下创建data文件夹,把test.txt扔上去,args(0)就可以修改为"data/test.txt";然后把sc设置为

val spark=new SparkContext("local","SparkPI")

这样的本地模式,就可以直接在idea上run。

5.打包成jar文件

File → Projecte Structure → Artifacts → “+” → JAR → From modules with dependencies... ...(这个选项的意思是把我们引入的所有外部包都封装起来,Empty就是不算上我们引入的spark包)

然后Main Class就选择我们的cluster,local是我们用于本地测试的,并且Main Class的名字一定要记住,后面spark上运行是要使用的。然后点ok就可以创建相关文件。如果之前就创建了的话,需要把之前的相关信息,也就是工程下的META-INF文件夹删除才可以成功创建。

回到主界面,然后Build → BuildArtifacts 就可以自行创建jar包了。

6.idea会把创建的jar包放进工程下的out文件夹,我们把它找到,为了方便,把jar包放进spark目录下,然后打开,进入META-INF文件夹,把后缀名为.DSA .SF . RSA的文件删除,

因为查资料说某些包的签名有问题,会导致我们在运行时找不到主类,事实上也确实是....

7.我们进入spark目录,通过bin文件夹下的spark-submit来提交任务,执行命令./bin/spark-submit -h来获得帮助文档,我就拿挑几个常用的:

8.

基本格式:

Usage: spark-submit [options] [app arguments]

Usage: spark-submit --kill [submission ID] --master [spark://...]

Usage: spark-submit --status [submission ID] --master [spark://...]

Usage: spark-submit run-example [options] example-class [example args]

选项:

--master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.

--class CLASS_NAME          Your application's main class (for Java / Scala apps).

--deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client).

--driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).

--executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).

我们就按上面所说执行如下命令

./bin/spark-submit --master yarn-clien --class cluster.WordCount WordCount.jar /tmp/text.txt

--master 就是我们master的url

--class 就是我们打包jar的主类名称

WordCount.jar 就是jar包名

/tmp/text.txt 是我事先放在hdfs上的测试文本文件,也就是我们编程中的args(0)参数

测试文本text.txt 内容如下

执行之后我们就可以等他完成,看到如下的结果:

对比下没有问题,中间缺失字母的地方应该是空格和换行符。

WordCount执行完毕。


二、Pi

重复步骤就不再多余赘述了

1.先上cluster上的代码

package // avoid overflow

import scala.math.random

import org.apache.spark._

object Pi {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("Spark Pi")

val spark = new SparkContext(conf)

val slices = if (args.length > 0) args(0).toInt else 2

val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid    overflow

val count = spark.parallelize(1 until n, slices).map { i =>

val x = random * 2 - 1

val y = random * 2 - 1

if (x*x + y*y < 1) 1 else 0

}.reduce(_ + _)

println("Pi is roughly " + 4.0 * count / n)

spark.stop()

}

}

args(0)也是我们之后运行时要添加的参数。如果是在本地模式下,依旧是把sc设置成本地模式。

val spark=new SparkContext("local","SparkPI")

如果要设置几个核跑的话就这样"local[*]",*为你人为设定的个数。其实也可以在控制台上设置。

2.打成jar包,删除包下相关文件,再mv到spark目录下

3.启动hadoop,spark后,照例用spark-submit提交任务

这个地方我没设置参数是因为代码中自带了判断,没有参数这一情况下,他就自己设置为了2。

if (args.length > 0) args(0).toInt else 2

然后等待结果

在一堆info中查找结果还是挺费眼睛的,所以我们可以在代码中对打印结果这个步骤稍加修改,增添两行*号:

再打包运行一遍,找起来就应该方便多了。


三、K-Means

这个例子我们就尝试用本地(local)模式来运行

1.首先我们得在idea上准备好测试的文本和输出的路径

我们在工程下创建data目录,把测试文本扔进去,内容如下

0.0 0.0 0.0

0.1 0.1 0.1

0.2 0.2 0.2

9.0 9.0 9.0

9.1 9.1 9.1

9.2 9.2 9.2

然后再创建一个result文件夹用于存放结果,准备工作做好

2.准备好代码,如下:

package local

import org.apache.spark.SparkContext

import org.apache.spark.mllib.clustering.KMeans

import org.apache.spark.mllib.linalg.Vectors

object K_Means {

def main(args: Array[String]) {

//初始化sc

val sc = new SparkContext("local", "Kmeans")

//读入数据

val rdd = sc.textFile("data/Kmeans_data.txt")

//rdd转化,转化成对应的RDD

val data = rdd.map(s => Vectors.dense(s.split(" ").map(_.toDouble)))

//最大迭代次数

val numIteration = 20

//聚类个数

val numClass = 5

//构建模型

val model = KMeans.train(data, numClass, numIteration)

//输出聚类中心

println("Cluster centers:")

for (c <- model.clusterCenters) {

println("  " + c.toString)

}

//使用误差平方之和来评估数据模型

val cost = model.computeCost(data)

println("Within Set Sum of Squared Errors = " + cost)

//使用模型测试单点数据

println("Vectors 0.2 0.2 0.2 is belongs to clusters:" + model.predict(Vectors.dense("0.2 0.2 0.2".split(' ').map(_.toDouble))))

println("Vectors 0.25 0.25 0.25 is belongs to clusters:" + model.predict(Vectors.dense("0.25 0.25 0.25".split(' ').map(_.toDouble))))

println("Vectors 8 8 8 is belongs to clusters:" + model.predict(Vectors.dense("8 8 8".split(' ').map(_.toDouble))))

//交叉评估1,只返回结果

val testdata = rdd.map(s => Vectors.dense(s.split(" ").map(_.toDouble)))

val result1 = model.predict(testdata)

result1.saveAsTextFile("result/kmeanout1/")

//交叉评估2,返回数据集和结果

val result2 = rdd.map {

line =>

val linevectore = Vectors.dense(line.split(" ").map(_.toDouble))

val prediction = model.predict(linevectore)

line + " " + prediction

}.saveAsTextFile("result/kmeanout2/")

sc.stop()

}

}

我们可以看到代码中运用到了mllib库,专门用于机器学习的算法。然而我们本地运行的重点是如下几个地方:

一是我们的sc不再由conf做参数,而是直接运用local的本地模式

二是我们读取文件的地方不再由控制台提供,而是直接由代码提供。路径为我们创建工程时就准备好的测试数据,把它转化为rdd然后才能后续操作。

三是我们输出的结果由saveAsTextFile方法存入本地指定的目标文件夹result,而没有像之前打印在控制台上(其实也可以打印的)

3.然后我们就开始运行Run → Run... →K_Means(主类名称)

我们可以通过idea的控制台看到各种info信息,和我们在spark上跑的信息一样。甚至如图还给了你端口号,说明虽然是本地运行,但还是启用了spark平台,也说明我们运行是成功的。

4.查看我们运行的结果,也就是我们是之前设定的输出路径

可以看到我们之前什么都没有的result文件夹下多了两个子文件夹(说明在输出时不存在的文件夹它会自行创建),里面就包含了我们Kmeans的结果,分别打开两个文件夹的part-00000

我们得到了我们想要的结果,Kmeans的本地模式运行也就成功结束了。

如果要转入到spark上运行,也就像之前的,更改sc,然后修改数据来源和输出路径为控制台输入的参数就行了。


至此结束,有任何问题欢迎指出(#^.^#)

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多