客户端负载均衡技术是,客户端维护一组服务器引用,每次客户端请求的时候,会根据负载均衡算法选中一个节点,发送请求。常用的负载算法有Random,Round robin,Hash,StaticWeighted等。ES的客户端负载使用了Round robin算法。(另外Hash一致性算法还会在另一地方遇见的)一个Count请求的整个客户端模块的调用流程是
简化的调用流程
client 提供了客户端的操作接口,比如count()
nodesService的execute()随机一个节点出来
Proxy代理通过transportService发送请求
一些初始化的事情
我们先看下创建客户端的代码,这里配置了几个配置项
1 2 3 4 5 | Settings settings = ImmutableSettings.settingsBuilder()
.put( "cluster.name" , "myClusterName" )
.put( "client.transport.sniff" , true ).build();
client= new TransportClient(settings)
.addTransportAddress( new InetSocketTransportAddress( "localhost" , 9300 ));
|
谁会在乎这些配置项呢,就是TransportClientNodesService类。它负责着嗅探,维护集群节点列表。选举节点。它的构造方法里做了一些初始化工作
1 2 3 4 5 6 7 8 9 10 | this .nodesSamplerInterval = componentSettings.getAsTime( "nodes_sampler_interval" , timeValueSeconds( 5 ));
this .pingTimeout = componentSettings.getAsTime( "ping_timeout" , timeValueSeconds( 5 )).millis();
this .ignoreClusterName = componentSettings.getAsBoolean( "ignore_cluster_name" , false );
//.....
if (componentSettings.getAsBoolean( "sniff" , false )) {
this .nodesSampler = new SniffNodesSampler();
} else {
this .nodesSampler = new SimpleNodeSampler();
}
this .nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());
|
nodes_sampler_interval 嗅探集群节点的间隔,默认5秒
pingTimeout Ping节点的超时时间,默认是5秒
ignoreClusterName 忽略集群名称,集群验证的时候
sniff 是否开启集群嗅探
另外TransportClientNodesService维护着2个列表,集群节点列表nodes和监听列表listedNodes。其中监听列表就是通过TransportClient.addTransportAddress()增加的节点列表。
嗅探集群节点
1 2 3 | interface NodeSampler {
void sample();
}
|
NodeSampler接口很简单,只有一个sample()方法,它的实现类有2个SniffNodesSampler和SimpleNodeSampler,我们在初始化里已经看到了,如果"sniff"配置项是true的话使用SniffNodesSampler类。它们2个的实现逻辑是
SimpleNodeSampler
SniffNodesSampler
创建一个listedNodes和nodes去重后的列表nodesToPing
循环nodesToPing里的每一个节点
没有连接就连接到这个节点,如果是nodes列表的就正常连接,listedNode列表的建立个轻连接就好了
发送"cluster/state"请求获取节点的状态,这里会有集群所有的数据节点dataNodes
再次确认下已经和所有节点建立连接
增加到nodes节点列表
我们可以发现SimpleNodeSampler最终的节点列表还是listedNodes,如果我们建立客户端的时候,只添加了一个localhost,那它所有的请求都会发送到localhost。只有SniffNodesSampler才去探测集群的所有节点。也就是SimpleNodeSampler的意图是让集群中的某些个节点,专门用于接受用户请求。SniffNodesSampler的话,所有节点都会参与负载。
1 2 3 4 5 6 7 8 9 10 11 12 13 | class ScheduledNodeSampler implements Runnable {
@Override
public void run() {
try {
nodesSampler.sample();
if (!closed) {
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this );
}
} catch (Exception e) {
logger.warn( "failed to sample" , e);
}
}
}
|
ScheduledNodeSampler线程启动后,Sampler就开始忙碌起来了。
选举节点
有了集群节点列表后execute()方法就可以通过轮询调度算法Round robin,选举节点了。算法的特点是实现起来简单优雅,请求会被均衡的发送到各个节点上。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | public <T> T execute(NodeCallback<T> callback) throws ElasticSearchException {
ImmutableList<DiscoveryNode> nodes = this .nodes;
if (nodes.isEmpty()) {
throw new NoNodeAvailableException();
}
int index = randomNodeGenerator.incrementAndGet();
if (index < 0 ) {
index = 0 ;
randomNodeGenerator.set( 0 );
}
for ( int i = 0 ; i < nodes.size(); i++) {
DiscoveryNode node = nodes.get((index + i) % nodes.size());
try {
return callback.doWithNode(node);
} catch (ElasticSearchException e) {
if (!(e.unwrapCause() instanceof ConnectTransportException)) {
throw e;
}
}
}
throw new NoNodeAvailableException();
}
|
关键是这一行代码,说那么多话,其实,我只是了为了这么,一行代码啊 ?
1 | DiscoveryNode node = nodes.get((index + i) % nodes.size());
|
|