分享

Java 线程池的理论与实践(二)

 太原中软 2017-05-15

四、如果线程死掉了怎么办

  几乎所有Executors中生成线程池的方法的注释上,都有代表相同意思的一句话,表示如果线程池中的某个线程死掉了,线程池会生成一个新的线程代替它。下面是方法java.util.concurrent.Executors.newFixedThreadPool(int)上的注释。

  If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.

线程死亡的原因

  我们都知道守护线程(daemon)会在所有的非守护线程都死掉之后也死掉,除此之外导致一个非守护线程死掉有以下几种可能:

  1. 自然死亡,Runnable.run()方法执行完后返回。
  2. 执行过程中有未捕获异常,被抛到了Runnable.run()之外,导致线程死亡。
  3. 其宿主死亡,进程关闭或者机器死机。在Java中通常是System.exit()方法被调用
  4. 其他硬件问题。

  线程池要保证其高可用性,就必须保证线程的可用。如一个固定容量的线程池,其中一个线程死掉了,它必须要能监控到线程的死亡并生成一个新的线程来代替它。ThreadPoolExecutor中与线程相关的有这样几个概念:

  1. java.util.concurrent.ThreadFactory,在Executors中有两种ThreadFactory,但其提供的线程池只使用了一种java.util.concurrent.Executors.DefaultThreadFactory,它是简单的使用ThreadGroup来实现。
  2. java.lang.ThreadGroup,从Java1开始就存在的类,用来建立一个线程的树形结构,可以用它来组织线程间的关系,但其并没有对其包含的子线程的监控。
  3. java.util.concurrent.ThreadPoolExecutor.Worker,ThreadPoolExecutor对线程的封装,其中还包含了一些统计功能。
ThreadPoolExecutor中如何保障线程的可用

  在ThreadPoolExecutor中使用了一个很巧妙的方法实现了对线程池中线程健康状况的监控,代码2是从ThreadPoolExecutor类源码中截取的一段代码,它们在一起说明了其对线程的监控。

  可以看到,在ThreadPoolExecutor中的线程被封装成一个对象Worker,而将其中的run()代理到ThreadPoolExecutor中的runWorker(),在runWorker()方法中是一个获取任务并执行的死循环。如果任务的运行出了什么问题(如抛出未捕获异常),processWorkerExit()方法会被执行,同时传入的completedAbruptly参数为true,会重新添加一个初始任务为null的Worker,并随之启动一个新的线程。

//代码2//ThreadPoolExecutor的动态内部类privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{ /** 对象中封装的线程 */final Thread thread; /** 第一个要运行的任务,可能为null. */ Runnable firstTask; /** 任务计数器 */volatilelong completedTasks; //省略其他代码 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */publicvoidrun(){ runWorker(this); } }finalvoidrunWorker(Worker w){ Thread wt = Thread.currentThread(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); try { beforeExecute(wt, task); try { task.run(); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }privatevoidprocessWorkerExit(Worker w, boolean completedAbruptly){ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }privatebooleanaddWorker(Runnable firstTask, boolean core){ retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary.if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) returnfalse; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) returnfalse; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startablethrownew IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } 五、回到我的问题

  由于各种各样的原因,我们并没有使用数据库自带的主从机制来做数据的复制,而是将主库的所有DML语句作为消息发送到读库(DTS),同时自己实现了数据的重放。第一版的数据同步服务十分简单,对于主库的DML消息处理和消费(写入读库)都是在一个线程内完成的.这么实现的优点是简单,但缺点是直接导致了表与表之间的数据同步会受到影响,如果有一个表A忽然来了很多的消息(往往是批量修改数据造成的),则会占住消息处理通道,影响其他业务数据的及时同步,同时单线程写库吞吐太小。

  上文说到,首先想到的是使用线程池来做消息的消费,但是不能直接套用上边说的Executor框架,由于以下几个原因:

  1. ThreadPoolExecutor中默认所有的任务之间是不互相影响的,然而对于数据库的DML来说,消息的顺序不能被打乱,至少单表的消息顺序必须有序,不然会影响最终的数据一致。
  2. ThreadPoolExecutor中所有的线程共享一个等待队列,然而为了防止表与表之间的影响,每个线程应该有自己的任务等待队列。
  3. 写库操作的吞吐直接受到提交事务数的影响,所以此多线程框架要可以支持任务的合并。

  重复造轮子是没有意义的,但是在我们这种场景下JDK中现有的Executor框架不符合要求,只能自己造轮子。

我的实现

  首先把线程抽象成「DML语句的执行器(Executor)」。其中包含了一个Thread的实例,维护了自己的等待队列(限定容量的阻塞队列),和对应的消息执行逻辑。

  除此之外还包含了一些简单的统计、线程健康监控、合并事务等处理。

  Executor的对象实现了Thread.UncaughtExceptionHandler接口,并绑定到其工作线程上。同时ExecutorGroup也会再生成一个守护线程专门来守护池内所有线程,作为额外的保险措施。

  把线程池的概念抽象成执行器组(ExecutorGroup),其中维护了执行器的数组,并维护了目标表到特定执行器的映射关系,并对外提供执行消息的接口,其主要代码如下:

//代码3publicclassExecutorGroup { Executor[] group = new Executor[NUM]; Thread boss = null; Map<String, Integer> registeredTables = new HashMap<>(32);// AtomicInteger cursor = new AtomicInteger();volatileint cursor = 0; publicExecutorGroup(String name) { //init groupfor(int i = 0; i < NUM; i++) { logger.debug("启动线程{},{}", name, i); group[i] = new Executor(this, String.format("sync-executor-%s-%d", name, i), i / NUM_OF_FIRST_CLASS); } startDaemonBoss(String.format("sync-executor-%s-boss", name)); } //额外的保险privatevoidstartDaemonBoss(String name) { if (boss != null) { boss.interrupt(); } boss = new Thread(() -> { while(true) { //休息一分钟。。。if (this.group != null) { for (int i = 0; i < group.length; i++) { Executor executor = group[i]; if (executor != null) { executor.checkThread(); } } } } }); boss.setName(name); boss.setDaemon(true); boss.start(); } publicvoidexecute(Message message){ logger.debug("执行消息"); //省略消息合法性验证if (!registeredTables.containsKey(taskKey)) { //已注册// registeredTables.put(taskKey, cursor.getAndIncrement()); registeredTables.put(taskKey, cursor++ % NUM); } int index = registeredTables.get(taskKey); logger.debug("执行消息{},注册索引{}", taskKey, index); try { group[index].schedule(message); } catch (InterruptedException e) { logger.error("准备消息出错", e); } } }

  完成后整体的线程模型如下图所示:

  新的线程模型

Java1.7新加入的TransferQueue

  Java1.7中提供了新的队列类型TransferQueue,但只提供了一个它的实现java.util.concurrent.LinkedTransferQueue<E>,它有更好的性能表现,可它是一个无容量限制的队列,而在我们的这个场景下必须要限制队列的容量,所以要自己实现一个有容量限制的队列。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多