Spark操作文件、Spark操作数据、Spark高级编程、Spark运行时架构2018/12/6版本修改人修改记录修改时间V1.0王守奎编 写2018/5/23目录Spark操作文件1CSV文件操作1数据准备1读取程序1使用CSVWriter写入csv文件2scala中 操作sequenceFile2存储seqfile2加载seqfile2ObjectFile3存储ObjectFile3加载Obje ctFile3使用压缩编解码保存文件3存储文件3加载文件4Spark操作数据4spark使用JdbcRDD访问RDMS数据(mys ql)4spark操作HBase51.启动hbase52.修改环境变量63.编程6Spark高级编程7累加器(Accumulato rs)7Accumulator能解决哪些问题?7使用Accumulator的注意事项7广播变量(broadcast)8数值型的rd d操作9Spark运行时架构91.分布式模式下,使用master/slave主从模式102.driver职责10a.转换用户程序到 task10b.在executor上调度task103.executor104.Clustermanager105.spark -submit10spark-submit的—master选项参数10spark-submit选项11Spark操作文件CSV文件 操作","分割文件CSVReader读取文件数据准备[csv.txt]1,tom,122,tomas,103,tomasLee,1 1读取程序scala>varrdd=sc.textFile("file:///home/hadoop/spark/csv.t xt")scala>importau.com.bytecode.opencsv.CSVReaderscala>importja va.io.StringReaderscala>rdd.map(x=>{valreader=newCSVReader(ne wStringReader(x));valarr=reader.readNext();println(arr(0)+" :"+arr(1)+":"+arr(2))(arr(0),arr(1),arr(2))}).collectscala>rd d.map(x=>{valarr=x.split(",");println(arr(0)+":"+arr(1)+" :"+arr(2))}).collect使用CSVWriter写入csv文件//写入scala>importjava.io. StringWriterscala>importau.com.bytecode.opencsv.CSVWriterscala>i mportjava.io.FileWriterscala>valarr=Array("name","tom","age", "12")scala>vallist=newjava.util.ArrayList[Array[String]]()sca la>list.add(arr)scala>valfw=newFileWriter("/home/hadoop/spark /aa.txt")scala>valw=newCSVWriter(fw)//写入磁盘scala>w.writeAll( list)scala>w.closescala中操作sequenceFile存储seqfilescala>valrdd=sc .parallelize(Array((100,"tom"),(200,"tomas"),(300,"tomasLee"),(40 0,"tomson"),(500,"jerry"),(600,"bob")))scala>rdd.saveAsSequenceFi le("file:///home/hadoop/spark/seq.seq");hdfsdfs-textfile:///ho me/hadoop/spark/seq.seq/part-00000//读取文件使用hdfsdfs-text命令查看 本地文件系统文件加载seqfilescala>importorg.apache.hadoop.io._scala>valrdd =sc.sequenceFile("file:///home/hadoop/spark/seq.seq/part-00000" ,classOf[IntWritable],classOf[String])scala>rdd.map(x=>println(x. _1)).collectObjectFile存储ObjectFilescala>valrdd=sc.parallelize( Array("tom","tomas","tomasLee"))scala>rdd.saveAsObjectFile("file: ///home/hadoop/spark/obj")加载ObjectFilescala>valrdd=sc.objectFi le("file:///home/hadoop/spark/obj/part-0000")scala>rdd.count使用压缩 编解码保存文件存储文件scala>importorg.apache.hadoop.io.compress._scala>val rdd=sc.parallelize(Array("tom","tomas","tomasLee"))scala>rdd.s aveAsTextFile("file:///home/hadoop/spark/codec",classOf[Lz4Codec] )//SnappyCodec、Lz4Codec...加载文件newAPIHadoopFile定义newAPIHadoopFi le[K,V,F<:InputFormat[K,V]](path:String,fClass:Class[F], kClass:Class[K],vClass:Class[V],conf:Configuration=hadoopC onfiguration):RDD[(K,V)]newAPIHadoopFile[K,V,F<:InputFormat [K,V]](path:String)(implicitkm:ClassTag[K],vm:ClassTag[V], fm:ClassTag[F]):RDD[(K,V)]newAPIHadoopRDD[K,V,F<:InputForm at[K,V]](conf:Configuration=hadoopConfiguration,fClass:Clas s[F],kClass:Class[K],vClass:Class[V]):RDD[(K,V)]注意:1.在Hadoo p中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为Hadoop本身有一个解压器会根据压缩文件的后缀推断解压算法进行解 压.2.如果用Spark从Hadoop中读取某种类型的数据不知道怎么读取的时候,上网查找一个使用map-reduce的时候是怎么读 取这种这种数据的,然后再将对应的读取方式改写成上面的hadoopRDD和newAPIHadoopRDD两个类就行了.scala>i mportorg.apache.hadoop.mapreduce.lib.input._scala>valdata=sc. newAPIHadoopFile[LongWritable,Text,TextInputFormat]("file:///home /hadoop/spark/codec/part-00000.lz4").map(_._2.toString)scala>data .collectSpark操作数据spark使用JdbcRDD访问RDMS数据(mysql)1.复制mysql驱动程序到spark 的classpath目录下/soft/spark/jarscpmysql-connector-java-6.0.6.jar/s oft/spark/jars/编程scala>importorg.apache.spark._scala>importorg. apache.spark.rdd.JdbcRDDscala>importjava.sql.{DriverManager,Resu ltSet}scala>defconn()={Class.forName("com.mysql.cj.jdbc.Driver") .newInstance();DriverManager.getConnection("jdbc:mysql://localhos t:3306/sqoop","root","p@ssw0rd");}//定义连接scala>defextractValues (r:ResultSet)={(r.getInt(1),r.getString(2))}//定义结果映射scala>va ldata=newJdbcRDD(sc,conn,"selectfromcustomerwhere?<=i dandid<=?",lowerBound=1,upperBound=3,numPartitions=2, mapRow=extractValues)//id小于等于3,大于等于1scala>data.collectspark 操作HBase1.启动hbase>zkServer.shstart>start-hbase.sh>hbaseshellhbas e>list//查看所有表信息hbase>scan''t1''hbase>desc''t1''hbase>put''t1'',''row -1'',''f1:name'',''tom''2.修改环境变量vi/soft/spark/conf/spark-env.shexport SPARK_DIST_CLASSPATH=$(hadoopclasspath):$(hbaseclasspath)3.编程s cala>importorg.apache.hadoop.hbase.HBaseConfigurationscala>impor torg.apache.hadoop.hbase.client.Resultscala>importorg.apache.ha doop.hbase.io.ImmutableBytesWritablescala>importorg.apache.hadoo p.hbase.mapreduce.TableInputFormatscala>valconf=HBaseConfigura tion.create()scala>conf.set(TableInputFormat.INPUT_TABLE,"t1") //whichtabletoscanscala>valrdd=sc.newAPIHadoopRDD(conf,cl assOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf [Result])//conf+table+hbasetablerowkey+resultsetscala>rdd.map( x=>println(x.getClass)).collectscala>rdd.map(x=>println("======== ==="+x._1)).collectscala>rdd.map(x=>println("==========="+org.apa che.hadoop.hbase.util.Bytes.toString(x._1.get()))).collectscala> rdd.map(x=>println("==========="+x._2)).collectscala>rdd.foreachP artition(fp=>{fp.foreach({case(k,v)=>{valkd=newString(k .get(),k.getOffset,k.getLength)valvd=v.rawCells()for(cell<-v d){valrowid=newString(cell.getRowArray,cell.getRowOffset,cel l.getRowLength)valfamily=newString(cell.getFamilyArray,cell. getFamilyOffset,cell.getFamilyLength)valqulifier=newString(c ell.getQualifierArray,cell.getQualifierOffset,cell.getQualifierLe ngth)valvalue=newString(cell.getValueArray,cell.getValueOffs et,cell.getValueLength)println(kd,rowid,family,qulifier,value)} }})})Spark高级编程累加器(Accumulators)Accumulator即累加器,与Mapreducecounte r的应用场景差不多,都能很好地观察task在运行期间的数据变化,Spark中的Accumulator各task可以对Accumul ator值进行累加,但是最终的返回值只能在Driver端获取,同时原生支持Int和Double类型的Accumulator,也支持 对Accumulator自定义类型及命名,以便我们更好的对程序进行调优.Accumulator能解决哪些问题?1.能精确地统计数据 的各种属性。例如可以统计出符合userID的记录数,在一个时间段内产生了多少次购买,通常我们在ETL使用Accumulator去 统计出各种属性的数据2.轻量级的调试工具,能观测到每个task的信息。如通过Accumulator可以在SparkUI观测到每个 task所处理的记录数,如下图3.从集群的资源利用率来精确的测量出Spark应用的资源利用率,如通过Accumulator可以很 以知道有多少的数据是来自HDFS,shuffle所处理的数据量如何以及RDD的重新计算次数,这些都是我们Spark应用调优的有利信 息使用Accumulator的注意事项在Action算子中更新Accumulator,Spark保证在每个task对Accumul ator只进行一次累加,即便是task重启也是如此,但注意在如果Accumulator是在transformation算子进行累加 的,那么一旦task失败或被重启,则Accumulator会被累加多次累加器,可以完成各个节点的计算变量回传给driverscal a>vara=sc.accumulator(0)scala>valrdd=sc.parallelize(Array(" 1","2","3","4","5"),3)scala>rdd.map(x=>{a+=1;}).collectscala>a广 播变量(broadcast)允许程序发送大型、只读的变量给所有的工作节点importscala.collection.mutab le.ListBuffervalfactor=List[Int](1,2,3);valfactorBroadcast= sc.broadcast(factor)valnums=Array(1,2,3,4,5,6,7,8,9)valnumsRd d=sc.parallelize(nums,3)vallist=newListBuffer[List[Int]]()v alresRdd=numsRdd.mapPartitions(ite=>{while(ite.hasNext){li st+=ite.next()::(factorBroadcast.value)}list.iterator})resRdd.f oreach(res=>println(res))首先生成了一个集合变量,把这个变量通过sparkContext的broadc ast函数进行广播,最后在rdd的每一个partition迭代时,使用这个广播变量。数值型的rdd操作scala>valrdd =sc.parallelize(Array(1,2,3,4,5))scala>rdd.mean()//平均值scala>r dd.sampleVariance()//标准差scala>rdd.variance()//方差scala>rdd.s tdev()//偏差Spark运行时架构1.分布式模式下,使用master/slave主从模式中央协调器(driver)+ 分布式的workernodespark(app):相当于hadoop的job,或者storm的toplogysparkdri ver:jvmexecutor:jvm2.driver职责a.转换用户程序到taskspark隐式创建operation的D AG,运行时执行转换成planb.在executor上调度task3.executor在worknode负责运行单个task,在 sparkapp启动时启动(仅启动一次)运行task并返回结果给driver。提供RDD内存存储,以备让程序缓存数据。每个exe cutor都有一个blockmanager4.Clustermanagerspark通过该对象启动executor,特殊时刻还 要启动dirver,可插拔,可以运行在几种clustermanager,yarn|mesos|standardalone.5. spark-submitspark提供脚本spark-submit,可以提交脚本到集群spark-submit的—master选 项参数ValueExplanationspark://host:portspark的独立集群,port7077.mesos:/ /host:portConnecttoaMesosclustermasteratthespecifiedport .BydefaultMesosmasterslistenonport5050.yarn连接到yarn集群,需要配 置HADOOP_CONF_DIR环境变量local单个corelocal[N]RuninlocalmodewithNc ores.local[]和cpucore数相当spark-submit选项FlagExplanation--master选择上 表中的任意一个值--deploy-modedriver可以运行在client上,也可以运行在worknode上client:dr iver运行在提交机器上cluster:driver在一个worknode上默认是client--classThe“main” classofyourapplicationifyou’rerunningaJavaorScalaprog ram.--nameAhuman-readablenameforyourapplication.Thiswillb edisplayedinSpark’swebUI.--jarsAlistofJARfilestoupload andplaceontheclasspathofyourapplication.Ifyourapplicat iondependsonasmallnumberofthird-partyJARs,youcanaddth emhere.--filesAlistoffilestobeplacedintheworkingdirect oryofyourapplication.Thiscanbeusedfordatafilesthatyou wanttodistributetoeachnode.--py-filesAlistoffilestobeaddedtothePYTHONPATHofyourapplication.Thiscancontain.py,.egg,or.zipfiles.--executor-memoryTheamountofmemorytouseforexecutors,inbytes.Suffixescanbeusedtospecifylargerquantitiessuchas“512m”(512megabytes)or“15g”(15gigabytes).--driver-memoryTheamountofmemorytouseforthedriverprocess,inbytes.Suffixescanbeusedtospecifylargerquantitiessuchas“512m”(512megabytes)or“15g”(15gigabytes).例如:>spark-submit--masterspark://localhost:7077--deploy-modecluster魁魁语录:继续加油魁魁语录:继续加油江湖一哥版权所有江湖一哥版权所有 |
|