配色: 字号:
Java并发程序设计教程-2010-08-10
2017-03-31 | 阅:  转:  |  分享 
  
Java并发程序设计教程

温绍锦

旧时王谢堂前燕,飞入寻常百姓家。

邮箱:szujobs@hotmail.com

旺旺:shaojinwensj

QQ:1420452

Blog:http://www.cnblogs.com/jobs/

版本:2010-08-09

内容列表

1、使用线程的经验:设置名称、响应中断、使用ThreadLocal

2、Executor:ExecutorService和Future☆☆☆

3、阻塞队列:put和take、offer和poll、drainTo

4、线程间的协调手段:lock、condition、wait、notify、notifyAll☆☆☆

5、Lock-free:atomic、concurrentMap.putIfAbsent、CopyOnWriteArrayList☆☆☆

6、关于锁使用的经验介绍

7、并发流程控制手段:CountDownlatch、Barrier

8、定时器:ScheduledExecutorService、大规模定时器TimerWheel

9、并发三大定律:Amdahl、Gustafson、Sun-Ni

10、神人和图书、相关网络资源

11、业界发展情况:GPGPU、OpenCL

12、复习题

学习的过程,着重注意红星标识☆的内容,学完之后,要求能够回

答复习题。

启动线程的注意事项

无论何种方式,启动一个线程,就要给它一个名字!这对排错诊断

系统监控有帮助。否则诊断问题时,无法直观知道某个线程的用途。

Threadthread=newThread("threadname"){

publicvoidrun(){

//doxxx

}

};

thread.start();

Threadthread=newThread(){

publicvoidrun(){

//doxxx

}

};

thread.setName("threadname");

thread.start();

publicclassMyThreadextendsThread{

publicMyThread(){

super("threadname");

}

publicvoidrun(){

//doxxx

}

}

MyThreadthread=newMyThread();

thread.start();

Threadthread=newThread(task);//传入任务

thread.setName(“threadname");

thread.start();

Threadthread=newThread(task,“threadname");

thread.start();

13

24

5

要响应线程中断

程序应该对线程中断作出恰当的响应。

Threadthread=newThread("interrupttest"){

publicvoidrun(){

for(;;){

doXXX();

if(Thread.interrupted()){

break;

}

}

}

};

thread.start();

Threadthread=newThread("interrupttest"){

publicvoidrun(){

for(;;){

try{

doXXX();

}catch(InterruptedExceptione){

break;

}catch(Exceptione){

//handleException

}

}

}

};

thread.start();publicvoidfoo()throwsInterruptedException{

if(Thread.interrupted()){

thrownewInterruptedException();

}

}

1

2

3

thread.interrupt();

ThreadLocal

ThreadLocal

initialValue():T

get():T

set(Tvalue)

remove()

顾名思义它是localvariable(线程局部变量)。它的功用非

常简单,就是为每一个使用该变量的线程都提供一个变量值

的副本,是每一个线程都可以独立地改变自己的副本,而不

会和其它线程的副本冲突。从线程的角度看,就好像每一个

线程都完全拥有该变量。

使用场景

Tokeepstatewithathread(user-id,transaction-id,logging-id)

Tocacheobjectswhichyouneedfrequently

隐式传参

注意:使用ThreadLocal,一般都是声明在静态变量中,如果不断的

创建ThreadLocal而且没有调用其remove方法,将会导致内存泄露。

同时请注意,如果是static的ThreadLocal,一般不需要调用remove。

ExecutorService

Executor

Thread

Executor

Thread

Executor

Thread

TaskTaskTaskTaskTask

TaskSubmitter

任务的提交者和执行者

TaskSubmitter

TaskSubmitter

为了方便并发执行任务,出现了一种专门用来执行任务的实现,也就是Executor。

由此,任务提交者不需要再创建管理线程,使用更方便,也减少了开销。

java.util.concurrent.Executors是Executor的工厂类,通过Executors可以创建你所需要的

Executor。

TaskExecutorTaskSubmitter

任务的提交者和执行者之间的通讯手段

ExecutorServiceexecutor=Executors.newSingleThreadExecutor();

Callabletask=newCallable(){

publicObjectcall()throwsException{

Objectresult="...";

returnresult;

}

};

Futurefuture=executor.submit(task);

future.get();//等待至完成

Futurefuture=executor.submit(task);

//等待到任务被执行完毕返回结果

//如果任务执行出错,这里会抛ExecutionException

future.get();

//等待3秒,超时后会抛TimeoutException

future.get(3,TimeUnit.SECONDS);

Callabletask=newCallable(){

publicObjectcall()throwsException{

Objectresult=…;

returnresult;

}

};

TaskSubmitter把任务提交给Executor执行,他们之间需要一种通

讯手段,这种手段的具体实现,通常叫做Future。Future通常包括

get(阻塞至任务完成),cancel,get(timeout)(等待一段时间)

等等。Future也用于异步变同步的场景。

有两种任务:

Runnable

Callable

Callable是需要返回值的任务

Future

cancel(boolean):boolean

isCancelled():boolean

isDone():boolean

get():T

get(long,TimeUnit):T

阻塞队列

producerconsumer

for(;;){

blockingQ.take();//如果队列空则阻塞

}

//如果队列满则阻塞

blockingQ.put(object);

阻塞队列,是一种常用的并发数据结构,常用于生产者-消费者模式。

在Java中,有很多种阻塞队列:

ArrayBlockingQueue

LinkedBlockingQueue

SynchronousQueue

PriorityBlockingQueue

CompletionService(BlockingQueue+Executor)

TransferQueue(JDK7中更快的SynchronousQueue)

最常用

不会满的

size为0

BlockingQueue

put(E)

take():E

offer(E,long,TimeUnit):boolean

poll(long,TimeUnit):E

remainingCapacity()

drainTo(Collection):int

drainTo(Collection,int):int

在BlockingQueue中,要使用put和take,而非offer和poll。如果

要使用offer和poll,也是要使用带等待时间参数的offer和poll。

使用drainTo批量获得其中的内容,能够减少锁的次数。

使用阻塞队列

Queue

add(E):boolean

remove():E

offer():boolean

poll():E

element():E

peek():E

使用BlockingQueue的时候,尽量不要使用从Queue继承下来的

方法,否则就失去了Blocking的特性了。

继承自

finalBlockingQueueblockingQ=newArrayBlockingQueue(10);

Threadthread=newThread("consumerthread"){

publicvoidrun(){

for(;;){

Objectobject=blockingQ.poll();//杯具,不等待就会直接返回

handle(object);

}

}

};

使用阻塞队列

finalBlockingQueueblockingQ=newArrayBlockingQueue(10);

Threadthread=newThread("consumerthread"){

publicvoidrun(){

for(;;){

try{

Objectobject=blockingQ.take();//等到有数据才继续

handle(object);

}catch(InterruptedExceptione){

break;

}catch(Exceptione){

//handleexception

}

}

}

};

X1

2

使用阻塞队列

finalBlockingQueueblockingQ=newArrayBlockingQueue(10);

Threadthread=newThread("consumerthread"){

publicvoidrun(){

for(;;){

try{

Objectobject=blockingQ.poll(1,TimeUnit.SECONDS);//防止死等

if(object==null){

continue;//或者做其他处理

}

}catch(InterruptedExceptione){

break;

}catch(Exceptione){

//handleexception

}

}

}

};3

classBlockingQ{

privateObjectnotEmpty=newObject();

privateQueuelinkedList=newLinkedList();

publicObjecttake()throwsInterruptedException{

synchronized(notEmpty){

if(linkedList.size()==0){

notEmpty.wait();

}

returnlinkedList.poll();

}

}

publicvoidoffer(Objectobject){

synchronized(notEmpty){

if(linkedList.size()==0){

notEmpty.notifyAll();

}

linkedList.add(object);

}

}

}

实现一个简单的阻塞队列(1)

要执行wait操作,必须先取得该对象的锁。

执行wait操作之后,锁会释放。

被唤醒之前,需要先获得锁。

要执行notify和notifyAll操作,都必须先取

得该对象的锁。

未取得锁就直接执行wait、notfiy、notifyAll会抛异常

通过实现简单的阻塞

队列来学习并发知识

classBlockingQ{

privateObjectnotEmpty=newObject();

privateObjectnotFull=newObject();

privateQueuelinkedList=newLinkedList();

privateintmaxLength=10;

publicObjecttake()throwsInterruptedException{

synchronized(notEmpty){

if(linkedList.size()==0){

notEmpty.wait();

}

synchronized(notFull){

if(linkedList.size()==maxLength){

notFull.notifyAll();

}

returnlinkedList.poll();

}

}

}

publicvoidoffer(Objectobject)throwsInterruptedException{

synchronized(notEmpty){

if(linkedList.size()==0){

notEmpty.notifyAll();

}

synchronized(notFull){

if(linkedList.size()==maxLength){

notFull.wait();

}

linkedList.add(object);

}

}

}

}

实现一个简单的阻塞队列(2)

分别需要对notEmpty和notFull加锁

分别需要对notEmpty和notFull加锁

通过实现简单的阻塞

队列来学习并发知识

classBlockingQ{

privateLocklock=newReentrantLock();

privateConditionnotEmpty=lock.newCondition();

privateConditionnotFull=lock.newCondition();

privateQueuelinkedList=newLinkedList();

privateintmaxLength=10;

publicObjecttake()throwsInterruptedException{

lock.lock();

try{

if(linkedList.size()==0){

notEmpty.await();

}

if(linkedList.size()==maxLength){

notFull.signalAll();

}

returnlinkedList.poll();

}finally{

lock.unlock();

}

}

publicvoidoffer(Objectobject)throwsInterruptedException{

lock.lock();

try{

if(linkedList.size()==0){

notEmpty.signalAll();

}

if(linkedList.size()==maxLength){

notFull.await();

}

linkedList.add(object);

}finally{

lock.unlock();

}

}

}

实现一个简单的阻塞队列(3)

要执行await操作,必须先取得该Condition的锁。

执行await操作之后,锁会释放。

被唤醒之前,需要先获得锁。

一个锁可以创建多个Condition

注意:未锁就直接执行await、signal、siganlAll会抛异常

要执行signal和signalAll操作,都必须先取

得该对象的锁。

通过实现简单的阻塞

队列来学习并发知识

ReentrantLock和Synchronized

Synchronized是Lock的一种简化实现,一个Lock可以对应多个

Condition,而synchronized把Lock和Condition合并了,一个

synchronizedLock只对应一个Condition,可以说Synchronized是Lock

的简化版本。

在JDK5,Synchronized要比Lock慢很多,但是在JDK6中,它们的效

率差不多。

Lock

lock();

tryLock()

Unlock();

Condition

await();

signal();

signalAll();

synchronzied

lock();

unlock();

wait();

notify();

notifyAll();

10..

awati->wait

singal->notify

singalAll->notifyAll

不要在Lock和Condition上使用wait、notiffy、notifyAll方法!

Inconcurrentprogramming,amonitorisanobjectintendedtobeusedsafelybymorethanonethread.Thedefiningcharacteristicofamonitor

isthatitsmethodsareexecutedwithmutualexclusion.Thatis,ateachpointintime,atmostonethreadmaybeexecutinganyofitsmethods.

Thismutualexclusiongreatlysimplifiesreasoningabouttheimplementationofmonitorscomparedwithcodethatmaybeexecutedinparallel.

Monitorsalsoprovideamechanismforthreadstotemporarilygiveupexclusiveaccess,inordertowaitforsomeconditiontobemet,before

regainingexclusiveaccessandresumingtheirtask.Monitorsalsohaveamechanismforsignalingotherthreadsthatsuchconditionshavebeen

met.

MonitorswereinventedbyC.A.R.Hoare[1]andPerBrinchHansen,[2]andwerefirstimplementedinBrinchHansen''sConcurrentPascal

language.

http://en.wikipedia.org/wiki/Monitor_(synchronization)

Monitor的理论模型

AMesastyle

monitorwith

twocondition

variablesa

andb

AJava

synchronized

stylemonitor

a

b

使用AtomicInteger

classCounter{

privateAtomicIntegercount=newAtomicInteger();

publicvoidincrement(){

count.incrementAndGet();

}

publicintgetCount(){

returncount.get();

}

}

classCounter{

privatevolatileintcount=0;

publicsynchronizedvoidincrement(){

count++;

}

publicintgetCount(){

returncount;

}

}

若要线程安全执行执行count++,需要加锁

使用AtomicInteger之后,不需要加锁,也可以

实现线程安全。

这是由硬件提供原子操作指令实现的。在非激烈竞争的情况下,开

销更小,速度更快。java.util.concurrent中实现的原子操作类包

括:

AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference

1

2

classCounter{

privatevolatileintmax=0;

publicsynchronizedvoidset(intvalue){

if(value>max){

max=value;

}

}

publicintgetMax(){

returnmax;

}

}

classCounter{

privateAtomicIntegermax=newAtomicInteger();

publicvoidset(intvalue){

for(;;){

intcurrent=max.get();

if(value>current){

if(max.compareAndSet(current,value)){

break;

}else{

continue;

}

}else{break;}

}

}

publicintgetMax(){

returnmax.get();

}

}

1

2

若要线程安全,需要加锁

LockFree算法,不需要加锁。

通常都是三个部分组成:

①循环

②CAS(CompareAndSet)

③回退

循环

CAS

回退

使用Lock-Free算法

classCounter{

privateAtomicIntegermax=newAtomicInteger();

publicvoidset(intvalue){

intcurrent;

do{

current=max.get();

if(value<=current){

break;

}

}while(!max.compareAndSet(current,value));

}

publicintgetMax(){

returnmax.get();

}

}

3

使用Lock-Free算法

方案3和方案2的实现是一样

的,只不过把for(;;)换成了

do…while。

非阻塞型同步(Non-blockingSynchronization)

如何正确有效的保护共享数据是编写并行程序必须面临的一个难题,通常的手段就是同步。

同步可分为阻塞型同步(BlockingSynchronization)和非阻塞型同步(Non-blocking

Synchronization)。

阻塞型同步是指当一个线程到达临界区时,因另外一个线程已经持有访问该共享数据的锁,

从而不能获取锁资源而阻塞,直到另外一个线程释放锁。常见的同步原语有mutex、

semaphore等。如果同步方案采用不当,就会造成死锁(deadlock),活锁(livelock)和

优先级反转(priorityinversion),以及效率低下等现象。

为了降低风险程度和提高程序运行效率,业界提出了不采用锁的同步方案,依照这种设计

思路设计的算法称为非阻塞型算法,其本质特征就是停止一个线程的执行不会阻碍系统中

其他执行实体的运行。

当今比较流行的Non-blockingSynchronization实现方案有三种:

Wait-free

Wait-free是指任意线程的任何操作都可以在有限步之内结束,而不用关心其它线程的

执行速度。Wait-free是基于per-thread的,可以认为是starvation-free的。非常遗憾的是

实际情况并非如此,采用Wait-free的程序并不能保证starvation-free,同时内存消耗也随

线程数量而线性增长。目前只有极少数的非阻塞算法实现了这一点。

Lock-free

Lock-Free是指能够确保执行它的所有线程中至少有一个能够继续往下执行。由于每个

线程不是starvation-free的,即有些线程可能会被任意地延迟,然而在每一步都至少有一

个线程能够往下执行,因此系统作为一个整体是在持续执行的,可以认为是system-wide

的。所有Wait-free的算法都是Lock-Free的。

Obstruction-free

Obstruction-free是指在任何时间点,一个孤立运行线程的每一个操作可以在有限步之

内结束。只要没有竞争,线程就可以持续运行。一旦共享数据被修改,Obstruction-free要

求中止已经完成的部分操作,并进行回滚。所有Lock-Free的算法都是Obstruction-free的。

Wait-Free

Lock-Free

Obstruction-Free

Atomic

Lockless-based

Lock-based

















进一步使用Lock-Free数据结构

classBeanManager{

privateMapmap=newHashMap();

publicObjectgetBean(Stringkey){

synchronized(map){

Objectbean=map.get(key);

if(bean==null){

map.put(key,createBean());

bean=map.get(key);

}

returnbean;

}

}

}

classBeanManager{

privateConcurrentMapmap=newConcurrentHashMap();

publicObjectgetBean(Stringkey){

Objectbean=map.get(key);

if(bean==null){

map.putIfAbsent(key,createBean());

bean=map.get(key);

}

returnbean;

}

}

使用ConcurrentMap,避免直

接使用锁,锁由数据结构来

管理。

ConcurrentHashMap并没有实现Lock-Free,只是使用了分离锁的办

法使得能够支持多个Writer并发。ConcurrentHashMap需要使用更

多的内存。

publicclassSequenceDaoextendsSqlMapClientDaoSupport{

publicbooleancompareAndSet(Stringname,intvalue,intexpect){

Mapparameters=newHashMap();

parameters.put("name",name);

parameters.put("value",value);

parameters.put("expect",expect);

//UPDATEt_sequenceSETvalue=#value#WHEREname=#name#ANDvalue=#expect#

intupdateCount=getSqlMapClientTemplate().update("Sequence.compareAndSet",parameters);

returnupdateCount==1;

}

}

publicclassSequenceService{

@Transactional(propagation=Propagation.NOT_SUPPORTED)

publicsynchronizedvoidincrement(StringsequenceName){

for(;;){

intvalue=dao.getValue(sequenceName);

if(dao.compareAndSet(sequenceName,value+1,value)){

break;

}

}

}

}

同样的思路用于更新数据库-乐观锁

通过UpdateCount来实现

CompareAndSet

三个部分:

①循环

②CAS(CompareAndSet)

③回退

注意,乐观锁时必须使用:@Transactional(propagation=Propagation.NOT_SUPPORTED)

publicclassSequenceDaoextendsSqlMapClientDaoSupport{

publicintgetValueForUpdate(Stringname){

//SELECTvalueFROMt_sequenceWHEREname=#name#FORUPDATE

return(Integer)getSqlMapClientTemplate().queryForObject("Sequence.getValueForUpdate",name);

}

publicvoidset(Stringname,intvalue){

Mapparameters=newHashMap();

parameters.put("name",name);

parameters.put("value",value);

//UPDATEt_sequenceSETvalue=#value#WHEREname=#name#

getSqlMapClientTemplate().update("Sequence.set",parameters);

}

}

publicclassSequenceService{

@Transactional(propagation=Propagation.REQUIRED)

publicsynchronizedvoidincrement2(StringsequenceName){

intvalue=dao.getValueForUpdate(sequenceName);

dao.set(sequenceName,value+1);

}

}

对比,使用悲观锁版本

读取时,就开始加锁。

Lock-Free算法,可以说是乐观锁,如果非激烈竞争的时候,不需要

使用锁,从而开销更小,速度更快。

使用CopyOnWriteArrayList

classEngine{

privateListlisteners=newArrayList();

publicbooleanaddListener(Listenerlistener){

synchronized(listeners){

returnlisteners.add(listener);

}

}

publicvoiddoXXX(){

synchronized(listeners){

for(Listenerlistener:listeners){

listener.handle();

}

}

}

}

classEngine{

privateListlisteners=newCopyOnWriteArrayList();

publicbooleanaddListener(Listenerlistener){

returnlisteners.add(listener);

}

publicvoiddoXXX(){

for(Listenerlistener:listeners){

listener.handle();

}

}

}

适当使用CopyOnWriteArrayList,能够提高

读操作时的效率。

1

2

COW是一种很古老的技术,

类似的并发数据结构有:

ConcurrentSkipListMap

ConcurrentSkipListSet

CopyOnWriteArrayList

CopyOnWriteArraySet

锁的使用

①使用支持CAS的数据结构,避免使用锁,如:

AtomicXXX、ConcurrentMap、CopyOnWriteList、ConcurrentLinkedQueue

②一定要使用锁的时候,注意获得锁的顺序,相反顺序获得锁,就容易产生死锁。

③死锁经常是无法完全避免的,鸵鸟策略被很多基础框架所采用。

④通过Dump线程的StackTrace,例如linux下执行命令kill-3,或者jstack–l,或

者使用Jconsole连接上去查看线程的StackTrace,由此来诊断死锁问题。

⑤外部锁常被忽视而导致死锁,例如数据库的锁

⑥存在检测死锁的办法

⑦存在一些预防死锁的手段,比如Lock的tryLock,JDK7中引入的Phaser等。

finalintCOUNT=10;

finalCountDownLatchcompleteLatch=newCountDownLatch(COUNT);

for(inti=0;i
Threadthread=newThread("workerthread"+i){

publicvoidrun(){

//doxxxx

completeLatch.countDown();

}

};

thread.start();

}

completeLatch.await();

并发流程控制-使用CoutDownLatch

当你启动了一个线程,你需要等它执行结束,

此时,CountDownLatch也许是一个很好的选择。

finalCountDownLatchstartLatch=newCountDownLatch(1);

for(inti=0;i<10;++i){

Threadthread=newThread("workerthread"+i){

publicvoidrun(){

try{

startLatch.await();

}catch(InterruptedExceptione){

return;

}

//doxxxx

}

};

thread.start();

}

//doxxx

startLatch.countDown();

当你启动很多线程,你需要这些线程等到通知

后才真正开始,CountDownLatch也许是一个很

好的选择。

finalCountDownLatchlatch=newCountDownLatch(1);

Threadthread=newThread(“worker-thread"){

publicvoidrun(){

//doxxx

latch.countDown();

}

};

thread.start();

latch.await();

finalLocklock=newReentrantLock();

finalConditioncompleteSignal=lock.newCondition();

Threadthread=newThread("worker-thread"){

publicvoidrun(){

//doxxx

lock.lock();

try{

completeSignal.signalAll();

}finally{

lock.unlock();

}

}

};

lock.lock();

try{

thread.start();

completeSignal.await();

}finally{

lock.unlock();

}

finalObjectcompleteSignal=newObject();

Threadthread=newThread("worker-thread"){

publicvoidrun(){

//doxxx

synchronized(completeSignal){

completeSignal.notifyAll();

}

}

};

synchronized(completeSignal){

thread.start();

completeSignal.wait();

}

为什么要使用CountDownLatch

1

2

3

以上三种实现功能是一样的

方案1使用CountDownLatch最简单

Barrier

A

B

C

D

Bar

rer

A

B

C

D

Bar

rer

A

B

C

D

Bar

rer

time

Abarrier:Abarrierisacoordinationmechanosm(analgorithm)thatforces

processwhichparticipateinaconcurrent(ordistributed)algorithmto

waituntileachoneofthemhasreachedacertainpointinitsprogram.The

collectionofthesecoordinationpointsiscalledthebarrier.Onceallthe

processeshavereachedthebarrier,theyareallpermittedtocontinuepast

thebarrier.

并发流程控制-使用CycliBarrier

classPerformaceTest{

privateintthreadCount;

privateCyclicBarrierbarrier;

privateintloopCount=10;

publicPerformaceTest(intthreadCount){

this.threadCount=threadCount;

barrier=newCyclicBarrier(threadCount,newRunnable(){

publicvoidrun(){

collectTestResult();

}

});

for(inti=0;i
Threadthread=newThread("test-thread"+i){

publicvoidrun(){

for(intj=0;j
doTest();

try{

barrier.await();

}catch(InterruptedExceptione){

return;

}catch(BrokenBarrierExceptione){

return;

}

}

}

};

thread.start();

}

}

privatevoiddoTest(){/doxxx/}

privatevoidcollectTestResult(){/doxxx/}

}

使用Barrier来实现并发性能

测试的聚合点。

JDK7是中包括一个类似

的流程控制手段Phaser

使用定时器

ScheduledExecutorService

schedule(Runnablecommand,longdelay,TimeUnitunit):ScheduledFuture

schedule(Callablecallable,longdelay,TimeUnitunit):ScheduledFuture

scheduleAtFixedRate(Runnablecomand,longinitDelay,longperiod,TimeUnitunit):ScheduledFuture

scheduleWithFixedDelay(Runnablecommand,longinitDelay,longdelay,TimeUnitunit):ScheduledFuture

java.util.concurrent.Executors是ScheduledExecutorService的工厂类,通过Executors,你可

以创建你所需要的ScheduledExecutorService。

JDK1.5之后有了ScheduledExecutorService,不建议你再使用java.util.Timer,因为它无论

功能性能都不如ScheduledExecutorService。

ScheduledExecutorServiceScheduledTaskSubmitter

ScheduleFuturefuture=scheduler.schedule(task,1,TimeUnit.SECONDS);

//等待到任务被执行完毕返回结果

//如果任务执行出错,这里会抛ExecutionException

future.get();

//取消调度任务

future.cancel();

TaskTaskTaskTaskTask

DelayedWorkQueue

大规模定时器TimerWheel

存在一种算法TimerWheel,适用于大规模的定时器实现。这个算法最早是被设计用来实

现BSD内核中定时器的,后来被广泛移植到诸如ACE等框架中,堪称BSD中经典算法之

一,能针对定时器的各类常见操作提供接近常数时间的响应,且能根据需要很容易进行

扩展。

1

2

3

45

6

7

0

在网络通讯领域,常见有TimerWheel的实现,包括ACE、MicrosoftUnifiedComunication

ManagedAPI等,因为大规模定时器目前只能使用TimerWheel。

硬件的发展趋势非常清晰;Moore定律表明不会出现更高的时钟频率,但是每个芯片上会

集成更多的内核。很容易想象让十几个处理器繁忙地处理一个粗粒度的任务边界(比如一

个用户请求),但是这项技术不会扩大到数千个处理器——在这种环境下短时间内流量可

能会呈指数级增长,但最终硬件趋势将会占上风。当跨入多内核时代时,我们需要找到更

细粒度的并行性,否则将面临即便有许多工作需要去做而处理器却仍处于空闲的风险。如

果希望跟上技术发展的脚步,软件平台也必须配合主流硬件平台的转变。最终,Java7包

含一种框架,用于表示某种更细粒度级别的并行算法:fork-join框架。

Fork-join融合了分而治之技术;获取问题后,递归地将它分成多个子问题,直到每个子问

题都足够小,以至于可以高效地串行地解决它们。递归的过程将会把问题分成两个或者多

个子问题,然后把这些问题放入队列中等待处理(fork步骤),接下来等待所有子问题的

结果(join步骤),把多个结果合并到一起。

JDK7任务分解工具Fork/Join

任务分解不同的行为分配给不同的线程

数据分解多个线程对不同的数据集执行同样的操作

数据流分解一个线程的输出是第二个线程的输入

publicstaticvoidmain(String[]args)throwsException{

finalForkJoinPoolmainPool=newForkJoinPool();

intlen=1000100010;

int[]array=newint[len];

mainPool.invoke(newSortTask(array,0,len-1));

}

publicstaticclassSortTaskextendsRecursiveAction{

privateint[]array;

privateintfromIndex;

privateinttoIndex;

privatefinalintchunksize=1024;

publicSortTask(int[]array,intfromIndex,inttoIndex){

this.array=array;

this.fromIndex=fromIndex;

this.toIndex=toIndex;

}

@Override

protectedvoidcompute(){

intsize=toIndex-fromIndex+1;

if(size
Arrays.sort(array,fromIndex,toIndex);

}else{

intleftSize=size/2;

SortTaskleftTask=newSortTask(array,fromIndex,fromIndex+leftSize);

SortTaskrightTask=newSortTask(array,fromIndex+leftSize+1,toIndex);

this.invokeAll(leftTask,rightTask);

}

}

}

Fork/Join使用示例

任务分解

?Amdahl定律

–GeneAmdahl发现在计算机体系架构设计过程中,

某个部件的优化对整个架构的优化和改善是有

上限的。这个发现后来成为知名的Amdahl定律。

?Gustafson定律

–Gustafson假设随着处理器个数的增加,并行与

串行的计算总量也是可以增加的。Gustafson定

律认为加速系数几乎跟处理器个数成正比,如

果现实情况符合Gustafson定律的假设前提的话,

那么软件的性能将可以随着处理个数的增加而

增加。

?Sun-Ni定律

–充分利用存储空间等计算资源,尽量增大问题

规模以产生更好/更精确的解。

即使你有10个老婆,也不能

一个月把孩子生下来。

当你有10个老婆,就会要生

更多的孩子。

你要设法让每个老婆都在干

活,别让她们闲着。

并发三大定律





DougLea-Mr.concurrency,当今世界上并发程序设计领域的先驱,著名学者。他是

util.concurrent包的作者,JSR166规范的制定者。图书著作《ConcurrentProgramming

inJava:DesignPrinciplesandPatterns》。其”AScalableElimination-basedExchange

Channel”和”ScalableSynchronousQueues”两篇论文列为非阻塞同步算法的经典文章

推荐图书

维基百科并发控制专题

http://en.wikipedia.org/wiki/Category:Concurrency_control

维基百科并行计算专题

http://en.wikipedia.org/wiki/Parallel_computing

维基百科非阻塞同步专题

http://en.wikipedia.org/wiki/Non-blocking_synchronization

HerbSutter的个人主页

http://www.gotw.ca

DougLea的个人主页

http://g.oswego.edu/

非阻塞同步算法论文

http://www.cs.wisc.edu/trans-memory/biblio/swnbs.html

ACE关于并发和网络的指南

http://www.cs.wustl.edu/~schmidt/tutorials-patterns.html

透过Linux内核看无锁编程

http://www.ibm.com/developerworks/cn/linux/l-cn-lockfree/

http://www.khronos.org/opencl/OpenCL官方网站

并发相关网络资源

我们今天没有10GHz芯片!

在IDF05(IntelDeveloperForum2005)上,Intel首席执行官Craig

Barrett就取消4GHz芯片计划一事,半开玩笑当众单膝下跪致歉。

DonaldKnuth

世界顶级计算机科学家

在我看来,这种现象(并发)或多或少是由于硬件设

计者已经无计可施了导致的,他们将Moore定律失

效的责任推脱给软件开发者。

DonaldKnuth2008年7月接受AndrewBinstock访谈

IntroducedAdoptedin

mainstream

GUIs1973(XeroxAlto)~1984-89(Mac)

~1990-95(Win3.x)

Objects1967(Simular)~1993-98(C++,Java)

GarbageCollection1958(Lisp)~1995-2000(Java)

GenericTypes1967(Strachey)~198x(USDoD,Ada)

~1995-2000(C++)

Internet1967+(ARPAnet)~1995-2000

Concurrency1964(CDC6600)~2007-2012

ABriefHistoryofTime

2010年最快的CPU

主要参数POWER7皓龙6100至强7500

首次发布2010-2-82010-3-292010-3-31

生产工艺45nm45nm45nm

晶体管数量12亿19亿23亿

晶体面积567mm2692mm2684mm2

最多核心数量8128

每核心最大线程数量412

CPU最大核心/线程数量8/3212/128/16

系统最高支持插槽数量3248

系统最大核心/线程数量256核/1024线程48核/48线程64核/128线程

最高主频3.8G(8核)2.3G(12核)2.26G(8核)

最高主频时CPU浮点性能243.2GFLOPS105.6GFLOPS73.32GFLOPS

CPU最高内存带宽68.2GB/s43GB/s34.1GB/s

平均每核内存带宽8.5GB/s3.58GB/s4.26GB/s

平均每内核IO带宽42GB/s8.13GB/s12.8GB/s

中国的超级计算机:天河1号1.206P,星云3P,都是异构计算体系。

AMDRadeonHD5970

Radeon?HD5970

$299

928GFLOPS

725MHz

3200

GDDR5

4GB

294W

双精度浮点处理能力

核心频率

处理器核心数量

内存类型

显存容量

最大功耗

显存带宽256GB/sec

44

单精度浮点处理能力4.64TFLOPS

天河一号

CPU:3072x2颗IntelQuadCoreXeonE55403.0GHz

GPU:2560颗ATIRadeon4870x2575MHz

内存:98TB

世界排名第七

星云

CPU:9280颗IntelX56502.6GHz

GPU:4640颗NvidiaC2050

内存:98TB

世界排名第二

“星云”在Linpack测试中的效能表现为每秒1.271petaflops,略逊于美国能源部旗下橡树

岭国家实验室(OakRidgeLab)所有的超级计算机“Jaguar”的每秒1.75petaflops。不过由美

国厂商Cray所打造、采用AMD六核心处理器Istanbul的“Jaguar”,理论峰值为每秒2.3

petaflops,逊于“星云”每秒2.98petaflops的理论峰值。星云”功耗仅2.55MW,低于

“Jaguar”的7MW。

中国的超级计算机

GPU大规模并行计算

ProcessorParallelism

CPUs

Multiplecoresdriving

performanceincreases

GPUs

Increasinglygeneralpurpose

data-parallelcomputing

Improvingnumerical

precision

GraphicsAPIs

andShading

Languages

Multi-processor

programming–

e.g.OpenMP

Emerging

Intersection

OpenCL

Heterogenous

Computing

OpenCL–OpenComputingLanguage

Open,royalty-freestandardforportable,parallelprogrammingofheterogeneous

parallelcomputingCPUs,GPUs,andotherprocessors

?操作系统

?递归算法

?桌面应用

例如MSWord

?交互性应用

例如

Debugger



CPUGPU

?油气勘探

?金融分析

?医疗成像

?有限元

?基因分析

?物理模拟

?地理信息系统

热点

?搜索引擎

?数据库、数据挖掘

?数理统计分析

?生物医药工程

?导航识别

?军事模拟

?无线射频模拟

?图像语音识别

?…

潜在

适用的应用范围

内容回顾

1、使用线程的经验:设置名称、响应中断、使用ThreadLocal

2、Executor:ExecutorService和Future☆☆☆

3、阻塞队列:put和take、offer和poll、drainTo

4、线程间的协调手段:lock、condition、wait、notify、notifyAll☆☆☆

5、Lock-free:atomic、concurrentMap.putIfAbsent、CopyOnWriteArrayList☆☆☆

6、关于锁使用的经验介绍

7、并发流程控制手段:CountDownlatch、Barrier、Phaser

8、定时器:ScheduledExecutorService、大规模定时器TimerWheel

9、JDK7的Fork/Join介绍

10、并发三大定律:Amdahl、Gustafson、Sun-Ni

11、神人和图书

12、业界发展情况:GPGPU、OpenCL

复习题

请回答以下问题:

1.Future是做什么用的?

2.Lock和synchronized的区别是什么?

3.什么是CAS?

4、Lock-Free算法的三个组成部分是什么?

谢谢!

献花(0)
+1
(本文系关平藏书首藏)