配色: 字号:
Spark3_Spark操作文件、Spark操作数据、Spark高级编程、Spark运行时架构
2022-09-15 | 阅:  转:  |  分享 
  
Spark操作文件、Spark操作数据、Spark高级编程、Spark运行时架构2018/12/6版本修改人修改记录修改时间V1.0王守奎编
写2018/5/23目录Spark操作文件1CSV文件操作1数据准备1读取程序1使用CSVWriter写入csv文件2scala中
操作sequenceFile2存储seqfile2加载seqfile2ObjectFile3存储ObjectFile3加载Obje
ctFile3使用压缩编解码保存文件3存储文件3加载文件4Spark操作数据4spark使用JdbcRDD访问RDMS数据(mys
ql)4spark操作HBase51.启动hbase52.修改环境变量63.编程6Spark高级编程7累加器(Accumulato
rs)7Accumulator能解决哪些问题?7使用Accumulator的注意事项7广播变量(broadcast)8数值型的rd
d操作9Spark运行时架构91.分布式模式下,使用master/slave主从模式102.driver职责10a.转换用户程序到
task10b.在executor上调度task103.executor104.Clustermanager105.spark
-submit10spark-submit的—master选项参数10spark-submit选项11Spark操作文件CSV文件
操作","分割文件CSVReader读取文件数据准备[csv.txt]1,tom,122,tomas,103,tomasLee,1
1读取程序scala>varrdd=sc.textFile("file:///home/hadoop/spark/csv.t
xt")scala>importau.com.bytecode.opencsv.CSVReaderscala>importja
va.io.StringReaderscala>rdd.map(x=>{valreader=newCSVReader(ne
wStringReader(x));valarr=reader.readNext();println(arr(0)+"
:"+arr(1)+":"+arr(2))(arr(0),arr(1),arr(2))}).collectscala>rd
d.map(x=>{valarr=x.split(",");println(arr(0)+":"+arr(1)+"
:"+arr(2))}).collect使用CSVWriter写入csv文件//写入scala>importjava.io.
StringWriterscala>importau.com.bytecode.opencsv.CSVWriterscala>i
mportjava.io.FileWriterscala>valarr=Array("name","tom","age",
"12")scala>vallist=newjava.util.ArrayList[Array[String]]()sca
la>list.add(arr)scala>valfw=newFileWriter("/home/hadoop/spark
/aa.txt")scala>valw=newCSVWriter(fw)//写入磁盘scala>w.writeAll(
list)scala>w.closescala中操作sequenceFile存储seqfilescala>valrdd=sc
.parallelize(Array((100,"tom"),(200,"tomas"),(300,"tomasLee"),(40
0,"tomson"),(500,"jerry"),(600,"bob")))scala>rdd.saveAsSequenceFi
le("file:///home/hadoop/spark/seq.seq");hdfsdfs-textfile:///ho
me/hadoop/spark/seq.seq/part-00000//读取文件使用hdfsdfs-text命令查看
本地文件系统文件加载seqfilescala>importorg.apache.hadoop.io._scala>valrdd
=sc.sequenceFile("file:///home/hadoop/spark/seq.seq/part-00000"
,classOf[IntWritable],classOf[String])scala>rdd.map(x=>println(x.
_1)).collectObjectFile存储ObjectFilescala>valrdd=sc.parallelize(
Array("tom","tomas","tomasLee"))scala>rdd.saveAsObjectFile("file:
///home/hadoop/spark/obj")加载ObjectFilescala>valrdd=sc.objectFi
le("file:///home/hadoop/spark/obj/part-0000")scala>rdd.count使用压缩
编解码保存文件存储文件scala>importorg.apache.hadoop.io.compress._scala>val
rdd=sc.parallelize(Array("tom","tomas","tomasLee"))scala>rdd.s
aveAsTextFile("file:///home/hadoop/spark/codec",classOf[Lz4Codec]
)//SnappyCodec、Lz4Codec...加载文件newAPIHadoopFile定义newAPIHadoopFi
le[K,V,F<:InputFormat[K,V]](path:String,fClass:Class[F],
kClass:Class[K],vClass:Class[V],conf:Configuration=hadoopC
onfiguration):RDD[(K,V)]newAPIHadoopFile[K,V,F<:InputFormat
[K,V]](path:String)(implicitkm:ClassTag[K],vm:ClassTag[V],
fm:ClassTag[F]):RDD[(K,V)]newAPIHadoopRDD[K,V,F<:InputForm
at[K,V]](conf:Configuration=hadoopConfiguration,fClass:Clas
s[F],kClass:Class[K],vClass:Class[V]):RDD[(K,V)]注意:1.在Hadoo
p中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为Hadoop本身有一个解压器会根据压缩文件的后缀推断解压算法进行解
压.2.如果用Spark从Hadoop中读取某种类型的数据不知道怎么读取的时候,上网查找一个使用map-reduce的时候是怎么读
取这种这种数据的,然后再将对应的读取方式改写成上面的hadoopRDD和newAPIHadoopRDD两个类就行了.scala>i
mportorg.apache.hadoop.mapreduce.lib.input._scala>valdata=sc.
newAPIHadoopFile[LongWritable,Text,TextInputFormat]("file:///home
/hadoop/spark/codec/part-00000.lz4").map(_._2.toString)scala>data
.collectSpark操作数据spark使用JdbcRDD访问RDMS数据(mysql)1.复制mysql驱动程序到spark
的classpath目录下/soft/spark/jarscpmysql-connector-java-6.0.6.jar/s
oft/spark/jars/编程scala>importorg.apache.spark._scala>importorg.
apache.spark.rdd.JdbcRDDscala>importjava.sql.{DriverManager,Resu
ltSet}scala>defconn()={Class.forName("com.mysql.cj.jdbc.Driver")
.newInstance();DriverManager.getConnection("jdbc:mysql://localhos
t:3306/sqoop","root","p@ssw0rd");}//定义连接scala>defextractValues
(r:ResultSet)={(r.getInt(1),r.getString(2))}//定义结果映射scala>va
ldata=newJdbcRDD(sc,conn,"selectfromcustomerwhere?<=i
dandid<=?",lowerBound=1,upperBound=3,numPartitions=2,
mapRow=extractValues)//id小于等于3,大于等于1scala>data.collectspark
操作HBase1.启动hbase>zkServer.shstart>start-hbase.sh>hbaseshellhbas
e>list//查看所有表信息hbase>scan''t1''hbase>desc''t1''hbase>put''t1'',''row
-1'',''f1:name'',''tom''2.修改环境变量vi/soft/spark/conf/spark-env.shexport
SPARK_DIST_CLASSPATH=$(hadoopclasspath):$(hbaseclasspath)3.编程s
cala>importorg.apache.hadoop.hbase.HBaseConfigurationscala>impor
torg.apache.hadoop.hbase.client.Resultscala>importorg.apache.ha
doop.hbase.io.ImmutableBytesWritablescala>importorg.apache.hadoo
p.hbase.mapreduce.TableInputFormatscala>valconf=HBaseConfigura
tion.create()scala>conf.set(TableInputFormat.INPUT_TABLE,"t1")
//whichtabletoscanscala>valrdd=sc.newAPIHadoopRDD(conf,cl
assOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf
[Result])//conf+table+hbasetablerowkey+resultsetscala>rdd.map(
x=>println(x.getClass)).collectscala>rdd.map(x=>println("========
==="+x._1)).collectscala>rdd.map(x=>println("==========="+org.apa
che.hadoop.hbase.util.Bytes.toString(x._1.get()))).collectscala>
rdd.map(x=>println("==========="+x._2)).collectscala>rdd.foreachP
artition(fp=>{fp.foreach({case(k,v)=>{valkd=newString(k
.get(),k.getOffset,k.getLength)valvd=v.rawCells()for(cell<-v
d){valrowid=newString(cell.getRowArray,cell.getRowOffset,cel
l.getRowLength)valfamily=newString(cell.getFamilyArray,cell.
getFamilyOffset,cell.getFamilyLength)valqulifier=newString(c
ell.getQualifierArray,cell.getQualifierOffset,cell.getQualifierLe
ngth)valvalue=newString(cell.getValueArray,cell.getValueOffs
et,cell.getValueLength)println(kd,rowid,family,qulifier,value)}
}})})Spark高级编程累加器(Accumulators)Accumulator即累加器,与Mapreducecounte
r的应用场景差不多,都能很好地观察task在运行期间的数据变化,Spark中的Accumulator各task可以对Accumul
ator值进行累加,但是最终的返回值只能在Driver端获取,同时原生支持Int和Double类型的Accumulator,也支持
对Accumulator自定义类型及命名,以便我们更好的对程序进行调优.Accumulator能解决哪些问题?1.能精确地统计数据
的各种属性。例如可以统计出符合userID的记录数,在一个时间段内产生了多少次购买,通常我们在ETL使用Accumulator去
统计出各种属性的数据2.轻量级的调试工具,能观测到每个task的信息。如通过Accumulator可以在SparkUI观测到每个
task所处理的记录数,如下图3.从集群的资源利用率来精确的测量出Spark应用的资源利用率,如通过Accumulator可以很
以知道有多少的数据是来自HDFS,shuffle所处理的数据量如何以及RDD的重新计算次数,这些都是我们Spark应用调优的有利信
息使用Accumulator的注意事项在Action算子中更新Accumulator,Spark保证在每个task对Accumul
ator只进行一次累加,即便是task重启也是如此,但注意在如果Accumulator是在transformation算子进行累加
的,那么一旦task失败或被重启,则Accumulator会被累加多次累加器,可以完成各个节点的计算变量回传给driverscal
a>vara=sc.accumulator(0)scala>valrdd=sc.parallelize(Array("
1","2","3","4","5"),3)scala>rdd.map(x=>{a+=1;}).collectscala>a广
播变量(broadcast)允许程序发送大型、只读的变量给所有的工作节点importscala.collection.mutab
le.ListBuffervalfactor=List[Int](1,2,3);valfactorBroadcast=
sc.broadcast(factor)valnums=Array(1,2,3,4,5,6,7,8,9)valnumsRd
d=sc.parallelize(nums,3)vallist=newListBuffer[List[Int]]()v
alresRdd=numsRdd.mapPartitions(ite=>{while(ite.hasNext){li
st+=ite.next()::(factorBroadcast.value)}list.iterator})resRdd.f
oreach(res=>println(res))首先生成了一个集合变量,把这个变量通过sparkContext的broadc
ast函数进行广播,最后在rdd的每一个partition迭代时,使用这个广播变量。数值型的rdd操作scala>valrdd
=sc.parallelize(Array(1,2,3,4,5))scala>rdd.mean()//平均值scala>r
dd.sampleVariance()//标准差scala>rdd.variance()//方差scala>rdd.s
tdev()//偏差Spark运行时架构1.分布式模式下,使用master/slave主从模式中央协调器(driver)+
分布式的workernodespark(app):相当于hadoop的job,或者storm的toplogysparkdri
ver:jvmexecutor:jvm2.driver职责a.转换用户程序到taskspark隐式创建operation的D
AG,运行时执行转换成planb.在executor上调度task3.executor在worknode负责运行单个task,在
sparkapp启动时启动(仅启动一次)运行task并返回结果给driver。提供RDD内存存储,以备让程序缓存数据。每个exe
cutor都有一个blockmanager4.Clustermanagerspark通过该对象启动executor,特殊时刻还
要启动dirver,可插拔,可以运行在几种clustermanager,yarn|mesos|standardalone.5.
spark-submitspark提供脚本spark-submit,可以提交脚本到集群spark-submit的—master选
项参数ValueExplanationspark://host:portspark的独立集群,port7077.mesos:/
/host:portConnecttoaMesosclustermasteratthespecifiedport
.BydefaultMesosmasterslistenonport5050.yarn连接到yarn集群,需要配
置HADOOP_CONF_DIR环境变量local单个corelocal[N]RuninlocalmodewithNc
ores.local[]和cpucore数相当spark-submit选项FlagExplanation--master选择上
表中的任意一个值--deploy-modedriver可以运行在client上,也可以运行在worknode上client:dr
iver运行在提交机器上cluster:driver在一个worknode上默认是client--classThe“main”
classofyourapplicationifyou’rerunningaJavaorScalaprog
ram.--nameAhuman-readablenameforyourapplication.Thiswillb
edisplayedinSpark’swebUI.--jarsAlistofJARfilestoupload
andplaceontheclasspathofyourapplication.Ifyourapplicat
iondependsonasmallnumberofthird-partyJARs,youcanaddth
emhere.--filesAlistoffilestobeplacedintheworkingdirect
oryofyourapplication.Thiscanbeusedfordatafilesthatyou
wanttodistributetoeachnode.--py-filesAlistoffilestobeaddedtothePYTHONPATHofyourapplication.Thiscancontain.py,.egg,or.zipfiles.--executor-memoryTheamountofmemorytouseforexecutors,inbytes.Suffixescanbeusedtospecifylargerquantitiessuchas“512m”(512megabytes)or“15g”(15gigabytes).--driver-memoryTheamountofmemorytouseforthedriverprocess,inbytes.Suffixescanbeusedtospecifylargerquantitiessuchas“512m”(512megabytes)or“15g”(15gigabytes).例如:>spark-submit--masterspark://localhost:7077--deploy-modecluster魁魁语录:继续加油魁魁语录:继续加油江湖一哥版权所有江湖一哥版权所有
献花(0)
+1
(本文系王守奎的图...原创)