分享

深入解析Apache Mina源码(3)——Mina的线程池模型

 集微笔记 2014-02-26

 

一、生产者消费者问题

做为苦逼的程序员的我们基本没有不知道生产者消费者问题的,这个经典的问题充分体现了进程同步的问题,还是简单的说下它的概念,生产者和消费者是两个线程,生产者线程生产物品放到空的缓冲区内(可能是一个list,消费者线程从缓冲区内取出物品进行消费并释放缓冲区,缓冲区有个固定大小,当生产者线程将缓冲区填充满时,生产者线程处于等待状态,等待消费者线程消费;当缓冲区消费空了后,消费者线程处于等待状态,等待生产者线程进行生产。当然生产者和消费者也可以有多个线程充当,但是操作的进程地址空间却只能是同一个。

这个经典的问题体现了多线程编程的一些要注意的地方,比如对同一资源进行访问所产生的互斥和同步问题。

下面看下对生产者消费者问题的实现。

物品类:

Java代码 复制代码 收藏代码
  1. package com.lifanghu.procon;   
  2.   
  3. /**  
  4.  * 食物  
  5.  * @author lifh  
  6.  * @mail wslfh2005@163.com  
  7.  * @since 2012-6-22 上午08:13:34  
  8.  * @name com.lifanghu.procon.Food.java  
  9.  * @version 1.0  
  10.  */  
  11.   
  12. public class Food {   
  13.   
  14.     private String name;   
  15.   
  16.     public String getName() {   
  17.         return name;   
  18.     }   
  19.   
  20.     public void setName(String name) {   
  21.         this.name = name;   
  22.     }   
  23.   
  24. }  

 

 缓冲区:

Java代码 复制代码 收藏代码
  1. package com.lifanghu.procon;   
  2.   
  3. import java.util.ArrayList;   
  4. import java.util.List;   
  5.   
  6. /**  
  7.  * 容器,缓冲区  
  8.  * @author lifh  
  9.  * @mail wslfh2005@163.com  
  10.  * @since 2012-6-22 上午08:33:56  
  11.  * @name com.lifanghu.procon.Container.java  
  12.  * @version 1.0  
  13.  */  
  14.   
  15. public class Container {   
  16.   
  17.     //缓冲区大小    
  18.     private int size;   
  19.     private List foods;   
  20.   
  21.     public Container(int size) {   
  22.         this.size = size;   
  23.         foods = new ArrayList(size);   
  24.     }   
  25.   
  26.     public synchronized void poll(Food food) {   
  27.         while (foods.size() >= size) {   
  28.             try {   
  29.                 wait();   
  30.             } catch (InterruptedException e) {   
  31.                 e.printStackTrace();   
  32.             }   
  33.         }   
  34.         foods.add(food);   
  35.         notifyAll();   
  36.     }   
  37.     public synchronized Food offer() {   
  38.         Food food = null;   
  39.         while (foods.size() == 0) {   
  40.             try {   
  41.                 wait();   
  42.             } catch (InterruptedException e) {   
  43.                 e.printStackTrace();   
  44.             }   
  45.         }   
  46.         food = foods.remove(foods.size() - 1);   
  47.         notifyAll();   
  48.         return food;   
  49.     }   
  50. }  

 

 生产者:

Java代码 复制代码 收藏代码
  1. package com.lifanghu.procon;   
  2.   
  3. /**  
  4.  * 生产者  
  5.  * @author lifh  
  6.  * @mail wslfh2005@163.com  
  7.  * @since 2012-6-22 上午08:13:26  
  8.  * @name com.lifanghu.procon.Producer.java  
  9.  * @version 1.0  
  10.  */  
  11.   
  12. public class Producer implements Runnable {   
  13.   
  14.     private Container container;   
  15.   
  16.     public Producer(Container container) {   
  17.         super();   
  18.         this.container = container;   
  19.     }   
  20.   
  21.     public void run() {   
  22.         for (int i = 0; i < 10; i++) {   
  23.             Food food = new Food();   
  24.             food.setName("馒头" + i);   
  25.             System.out.println("生产者生产出" + food.getName());   
  26.             container.poll(food);   
  27.             try {   
  28.                 Thread.sleep((long) (Math.random() * 3000));   
  29.             } catch (InterruptedException e) {   
  30.                 e.printStackTrace();   
  31.             }   
  32.         }   
  33.     }   
  34. }  

 

 消费者:

Java代码 复制代码 收藏代码
  1. package com.lifanghu.procon;   
  2.   
  3. /**  
  4.  * 消费者  
  5.  * @author lifh  
  6.  * @mail wslfh2005@163.com  
  7.  * @since 2012-6-22 上午08:13:52  
  8.  * @name com.lifanghu.procon.Consumer.java  
  9.  * @version 1.0  
  10.  */  
  11.   
  12. public class Consumer implements Runnable {   
  13.   
  14.     private Container container;   
  15.   
  16.     public Consumer(Container container) {   
  17.         super();   
  18.         this.container = container;   
  19.     }   
  20.   
  21.     public void run() {   
  22.         for (;;) {   
  23.             Food food = container.offer();   
  24.             try {   
  25.                 Thread.sleep((long) (Math.random() * 3000));   
  26.             } catch (InterruptedException e) {   
  27.                 e.printStackTrace();   
  28.             }   
  29.             if (food != null) {   
  30.                 System.out.println(food.getName() + "被消费!");   
  31.             }   
  32.         }   
  33.     }   
  34. }  

 

 测试类:

Java代码 复制代码 收藏代码
  1. package com.lifanghu.procon;   
  2.   
  3. /**  
  4.  * 客户端测试类  
  5.  * @author lifh  
  6.  * @mail wslfh2005@163.com  
  7.  * @since 2012-6-22 上午08:13:59  
  8.  * @name com.lifanghu.procon.Client.java  
  9.  * @version 1.0  
  10.  */  
  11.   
  12. public class Client {   
  13.   
  14.     public static void main(String[] args) {   
  15.         Container container = new Container(5);   
  16.         Thread producer1 = new Thread(new Producer(container));   
  17.         // Thread producer2 = new Thread(new Producer(container));   
  18.         // producer2.start();   
  19.         Thread consumer1 = new Thread(new Consumer(container));   
  20.         producer1.start();   
  21.         consumer1.start();   
  22.     }   
  23. }  

 

 输出结果: 

生产者生产出馒头0
馒头0被消费!
生产者生产出馒头1
馒头1被消费!
生产者生产出馒头2
生产者生产出馒头3
生产者生产出馒头4
馒头2被消费!
生产者生产出馒头5
馒头4被消费!
馒头5被消费!
生产者生产出馒头6
生产者生产出馒头7
馒头3被消费!
馒头7被消费!
馒头6被消费!
生产者生产出馒头8
馒头8被消费!
生产者生产出馒头9
馒头9被消费!

 

二、 线程池及实现

上面我们讲到了生产者消费者的问题,那么这和线程池有什么关系呢?其实线程池的实现就是生产者消费者问题的实现,理解了生产者消费者问题就不会对线程池的实现感到神秘了,线程池在很多地方会用到,比如tomcat等各种中间容器的实现,Spring对线程池的支持等,当然mina中也使用到了线程池的概念。至于为什么要用到线程池,网上文章很多,基本是操作系统支持的线程数有限,线程的创建关闭有很大的系统开销,线程的切换也会影响系统性能等等。

下面这个图就是线程池的基本原理图,看看是不是和生产者消费者问题一样。


 

看下简单对线程池的实现代码,主要包括三个类,一个是线程池,一个是工作任务,一个是客户端进行任务添加。

任务类,比较简单,实现Runnable接口:

Java代码 复制代码 收藏代码
  1. package com.lifanghu.threadpool;   
  2. //任务类,具体要执行的操作   
  3. public class Worker implements Runnable {   
  4.     private int id;   
  5.     public Worker(int id) {   
  6.         this.id = id;   
  7.     }   
  8.     public void run() {   
  9.         try {   
  10.             Thread.sleep(100);   
  11.         } catch (InterruptedException e) {   
  12.             e.printStackTrace();   
  13.         }   
  14.         System.out.println("线程:" + Thread.currentThread().getName() + " 执行任务" + id);   
  15.     }   
  16. }  

 

线程池,相对复杂一些,但是原理是很简单的:

Java代码 复制代码 收藏代码
  1. package com.lifanghu.threadpool;   
  2. import java.util.LinkedList;   
  3. /**  
  4.  * 线程池实现  
  5.  * @author lifh  
  6.  * @mail wslfh2005@163.com  
  7.  * @since 2012-6-22 下午03:31:47  
  8.  * @name com.lifanghu.threadpool.ThreadPool.java  
  9.  * @version 1.0  
  10.  */  
  11. public class ThreadPool {   
  12.     // 线程池大小   
  13.     private final int nThreads;   
  14.     // 线程池工作者(具体线程)   
  15.     private final PoolWorker[] threads;   
  16.     // 任务队列   
  17.     private final LinkedList queue;   
  18.     public ThreadPool(int nThreads) {   
  19.         // 初始线程池,并启动线程池里面的线程   
  20.         this.nThreads = nThreads;   
  21.         queue = new LinkedList();   
  22.         threads = new PoolWorker[nThreads];   
  23.         for (int i = 0; i < nThreads; i++) {   
  24.             threads[i] = new PoolWorker();   
  25.             threads[i].start();   
  26.         }   
  27.     }   
  28.     // 提交工作任务,实际将任务放入队列,并通知线程进行消费   
  29.     public void execute(Runnable r) {   
  30.         synchronized (queue) {   
  31.             queue.addLast(r);   
  32.             queue.notify();   
  33.         }   
  34.     }   
  35.   
  36.     private class PoolWorker extends Thread {   
  37.         public void run() {   
  38.             Runnable r;   
  39.             // 循环取出任务队列里的任务进行消费,如果没有任务,就等待任务到来。   
  40.             while (true) {   
  41.                 synchronized (queue) {   
  42.                     while (queue.isEmpty()) {   
  43.                         try {   
  44.                             queue.wait();   
  45.                         } catch (InterruptedException ignored) {   
  46.                         }   
  47.                     }   
  48.                     r = queue.removeFirst();   
  49.                 }   
  50.                 try {   
  51.                     r.run();   
  52.                 } catch (RuntimeException e) {   
  53.                 }   
  54.             }   
  55.         }   
  56.     }   
  57. }  
  

看下客户端的调用代码:

Java代码 复制代码 收藏代码
  1. package com.lifanghu.threadpool;   
  2. /**  
  3.  * 客户端测试类  
  4.  * @author lifh  
  5.  * @mail wslfh2005@163.com  
  6.  * @since 2012-6-22 下午03:25:36  
  7.  * @name .Client.java  
  8.  * @version 1.0  
  9.  */  
  10. public class Client {   
  11.     public static void main(String[] args) {   
  12.         ThreadPool queue = new ThreadPool(10);   
  13.         // 提交工作任务。   
  14.         queue.execute(new Worker(1));   
  15.         queue.execute(new Worker(2));   
  16.         queue.execute(new Worker(3));   
  17.     }   
  18. }  
  

观察输出结果:

线程:Thread-1 执行任务1
线程:Thread-5 执行任务3
线程:Thread-3 执行任务2

  怎么样,感觉是不是很easy呢?咱们的线程池实现其实比较简单的,但是实际应用中我们用线程池比较常见的方式还是使用JDK中对线程池的实现,它提供了ExecutorServiceExecutor等类实现了对线程池的支持,不过线程池的实现原理其实是和我们的一样的,只不过它更多的考虑了实现细节,功能更强一些,关于它的使用网上有很多文章讲的已经很清楚了,可以参考:http://mshijie./blog/366591

 

三、Mina中的线程池模型

前面讲了生产者消费者问题以及由此引出的线程池的实现问题,那么现在我们来看下实际开源项目mina中是怎么使用线程池模型的。

Mina中的线程池使用主要有四个地方:

1、IoAcceptor线程池。

2、IoConnector线程池。

3、IoProcessor线程池。

4、过滤器类ExecutorFilter线程池。

 

一、先说下IoAcceptorIoConnector线程池,它俩的实现类都继承了AbstractIoService类,而Executor也是定义在这个类里面的,所以使用线程池的方式是一样的。

先看下AbstractIoService类关于线程池的初始化,它的初始化是在构造方法里面进行的:

 

Java代码 复制代码 收藏代码
  1. if (executor == null) {   
  2.     //默认的线程池:可缓存的线程池   
  3.     this.executor = Executors.newCachedThreadPool();   
  4.     createdExecutor = true;   
  5. else {   
  6.     this.executor = executor;   
  7.     createdExecutor = false;   
  8. }   
  9.   
  10. //重新设定的线程名称   
  11. threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();  

 

下面是提交作业任务的方法:

Java代码 复制代码 收藏代码
  1. protected final void executeWorker(Runnable worker, String suffix) {   
  2.     String actualThreadName = threadName;   
  3.     if (suffix != null) {   
  4.         actualThreadName = actualThreadName + '-' + suffix;   
  5.     }   
  6.     // 向线程池中提交任务。   
  7.     executor.execute(new NamePreservingRunnable(worker, actualThreadName));   
  8. }  

对于IoAcceptor的任务提交调用是在bindunbind方法实现中的,看下bind最终调用,在类AbstractPollingIoAcceptor startupAcceptor方法中:

 

Java代码 复制代码 收藏代码
  1. // start the acceptor if not already started   
  2. Acceptor acceptor = acceptorRef.get();   
  3.   
  4. if (acceptor == null) {   
  5.     acceptor = new Acceptor();   
  6.   
  7.     if (acceptorRef.compareAndSet(null, acceptor)) {   
  8.         //放入工作线程池中,供异步执行。   
  9.         executeWorker(acceptor);   
  10.     }   
  11. }  

 

 再来看内部Acceptor,它作为接收者任务类,执行端口的绑定,通道的注册操作等。

Java代码 复制代码 收藏代码
  1. //实际的注册端口方法   
  2. nHandles += registerHandles();  

 registerHandles方法中关于注册端口的方法:

 

Java代码 复制代码 收藏代码
  1. try {   
  2.     // Process all the addresses   
  3.     for (SocketAddress a : localAddresses) {   
  4.         //注册端口,最终调用低层的注册方法,参考类NioSocketAcceptor   
  5.         H handle = open(a);   
  6.         newHandles.put(localAddress(handle), handle);   
  7.     }  
  

unbind方法和bind方法的调用很类似,这里就不说了。

再看下IoConnector,它最终是在方法connect时会提交任务,看下AbstractPollingIoConnector类的startupWorker方法:

 

Java代码 复制代码 收藏代码
  1. if (connector == null) {   
  2.     connector = new Connector();   
  3.        
  4.     if (connectorRef.compareAndSet(null, connector)) {   
  5.         //提交执行任务   
  6.         executeWorker(connector);   
  7.     }   
  8. }  

 对于IoAcceptorIoConnector线程池的线程池大小,一般来说一个对象里面只有一个线程池,一个线程池里面一般有一个线程,当然如果你的连接或者监听比较多时可能会自动增加线程,这个就看线程池自己分配了。

二、关于IoProcessor线程池。IoProcessor里面使用线程池的方式和上面两个使用方式很相似,代码都非常类似,看下AbstractPollingIoProcessor类的startupProcessor方法:

Java代码 复制代码 收藏代码
  1. private void startupProcessor() {   
  2.     Processor processor = processorRef.get();   
  3.   
  4.     if (processor == null) {   
  5.         processor = new Processor();   
  6.   
  7.         if (processorRef.compareAndSet(null, processor)) {   
  8.             //添加执行任务。   
  9.             executor.execute(new NamePreservingRunnable(processor, threadName));   
  10.         }   
  11.     }   
  12.     // Just stop the select() and start it again, so that the processor   
  13.     // can be activated immediately.   
  14.     wakeup();   
  15. }  

 它的大小是在SimpleIoProcessorPool中定义的,默认是CPU核数加1,代码如下:

Java代码 复制代码 收藏代码
  1. //默认的大小为CPU核数加1   
  2. private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;  

 

 最终调用其实还是调用的AbstractPollingIoProcessor里面的执行线程,可以看下SimpleIoProcessorPool的构造方法:

Java代码 复制代码 收藏代码
  1. try {   
  2.     processorConstructor = processorType.getConstructor(ExecutorService.class);   
  3.     //最终还是调用AbstractPollingIoProcessor进行数据处理的。   
  4. pool[0] = processorConstructor.newInstance(this.executor);   
  5. ……   
  6. // Constructor found now use it for all subsequent instantiations   
  7. for (int i = 1; i < pool.length; i++) {   
  8.     try {   
  9.         if (usesExecutorArg) {   
  10.             pool[i] = processorConstructor.newInstance(this.executor);   
  11.         } else {   
  12.             pool[i] = processorConstructor.newInstance();   
  13.         }   
  14.     } catch (Exception e) {   
  15.         // Won't happen because it has been done previously   
  16.     }   
  17. }  

 我们可以看到在有个这样的变量:

Java代码 复制代码 收藏代码
  1. /** The pool table */  
  2. private final IoProcessor[] pool;  

 从这个变量我们可以发现mina的线程池模型是以多个newCachedThreadPool存在的,至于mina为什么要这样处理,这里我也不得而知,如果哪位知道的话可以一起讨论……

三、ExecutorFilter类中的线程池。这是一个可选的线程池,是加在过滤器当中的。我们一般选择加在过滤器的最后面,这样Handler里面的业务处理就可以在线程池里面进行处理了。它的默认大小是16

 

Java代码 复制代码 收藏代码
  1. /** The default pool size */  
  2. private static final int DEFAULT_MAX_POOL_SIZE = 16;  

 

 看下Executor的创建方式:

 

Java代码 复制代码 收藏代码
  1. private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,   
  2.     TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {   
  3.     // Create a new Executor   
  4.     Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,    
  5.         keepAliveTime, unit, threadFactory, queueHandler);   
  6.        
  7.     return executor;   
  8. }  

 

 OrderedThreadPoolExecutor是一个继承了ThreadPoolExecutor的类,覆盖了一些方法的实现。看下任务提交的代码:

 

Java代码 复制代码 收藏代码
  1. protected void fireEvent(IoFilterEvent event) {   
  2.     //将事件提交给线程池执行   
  3.     executor.execute(event);   
  4. }  

 

 里面的实现细节相对比较复杂,感兴趣的童鞋可以再自行深入研究。

 

     四、推荐文章

 

 

1. java并发编程-Executor框架

http://mshijie./blog/366591

 

2. java.util.concurrent介绍

http://www.cnblogs.com/sarafill/archive/2011/05/18/2049461.html

 

3. 原子变量(AtomicLong, AtomicInteger, AtomicReference)

http://meng-lin./blog/485281

 

 

    五、总结

 

上面的文章基本上讲的比较简单,粒度比较粗,线程池的应用是mina的核心之一,里面有很多细节的地方其实是很值得学习的,当然本人到现在也不能完全吃透。还需要以后在交流和学习中与大家一起成长。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多