分享

【技术总结】:Spark内核架构设计和实现原理(一)

 看风景D人 2016-10-07

Spark是一个针对超大数据集合的集群分布式计算系统,比MapReducer快。我们从三个方面来引出和介绍Spark。

1. How to find the most similar movies?

2. How to scale?

3. How to survive?

1. How to find the most similar movies?


我们的目标是算电影之间的相似度。这里我们讲一个非常基本的相似度算法,它的基本思想是电影的并发出现率越高,相似度越高。如图所示,三个用户分别有一个喜欢的电影的列表,为了算电影相似度,我们先找出电影的并发出现,比如通过u1的列表,先找到6个电影的并发pair,表示m2和m5同时出现,m2和m7同时出现……(例中并发pair和顺序有关,我们稍后会将),再把属于不同用户的相同pair求和,比如(m2,m5)出现一次,(m2,m7)出现三次。这样可以得出m2相似度最高的是m7的结论。


1)   What if the data is large?

仍然使用上述例子,数据很大时可以先将用户喜欢电影的列表(data1) 拆解成并发pair,再把相同pair merge在一起。


运用数据的partition,将不同用户的数据放在不同机器上运算,每一部分叫一个worker,比如worker1处理u1的数据,worker2处理u2的数据,worker3处理u3的数据。如果用户很多,对每个用户分配一个机器并不高效,我们需要改变parition方式。比如可以将u1到u100的数据放在worker1里处理。每个worker不仅要存data,还要存算法。比如用分解的算法,input为存电影的vector,通过n^2的复杂度得到并发pair。每个worker得到新的data2(并发pair)。

Merge过程也需要多个worker,每个worker存有data2(并发pair)和一个新的算法(算法2)。这个过程要输出pair出现的次数。并发pair第一个值相同的数据放在相同的worker里,用算法2进行统计,就可以输出并发pair出现次数并找到有最大相似度的电影。


这里有几个问题需要思考:

(1) merge过程的worker开始工作时,是否要求partition的worker结束工作?

不要求。因为即使partition的worker没有处理完数据,worker仍可以得到部分数据,这就是流处理(streaming)。


(2) partition的数据是否要拷贝到merge的worker上?

不建议数据拷贝。因为数据拷贝非常耗时。如果worker在同一个机器上,可以传递引用/指针。


(3) 是否需要格外的worker从merge过程读取数据,输出best one?(比如对于m2,输出相似度最高的m7)?

不需要。我们可以在merge过程中增加一个算法2.5来选择最高相似度的电影,并且不用在worker间传数据。


(4) 分布式系统中,数据变成immutable性能变好还是坏?

immutable的数据是指只读的数据,它虽然天然并发,但使得写操作变多。


(5)(高阶)既然可以用拆解的方式做,为什么仍需要map reduce?(留作思考)


总结下来,程序 = 数据 + 算法。计算复杂度 = 数据 + 算法。数据部分指的数据的移动,它在分布式系统中很显著。降低数据移动的复杂度可以少移动(传引用或组合操作,比如算法组合)或者多用memory。


2) How to run faster?

可以通过组合算法(比如算法2和算法2.5进行组合)减少数据移动。这种组合就是DAG分析(DAG有向无环图,反应弹性分布式数据集之间的依赖关系)。


还可以用lazy evaluation。所有计算过程都是data + 算法 = new data,我们先找出所有的算法关系,再进行化简,这就是lazy evaluation。虽然算法复杂度不一定变低,但是数据移动变少。

Spark的中间数据放到内存中,对迭代运算效率更高。(根据spark官网给出的对比测试结果,当spark所有的计算都在内存中进行时,spark要比hadoop快两个数量级100多倍;当spark计算应用到磁盘时,spark的计算速度也是hadoop的10x倍)

(from: http://student-lp./blog/2158969)


说到hadoop,很多人不清楚spark和map reduce区别。我们用品牌名和功能名来解释。

品牌名:spark, ec2, hadoop, apache, nginx, tomcat, kafka

功能名:mapreduce, filter, storage, handle HTTP, load balance, cache


一个品牌可以有很多功能,比如apache, nginx都可以handle HTTP, load balance, cache


2. How to scale?

1) What is the architecture?

首先client,server互相连接,client需要driver,server需要driver。同时有controller/master处理各种机器。在建立连接时,可以client和master连接,master负责分发,也可以client直接和server连接。这两种方法都可以。如果client全都找master,master会成为瓶颈。


master有很多种(比如yarn),master的功能有很多种,比如负责协调,负责回滚


除了driver,client还需要scheduler,把数据拆解成可调度的序列。除了recevier,server还需要exectuer和data management。exectuer之间的数据尽量共享,这样数据不用存储多份。存储的数据是immutable,这样不用担心数据被改动,增加数据共享率。


如果数据太多,可以使用MMAP(一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。)或者调整算法。


此外还有shuffle/transfer,它使得从用户角度看,只有一个server。

(from:http://www.cnblogs.com/huxiao-tee/p/4660352.html)


3. How to survive?

1) What if a worker is broken?

spark使用重算而不是replica,因为replica会使数据移动,增加时间复杂度。重算的前提是系统还存有故障部分前面的数据,这需要有选择性的做cache/snapshot,从而可以用log来进行数据恢复。


这个过程在spark里叫lineage:spark里数据恢复原状。RDD可以通过不断寻找父类,找到最原始的那个RDD。这条继承路径就认为是RDD的血统。


这里有一个tricky的问题:如果几个worker在同一台机器,一个worker失败,会增加很多复杂度。如果在不同机器,运算又会很慢,因为要传递数据。至此,我们已经大致了解Spark的架构了。


One more thing

回到相似度的问题上。之前我们讨论的是item之间的similarity。如何计算user之间的similarity?


(1) 首先得到各个user的所有喜爱电影的列表。

(2) 然后filter数据,比如选择喜欢电影数量在10~100的用户。因为如果用户小于10,历史记录太少,大于100说明数据太广泛,也不适于进行相似度分析。

(3) 进行倒排索引,得到(user,movie)的pair

(4) 针对每个电影,得到针对电影的倒排索引(可以partition)。比如m1 = {u1, u3, u7}。

(5) 通过filter得到用户最新的10个电影列表,比如u1 = {m1, m5, m9…...}。

(6) 两种数据进行组合(4和5中)。对于u1,找m1, m5, m9, …...的用户列表,找到出现次数最多的用户。


↓↓↓ 长按识别图中二维码 【论码农的自我修养】


    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多