配色: 字号:
Openfire集群源码分析
2016-11-04 | 阅:  转:  |  分享 
  
Openfire集群源码分析

如果用户量增加后为了解决吞吐量问题,需要引入集群,在openfire中提供了集群的支持,另外也实现了两个集群插件:hazelcast和clustering。为了了解情况集群的工作原理,我就沿着openfire的源代码进行了分析,也是一次学习的过程。





首先理解集群的一些简单概念

集群的目的是让多个实例像一个实例一样运行,这样就可以通过增长实例来增长计算能力。也就是所谓的分布式计算问题,这其中最为关注的一个特性就是——CAP理论,也就是所谓的一致性、可用性、分区容错性。集群中最核心解决的问题就是CAP。

CAP综合理解就是我上面写的,多个实例像一个实例一样运行。



所以所谓集群就是把一些数据共享或者同步到不同的实例上,这样系统使用同样的算法,取的结果当然应该是相同啦。所以一些数据库的主从复制,缓存数据集群都是类似这种解决方法。只是代码实现质量和处理规模的问题。



有了这个基础我们再来看看openfire是怎么解决这个问题的。



openfire的集群设计



1、哪些需要进行集群间的同步

对于openfire而言,有这几方面的数据需要进行保证集群间的同步:数据库存的数据、缓存数据、session。貌似就这些吧?

数据库

因为对于openfire来说基本上是透明的,所以这块就交给数据库本身来实现。

缓存数据

缓存是存在内存里的,所以这部分是要同步的

session

session在openfire并不需要所有实例同步,但是需要做用户路由缓存,否则发消息时找不到对应的会话。由此用户路由还是要同步的。



2、缓存的设计

缓存接口

openfire里对缓存的数据容器提供了一个包装接口,这个接口提供了缓存数据的基本方法,用于统一数据操作。

publicinterfaceCacheextendsjava.util.Map

如果不开启集群时缓存的默认缓存容器类是:publicclassDefaultCache,实际上DefaultCache就是用一个Hashmap来存数据的。





缓存工厂类

为了保证缓存是可以扩展的,提供了一个工厂类:

publicclassCacheFactory





CacheFactory类中会管理所有的缓存容器,如下代码:

复制代码

/

Returnsthenamedcache,creatingitasnecessary.



@paramnamethenameofthecachetocreate.

@returnthenamedcache,creatingitasnecessary.

/

@SuppressWarnings("unchecked")

publicstaticsynchronizedTcreateCache(Stringname){

Tcache=(T)caches.get(name);

if(cache!=null){

returncache;

}

cache=(T)cacheFactoryStrategy.createCache(name);



log.info("Createdcache["+cacheFactoryStrategy.getClass().getName()+"]for"+name);



returnwrapCache(cache,name);

}



复制代码

上面代码中会通过缓存工厂策略对象来创建一个缓存容器,最后warpCache方法会将此容器放入到caches中。





缓存工厂类的策略

在CacheFactory中默认是使用一个DefaultLocalCacheStrategy来完成缓存创建的。另外还提供了在集群条件下的缓存策略接入。也就是通过实例化不同的策略来切换缓存管理方案。比如后面要提到的hazelcast就是通过这个来替换了本地缓存策略的。从接口的设计上来看,openfire的缓存策略也就是为了集群与非集群的实现。



3、集群的设计

在openfire中的集群主要包括:集群管理、数据同步管理、集群计算任务。



集群管理者

在openfire中主要是一个类来实现:ClusterManager,在ClusterManager中实现了集群实例的加入、退出管理,因为没有使用主从结构,所以ClusterManager实现了一个无中心管理,不知道我理解的对不对。因为只要当前实实例启用了集群,ClusterManager就会主动的加载集群管理并与其他的集群进行同步。



startup

startup是启动集群的方法,代码:



publicstaticsynchronizedvoidstartup(){

if(isClusteringEnabled()&&!isClusteringStarted()){

initEventDispatcher();

CacheFactory.startClustering();

}

}

首先要判断是否开启了集群并且当前集群实例未运行时才去启动。

先是初始化了事件分发器,用于处理集群的同步事情。



然后就是调用CacheFactory的startClustering来运行集群。在startClustering方法中主要是这几个事情:

会使用集群的缓存工厂策略来启动,同时使自己加入到集群中。

开启一个线程用于同步缓存的状态



在前面startup中的initEventDispatcher方法,在这里会注册一个分发线程监听到集群事件,收到事件后会执行joinedCluster或者leftCluster的操作,joinedCluster就是加入到集群中的意思。



在joinedCluster时会将本地的缓存容器都转换为集群缓存。由此便完成了集群的初始化并加入到集群中了。



shutdown

shutdown相对简单点就是退出集群,并且将缓存工厂恢复为本地缓存。



同步管理

上面主要是讲了如何管理集群,接着比较重要的就是如何在集群间同步数据呢?这部分主要是看具体的分布式计算系统的实现了,从openfire来说就是将数据放到集群缓存中,然后通过集群组件来完成的,比如使用hazelcast。



因为使用缓存来解决,所以在CacheFactory中才会有这些么多关于集群的处理代码,特别是对于缓存策略的切换,以及集群任务处理都在CacheFactory作为接口方法向外公开。这样也把集群的实现透明了。



集群计算任务

在这之前一直没有提到集群中的计算问题,因为既然有了集群是不是可以利用集群的优势进行一些并行计算呢?这部分我倒没有太过确定,只是看到相关的代码所以简单列一下。



在CacheFactory类中有几个方法:doClusterTask、doSynchronousClusterTask,这两个都是overload方法,参数有所不同而已。这几个方法就是用于执行一些计算任务的。就看一下doClusterTask:

publicstaticvoiddoClusterTask(finalClusterTasktask){

cacheFactoryStrategy.doClusterTask(task);

}

这里有个限定就是必须是ClusterTask派生的类才行,看看它的定义:



publicinterfaceClusterTaskextendsRunnable,Externalizable{



VgetResult();



}

主要是为了异步执行和序列化,异步是因为不能阻塞,而序列化当然就是为了能在集群中传送。





再看CacheFactory的doClusterTask方法可以发现,它只不过是代理了缓存策略工厂的doClusterTask,具体的实现还是要看集群实现的。



看一看hazelcast的实现简单理解openfire集群

在openfire中有集群的插件实现,这里就以hazelcast为例子简单的做一下分析与学习。



缓存策略工厂类(ClusteredCacheFactory)



ClusteredCacheFactory实现了CacheFactoryStrategy,代码如下:

publicclassClusteredCacheFactoryimplementsCacheFactoryStrategy{

首先是startCluster方法用于启动集群,主要完成几件事情:



设置缓存序列化工具类,ClusterExternalizableUtil。这个是用于集群间数据复制时的序列化工具

设置远程session定位器,RemoteSessionLocator,因为session不同步,所以它主要是用于多实例间的session读取

设置远程包路由器ClusterPacketRouter,这样就可以在集群中发送消息了

加载Hazelcast的实例设置NodeID,以及设置ClusterListener



在前面说起集群启动时提到了缓存切换,那具体实现时是如何做的呢?



因为集群启动后就要是CacheFactory.joinedCluster方法来加入集群的。看一下加入的代码:



复制代码

/

NotificationmessageindicatingthatthisJVMhasjoinedacluster.

/

@SuppressWarnings("unchecked")

publicstaticsynchronizedvoidjoinedCluster(){

cacheFactoryStrategy=clusteredCacheFactoryStrategy;

//Loopthroughlocalcachesandswitchthemtoclusteredcache(copycontent)

for(Cachecache:getAllCaches()){

//skiplocal-onlycaches

if(localOnly.contains(cache.getName()))continue;

CacheWrappercacheWrapper=((CacheWrapper)cache);

CacheclusteredCache=cacheFactoryStrategy.createCache(cacheWrapper.getName());

clusteredCache.www.wang027.computAll(cache);

cacheWrapper.setWrappedCache(clusteredCache);

}

clusteringStarting=false;

clusteringStarted=true;

log.info("Clusteringstarted;cachemigrationcomplete");

}



复制代码

这里可以看到会读取所有的缓存容器并一个个的使用Wrapper包装一下,然后用同样的缓存名称去createCache一个新的Cache,这步使用的是切换后的集群缓存策略工厂,也就是说会使用ClusteredCacheFactory去创建新的缓存容器。最后再将cache写入到新的clusteredCache里,这样就完成了缓存的切换。





当然这里还是要看一下ClusteredCacheFactory的createCache实现:

复制代码

publicCachecreateCache(Stringname){

//Checkifclusterisbeingstartedup

while(state==State.starting){

//Waituntilclusterisfullystarted(orfailed)

try{

Thread.sleep(250);

}

catch(InterruptedExceptione){

//Ignore

}

}

if(state==State.stopped){

thrownewIllegalStateException("Cannotcreateclusteredcachewhennotinacluster");

}

returnnewClusteredCache(name,hazelcast.getMap(name));

}



复制代码

这里使用的是ClusteredCache,而且最重要的是传入的第二个map参数换成了hazelcast的了,这样之后再访问这个缓存容器时已经不再是原先的本地Cache了,已经是hazelcast的map对象。hazelcast会自动对map的数据进行同步管理,这也就完成了缓存同步的功能。







集群计算

那就看hazelcast的实现吧,在ClusteredCacheFactory中doClusterTask举个例子吧:



复制代码

publicvoiddoClusterTask(finalClusterTasktask){

if(cluster==null){return;}

Setmembers=newHashSet();

Membercurrent=cluster.www.baiyuewang.netgetLocalMember();

for(Membermember:cluster.getMembers()){

if(!member.getUuid().equals(current.getUuid())){

members.add(member);

}

}

if(members.size()>0){

//Asynchronouslyexecutethetaskontheotherclustermembers

logger.debug("ExecutingasynchronousMultiTask:"+task.getClass().getName());

hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(

newCallableTask(task),members);

}else{

logger.warn("Noclustermembersselectedforclustertask"+task.getClass().getName());

}

}



复制代码

过程就是,先获取到集群中的实例成员,当然要排除自己。然后hazelcast提供了ExecutorService来执行这个task,方法就是submiteToMembers。这样就提交了一个运算任务。只不过具体是如何分配计算并汇集结果倒真不太清楚。

献花(0)
+1
(本文系thedust79首藏)