云计算实践教程课程主要内容第7章 Hadoop应用与实践47.1 HDFS基本操作块(Block)是文件存储的逻辑单元,默认块大小是64MB 每个块建立多个副本(备份),这些副本都尽量分布在不同的DataNode节点上 NameNode管理文件目录结构,执行文件系统的命名 空间操作决定Block到具体DataNode节点的映射通过两个核心文件fsimage和edits来维护和管理文件系统fsimage 是元数据镜像文件,存储某时段NameNode内存元数据信息,edits是操作日志文件57.1 HDFS基本操作DataNodeDa taNode在NameNode的指挥下进行Block的创建、删除和复制HDFS使用副本机制保证数据的安全性,这些副本分别存储在不同 的DataNode节点中,这样当一个节点停止服务后数据也不会丢失Secondary NameNode保存文件系统元数据的备份,以备 NameNode发生故障时进行数据恢复Secondary NameNode按照一定时间间隔从NameNode下载元数据信息(fsi mage,edits),把二者合并生成新的fsimage6HDFS Shell命令7HDFS Shell命令8HDFS Shell 命令9HDFS的Web接口Hadoop提供了可用的Web访问接口常用的几个端口50070端口,查看NameNode状态50075端 口,查看DataNode状态50030端口,查看JobTracker状态50060端口,查看TaskTracker状态块(Bloc k)Web页面只能浏览文件系统,不能创建或修改目录结构10 HDFS的Java访问接口-Hadoop URL使用Hadoop UR L读取HDFS文件通过java.net.URL对象打开一个数据流调用IOUtils类的静态方法copyBytes()将HDFS数据 流复制到标准输出流System.out中copyBytes(InputStream in,OutputStream out,int buffSize,boolean close)参数in表示输入流,out表示输出流,buffsize表示缓冲区大小,close是 布尔变量,表示复制完毕后是否关闭流。使用URL方式只能读取数据,不能写入数据11 HDFS的Java接口-FileSystem A PI使用FileSystem API读写数据FileSystem类封装了几乎所有的文件操作,例如创建目录,显示目录列表,读写HDF S文件,对HDFS文件的上传、下载以及删除等。调用IOUtils类的静态方法copyBytes()将HDFS数据流复制到标准输出流 System.out中使用FileSystem API操作文件的程序框架 oper ator() { 设置Configuration对象; 获取FileSystem对象; 进行文件操作; }12使用FileSystem API读写数据HDFS操作的常用方法创建目录public boolean mkdirs( Path f) throws显示目录文件列表public FileStatus[] listStatus (Path f) thr ows IOExcertion创建HDFS文件public FSDataOutputStream create(Path f) t hrows IOExcertion上传本地文件到HDFS文件系统public void?copyFromLocalFile(Pat h?src,?Path?dst)?throws?IOException把HDFS文件复制到本地文件系统public void co pyToLocalFile(Path src, Path dst) ?throws?IOException删除一个文件或目录 p ublic?boolean?delete(Path?f,?boolean?recursive)?throws?IOExceptio n 13 7.2MapReduce编程MapReduce是一种分布式计算模型,主要用于解决海量数据的计算问题。MapRedu ce的架构是一种主从结构主节点JobTracker,负责接收计算任务,把计算任务分配给TaskTracker执行,并监视TaskT racker的执行情况从节点TaskTracker,负责执行JobTracker分配的计算任务一个MapReduce任务的执行过程 可以表示为: Map:→[] Reduce: →[ 3,v3>] 这里<…>表示键值对,[...]表示列表。14 MapReduce处理流程MapReduce处理模块接 收输入的原始数据列表,将列表拆分成单独的键值对,发给Map对应的函数进行处理。Map函数按照定义的处理方法对 v1>进行处理,生成列表。列表中键值相同的数据对被排序合并成一个新的键值对,这个过程 称为Shuffle。Reduce对应的函数对新的键值对进行处理,生成最终结果,表示为列表。15 M apReduce程序结构典型MapReduce程序包括三个部分:Mapper、Reducer、作业执行Mapper:负责数据处理, 将输入的一个对映射到0个或多个中间格式的形式map()方法的默认实现 protected void map(KEYIN key, VALUEIN value,Context contex t) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); }16 MapReduce程序结构Reducer:接受来自各个Mapper的输出,根据对中的 key对输入数据排序,把具有相同key的值进行归并,通过迭代处理与指定key相关联的值,生成列表reduce()函 数的默认实现 protected void reduce(KEYIN key, Iterable IN> values, Context context ) throws IOException, InterruptedExc eption { for(VALUEIN value: values) { context.write((KEYOUT) key, (V ALUEOUT) value); } }17 MapReduce程序结构 作业执行:写驱动代码让程序运行起来配置并传递一个名为JobConf对象的作业给JobClient.runJob()以启动MapRe duce作业。JobConf对象将保持作业运行所需的全部配置参数。每个作业定制的基本参数包括输入路径、输出路径、Mapper类和R educer类,也可以重置默认的作业属性。18 MapReduce程序结构MapReduce程序的基本结构如下:public ?c lass ?MyJob extends ?Configured implements Tool {? ? ? ?/ ?自定义Ma pper/? public static class MapClass extends Mapper< KEYIN, V ALUEIN, KEYOUT, VALUEOUT > ?{ public void map(KEYIN k ey, VALUEIN value,Context context) throws IOException { ?//添加M apper内处理代码? ? ? ? ? ? ? ? }? ? ? }? ? ? /自定义Reducer/? ? ? publi c ?static class ?ReducerClass ?extends ?Reducer< KEYIN, VALUEIN, KEYOUT, VALUEOUT > ? {? ? ? ? ? ? ??public void reduce< KEYIN key , Iterable values, Context context) ?throws IOException ?{? ? ? ? ? ? ? ? ?//添加Reducer内处理代码? ? ? ? ? ? }? ? ? }? ? ? / MapReduce程序中的作业执行/public static void main(String[] args) throws IOException {? ? ?? ? ? ? //添加作业执行的驱动代码? ? ? } }19 MapReduce基本算法实验实验目的:理解分布式数据处理和存储的基本原理,掌握MapReduce工作机制和程序架构,基于MapR educe分布式框架实现Hadoop的基本算法实验要求:编写MapReduce程序实现Hadoop的基本算法,包括:单词计数、数据 去重、数据排序、单表关联、多表关联以及大矩阵乘法 20实验1 单词计数问题描述:单词计数的目标任务是统计给定文件中所有单词的出 现次数。例如:输入文件中的内容是: Hello world Hello hadoo p 输出结果为: Hello 2 hadoop 1 world 121实验1 单词计数设计思路:Hadoop对文本文件默认的解析规则是,一行一个 lue>,其中key是每一行的起始位置在文件中的偏移量,value是本行的文本内容。map1: 输入 <0,Hello world > 输出 map2: 输入: <11,Hello hadoop> 输出: o, 1>shuffle:对Map输出的按照key把value进行排序合并,即把 ke y相同的value并到一个集合中,形成,合并后的结果可以表示为: 22实验1 单词计数设计思路:在Reduce阶段,键相等的键值对调用一次 reduce()方法,对同一个单词的所有1值相加,形成新的键值对输出,输出的结果为: rld ,1> 23实验1 单词计数代码实现----Mapper类:static class MyMapper extends M apper{ final Text k2 = new Text(); //k2 存放一行中的单词 //v2 表示单词在该行中的出现次数 final IntWritable v2 = new IntWritable(1); /定义map方法,分割文本行中的单词,将 单词及其在该行中的出现次数1写入context。形参value表示一行文本/ protected void map( LongWritable key, Text value, Context context) throws IOException ,InterruptedException { //以空格分割文本 final Str ing[] splited = value.toString().split(" "); for (Stri ng word : splited) { k2.set(word); context .write(k2,v2); //把k2、v2写入到context中 } } } 24实验1 单词计数代码实现---- Reducer类:static class MyReducer extend s Reducer{ //v3表示单词出现的总 次数 final IntWritable v3 = new IntWritable(0); /定义reduce 方法,遍历map()方法输出的“值”的集合,将所有的“值”相加,得到单词的总出现次数。/ protected voi d reduce(Text key, Iterable values,Context context) throws IOException ,InterruptedException { int sum = 0; //sum存放该单词出现的总次数 for (IntWritable count : values) { sum += count.get(); } final Text k3 = key; //k3表示 单词,是最后输出的“键” v3.set(sum); //v3表示单词的总次数,是最后输出的“值” //将单词及其总次数作为写入context context.write(k3, v3); } } 25实验2 数据去重问题描述:所谓数据去重,就是对输入文件中出现次数超过一次的数据进行筛选,使其在输出文件中只出现一次。 例如,输入文件中的内容如下所示,其中每一行是一个数据: hello world hello you hello hadoop 数据去重后的输出结果是: hadoop hello world you26实验2 数据去重设计思路: 在Map阶段把生成的 e>中的key设置为数据,value任意。在Reduce阶段不管每个key所对应的value列表是什么,在reduce()方法中直 接将输入的key复制为输出的key,将输出的value设置为空并输出,就可以实现每个数据只出现一次的目标 。27实验2 数据去重代 码实现----Mapper类:private static class DuplicateMapper extends Mappe r { private NullWrit able nullWritable = NullWritable.get(); private Text word = new Text(); protected void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException { // 获取每行数据的值 String lineValue = value.toString(); // 设置每一行的值 word.set(lineValue); // 将key设置为每行数据,将v alue置为空,写回到context中 context.write(word, nullWritable); } }28实验2 数据去重代码实现----Reducer类:private static class Dupl icateReducer extends Reducer le> { private NullWritable nullWritable = NullWritable.get() ; protected void reduce(Text key, Iterable va lues, Context context) throws IOException, InterruptedException { //将输入的key作为输出的key,将输出的value设置为空,写回到context中 context .write(key, nullWritable); } }29实验3 数据排序问题描述:将原始数据按照一定规则排序后 输出。数值型数据按照数字从小到大排序,文本类型数据按照字典顺序对排序。设计思路:采用Hadoop默认的文本文件输入方式后,在Map 阶段把数据作为key输出,value任意(可以置为空)。在Reduce阶段获得Map输出的后,将相同key值 的value合并排序,形成格式。然后将value列表中的元素依次作为输出健值对中的value ,另外设置一个计数器用来统计当前输出值(即数据)的位次,这个位次就是输出健值对中的key。 30实验3 数据排序代码实现- ---Mapper类:private static class SortMapper extends Mapper table, Text, LongWritable, NullWritable> { private NullWrita ble mNullWritable = NullWritable.get(); private LongWritable mWord = new LongWritable(); protected void map(LongWritabl e key,Text value,Context context) throws IOException, Interrupted Exception { // 获取每行数据的值,并转换成LongWritable类型 Stri ng lineValue = value.toString(); long longValue = Long.v alueOf(lineValue); // 设置每一行的值 mWord.set(longVal ue); //将key设置为每行数据,将value置为空,写回到context中 contex t.write(mWord, mNullWritable); } }31实验3 数据排序代码实现----R educer类:private static class SortReducer extends Reducer able, NullWritable, LongWritable, LongWritable> { //mCounte r是全局变量,表示输出的位次 private LongWritable mCounter = new LongWritab le(1); protected void reduce(LongWritable key, Iterable llWritable> values,Context context) throws IOException, Interrupt edException { //以value列表中元素的个数为循环次数 for(NullW ritable nullWritable : values){ //以位次作为输出的key,以输入的key作 为输出的value context.write(mCounter, key); mCounter.set(mCounter.get() + 1); //位次增加1 } } }32实验4 单表关联问题描述:单表关联就是对输入文件中的原始数据进行挖掘,找出用 户所关心的数据。由于原始数据在一张表里,很多情况下需要对同一个数据表进行连接操作,因此称为单表关联。例如:输入文件中的内容描述的是 child-parent关系,文件中每一行数据包括两列,第一列表示child,第二列表示parent。样例输入为: Tom Lucy Lucy Mary Lucy Ben要求从原始数据中找出g randchild-grandparent关系,输出文件中每一行数据也包括两列,第一列表示grandchild,第二列表示gran dparent 。样例输出为: Tom Mary Tom Ben 33实验4 单表关联 设计思路: 把左表的parent列和右表的child列连接起来(这里的左表和右表是同一张表)。在连接得到的新表中把连接的两列(左 表的parent列和右表的child列)删除就可以得到需要的结果,即grandchild-grandparent关系。由于在Map Reduce的工作过程中会将相同key值的value合并,因此,如果把Map阶段输出结果中的key设置成需要连接的列,那么列相等的 value就会合并连接在一起。34实验5 多表关联问题描述:和单表关联类似,也是对原始数据进行挖掘,找出用户所关心的数据,不同的是 原始数据存放在多张表中。例如:输入两个文件,一个文件表示工厂信息,文件中每行一个数据,内容包括工厂名称和工厂所在城市的编号;另一个 文件表示地址信息,每行一个数据,内容包括城市编号和城市名称。要求从输入数据中找出工厂和其所在城市名称之间的对应关系,输出文件中每一 行数据包括两列,第一列是工厂名称,第二列是工厂所在城市的名称。 35实验5 多表关联设计思路: Map阶段对读入的每行数据进行 分割,如果这行数据属于工厂信息表,那么把工厂所在城市的编号作为key,把工厂名称和标志参数1作为value,形成左表并输出;如果这 行数据属于地址信息表,那么把工厂所在城市的编号作为key,把城市名称和标志参数2作为value,形成右表并输出。在Shuffle阶 段,把相同key值的value进行合并排序,每个key所对应的value列表中就包含了同一个城市编号所对应的工厂名和城市名。在Re duce阶段,对每个key所对应的value列表进行解析,将左表中的工厂名和右表中的城市名分别放在两个数组中,并对两个数组计算笛卡 尔积,输出结果。36实验6大矩阵乘法问题描述:大矩阵乘法的目标任务是,给定 MN矩阵A和NL矩阵B,将A和B两个矩阵相乘得到M L新矩阵C。设计思路:采用稀疏矩阵的存储方式,只存储矩阵中那些非零的数值,存储的数据包括矩阵元素的行号、列号和值。在Map阶段, 对于矩阵A的元素输出l个对,其中key表示的是该元素参与计算得到的C矩阵元素的行、列号,value表示的是 该元素所在的矩阵A、在矩阵中的列号和值。对于矩阵B的元素,则输出m个对,其中key表示的是该元素参与计算得 到的C矩阵元素的行、列号,value表示的是该元素所在的矩阵B、在矩阵中的行号和值。把key相同的value合并后,形成 list of value>对,由key可以确定对哪一个C矩阵元素进行计算,由 value列表中的value可知参与计算的元素有 哪些。在Reduce阶段,对value列表分析得到参与计算的A、B矩阵的元素,对A矩阵中列号与B矩阵中行号相同的元素相乘求和,得到 C矩阵元素。37 7.3 Hbase的基本操作38 Hbase安装部署-伪分布模式把hbase-0.94.7-security.t ar.gz复制到/usr/local。使用下面的命令解压hbase-0.94.7-security.tar.gz,并重命名为hba se。cd /usr/localtar -zxvf hbase-0.94.7-security.tar.gzmv hbase-0. 94.7-security hbase使用“vi /etc/profile”命令修改/etc/profile文件,在文件中增加以下 环境变量配置:export HBASE_HOME=/usr/local/hbaseexport PATH=.:$HADOOP_HO ME/bin:$JAVA_HOME/bin:$PATH:$HBASE_HOME/bin保存退出,使用命令“source /etc/ profile”使配置生效。 39 Hbase安装部署-伪分布模式修改$HBASE_HOME/conf/hbase-env.sh文 件,在文件中设置JAVA_HOME为JDK安装目录,并指定采用HBase内嵌的ZooKeeper管理集群,配置内容如下:expor t JAVA_HOME=/usr/local/jdkexport HBASE_MANAGES_ZK=true修改$HBASE_HO ME/conf/hbase-site.xml文件,配置以下内容:hbase.rootdir是文件目录(它的主机和端口号与fs.de fault.name的主机和端口号要一致。)HBase的运行模式,false表示单机模式,true表示分布式模式ZooKeeper 集群的地址列表,在伪分布模式下只有一台机器指定Hlog和Hfile的副本数(由于伪分布模式下DataNode只有一台,所以参数值设 为1)40 Hbase安装部署-伪分布模式在$HBASE_HOME/conf/regionservers文件中写入regionse rver主机名:master先使用命令“start-all.sh”启动Hadoop,然后启动HBase。启动HBase使用如下命令 :cd /usr/local/hbase/bin./start-hbase.sh使用jps命令查看系统的Java进程HBase启动 成功后,通过命令“hadoop fs -ls /”查看HDFS目录,显示在HDFS的根目录下多了一个hbase的目录。通过http ://masterIP:60010/master-status查看HBase当前状态及RegionServer的信息。停止HBas e,使用以下命令:cd /usr/local/hbase/bin./stop-hbase.sh41 Hbase的SHELL操作HB ase SHELL提供了对数据库的操作命令,包括创建、删除表,添加、删除、查看数据等。使用下面的命令可以启动HBase SHELL :/usr/local/hbase/bin/hbase shell42常用HBase SHELL命令创建表-create命令,命 令中要指明表名和列族信息>create ''user'',''user_id'',''user_addr'',''user_info‘列出HBa se中的全部表-list命令>list得到表的描述-describe命令>describe ''user''添加记录-put命令put ''user'',''m001'',''user_info:birthday'',''1996-06-17''put ''user'',''m001'' ,''user_info:name'',''zhao''put ''user'',''m001'',''user_addr: province '', ''jiangsu''put ''user'',''m001'',''user_addr:city'',''changzhou''put ''user'' ,''f001'',''user_info:birthday'',''1997-4-17''put ''user'',''f001'',''user_i nfo:favorite'',''sports''put ''user'',''f001'',''user_addr: province '',''j iangsu''put ''user'',''f001'',''user_addr: city '',''nanjing''put ''user'','' f001'',''user_addr:town'',''jianye''43常用HBase SHELL命令查看记录-get命令,命令中需要指 定行名,也可以通过指定列名来获取某列的信息。> get ''user'',''f001‘ //取得''user''表中''f001''行的所有数 据> get ''user'',''f001'',''user_info‘//获取''user''表中''f001''行''user_info''列族的 所有数据> get ''user'',''f001'',''user_info:favorite‘//获取''user''表中''f001''行'' user_info:favorite ''列的数据扫描全表-scan命令,可以通过指定列族名来查看某列的信息> scan ''user ‘//获取''user''表的所有信息> scan ''user'',{COLUMNS=>''user_info''}//获取''user''表 中''user_info''列族的所有信息> scan ''user'',{COLUMNS=>''user_info'',LIMIT=>1 }//获取''user''表中''user_info''列族的前一行的信息44常用HBase SHELL命令删除记录-delete命令,用 于删除表中指定列的相关数据。>delete ''user'',''m001'',''user_info:name''//删除''user''表''m 001''行的''user_info:name''列删除整行-deleteall命令。>deleteall ''user,''m001''// 删除''user''表''m001''行统计表的行数-count命令>count ''user‘//统计''user''表的行数清空表-trun cate命令>truncate ''user‘//清空''user''表删除表-drop命令,在删除表之前先执行disable ''表名'' 命令使表失效>disable ''user‘//使表失效>drop ‘user‘//删除表45HBase的Java API-配置HB ase通过HBaseConfiguration类对HBase进行配置,create()方法通过默认的HBase配置文件来创建Con figuration。在程序中配置HBase主要是指定HBase的数据存放位置和ZooKeeper集群位置。配置HBase的程序代 码如下:private static Configuration getConfiguration() { Configurat ion conf = HBaseConfiguration.create(); conf.set("hbase.rootdir" , "hdfs://master:9000/hbase"); conf.set("hbase.zookeeper.quorum" , "master"); return conf;}46HBase的Java API-创建表表的创建由HBaseAdmin类实现 ,HBaseAdmin类提供了createTable方法来创建一个新表,创建时要指定表的信息和表内列族的信息。表信息是通过HTab leDescriptor类描述,在实例化HTableDescriptor时,由name参数指明要创建的表的名称设置表名称后,使用H TableDescriptor实例的addFamily方法添加表中的列族,该方法需要传递一个列族描述实例HColumnDescri ptor。通常在创建或删除一张表之前,先要确定该表是否存在,HBaseAdmin类提供了tableExists方法来检查指定的表是 否存在。47HBase的Java API-创建表创建表的代码如下:public static void create(String tableName, String columnFamily) throws IOException{ HBaseAdmin a dmin = new HBaseAdmin(getConfiguration()); if (admin.tableExists( tableName)) { System.out.println("table exists!"); } else{ HTab leDescriptor tableDesc = new HTableDescriptor(tableName); tableD esc.addFamily(new HColumnDescriptor(columnFamily)); admin.create Table(tableDesc); System.out.println("create table success!"); } }48HBase的Java API-添加记录对表的读写、删除等操作定义在HTable类中。HTable类提供了put(Put pu t)方法写入数据。在实例化HTable后,首先创建一个Put类实例,指明待插入行的行关键字。然后通过Put实例的add方法将要写入 的数据传入Put实例,即插入指定行的“列族名:标签”及单元格的值。最后调用HTable的put方法添加一条记录,在put方法中传递 Put实例作为参数。49HBase的Java API-添加记录添加一条记录的代码如下:public static void put (String tableName, String row, String columnFamily, String column , String data) throws IOException{ HTable table = new HTable(getC onfiguration(), tableName); Put p1 = new Put(Bytes.toBytes(row)); p1.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes .toBytes(data)); table.put(p1); System.out.println("put''"+row+"'', "+columnFamily+":"+column+"'',''"+data+"''");}50HBase的Java API-读取记录H Table类提供了get(Get get)方法来获取表中特定行的数据。在调用get(Get get)方法时需要一个Get类的实例作 为参数,Get实例中指明了要获取的数据位置。在创建Get实例时,要指明待获取的数据的行关键字,如果要获取某一列族或某一列的数据,需 要使用Get实例的addFamily或addColumn方法设置具体的位置。调用get方法查询后的返回结果是一个Result类的实 例。51HBase的Java API-读取记录读取一行数据的代码如下:public static void get(String tableName, String row) throws IOException{ HTable table = new HTa ble(getConfiguration(), tableName); Get get = new Get(Bytes.toByt es(row)); Result result = table.get(get); System.out.println("Get : "+result);}52HBase的Java API-显示所有数据HTable类提供了getScanner(Scan sca n)方法用以扫描全表,该方法需要传递一个Scan类的实例作为参数。调用HTable实例的getScanner(Scan scan) 方法检索数据表,得到ResultScanner数据集,对ResultScanner数据集解析获取数据。public static void scan(String tableName) throws IOException{ HTable table = ne w HTable(getConfiguration(), tableName); Scan scan = new Scan(); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { System.out.println("Scan: "+result); }}53HBase的Java API-删除数据HTable类提供了delete(Delete delete)方法用以删除表中特定行的数据,可以删除单行数据,也可以删除多行数据。在调用delete方法时需要一个Delete类的实例作为参数,Delete实例中指明了要删除哪些行。如果要删除某一列族的数据,需要通过Delete实例的deleteFamily方法来指定列族。如果要删除某一列的数据,需要通过deleteColumn方法来指定列族及其某一列,该方法还可以通过指定时间戳来删除该列某一特定时间戳版本的值。54HBase的Java API-删除数据删除某一行数据的代码为:public static void delete(String tableName, String row) throws IOException{ HTable table = new HTable(getConfiguration(), tableName); Delete del = new Delete(Bytes.toBytes(row)); table.delete(del); System.out.println("delete: "+row);}55HBase的Java API-删除表HBaseAdmin类提供了deleteTable(String tableName)方法删除一张表。在删除之前需要通过disableTable(String tableName)方法先使该表失效。删除表的代码为:public static void delete(String tableName) throws IOException{ HBaseAdmin admin = new HBaseAdmin(getConfiguration()); if(admin.tableExists(tableName)){ try { admin.disableTable(tableName); admin.deleteTable(tableName); } catch (IOException e) { e.printStackTrace(); System.out.println("Delete "+tableName+" 失败"); } } System.out.println("Delete "+tableName+" 成功");} |
|