分享

在Hadoop集群上运行Mahout中的fpg算法

 whbsdu 2014-12-29
fpg是Mahout中实现的计算频繁项集的算法,下面我们来简单说一下在分布式环境下(Hadoop)利用fpg对交易数据进行频繁项集的挖掘。这里只是对fpg算法的简单应用,如果想对该算法进行深入的了解,可以下载Mahout的源码包.
首先要在集群上安装Hadoop和Mahout,配置好环境变量。运行mahout命令,会列出mahout包括的算法。运行mahout fpg会列出运行fpg算法所需要的参数
我们将交易数据存入HDFS中,这里我们以测试数据集accidents.dat为例。hadoop fs -put accident.dat /user/associationrules/data/
运行fpg算法,mahout fpg -i /user/associationrules/data/accidents.dat -o /user/associationrules/result  -method mapreduce -regex '[\ ]'  -s 2 
下面我们来看一下各个参数的含义:
-i:数据的输入路径,-o:是结果输出路径, -method:是处理方法,这里我们采用mapreduce并行的处理方式, -regex正则表达式模式,使用给定的字符串分割对数据进行分割,默认值分割符是逗号。-s :最小值支持度。
当然运行该算法时还可以添加其它的参数,不添加就采用默认值,具体各个参数的含义可以通过运行mahout fpg来进行查看。
当该运行完时,会将结果存到输出目录下,例如在HDFS中的目录:/user/associationrules/result/frequentpatterns/part-r-00000。part-r-00000是一个序列化的文件,我们需要通过mahout 的seqdumper算法进行转化。mahout seqdumper -i /user/associationrules/result/frequentpatterns/part-r-00000 -o /usr/local/result反序列化后的数据会存入result中。
当然我们在实际的应用中,数据处理部分占可很大的工作,因为你的数据很有可能,不是fpg算法所符合的格式,这就需要对原始数据进行清洗,最后生成fpg所符合的标准格式。
关于交易数据的处理
例如这样的数据格式
用户ID       .......    物品名称   ..........
10000001  .......      00001      ..........
10000002  .......      00001      ..........
10000001  .......      00002      ..........
10000003  .......      00002      ..........
我们最后要的是这样的数据结构
00001,00002
00001,00003
.........
每一条记录对应一个用户的所购买的物品
在应用中我通过一下脚本对原始交易数据集进行了处理
#coding=utf-8
'''
Created on 2012-8-27

@author: yuanzhen
'''

class ResolveFile():
    def removeSameRow(self):
        content = set()
        row = ''
        in_file = open('你的原始交易数据文件','r')
        out_file = open('中间临时文件','w')
        for line in in_file:
            line = line.strip().split('\t')
            row = '\t'.join(map(str,[line[0],line[10]]))
            content.add(row)
        for line in content:
            print line
            out_file.write(line + '\n')    
        out_file.close()
        in_file.close()   
    
    def merger(self): 
        in_file = open('中间临时文件','r')
        out_file = open('最终文件','w')
        dict = {}
        for line in in_file:
            line = line.strip().split('\t')
            dict[line[0]] = dict.get(line[0],'') + line[1] + ','
        for item in dict:    
            out_file.write(dict[item].strip(',') + '\n') 
        in_file.close()
        out_file.close()            
        
        
if __name__ == '__main__':
    resolveFile = ResolveFile()
    resolveFile.removeSameRow() 
    resolveFile.merger()
这里的最终文件就是fpg算法所需要的输入
注意不同版本的mahout在运行算法时可能需要的参数不同

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多