分享

Flume NG源码分析(六)应用程序使用的RpcClient设计

 WindySky 2017-10-25 发布于广东

上一篇Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志 介绍了ThriftSource利用Thrfit服务ThriftSourceProtocol来收集日志。这篇说说flume-ng-sdk中提供给应用层序使用的RpcClient的设计和实现。继续使用ThriftRpcClient来作例子。


先看看ThriftSourceProtocol提供的原生的客户端,它是Thrfit通过flume.thrift文件定义的Thrfit服务默认生成。这个原生的Client提供了网络传输和协议编解码等RPC客户端的基本功能。关于Thrift客户端可以参考这篇Thrift源码分析(三)-- IDL和生成代码分析

  1. public static class Client extends org.apache.thrift.TServiceClient implements Iface {  
  2.     public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {  
  3.       public Factory() {}  
  4.       public Client getClient(org.apache.thrift.protocol.TProtocol prot) {  
  5.         return new Client(prot);  
  6.       }  
  7.       public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {  
  8.         return new Client(iprot, oprot);  
  9.       }  
  10.     }  
  11.   
  12.     public Client(org.apache.thrift.protocol.TProtocol prot)  
  13.     {  
  14.       super(prot, prot);  
  15.     }  
  16.   
  17.     public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {  
  18.       super(iprot, oprot);  
  19.     }  
  20.   
  21.     public Status append(ThriftFlumeEvent event) throws org.apache.thrift.TException  
  22.     {  
  23.       send_append(event);  
  24.       return recv_append();  
  25.     }  
  26.   
  27.     public void send_append(ThriftFlumeEvent event) throws org.apache.thrift.TException  
  28.     {  
  29.       append_args args = new append_args();  
  30.       args.setEvent(event);  
  31.       sendBase("append", args);  
  32.     }  
  33.   
  34.     public Status recv_append() throws org.apache.thrift.TException  
  35.     {  
  36.       append_result result = new append_result();  
  37.       receiveBase(result, "append");  
  38.       if (result.isSetSuccess()) {  
  39.         return result.success;  
  40.       }  
  41.       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "append failed: unknown result");  
  42.     }  
  43.   
  44.     public Status appendBatch(List<ThriftFlumeEvent> events) throws org.apache.thrift.TException  
  45.     {  
  46.       send_appendBatch(events);  
  47.       return recv_appendBatch();  
  48.     }  
  49.   
  50.     public void send_appendBatch(List<ThriftFlumeEvent> events) throws org.apache.thrift.TException  
  51.     {  
  52.       appendBatch_args args = new appendBatch_args();  
  53.       args.setEvents(events);  
  54.       sendBase("appendBatch", args);  
  55.     }  
  56.   
  57.     public Status recv_appendBatch() throws org.apache.thrift.TException  
  58.     {  
  59.       appendBatch_result result = new appendBatch_result();  
  60.       receiveBase(result, "appendBatch");  
  61.       if (result.isSetSuccess()) {  
  62.         return result.success;  
  63.       }  
  64.       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "appendBatch failed: unknown result");  
  65.     }  
  66.   
  67.   }  

来看看Flume NG是如何封装Thrift客户端的。Flume NG支持Avro,Thrfit等多种RPC实现,它的RpcClient层次结构如下



RpcClient接口定义了给应用程序使用的RPC客户端的基本功能

  1. public interface RpcClient {  
  2.   
  3.   
  4.   public int getBatchSize();  
  5.   
  6.     
  7.   public void append(Event event) throws EventDeliveryException;  
  8.   
  9.     
  10.   public void appendBatch(List<Event> events) throws  
  11.       EventDeliveryException;  
  12.   
  13.   public boolean isActive();  
  14.   
  15.     
  16.   public void close() throws FlumeException;  
  17.   
  18. }  

AbstractRpcClient抽象类实现了RPCClient接口,提供了getBatchSize的默认实现,并增加了configure接口来支持配置

  1. public abstract class AbstractRpcClient implements RpcClient {  
  2.   
  3.   protected int batchSize =  
  4.       RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;  
  5.   protected long connectTimeout =  
  6.       RpcClientConfigurationConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;  
  7.   protected long requestTimeout =  
  8.       RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;  
  9.   
  10.   @Override  
  11.   public int getBatchSize(){  
  12.     return batchSize;  
  13.   }  
  14.   @Override  
  15.   public abstract void append(Event event) throws EventDeliveryException;  
  16.   
  17.   @Override  
  18.   public abstract void appendBatch(List<Event> events)  
  19.       throws EventDeliveryException;  
  20.   
  21.   @Override  
  22.   public abstract boolean isActive();  
  23.   
  24.   @Override  
  25.   public abstract void close() throws FlumeException;  
  26.   
  27.   protected abstract void configure(Properties properties)  
  28.       throws FlumeException;  
  29.   
  30. }  

对于一个设计良好的服务框架的客户端来说,有几个基本的特性

1. 服务寻址

2. 连接池管理

3. 客户端实现RPC调用的负载均衡

4. 缓存

5. 容灾处理,失效转移


我们来看看Flume NG是如何设计它的服务客户端的。基本的组件如下:



服务寻址

Flume NG的RPC客户端的服务寻址实现比较简单,只是在Properties配置文件里设置Thrift服务器的IP和端口,然后用这个值来创建TSocket。这里是一个可以扩展点,使服务寻址的能力更强,更灵活

  1.       HostInfo host = HostInfo.getHostInfoList(properties).get(0);  
  2.       hostname = host.getHostName();  
  3.       port = host.getPortNumber();  
  4.   
  5. // ClientWrapper  
  6.       public ClientWrapper() throws Exception{  
  7.       // 使用hostname, port来构建TSocket  
  8.       transport = new TFastFramedTransport(new TSocket(hostname, port));  
  9.       transport.open();  
  10.       client = new ThriftSourceProtocol.Client(new TCompactProtocol  
  11.         (transport));  
  12.       // Not a great hash code, but since this class is immutable and there  
  13.       // is at most one instance of the components of this class,  
  14.       // this works fine [If the objects are equal, hash code is the same]  
  15.       hashCode = random.nextInt();  
  16.     }  


连接池管理

首先是使用ClientWrapper类来封装Thrift生成的原生的Client,可以通过Properties配置来设置Client的值,设置socket连接和protocol编解码协议

  1. private class ClientWrapper {  
  2.     public final ThriftSourceProtocol.Client client;  
  3.     public final TFastFramedTransport transport;  
  4.     private final int hashCode;  
  5.   
  6.     public ClientWrapper() throws Exception{  
  7.       transport = new TFastFramedTransport(new TSocket(hostname, port));  
  8.       transport.open();  
  9.       client = new ThriftSourceProtocol.Client(new TCompactProtocol  
  10.         (transport));  
  11.       // Not a great hash code, but since this class is immutable and there  
  12.       // is at most one instance of the components of this class,  
  13.       // this works fine [If the objects are equal, hash code is the same]  
  14.       hashCode = random.nextInt();  
  15.     }  
  16. }   

ConnectionPoolManager实现了一个简单的连接池管理类,提供了checkOut和checkIn两个方法来借出和归还连接对象ClientWrapper。使用ReentrantLock和它的条件队列Condition来实现管程的功能,自管理同步操作。当availableClients为空,并且已经达到连接池的最大值时,checkOut操作会阻塞。当checkIn归还连接对象时,唤醒在checkOut上阻塞的线程。

  1. private class ConnectionPoolManager {  
  2.    private final Queue<ClientWrapper> availableClients;  
  3.    private final Set<ClientWrapper> checkedOutClients;  
  4.    private final int maxPoolSize;  
  5.    private int currentPoolSize;  
  6.    private final Lock poolLock;  
  7.    private final Condition availableClientsCondition;  
  8.   
  9.    public ConnectionPoolManager(int poolSize) {  
  10.      this.maxPoolSize = poolSize;  
  11.      availableClients = new LinkedList<ClientWrapper>();  
  12.      checkedOutClients = new HashSet<ClientWrapper>();  
  13.      poolLock = new ReentrantLock();  
  14.      availableClientsCondition = poolLock.newCondition();  
  15.      currentPoolSize = 0;  
  16.    }  
  17.   
  18.    public ClientWrapper checkout() throws Exception {  
  19.   
  20.      ClientWrapper ret = null;  
  21.      poolLock.lock();  
  22.      try {  
  23.        if (availableClients.isEmpty() && currentPoolSize < maxPoolSize) {  
  24.          ret = new ClientWrapper();  
  25.          currentPoolSize++;  
  26.          checkedOutClients.add(ret);  
  27.          return ret;  
  28.        }  
  29.        while (availableClients.isEmpty()) {  
  30.          availableClientsCondition.await();  
  31.        }  
  32.        ret = availableClients.poll();  
  33.        checkedOutClients.add(ret);  
  34.      } finally {  
  35.        poolLock.unlock();  
  36.      }  
  37.      return ret;  
  38.    }  
  39.   
  40.    public void checkIn(ClientWrapper client) {  
  41.      poolLock.lock();  
  42.      try {  
  43.        availableClients.add(client);  
  44.        checkedOutClients.remove(client);  
  45.        availableClientsCondition.signal();  
  46.      } finally {  
  47.        poolLock.unlock();  
  48.      }  
  49.    }  
  50.   
  51.    public void destroy(ClientWrapper client) {  
  52.      poolLock.lock();  
  53.      try {  
  54.        checkedOutClients.remove(client);  
  55.        currentPoolSize--;  
  56.      } finally {  
  57.        poolLock.unlock();  
  58.      }  
  59.      client.transport.close();  
  60.    }  
  61.   
  62.    public void closeAll() {  
  63.      poolLock.lock();  
  64.      try {  
  65.        for (ClientWrapper c : availableClients) {  
  66.          c.transport.close();  
  67.          currentPoolSize--;  
  68.        }  
  69.      /* 
  70.       * Be cruel and close even the checked out clients. The threads writing 
  71.       * using these will now get an exception. 
  72.       */  
  73.        for (ClientWrapper c : checkedOutClients) {  
  74.          c.transport.close();  
  75.          currentPoolSize--;  
  76.        }  
  77.      } finally {  
  78.        poolLock.unlock();  
  79.      }  
  80.    }  
  81.  }  

客户端负载均衡

LoadBalancingRpcClient继承了AbstractRpcClient类,提供了RPC客户端的负载均衡。这是一个装饰器模式的实现。

HostSelector接口定义了负载均衡的接口,它是对HostInfo进行负载均衡,再由HostInfo找到对应的RpcClient对象。

  1. public interface HostSelector {  
  2.   
  3.     void setHosts(List<HostInfo> hosts);  
  4.   
  5.     Iterator<HostInfo> createHostIterator();  
  6.   
  7.     void informFailure(HostInfo failedHost);  
  8.   }  

HostSelector有两个默认的实现

RoundRobinHostSelector是轮询方式的负载均衡实现

RandomOrderHostSelector是随机方式的负载均衡实现


看下RoundRobinHostSelector的实现,它的逻辑主要在OrderSelector这个类中实现

  1. private static class RoundRobinHostSelector implements HostSelector {  
  2.   
  3.    private OrderSelector<HostInfo> selector;  
  4.   
  5.    RoundRobinHostSelector(boolean backoff, long maxBackoff){  
  6.      selector = new RoundRobinOrderSelector<HostInfo>(backoff);  
  7.      if(maxBackoff != 0){  
  8.        selector.setMaxTimeOut(maxBackoff);  
  9.      }  
  10.    }  
  11.    @Override  
  12.    public synchronized Iterator<HostInfo> createHostIterator() {  
  13.      return selector.createIterator();  
  14.    }  
  15.   
  16.    @Override  
  17.    public synchronized void setHosts(List<HostInfo> hosts) {  
  18.      selector.setObjects(hosts);  
  19.    }  
  20.   
  21.    public synchronized void informFailure(HostInfo failedHost){  
  22.      selector.informFailure(failedHost);  
  23.    }  
  24.  }  

OrderSelector是一个支持回退backoff算法的顺序选择容器,它的类层次结构如下


父类OrderSelector是抽象类,定义了回退算法,子类RoundRobinOrderSelector和RandomOrderSelector实现了创建迭代器的算法。

RoundRobinOrderSelector的代码如下

1. getIndexList()返回状态正常的对象列表

2. nextHead索引指向当前位置,作为轮询的起点

  1. public class RoundRobinOrderSelector<T> extends OrderSelector<T> {  
  2.   
  3.   private int nextHead = 0;  
  4.   
  5.   public RoundRobinOrderSelector(boolean shouldBackOff) {  
  6.     super(shouldBackOff);  
  7.   }  
  8.   
  9.   @Override  
  10.   public Iterator<T> createIterator() {  
  11.     List<Integer> activeIndices = getIndexList();  
  12.     int size = activeIndices.size();  
  13.     // possible that the size has shrunk so gotta adjust nextHead for that  
  14.     if (nextHead >= size) {  
  15.       nextHead = 0;  
  16.     }  
  17.     int begin = nextHead++;  
  18.     if (nextHead == activeIndices.size()) {  
  19.       nextHead = 0;  
  20.     }  
  21.   
  22.     int[] indexOrder = new int[size];  
  23.   
  24.     for (int i = 0; i < size; i++) {  
  25.       indexOrder[i] = activeIndices.get((begin + i) % size);  
  26.     }  
  27.   
  28.     return new SpecificOrderIterator<T>(indexOrder, getObjects());  
  29.   }  
  30. }  

对于LoadBalanceRpcClient来说,它的配置文件里,同一个RPC服务的服务器列表至少有两个服务端信息才能使用负载均衡。在配置文件中还配置了回退算法和负载均衡算法相关的配置

  1.  protected void configure(Properties properties) throws FlumeException {  
  2.     clientMap = new HashMap<String, RpcClient>();  
  3.     configurationProperties = new Properties();  
  4.     configurationProperties.putAll(properties);  
  5.     hosts = HostInfo.getHostInfoList(properties);  
  6.     if (hosts.size() < 2) {  
  7.       throw new FlumeException("At least two hosts are required to use the "  
  8.           + "load balancing RPC client.");  
  9.     }  
  10.      String lbTypeName = properties.getProperty(  
  11.         RpcClientConfigurationConstants.CONFIG_HOST_SELECTOR,  
  12.         RpcClientConfigurationConstants.HOST_SELECTOR_ROUND_ROBIN);  
  13.   
  14.     boolean backoff = Boolean.valueOf(properties.getProperty(  
  15.             RpcClientConfigurationConstants.CONFIG_BACKOFF,  
  16.             String.valueOf(false)));  
  17.   
  18.     String maxBackoffStr = properties.getProperty(  
  19.         RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF);  
  20.   
  21.     long maxBackoff = 0;  
  22.     if(maxBackoffStr != null) {  
  23.       maxBackoff     = Long.parseLong(maxBackoffStr);  
  24.     }  
  25.   
  26.     if (lbTypeName.equalsIgnoreCase(  
  27.         RpcClientConfigurationConstants.HOST_SELECTOR_ROUND_ROBIN)) {  
  28.       selector = new RoundRobinHostSelector(backoff, maxBackoff);  
  29.     } else if (lbTypeName.equalsIgnoreCase(  
  30.         RpcClientConfigurationConstants.HOST_SELECTOR_RANDOM)) {  
  31.       selector = new RandomOrderHostSelector(backoff, maxBackoff);  
  32.     } else {  
  33.       try {  
  34.         @SuppressWarnings("unchecked")  
  35.         Class<? extends HostSelector> klass = (Class<? extends HostSelector>)  
  36.             Class.forName(lbTypeName);  
  37.   
  38.         selector = klass.newInstance();  
  39.       } catch (Exception ex) {  
  40.         throw new FlumeException("Unable to instantiate host selector: "  
  41.             + lbTypeName, ex);  
  42.       }  
  43.     }  
  44.   
  45.     selector.setHosts(hosts);  
  46.     isOpen = true;  
  47. }  


客户端负载均衡的主要组件如下



客户端缓存

客户端缓存比较简单,使用了一个Map结构,保存了HostInfo和对应的RPCClient对象,这样可以复用RPCClient对象,这是一个重对象,包含了一个连接池的实例。

  1. clientMap = new HashMap<String, RpcClient>();  
  2.   
  3.   
  4. private synchronized RpcClient getClient(HostInfo info)  
  5.       throws FlumeException, EventDeliveryException {  
  6.     throwIfClosed();  
  7.     String name = info.getReferenceName();  
  8.     RpcClient client = clientMap.get(name);  
  9.     if (client == null) {  
  10.       client = createClient(name);  
  11.       clientMap.put(name, client);  
  12.     } else if (!client.isActive()) {  
  13.       try {  
  14.         client.close();  
  15.       } catch (Exception ex) {  
  16.         LOGGER.warn("Failed to close client for " + info, ex);  
  17.       }  
  18.       client = createClient(name);  
  19.       clientMap.put(name, client);  
  20.     }  
  21.   
  22.     return client;  
  23.   }  

客户端容灾处理

FailoverRpcClient类实现了客户端的容灾处理,它也是装饰器模式的实现,基础了AbstractRpcClient,实现了RpcClient接口FailoverRpcClient主要是实现了失效转移,利用重试机制,当一个RpcClient失效,就使用下一个RpcClient重试RPC请求,直到成功,或者全部失败

FailoverRpcClient也维护了一个HostInfo列表,由HostInfo再找到对应的RpcClient。还维护了一个最大的重试次数maxTries

  1. private synchronized void configureHosts(Properties properties)  
  2.       throws FlumeException {  
  3.     if(isActive){  
  4.       logger.error("This client was already configured, " +  
  5.           "cannot reconfigure.");  
  6.       throw new FlumeException("This client was already configured, " +  
  7.           "cannot reconfigure.");  
  8.     }  
  9.     hosts = HostInfo.getHostInfoList(properties);  
  10.     String tries = properties.getProperty(  
  11.         RpcClientConfigurationConstants.CONFIG_MAX_ATTEMPTS);  
  12.     if (tries == null || tries.isEmpty()){  
  13.       maxTries = hosts.size();  
  14.     } else {  
  15.       try {  
  16.         maxTries = Integer.parseInt(tries);  
  17.       } catch (NumberFormatException e) {  
  18.         maxTries = hosts.size();  
  19.       }  
  20.     }  
  21. ......  
  22. }  

看一下它的append方法,实现了重试机制来做失效转移

  1. public void append(Event event) throws EventDeliveryException {  
  2.     //Why a local variable rather than just calling getClient()?  
  3.     //If we get an EventDeliveryException, we need to call close on  
  4.     //that specific client, getClient in this case, will get us  
  5.     //the next client - leaving a resource leak.  
  6.     RpcClient localClient = null;  
  7.     synchronized (this) {  
  8.       if (!isActive) {  
  9.         logger.error("Attempting to append to an already closed client.");  
  10.         throw new EventDeliveryException(  
  11.             "Attempting to append to an already closed client.");  
  12.       }  
  13.     }  
  14.     // Sit in an infinite loop and try to append!  
  15.     int tries = 0;  
  16.     while (tries < maxTries) {  
  17.       try {  
  18.         tries++;  
  19.         localClient = getClient();  
  20.         localClient.append(event);  
  21.         return;  
  22.       } catch (EventDeliveryException e) {  
  23.         // Could not send event through this client, try to pick another client.  
  24.         logger.warn("Client failed. Exception follows: ", e);  
  25.         localClient.close();  
  26.         localClient = null;  
  27.       } catch (Exception e2) {  
  28.         logger.error("Failed to send event: ", e2);  
  29.         throw new EventDeliveryException(  
  30.             "Failed to send event. Exception follows: ", e2);  
  31.       }  
  32.     }  
  33.     logger.error("Tried many times, could not send event."  
  34.     throw new EventDeliveryException("Failed to send the event!");  
  35.   }  


    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多