配色: 字号:
Java 线程池详解
2016-10-11 | 阅:  转:  |  分享 
  
Java线程池详解

系统启动一个线程的成本是比较高的,因为它涉及到与操作系统的交互,使用线程池的好处是提高性能,当系统中包含大量并发的线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池的最大线程数参数可以控制系统中并发线程数不超过次数。

一、Executors工厂类用来产生线程池,该工厂类包含以下几个静态工厂方法来创建对应的线程池。创建的线程池是一个ExecutorService对象,使用该对象的submit方法或者是execute方法执行相应的Runnable或者是Callable任务。线程池本身在不再需要的时候调用shutdown()方法停止线程池,调用该方法后,该线程池将不再允许任务添加进来,但是会直到已添加的所有任务执行完成后才死亡。

1、newCachedThreadPool(),创建一个具有缓存功能的线程池,提交到该线程池的任务(Runnable或Callable对象)创建的线程,如果执行完成,会被缓存到CachedThreadPool中,供后面需要执行的任务使用。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33 importjava.util.concurrent.ExecutorService;

importjava.util.concurrent.Executors;

publicclassCacheThreadPool{

staticclassTaskimplementsRunnable{

@Override

publicvoidrun(){

System.out.println(this+""+Thread.currentThread().getName()+"AllStackTracesmapsize:"

+Thread.currentThread().getAllStackTraces().size());

}

}

publicstaticvoidmain(String[]args){

ExecutorServicecacheThreadPool=Executors.newCachedThreadPool();

//先添加三个任务到线程池

for(inti=0;i<3;i++){

cacheThreadPool.execute(newTask());

}

//等三个线程执行完成后,再次添加三个任务到线程池

try{

Thread.sleep(3000);

}catch(InterruptedExceptione){

e.printStackTrace();

}

for(inti=0;i<3;i++){

cacheThreadPool.execute(newTask());

}

}

} 执行结果如下:

1

2

3

4

5

6 CacheThreadPool$Task@2d312eb9pool-1-thread-1AllStackTracesmapsize:7

CacheThreadPool$Task@59522b86pool-1-thread-3AllStackTracesmapsize:7

CacheThreadPool$Task@73dbb89fpool-1-thread-2AllStackTracesmapsize:7

CacheThreadPool$Task@5795cedcpool-1-thread-3AllStackTracesmapsize:7

CacheThreadPool$Task@256d5600pool-1-thread-1AllStackTracesmapsize:7

CacheThreadPool$Task@7d1c5894pool-1-thread-2AllStackTracesmapsize:7 线程池中的线程对象进行了缓存,当有新任务执行时进行了复用。但是如果有特别多的并发时,缓存线程池还是会创建很多个线程对象。

2、newFixedThreadPool(intnThreads)创建一个指定线程个数,线程可复用的线程池。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33 importjava.util.concurrent.ExecutorService;

importjava.util.concurrent.Executors;

publicclassFixedThreadPool{

staticclassTaskimplementsRunnable{

@Override

publicvoidrun(){

System.out.println(this+""+Thread.currentThread().getName()+"AllStackTracesmapsize:"

+Thread.currentThread().getAllStackTraces().size());

}

}

publicstaticvoidmain(String[]args){

ExecutorServicefixedThreadPool=Executors.newFixedThreadPool(3);

//先添加三个任务到线程池

for(inti=0;i<5;i++){

fixedThreadPool.execute(newTask());

}

//等三个线程执行完成后,再次添加三个任务到线程池

try{

Thread.sleep(3);

}catch(InterruptedExceptione){

e.printStackTrace();

}

for(inti=0;i<3;i++){

fixedThreadPool.execute(newTask());

}

}

} 执行结果:

1

2

3

4

5

6

7

8 FixedThreadPool$Task@7045c12dpool-1-thread-2AllStackTracesmapsize:7

FixedThreadPool$Task@50fa0befpool-1-thread-2AllStackTracesmapsize:7

FixedThreadPool$Task@ccb1870pool-1-thread-2AllStackTracesmapsize:7

FixedThreadPool$Task@7392b4e3pool-1-thread-1AllStackTracesmapsize:7

FixedThreadPool$Task@5bdeff18pool-1-thread-2AllStackTracesmapsize:7

FixedThreadPool$Task@7d5554e1pool-1-thread-1AllStackTracesmapsize:7

FixedThreadPool$Task@24468092pool-1-thread-3AllStackTracesmapsize:7

FixedThreadPool$Task@fa7b978pool-1-thread-2AllStackTracesmapsize:7 3、newSingleThreadExecutor(),创建一个只有单线程的线程池,相当于调用newFixedThreadPool(1)

4、newSheduledThreadPool(intcorePoolSize),创建指定线程数的线程池,它可以在指定延迟后执行线程。也可以以某一周期重复执行某一线程,知道调用shutdown()关闭线程池。

示例如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29 importjava.util.concurrent.Executors;

importjava.util.concurrent.ScheduledExecutorService;

importjava.util.concurrent.TimeUnit;

publicclassScheduledThreadPool{

staticclassTaskimplementsRunnable{

@Override

publicvoidrun(){

System.out.println("time"+System.currentTimeMillis()+""+Thread.currentThread().getName()+"AllStackTracesmapsize:"

+Thread.currentThread().getAllStackTraces().size());

}

}

publicstaticvoidmain(String[]args){

ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(3);

scheduledExecutorService.schedule(newTask(),3,TimeUnit.SECONDS);

scheduledExecutorService.scheduleAtFixedRate(newTask(),3,5,TimeUnit.SECONDS);

try{

Thread.sleep(301000);

}catch(InterruptedExceptione){

e.printStackTrace();

}

scheduledExecutorService.shutdown();

}

} 运行结果如下:

1

2

3

4

5

6

7 time1458921795240pool-1-thread-1AllStackTracesmapsize:6

time1458921795241pool-1-thread-2AllStackTracesmapsize:6

time1458921800240pool-1-thread-1AllStackTracesmapsize:7

time1458921805240pool-1-thread-1AllStackTracesmapsize:7

time1458921810240pool-1-thread-1AllStackTracesmapsize:7

time1458921815240pool-1-thread-1AllStackTracesmapsize:7

time1458921820240pool-1-thread-1AllStackTracesmapsize:7 由运行时间可看出,任务是按照5秒的周期执行的。

5、newSingleThreawww.visa158.comdScheduledExecutor()创建一个只有一个线程的线程池,同调用newScheduledThreadPool(1)。

二、ForkJoinPool和ForkJoinTask

ForkJoinPool是ExecutorService的实现类,支持将一个任务划分为多个小任务并行计算,在把多个小任务的计算结果合并成总的计算结果。它有两个构造函数

ForkJoinPool(intparallelism)创建一个包含parallelism个并行线程的ForkJoinPool。

ForkJoinPool(),以Runtime.avawww.hunanwang.netilableProcessors()方法返回值作为parallelism参数来创建ForkJoinPool。

ForkJoinTask代表一个可以并行,合并的任务。它是实现了Future接口的抽象类,它有两个抽象子类,代表无返回值任务的RecuriveAction和有返回值的RecursiveTask。可根据具体需求继承这两个抽象类实现自己的对象,然后调用ForkJoinPool的submit方法执行。

RecuriveAction示例如下,实现并行输出0-300的数字。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46 importjava.util.concurrent.ForkJoinPool;

importjava.util.concurrent.RecursiveAction;

importjava.util.concurrent.TimeUnit;

publicclassActionForkJoinTask{

staticclassPrintTaskextendsRecursiveAction{

privatestaticfinalintTHRESHOLD=50;

privateintstart;

privateintend;

publicPrintTask(intstart,intend){

this.start=start;

this.end=end;

}

@Override

protectedvoidcompute(){

if(end-start
for(inti=start;i
System.out.println(Thread.currentThread().getName()+""+i);

}

}else{

intmiddle=(start+end)/2;

PrintTaskleft=newPrintTask(start,middle);

PrintTaskright=newPrintTask(middle,end);

left.fork();

right.fork();

}

}

}

publicstaticvoidmain(String[]args){

ForkJoinPoolpool=newForkJoinPool();

pool.submit(newPrintTask(0,300));

try{

pool.awaitTermination(2,TimeUnit.SECONDS);

}catch(InterruptedExceptione){

e.printStackTrace();

}

pool.shutdown();

}

} 在拆分小任务后,调用任务的fork()方法,加入到ForkJoinPool中并行执行。

RecursiveTask示例,实现并行计算100个整数求和。拆分为每20个数求和后获取结果,在最后合并为最后的结果。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69 importjava.util.Random;

importjava.util.concurrent.ExecutionException;

importjava.util.concurrent.ForkJoinPool;

importjava.util.concurrent.Future;

importjava.util.concurrent.RecursiveTask;

publicclassTaskForkJoinTask{

staticclassCalTaskextendsRecursiveTask{

privatestaticfinalintTHRESHOLD=20;

privateintarr[];

privateintstart;

privateintend;

publicCalTask(int[]arr,intstart,intend){

this.arr=arr;

this.start=start;

this.end=end;

}

@Override

protectedIntegercompute(){

intsum=0;

if(end-start
for(inti=start;i
sum+=arr[i];

}

System.out.println(Thread.currentThread().getName()+"sum:"+sum);

returnsum;

}else{

intmiddle=(start+end)/2;

CalTaskleft=newCalTask(arr,start,middle);

CalTaskright=newCalTask(arr,middle,end);

left.fork();

right.fork();

returnleft.join()+right.join();

}

}

}

publicstaticvoidmain(String[]args){

intarr[]=newint[100];

Randomrandom=newRandom();

inttotal=0;

for(inti=0;i
inttmp=random.nextInt(20);

total+=(arr[i]=tmp);

}

System.out.println("total"+total);

ForkJoinPoolpool=newForkJoinPool(4);

Futurefuture=pool.submit(newCalTask(arr,0,arr.length));

try{

System.out.println("calresult:"+future.get());

}catch(InterruptedExceptione){

e.printStackTrace();

}catch(ExecutionExceptione){

e.printStackTrace();

}

pool.shutdown();

}

} 执行结果如下:

1

2

3

4

5

6

7

8

9

10 total912

ForkJoinPool-1-worker-2sum:82

ForkJoinPool-1-worker-2sum:123

ForkJoinPool-1-worker-2sum:144

ForkJoinPool-1-worker-3sum:119

ForkJoinPool-1-worker-2sum:106

ForkJoinPool-1-worker-2sum:128

ForkJoinPool-1-worker-2sum:121

ForkJoinPool-1-worker-3sum:89

calresult:912 子任务执行完后,调用任务的join()方法获取子任务执行结果,再相加获得最后的结果。





















献花(0)
+1
(本文系白狐一梦首藏)