一、生产者消费者问题
做为苦逼的程序员的我们基本没有不知道生产者消费者问题的,这个经典的问题充分体现了进程同步的问题,还是简单的说下它的概念,生产者和消费者是两个线程,生产者线程生产物品放到空的缓冲区内(可能是一个list),消费者线程从缓冲区内取出物品进行消费并释放缓冲区,缓冲区有个固定大小,当生产者线程将缓冲区填充满时,生产者线程处于等待状态,等待消费者线程消费;当缓冲区消费空了后,消费者线程处于等待状态,等待生产者线程进行生产。当然生产者和消费者也可以有多个线程充当,但是操作的进程地址空间却只能是同一个。
这个经典的问题体现了多线程编程的一些要注意的地方,比如对同一资源进行访问所产生的互斥和同步问题。
下面看下对生产者消费者问题的实现。
物品类:
Java代码
- package com.lifanghu.procon;
-
- /**
- * 食物
- * @author lifh
- * @mail wslfh2005@163.com
- * @since 2012-6-22 上午08:13:34
- * @name com.lifanghu.procon.Food.java
- * @version 1.0
- */
-
- public class Food {
-
- private String name;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- }
缓冲区:
Java代码
- package com.lifanghu.procon;
-
- import java.util.ArrayList;
- import java.util.List;
-
- /**
- * 容器,缓冲区
- * @author lifh
- * @mail wslfh2005@163.com
- * @since 2012-6-22 上午08:33:56
- * @name com.lifanghu.procon.Container.java
- * @version 1.0
- */
-
- public class Container {
-
- //缓冲区大小
- private int size;
- private List foods;
-
- public Container(int size) {
- this.size = size;
- foods = new ArrayList(size);
- }
-
- public synchronized void poll(Food food) {
- while (foods.size() >= size) {
- try {
- wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- foods.add(food);
- notifyAll();
- }
- public synchronized Food offer() {
- Food food = null;
- while (foods.size() == 0) {
- try {
- wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- food = foods.remove(foods.size() - 1);
- notifyAll();
- return food;
- }
- }
生产者:
Java代码
- package com.lifanghu.procon;
-
- /**
- * 生产者
- * @author lifh
- * @mail wslfh2005@163.com
- * @since 2012-6-22 上午08:13:26
- * @name com.lifanghu.procon.Producer.java
- * @version 1.0
- */
-
- public class Producer implements Runnable {
-
- private Container container;
-
- public Producer(Container container) {
- super();
- this.container = container;
- }
-
- public void run() {
- for (int i = 0; i < 10; i++) {
- Food food = new Food();
- food.setName("馒头" + i);
- System.out.println("生产者生产出" + food.getName());
- container.poll(food);
- try {
- Thread.sleep((long) (Math.random() * 3000));
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
消费者:
Java代码
- package com.lifanghu.procon;
-
- /**
- * 消费者
- * @author lifh
- * @mail wslfh2005@163.com
- * @since 2012-6-22 上午08:13:52
- * @name com.lifanghu.procon.Consumer.java
- * @version 1.0
- */
-
- public class Consumer implements Runnable {
-
- private Container container;
-
- public Consumer(Container container) {
- super();
- this.container = container;
- }
-
- public void run() {
- for (;;) {
- Food food = container.offer();
- try {
- Thread.sleep((long) (Math.random() * 3000));
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- if (food != null) {
- System.out.println(food.getName() + "被消费!");
- }
- }
- }
- }
测试类:
Java代码
- package com.lifanghu.procon;
-
- /**
- * 客户端测试类
- * @author lifh
- * @mail wslfh2005@163.com
- * @since 2012-6-22 上午08:13:59
- * @name com.lifanghu.procon.Client.java
- * @version 1.0
- */
-
- public class Client {
-
- public static void main(String[] args) {
- Container container = new Container(5);
- Thread producer1 = new Thread(new Producer(container));
- // Thread producer2 = new Thread(new Producer(container));
- // producer2.start();
- Thread consumer1 = new Thread(new Consumer(container));
- producer1.start();
- consumer1.start();
- }
- }
输出结果:
生产者生产出馒头0 馒头0被消费! 生产者生产出馒头1 馒头1被消费! 生产者生产出馒头2 生产者生产出馒头3 生产者生产出馒头4 馒头2被消费! 生产者生产出馒头5 馒头4被消费! 馒头5被消费! 生产者生产出馒头6 生产者生产出馒头7 馒头3被消费! 馒头7被消费! 馒头6被消费! 生产者生产出馒头8 馒头8被消费! 生产者生产出馒头9 馒头9被消费!
二、 线程池及实现
上面我们讲到了生产者消费者的问题,那么这和线程池有什么关系呢?其实线程池的实现就是生产者消费者问题的实现,理解了生产者消费者问题就不会对线程池的实现感到神秘了,线程池在很多地方会用到,比如tomcat等各种中间容器的实现,Spring对线程池的支持等,当然mina中也使用到了线程池的概念。至于为什么要用到线程池,网上文章很多,基本是操作系统支持的线程数有限,线程的创建关闭有很大的系统开销,线程的切换也会影响系统性能等等。
下面这个图就是线程池的基本原理图,看看是不是和生产者消费者问题一样。
看下简单对线程池的实现代码,主要包括三个类,一个是线程池,一个是工作任务,一个是客户端进行任务添加。
任务类,比较简单,实现Runnable接口:
Java代码
- package com.lifanghu.threadpool;
- //任务类,具体要执行的操作
- public class Worker implements Runnable {
- private int id;
- public Worker(int id) {
- this.id = id;
- }
- public void run() {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("线程:" + Thread.currentThread().getName() + " 执行任务" + id);
- }
- }
线程池,相对复杂一些,但是原理是很简单的:
Java代码
- package com.lifanghu.threadpool;
- import java.util.LinkedList;
- /**
- * 线程池实现
- * @author lifh
- * @mail wslfh2005@163.com
- * @since 2012-6-22 下午03:31:47
- * @name com.lifanghu.threadpool.ThreadPool.java
- * @version 1.0
- */
- public class ThreadPool {
- // 线程池大小
- private final int nThreads;
- // 线程池工作者(具体线程)
- private final PoolWorker[] threads;
- // 任务队列
- private final LinkedList queue;
- public ThreadPool(int nThreads) {
- // 初始线程池,并启动线程池里面的线程
- this.nThreads = nThreads;
- queue = new LinkedList();
- threads = new PoolWorker[nThreads];
- for (int i = 0; i < nThreads; i++) {
- threads[i] = new PoolWorker();
- threads[i].start();
- }
- }
- // 提交工作任务,实际将任务放入队列,并通知线程进行消费
- public void execute(Runnable r) {
- synchronized (queue) {
- queue.addLast(r);
- queue.notify();
- }
- }
-
- private class PoolWorker extends Thread {
- public void run() {
- Runnable r;
- // 循环取出任务队列里的任务进行消费,如果没有任务,就等待任务到来。
- while (true) {
- synchronized (queue) {
- while (queue.isEmpty()) {
- try {
- queue.wait();
- } catch (InterruptedException ignored) {
- }
- }
- r = queue.removeFirst();
- }
- try {
- r.run();
- } catch (RuntimeException e) {
- }
- }
- }
- }
- }
看下客户端的调用代码:
Java代码
- package com.lifanghu.threadpool;
- /**
- * 客户端测试类
- * @author lifh
- * @mail wslfh2005@163.com
- * @since 2012-6-22 下午03:25:36
- * @name .Client.java
- * @version 1.0
- */
- public class Client {
- public static void main(String[] args) {
- ThreadPool queue = new ThreadPool(10);
- // 提交工作任务。
- queue.execute(new Worker(1));
- queue.execute(new Worker(2));
- queue.execute(new Worker(3));
- }
- }
观察输出结果:
线程:Thread-1 执行任务1 线程:Thread-5 执行任务3 线程:Thread-3 执行任务2
怎么样,感觉是不是很easy呢?咱们的线程池实现其实比较简单的,但是实际应用中我们用线程池比较常见的方式还是使用JDK中对线程池的实现,它提供了ExecutorService,Executor等类实现了对线程池的支持,不过线程池的实现原理其实是和我们的一样的,只不过它更多的考虑了实现细节,功能更强一些,关于它的使用网上有很多文章讲的已经很清楚了,可以参考:http://mshijie./blog/366591。
三、Mina中的线程池模型
前面讲了生产者消费者问题以及由此引出的线程池的实现问题,那么现在我们来看下实际开源项目mina中是怎么使用线程池模型的。
Mina中的线程池使用主要有四个地方:
1、IoAcceptor线程池。
2、IoConnector线程池。
3、IoProcessor线程池。
4、过滤器类ExecutorFilter线程池。
一、先说下IoAcceptor和IoConnector线程池,它俩的实现类都继承了AbstractIoService类,而Executor也是定义在这个类里面的,所以使用线程池的方式是一样的。
先看下AbstractIoService类关于线程池的初始化,它的初始化是在构造方法里面进行的:
Java代码
- if (executor == null) {
- //默认的线程池:可缓存的线程池
- this.executor = Executors.newCachedThreadPool();
- createdExecutor = true;
- } else {
- this.executor = executor;
- createdExecutor = false;
- }
-
- //重新设定的线程名称
- threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
下面是提交作业任务的方法:
Java代码
- protected final void executeWorker(Runnable worker, String suffix) {
- String actualThreadName = threadName;
- if (suffix != null) {
- actualThreadName = actualThreadName + '-' + suffix;
- }
- // 向线程池中提交任务。
- executor.execute(new NamePreservingRunnable(worker, actualThreadName));
- }
对于IoAcceptor的任务提交调用是在bind和unbind方法实现中的,看下bind最终调用,在类AbstractPollingIoAcceptor的 startupAcceptor方法中:
Java代码
- // start the acceptor if not already started
- Acceptor acceptor = acceptorRef.get();
-
- if (acceptor == null) {
- acceptor = new Acceptor();
-
- if (acceptorRef.compareAndSet(null, acceptor)) {
- //放入工作线程池中,供异步执行。
- executeWorker(acceptor);
- }
- }
再来看内部Acceptor,它作为接收者任务类,执行端口的绑定,通道的注册操作等。
Java代码
- //实际的注册端口方法
- nHandles += registerHandles();
registerHandles方法中关于注册端口的方法:
Java代码
- try {
- // Process all the addresses
- for (SocketAddress a : localAddresses) {
- //注册端口,最终调用低层的注册方法,参考类NioSocketAcceptor
- H handle = open(a);
- newHandles.put(localAddress(handle), handle);
- }
unbind方法和bind方法的调用很类似,这里就不说了。
再看下IoConnector,它最终是在方法connect时会提交任务,看下AbstractPollingIoConnector类的startupWorker方法:
Java代码
- if (connector == null) {
- connector = new Connector();
-
- if (connectorRef.compareAndSet(null, connector)) {
- //提交执行任务
- executeWorker(connector);
- }
- }
对于IoAcceptor和IoConnector线程池的线程池大小,一般来说一个对象里面只有一个线程池,一个线程池里面一般有一个线程,当然如果你的连接或者监听比较多时可能会自动增加线程,这个就看线程池自己分配了。
二、关于IoProcessor线程池。IoProcessor里面使用线程池的方式和上面两个使用方式很相似,代码都非常类似,看下AbstractPollingIoProcessor类的startupProcessor方法:
Java代码
- private void startupProcessor() {
- Processor processor = processorRef.get();
-
- if (processor == null) {
- processor = new Processor();
-
- if (processorRef.compareAndSet(null, processor)) {
- //添加执行任务。
- executor.execute(new NamePreservingRunnable(processor, threadName));
- }
- }
- // Just stop the select() and start it again, so that the processor
- // can be activated immediately.
- wakeup();
- }
它的大小是在SimpleIoProcessorPool中定义的,默认是CPU核数加1,代码如下:
Java代码
- //默认的大小为CPU核数加1
- private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
最终调用其实还是调用的AbstractPollingIoProcessor里面的执行线程,可以看下SimpleIoProcessorPool的构造方法:
Java代码
- try {
- processorConstructor = processorType.getConstructor(ExecutorService.class);
- //最终还是调用AbstractPollingIoProcessor进行数据处理的。
- pool[0] = processorConstructor.newInstance(this.executor);
- ……
- // Constructor found now use it for all subsequent instantiations
- for (int i = 1; i < pool.length; i++) {
- try {
- if (usesExecutorArg) {
- pool[i] = processorConstructor.newInstance(this.executor);
- } else {
- pool[i] = processorConstructor.newInstance();
- }
- } catch (Exception e) {
- // Won't happen because it has been done previously
- }
- }
我们可以看到在有个这样的变量:
Java代码
- /** The pool table */
- private final IoProcessor
[] pool;
从这个变量我们可以发现mina的线程池模型是以多个newCachedThreadPool存在的,至于mina为什么要这样处理,这里我也不得而知,如果哪位知道的话可以一起讨论……
三、ExecutorFilter类中的线程池。这是一个可选的线程池,是加在过滤器当中的。我们一般选择加在过滤器的最后面,这样Handler里面的业务处理就可以在线程池里面进行处理了。它的默认大小是16。
Java代码
- /** The default pool size */
- private static final int DEFAULT_MAX_POOL_SIZE = 16;
看下Executor的创建方式:
Java代码
- private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
- TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
- // Create a new Executor
- Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,
- keepAliveTime, unit, threadFactory, queueHandler);
-
- return executor;
- }
类OrderedThreadPoolExecutor是一个继承了ThreadPoolExecutor的类,覆盖了一些方法的实现。看下任务提交的代码:
Java代码
- protected void fireEvent(IoFilterEvent event) {
- //将事件提交给线程池执行
- executor.execute(event);
- }
里面的实现细节相对比较复杂,感兴趣的童鞋可以再自行深入研究。
四、推荐文章
1. java并发编程-Executor框架
http://mshijie./blog/366591
2. java.util.concurrent介绍
http://www.cnblogs.com/sarafill/archive/2011/05/18/2049461.html
3. 原子变量(AtomicLong, AtomicInteger, AtomicReference)
http://meng-lin./blog/485281
五、总结
上面的文章基本上讲的比较简单,粒度比较粗,线程池的应用是mina的核心之一,里面有很多细节的地方其实是很值得学习的,当然本人到现在也不能完全吃透。还需要以后在交流和学习中与大家一起成长。
|