一、Spark安装及服务启动Apache Spark是一种快速的集群计算技术,专为快速计算而设计。它基于Hadoop MapReduce,它扩展了MapReduce模型,以有效地将其用于更多类型的计算,包括交互式查询和流处理。 Spark的主要特性是它的内存中集群计算,提高了应用程序的处理速度(Spark 因为 RDD 是基于内存的,可以比较容易切成较小的块来处理。如果能对这些小块处理得足够快,就能达到低延时的效果)。 ![]() 比起 Hadoop MapReduce, Spark 本质上就是基于内存的更快的批处理,然后用足够快的批处理来实现各种场景
1、安装Scala下载并解压Scala cd /opt/scala
wget https://downloads.lightbend.com/scala/2.11.7/scala-2.11.7.tgz
tar -zxf scala-2.11.7.tgz
将Scala添加到环境变量
在最后面添加 export SCALA_HOME=/opt/scala/scala-2.11.7
export PATH=$PATH:$SCALA_HOME/bin
激活配置
2、Spark下载从官网下载和自己hadoop版本相匹配的spark安装包 3、解压安装文件并配置环境变量(1)解压安装文件 tar -zxvf spark-2.2.0-bin-hadoop2.7.tgz -C opt/spark
修改文件夹名字
(2)配置环境变量 export SPARK_HOME=/opt/spark/spark-2.2.0
export PATH=$SPARK_HOME/bin:$PATH
4、配置Spark需要修改的配置文件有两个
配置如下内容 # 配置JAVA_HOME
export JAVA_HOME=/opt/java/jdk1.8.0_144
# 配置SCALA_HOME
export SCALA_HOME=/opt/scala/scala-2.11.7
# 配置HADOOP
export HADOOP_HOME=/opt/hadoop/hadoop-2.7.6/
export HADOOP_CONF_DIR=/opt/hadoop/hadoop-2.7.6/etc/hadoop
#定义管理端口
export SPARK_MASTER_WEBUI_PORT=8088
#定义master域名和端口
export SPARK_MASTER_HOST=spark-master
export SPARK_MASTER_PORT=7077 # 提交Application的端口
export SPARK_MASTER_IP=10.141.211.80
# 定义work节点的管理端口
export SPARK_WORKER_WEBUI_PORT=8088
# 每一个Worker最多可以使用的cpu core的个数,真实服务器如果有32个,可以设置为32个
export SPARK_WORKER_CORES=10
# 每一个Worker最多可以使用的内存,真实服务器如果有128G,你可以设置为100G
export SPARK_WORKER_MEMORY=4g
(2)配置spark-defaults.conf
配置如下内容 spark.eventLog.enabled=true
spark.eventLog.compress=true
# 保存在本地
# spark.eventLog.dir=file://opt/hadoop/hadoop-2.7.6/logs/userlogs
# spark.history.fs.logDirectory=file://opt/hadoop/hadoop-2.7.6/logs/userlogs
# 保存在hdfs上
spark.eventLog.dir=hdfs://spark-master:9000/tmp/logs/root/logs
spark.history.fs.logDirectory=hdfs://spark-master:9000/tmp/logs/root/logs
spark.yarn.historyServer.address=spark-master:18080
5、启动Spark
二、PySpark安装pyspark是用来对接 spark的 Python 库 pip install pyspark
三、使用pyspark1、SparkContext声明SparkContext是任何spark功能的入口点。
2、一些基本操作(1)count from pyspark import SparkContext
sc = SparkContext('local', 'count app')
words = sc.parallelize(
['scala',
'java',
'hadoop',
'spark',
'akka',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
])
counts = words.count()
print('Number of elements in RDD -> %i' % counts)
(2)collect
(3)foreach from pyspark import SparkContext
sc = SparkContext('local', 'ForEach app')
words = sc.parallelize (
['scala',
'java',
'hadoop',
'spark',
'akka',
'spark vs hadoop',
'pyspark',
'pyspark and spark']
)
def f(x): print(x)
fore = words.foreach(f)
(4)filter
(5)map from pyspark import SparkContext
sc = SparkContext('local', 'Map app')
words = sc.parallelize(
['scala',
'java',
'hadoop',
'spark',
'akka',
'spark vs hadoop',
'pyspark',
'pyspark and spark']
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print('Key value pair -> %s' % (mapping))
(6)reduce
(7)join from pyspark import SparkContext
sc = SparkContext('local', 'Join app')
x = sc.parallelize([('spark', 1), ('hadoop', 4)])
y = sc.parallelize([('spark', 2), ('hadoop', 5)])
joined = x.join(y)
final = joined.collect()
print( 'Join RDD -> %s' % (final))
|
|
来自: 新用户61024634 > 《待分类》