Hadoop Streaming提供了一个便于进行MapReduce编程的工具包,使用它可以基于一些可执行命令、脚本语言或其他编程语言来实现Mapper和 Reducer,从而充分利用Hadoop并行计算框架的优势和能力,来处理大数据 好吧我承认以上这句是抄的以下是原创干货 首先部署hadoop环境,这点可以参考 http://www./install-hadoop-in-centos/ 好吧原创从下一行开始 部署hadoop完成后,需要下载hadoop-streaming包,这个可以到http://www./Code/JarDownload/hadoop-streaming/hadoop-streaming-0.23.6.jar.zip去下载,或者访问http://www./Code/JarDownload/hadoop-streaming/选择最新版本,千万不要选择source否则后果自负,选择编译好的jar包即可,放到/usr/local/hadoop目录下备用 接下来是选择大数据统计的样本,我在阿里的天池大数据竞赛网站下载了母婴类购买统计数据,记录了900+个萌萌哒小baby的购买用户名、出生日期和性别信息,天池的地址https://tianchi.shuju.aliyun.com/datalab/index.htm 数据是一个csv文件,结构如下: 用户名,出生日期,性别(0女,1男,2不愿意透露性别) 比如:415971,20121111,0(数据已经脱敏处理) 下面我们来试着统计每年的男女婴人数 接下来开始写mapper程序mapper.py,由于hadoop-streaming是基于Unix Pipe的,数据会从标准输入sys.stdin输入,所以输入就写sys.stdin #!/usr/bin/python # -*- coding: utf-8 -*- import sys for line in sys.stdin: line = line.strip() data = line.split(',') if len(data)<3: continue user_id = data[0] birthyear = data[1][0:4] gender = data[2] print >>sys.stdout,"%s\t%s"%(birthyear,gender) 一个很简单的程序,看不懂的请自行提高姿势水平 下面是reduce程序,这里大家需要注意一下,map到reduce的期间,hadoop会自动给map出的key排序,所以到reduce中是一个已经排序的键值对,这简化了我们的编程工作 我是有洪荒之力的reducer.py,和外面的哪些妖艳贱货不一样 #!/usr/bin/python # -*- coding: utf-8 -*- import sys gender_totle = {'0':0,'1':0,'2':0} prev_key = False for line in sys.stdin:#map的时候map中的key会被排序 line = line.strip() data = line.split('\t') birthyear = data[0] curr_key = birthyear gender = data[1] #寻找边界,输出结果 if prev_key and curr_key !=prev_key:#不是第一次,并且找到了边界 print >>sys.stdout,"%s year has female %s and male %s"%(prev_key,gender_totle['0'],gender_totle['1'])#先输出上一次统计的结果 prev_key = curr_key gender_totle['0'] = 0 gender_totle['1'] = 0 gender_totle['2'] = 0#清零 gender_totle[gender] +=1#开始计数 else: prev_key = curr_key gender_totle[gender] += 1 #输出最后一行 if prev_key: print >>sys.stdout,"%s year has female %s and male %s"%(prev_key,gender_totle['0'],gender_totle['1']) 接下来就是将样本和mapper reducer上传到hdfs中并执行了,这也是我踩坑的地方 可以先这样测试下python脚本是否正确 cat sample.csv | ./mapper.py | sort -t ' ' -k 1 | ./reducer.py
首先要在hdfs中创建相应的目录,为了方便,我将一部分hadoop命令做了别名 alias stop-dfs='/usr/local/hadoop/sbin/stop-dfs.sh' alias start-dfs='/usr/local/hadoop/sbin/start-dfs.sh' alias dfs='/usr/local/hadoop/bin/hdfs dfs' echo "alias stop-dfs='/usr/local/hadoop/sbin/stop-dfs.sh'" >> /etc/profile echo "alias start-dfs='/usr/local/hadoop/sbin/start-dfs.sh'" >> /etc/profile echo "alias dfs='/usr/local/hadoop/bin/hdfs dfs'" >> /etc/profile
启动hadoop后,先创建一个用户目录 dfs -mkdir -p /user/root 将样本上传到此目录中 dfs -put ./sample.csv /user/root 当然也可以这样处理更加规范,这两者的差别一会儿会说 dfs -mkdir -p /user/root/input dfs -put ./sample.csv /user/root/input 接下来将mapper.py和reducer.py上传到服务器上,切换到上传以上两个文件的目录 然后就可以执行了,执行命令如下 hadoop jar /usr/local/hadoop/hadoop-streaming-0.23.6.jar -D mapred.job.name="testhadoop" -D mapred.job.queue.name=testhadoopqueue -D mapred.map.tasks=50 -D mapred.min.split.size=1073741824 -D mapred.reduce.tasks=10 -D stream.num.map.output.key.fields=1 -D num.key.fields.for.partition=1 -input sample.csv -output output-streaming -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 如果是将sample.csv放到input下,这个命令就应该这么写,不过反正我也没试过,出错了不关我的事 hadoop jar /usr/local/hadoop/hadoop-streaming-0.23.6.jar -D mapred.job.name="testhadoop" -D mapred.job.queue.name=testhadoopqueue -D mapred.map.tasks=50 -D mapred.min.split.size=1073741824 -D mapred.reduce.tasks=10 -D stream.num.map.output.key.fields=1 -D num.key.fields.for.partition=1 -input input/sample.csv -output output-streaming -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 命令的解释如下
(1)-input:输入文件路径
接下来就是激动人心的一刻了,要非常用力地跪着按下enter键 如果有报错output-streaming already exists就用命令dfs -rm -R /user/root/output-streaming 然后跳起来按下enter键 即使出现奇怪的刷屏也不要惊奇恩妈妈是这么教我的 如果出现以下字样就是成功了 16/08/18 18:35:20 INFO mapreduce.Job: map 100% reduce 100% 16/08/18 18:35:20 INFO mapreduce.Job: Job job_local926114196_0001 completed successfully 之后使用如下命令将结果取回本地,使用cat命令就能查看 dfs -get /user/root/output-streaming/* ./output-streaming cat ./output-streaming/* 很惭愧,只做了一点微小的工作 |
|
来自: jasonbetter > 《Hadoop》