(点击上方蓝字,可快速关注我们)
背景 某平行世界程序猿小张接到Boss一项任务,统计用户反馈内容中的单词出现次数,以便分析用户主要习惯。文本如下:
小张作为蓝翔高材生,很快就实现了:
作为有上进心的青年,小张决心对算法进行抽象封装,并支持多节点计算。小张把这个统计次数程序分成两个大步骤:分解和计算。 第一步:先把文本以某维度分解映射成最小独立单元。 (段落、单词、字母维度)。 第二部:把最小单元重复的做合并计算。 小张参考MapReduce论文设计Map、Reduce如下: Map实现 Mapping Mapping函数把文本分解映射key,value形式的最小单元,即<单词,出现次数(1)>、 public IEnumerable<> { foreach (T sourceVal in list) yield return Tuple.Create(sourceVal, 1); } 使用,输出为(brow, 1), (brow, 1), (sorrow, 1), (sorrow, 1): var spit = hamlet.Split(new[] { ' ', Environment.NewLine }, StringSplitOptions.RemoveEmptyEntries); var mp = new MicroMapReduce var result= mp.Mapping(spit); Combine 为了减少数据通信开销,mapping出的键值对数据在进入真正的reduce前,进行重复键合并。也相对于提前进行预计算一部分,加快总体计算速度。 输出格式为(brow, 2), (sorrow, 2): public Dictionary { Dictionary foreach (var val in list) { if (dt.ContainsKey(val.Item1)) dt[val.Item1] += val.Item2; else dt.Add(val.Item1, val.Item2); } return dt; } Partitioner Partitioner主要用来分组划分,把不同节点的统计数据按照key进行分组。 其输出格式为: (brow, {(brow,2)},(brow,3)), (sorrow, {(sorrow,10)},(brow,11)): public IEnumerable<> { var dict = new Dictionary foreach (var val in list) { if (!dict.ContainsKey(val.Key)) dict[val.Key] = new Group dict[val.Key].Values.Add(val.Value); } return dict.Values; } Group定义: public class Group { public Group(TKey key) : base(key, new List { } public TKey Key { get { return base.Item1; } } public List { get { return base.Item2; } } } Reduce实现 Reducing函数接收,分组后的数据进行最后的统计计算。 public Dictionary { Dictionary foreach (var sourceVal in groups) { result.Add(sourceVal.Key, sourceVal.Values.Sum()); } return result; } 封装调用如下: public IEnumerable<> { var step1 = Mapping(list); var step2 = Combine(step1); var step3 = Partitioner(step2); return step3; } public Dictionary { var step1 = Reducing(groups); return step1; } public Dictionary { var map = Map(list); var reduce = Reduce(map); return reduce; } 整体计算步骤图如下: 支持分布式 小张抽象封装后,虽然复杂度上去了。但暴露给使用者是非常清晰的接口,满足MapReduce的数据格式要求,即可使用。 var spit = hamlet.Split(new[] { ' ', Environment.NewLine }, StringSplitOptions.RemoveEmptyEntries); var mp = new MicroMapReduce var result1= mp.MapReduce(spit); 小张完成后脑洞大开,考虑到以后文本数据量超大。 所以fork了个分支,准备支持分布式计算,以后可以在多个服务器节点跑。 数据分片 数据分片就是把大量数据拆成一块一块的,分散到各个节点上,方便我们的mapReduce程序去计算。 分片主流的有mod、consistent hashing、vitual Buckets、Range Partition等方式。 关于consistent hashing上篇有介绍(探索c#之一致性Hash详解)。在Hadoop中Hdfs和mapreduce是相互关联配合的,一个存储和一个计算。如果自行实现的话还需要个统一的存储。所以这里的数据源可以是数据库也可以是文件。小张只是满足boss需求,通用计算框架的话可以直接用现成的。 模拟分片 public List<> { var temp =new List<> temp.Add(list); temp.Add(list); return temp; } Worker节点 小张定义了Master,worker角色。 master负责汇集输出,即我们的主程序。 每一个worker我们用一个线程来模拟,最后输出到master汇总,master最后可以写到数据库或其他。 public void WorkerNode(IEnumerable { new Thread(() => { var map = Map(list); var reduce = Reduce(map); master.Merge(reduce); }).Start(); } public class Master { public Dictionary public void Merge(Dictionary { foreach (var item in list) { lock (this) { if (Result.ContainsKey(item.Key)) Result[item.Key] += item.Value; else Result.Add(item.Key, item.Value); } } } } 分布式计算步骤图: 总结 MapReduce模型从性能速度来说并不是非常好的,它优势在于隐藏了分布式计算的细节、容灾错误、负载均衡及良好的编程API,包含HDFS、Hive等在内一整套大数据处理的生态框架体系。在数据量级不是很大的话,企业自行实现一套轻量级分布式计算会有很多优点,比如性能更好、可定制化、数据库也不需要导入导出。从成本上也节省不少,因为hadoop开发、运维、服务器都需要不少人力物力。 DotNet 微信号:iDotNet 打造东半球最好的 .Net 微信号 -------------------------------------- 商务合作QQ:2302462408 投稿网址:top.jobbole.com |
|