配色: 字号:
part3-数据的读取与保存
2018-06-18 | 阅:  转:  |  分享 
  
数据读取与保存代码参考:https://www.cnblogs.com/tonglin0325/p/6682367.html1文件格式1.
1读取文本文件注:必须注意RDD中元素的概念。常用有两个方式:textFile()读取一个文件,生成的RDD中的每个元素就是该文件
中的每一行(textFile()以回车来划分存储元素)wholeTextFiles(“D:/test/.txt”)读取多个文件
,并生成键值类型的RDD,其中键就是文件名,而value就是文件内容。一个文件的全部内容就是RDD中的一个元素,多个文件的内容就相
当于一个RDD中的多个元素。(不以回车作为元素划分),不是将所有文件的内容合并了,而是每个文件中的内容独立,并以键值的形式存储在R
DD中,做为元素。wholeTextFiles还支持通配符,这一点在实际应用中十分重要(求取不同阶段销售数据的平均值)。注意区分上
述两种方法,这个在对RDD中元素处理如求取平均值时是十分重要的。案例:求不同阶段销售的平均值首先实际应用中,我们不会将同一时段的
数据写入同一个文件中,而是将该阶段的数据分别写入不同文件中。但这些不同文件之间是有关系的,他们属于同一时段。现在要求出该时段内销售
的平均值,也就是求出该时段所包含的文件的总内容的平均值。这时我们利用wholeTextFile()先求出该时段内,每个文件内容的平
均值。然后再将该时段内各个文件内容的平均值取和在平均。最终得出该时段内全部内容的平均值。在利用wholeTextFile方法时,
要注意他是将一个文件中的全部内容作为RDD中的一个元素。那么在我们对该文件中的内容进行split(“,”)的时候,若该文件中的内
容是多行,那么每行的末尾就会存在“回车”,textFile()处理该文件时,会根据“\n”来识别一个元素的结束,但wholeTex
tFile则不会,它会将“回车”视为一个元素,由于我们是按“,”进行分割,那么就会存在一些分割出来的元素,他们是包含/n的,如2/
n3,这样的元素在控制台上上下显示,而在之后的求平均值运算中就会存现数据格式异常因为这样的元素没法转换为double类型,因此我们
需要将这样的元素进行split(“\n”)处理,并利用成员集合,将其存储在其中。每个文件平均值的结果整体的平均值整体平均
值结果1.2文件的保存saveAsTextField(“路径”)Json2.1读取json(字符串------>对象)的过程
。我们是借助对象来对json数据进行解析的,将json数据存储到对象中。读取json格式的文件,一般我们就是先读取文本文件,然后再
解析该文本文件中的json格式的数据。在scala中提供了很多用来解析json的函数库。如:fasterxmlJson4s无论使
用哪种方式去解析json格式数据,都必须提供一个类,通过反射的方式用这个类中相应的属性去解析json中字段的含义。这种方法假设文件
中的每一行都是一条JSON记录。如果你有跨行的JSON数据,你就只能读入整个文件,这是因为对于textFile()读取文件
内容时,会将文件中每行数据作为rdd中的每个元素。必须注意:要求自定义类中的属性名与源文件中json相应字段名相同。使用fast
erxml库来解析jsonimportcom.fasterxml.jackson.databind.ObjectMapperi
mportorg.apache.spark.rdd.RDDimportorg.apache.spark.{SparkConf,
SparkContext}objectMyPartition{defmain(args:Array[String])
:Unit={valconf=newSparkConf().setMaster("local").setAppNa
me("JSON")valsc=newSparkContext(conf)valrdd:RDD[String]=
sc.textFile("E:/test/json.txt")//定义解析库对象varmapper=newObjec
tMapper()valvalue:RDD[Person]=rdd.flatMap(record=>{Some(
mapper.readValue(record,classOf[Person]))})valresult=value
result.collect().foreach(c=>println(c.name+","+c.age))}}Pers
on类需要进行序列化,同时还需要具有无参构造结果:源文件注:一般结果显示无法解析文件,多数都是我们的json文件格式有问题Jso
n4s解析Json2.2Json存储对象------>字符串的过程。我们任然用json的解析库的对象mapper,将对象
中存储的json格式的数据,在此解析成字符串。读与写是两个相反的过程:读:利用json解析器将字符串解析成json格式数据,并存放
于自定义对象中。写:利用json解析器将对象中存储的json格式数据转化为字符串,并通过saveAsTextFile(“D:/”)
写入到指定路径中Json的读写:注:(1)我们在使用saveAsTextFile的时候,由于saveAsTextFile是继承了s
aveAsHadoopFile因此他跟hadoop有关,我们需要在c:/window/system32中添加一个文件hadoop.
hll这与我们当初在用eclipse开发hdfs的时候是一样的。数据源:结果:如果我们对读取后的json不转成字符串,则由于RDD
1中包含的是对象,若直接执行Rdd1.saveAsTextFile(),则相当于将对象直接输出。结果:CSV逗号分隔符CSV于j
son类似,都代表的是一种数据格式,并存储在文本文件中。Csv是逗号分隔,其中的每个元素都是用逗号分隔的。于json不同的是,cs
v没有字段的定义。sj_mino1001.jpg,715282,4FB55FE8,sj_mino1002.jpg,471289,9
3203C5C,sj_mino1003.jpg,451929,C4E80467,3.1csv的读取同样,先从文本文件读取字符串,然
后利用opencsv库将该csv解析出来并存于RDD中的Array中第一种方式textFile结果:方式二:wholeTextF
ile()关键是要注意我们所导入的包3.2、csv存储4文件的压缩与序列化序列化就是将对象转为字节序列,数据只有通过序列化后才能进
行网络通信和数据压缩。通过数据压缩能够减小内存占用以及IO和网络数据传输开销,提示Spark整体的应用性能。4.1序列化Sp
ark序列化类图:(spark提供了两种序列化类,JavaSerializer,KryoSerializer)spark支持两
种序列化:spark默认使用Java序列化,但效率低spark支持Kyro的序列化当使用SparkContext的saveAsOb
jectFile方法将对象序列化到文件,以及通过objectFile方法将对象从文件反序列出来的时候,Spark默认使用Java的
序列化以及反序列化机制,通常情况下,这种序列化机制是很低效的,Spark支持使用Kyro作为对象的序列化和反序列化机制,序列化的速
度比java更快,但是使用Kyro时要注意,Kyro目前还是有些bug。Spark默认是使用Java的ObjectOutputS
tream框架,它支持所有的继承于java.io.Serializable序列化,如果想要进行调优的话,可以通过继承java.io
.Externalizable。这种格式比较大,而且速度慢。Spark还支持这种方式Kryoserialization,它的速度
快,而且压缩比高于Java的序列化,但是它不支持所有的Serializable格式,并且需要在程序里面注册。它需要在实例化Spar
kContext之前进行注册序列化conf.set("spark.serializer","org.apache.spark
.serializer.KryoSerializer")//如果是自定义数据类型conf.set("spark.seriali
zer","org.apache.spark.serializer.KryoSerializer")conf.registerK
ryoClasses(Array(classOf[ApacheLog]))4.2压缩Spark内置提供了三种压缩方式,这三种压缩
方式需要借助各自的第三方库来实现压缩和解压功能。Snappy:提供了更高的压缩速度LZF:提供了更高的压缩比LZ4:提供了更高的压
缩速度和压缩比5文件系统5.1本地文件系统valrdd=sc.textFile("file:///home/holden/
happypandas.gz")5.2AmazonS3AmazonSimpleStorageService(Amazo
nS3)是一种对象存储,它具有简单的Web服务接口,可用于在Web上的任何位置存储和检索任意数量的数据。它能够提供9
9.999999999%的持久性,并且可以在全球大规模传递数万亿对象。https://www.cnblogs.com/web42
4/p/6840207.html5.3HDFS6sparkSQL中的结构化数据(1)Hive(2)JsonSparkSQL
也可以自动推断出它们的结构信息,并将这些数据读取为记录,这样就可以使得提取字段的操作变得很简单。数据库7.1jdbc连接数据库这
里只需要了解一下JdbcRDD的工作原理即可,事实上JdbcRDD的应用由于语法的设计导致他十分受限1、getConnection
返回一个已经打开的结构化数据库连接,JdbcRDD会自动维护关闭。2、sql是查询语句,此查询语句必须包含两处占位符?来作为
分割数据库ResulSet的参数,例如:"selecttitle,authorfrombookswhere?<=
idandid<=?"3、lowerBound,upperBound,numPartitions分别为第一、第二占位符,partition的个数。例如,给出lowebound1,upperbound20,numpartitions2,则查询分别为(1,10)与(11,20)4、mapRow是转换函数,将返回的ResultSet转成RDD需用的单行数据,此处可以选择Array或其他,也可以是自定义的caseclass。默认的是将ResultSet转换成一个Object数组。参考:https://www.iteblog.com/archives/1113.html
献花(0)
+1
(本文系实习生101首藏)