分享

搭建Spark计算平台 python操作Spark

 新用户61024634 2021-09-12

一、Spark安装及服务启动

Apache Spark是一种快速的集群计算技术,专为快速计算而设计。它基于Hadoop MapReduce,它扩展了MapReduce模型,以有效地将其用于更多类型的计算,包括交互式查询和流处理。 Spark的主要特性是它的内存中集群计算,提高了应用程序的处理速度(Spark 因为 RDD 是基于内存的,可以比较容易切成较小的块来处理。如果能对这些小块处理得足够快,就能达到低延时的效果)。


比起 Hadoop MapReduce, Spark 本质上就是基于内存的更快的批处理,然后用足够快的批处理来实现各种场景

1、安装Scala

下载并解压Scala

cd /opt/scala wget https://downloads.lightbend.com/scala/2.11.7/scala-2.11.7.tgz tar -zxf scala-2.11.7.tgz

将Scala添加到环境变量

vi /etc/profile

在最后面添加

export SCALA_HOME=/opt/scala/scala-2.11.7 export PATH=$PATH:$SCALA_HOME/bin

激活配置

source /etc/profile

2、Spark下载

官网下载和自己hadoop版本相匹配的spark安装包

3、解压安装文件并配置环境变量

(1)解压安装文件
解压安装文件到指定的的文件夹 /opt/spark

tar -zxvf spark-2.2.0-bin-hadoop2.7.tgz -C opt/spark

修改文件夹名字

cd /opt/spark/
mv spark-2.2.0-bin-hadoop2.7 spark-2.2.0

(2)配置环境变量

export SPARK_HOME=/opt/spark/spark-2.2.0 export PATH=$SPARK_HOME/bin:$PATH

4、配置Spark

需要修改的配置文件有两个
spark-env.sh,spark-defaults.conf
(1)配置spark-env.sh

cd /opt/spark/spark-2.2.0/conf
mv spark-env.sh.template spark-env.sh
vim spark-env.sh

配置如下内容

# 配置JAVA_HOME export JAVA_HOME=/opt/java/jdk1.8.0_144 # 配置SCALA_HOME export SCALA_HOME=/opt/scala/scala-2.11.7 # 配置HADOOP export HADOOP_HOME=/opt/hadoop/hadoop-2.7.6/ export HADOOP_CONF_DIR=/opt/hadoop/hadoop-2.7.6/etc/hadoop #定义管理端口 export SPARK_MASTER_WEBUI_PORT=8088 #定义master域名和端口 export SPARK_MASTER_HOST=spark-master export SPARK_MASTER_PORT=7077 # 提交Application的端口 export SPARK_MASTER_IP=10.141.211.80 # 定义work节点的管理端口 export SPARK_WORKER_WEBUI_PORT=8088 # 每一个Worker最多可以使用的cpu core的个数,真实服务器如果有32个,可以设置为32个 export SPARK_WORKER_CORES=10 # 每一个Worker最多可以使用的内存,真实服务器如果有128G,你可以设置为100G export SPARK_WORKER_MEMORY=4g

(2)配置spark-defaults.conf

cd /opt/spark/spark-2.2.0/conf
vim spark-defaults.conf

配置如下内容

spark.eventLog.enabled=true spark.eventLog.compress=true # 保存在本地 # spark.eventLog.dir=file://opt/hadoop/hadoop-2.7.6/logs/userlogs # spark.history.fs.logDirectory=file://opt/hadoop/hadoop-2.7.6/logs/userlogs # 保存在hdfs上 spark.eventLog.dir=hdfs://spark-master:9000/tmp/logs/root/logs spark.history.fs.logDirectory=hdfs://spark-master:9000/tmp/logs/root/logs spark.yarn.historyServer.address=spark-master:18080

5、启动Spark

sbin/start-all.sh 

二、PySpark安装

pyspark是用来对接 spark的 Python 库

pip install pyspark

三、使用pyspark

1、SparkContext声明

SparkContext是任何spark功能的入口点。

from pyspark import SparkContext
sc = SparkContext('local', 'First App')

2、一些基本操作

(1)count
返回RDD中的元素个数

from pyspark import SparkContext sc = SparkContext('local', 'count app') words = sc.parallelize( ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark' ]) counts = words.count() print('Number of elements in RDD -> %i' % counts)

(2)collect
返回RDD中的所有元素

from pyspark import SparkContext
sc = SparkContext('local', 'collect app')
words = sc.parallelize(
    ['scala',
     'java',
     'hadoop',
     'spark',
     'akka',
     'spark vs hadoop',
     'pyspark',
     'pyspark and spark'
     ])
coll = words.collect()
print('Elements in RDD -> %s' % coll)

(3)foreach
仅返回满足foreach内函数条件的元素。在下面的示例中,我们在foreach中调用print函数,该函数打印RDD中的所有元素。

from pyspark import SparkContext sc = SparkContext('local', 'ForEach app') words = sc.parallelize ( ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark'] ) def f(x): print(x) fore = words.foreach(f)

(4)filter
返回一个包含元素的新RDD,它满足过滤器内部的功能。在下面的示例中,我们过滤掉包含''spark'的字符串。

from pyspark import SparkContext
sc = SparkContext('local', 'Filter app')
words = sc.parallelize(
    ['scala',
     'java',
     'hadoop',
     'spark',
     'akka',
     'spark vs hadoop',
     'pyspark',
     'pyspark and spark']
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print('Fitered RDD -> %s' % (filtered))

(5)map
通过将该函数应用于RDD中的每个元素来返回新的RDD。在下面的示例中,我们形成一个键值对,并将每个字符串映射为值1

from pyspark import SparkContext sc = SparkContext('local', 'Map app') words = sc.parallelize( ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark'] ) words_map = words.map(lambda x: (x, 1)) mapping = words_map.collect() print('Key value pair -> %s' % (mapping))

(6)reduce
执行指定的可交换和关联二元操作后,将返回RDD中的元素。在下面的示例中,我们从运算符导入add包并将其应用于'num'以执行简单的加法运算。说白了和Python的reduce一样:假如有一组整数[x1,x2,x3],利用reduce执行加法操作add,对第一个元素执行add后,结果为sum=x1,然后再将sum和x2执行add,sum=x1 x2,最后再将x2和sum执行add,此时sum=x1 x2 x3。

from pyspark import SparkContext
from operator import add
sc = SparkContext('local', 'Reduce app')
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print('Adding all the elements -> %i' % (adding))

(7)join
它返回RDD,其中包含一对带有匹配键的元素以及该特定键的所有值。

from pyspark import SparkContext sc = SparkContext('local', 'Join app') x = sc.parallelize([('spark', 1), ('hadoop', 4)]) y = sc.parallelize([('spark', 2), ('hadoop', 5)]) joined = x.join(y) final = joined.collect() print( 'Join RDD -> %s' % (final))

官方文档

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多