分享

Java 8并行流:必备技巧

 月影晓风 2015-10-15

原文:tobyhobson

译文:ImportNew - paddx

链接:http://www./16801.html


Java 8 并行流(parallel stream)采用共享线程池,对性能造成了严重影响。可以包装流来调用自己的线程池解决性能问题。


问题


Java 8 的并行流可以让我们相对轻松地执行并行任务。


myList.parallelStream.map(obj -> longRunningOperation())


但是这样存在一个严重的问题:在 JVM 的后台,使用通用的 fork/join 池来完成上述功能,该池是所有并行流共享的。默认情况,fork/join 池会为每个处理器分配一个线程。假设你有一台16核的机器,这样你就只能创建16个线程。对 CPU 密集型的任务来说,这样是有意义的,因为你的机器确实只能执行16个线程。但是真实情况下,不是所有的任务都是 CPU 密集型的。例如:


myList.parallelStream

.map(this::retrieveFromA)

.map(this::processUsingB)

.forEach(this::saveToC)


myList.parallelStream

.map(this::retrieveFromD)

.map(this::processUsingE)

.forEach(this::saveToD)


这两个流很大程度上是受限于IO操作,所以会等待其他系统。但这两个流使用相同的(小)线程池,因此会相互等待而被阻塞。这个非常不好,可以改进。我们以一个流为例:


final List<Integer> firstRange = buildIntRange();

firstRange.parallelStream().forEach((number) -> {

try {

// do something slow

Thread.sleep(5);

} catch (InterruptedException e) { }

});


完整的代码可以在gist上查看。


在执行期间,我获取了一份线程dump的文件。这是相关的线程(在我的Macbook上):


ForkJoinPool.commonPool-worker-1

ForkJoinPool.commonPool-worker-2

ForkJoinPool.commonPool-worker-3

ForkJoinPool.commonPool-worker-4


现在,我要并行的执行这两个并行流(对于那些不是以英语为母语的人士,我感到非常抱歉!)


Runnable firstTask = () -> {

firstRange.parallelStream().forEach((number) -> {

try {

// do something slow

Thread.sleep(5);

} catch (InterruptedException e) { }

});

};


Runnable secondTask = () -> {

secondRange.parallelStream().forEach((number) -> {

try {

// do something slow

Thread.sleep(5);

} catch (InterruptedException e) { }

});

};

// run threads


完整的代码可以在gist上查看。


这次我们再看一下线程dump文件:


ForkJoinPool.commonPool-worker-1

ForkJoinPool.commonPool-worker-2

ForkJoinPool.commonPool-worker-3

ForkJoinPool.commonPool-worker-4


正如你所见,结果是一样的。我们只使用了4个线程。


一种变通方案


正如我所提到的,JVM 后台使用 fork/join 池,在 ForkJoinTask 的文档中,我们可以看到:


如果合适,安排一个异步执行的任务到当前正在运行的池中。如果任务不在inForkJoinPool()中,也可以调用ForkJoinPool.commonPool()获取新的池来执行。


让我试一试……


ForkJoinPool forkJoinPool = new ForkJoinPool(3);

forkJoinPool.submit(() -> {

firstRange.parallelStream().forEach((number) -> {

try {

Thread.sleep(5);

} catch (InterruptedException e) { }

});

});


ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);

forkJoinPool2.submit(() -> {

secondRange.parallelStream().forEach((number) -> {

try {

Thread.sleep(5);

} catch (InterruptedException e) {

}

});

});


完整的代码可以在gist上查看。


现在,我们再次查看线程池:


ForkJoinPool-1-worker-1

ForkJoinPool-1-worker-2

ForkJoinPool-1-worker-3

ForkJoinPool-1-worker-4

ForkJoinPool-2-worker-1

ForkJoinPool-2-worker-2

ForkJoinPool-2-worker-3

ForkJoinPool-1-worker-4


因为我们创建自己的线程池,所以可以避免共享线程池,如果有需要,甚至可以分配比处理机数量更多的线程。


ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>);



 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多