分享

如何设计线程安全的可扩展类

 Bladexu的文库 2020-02-19

设计线程安全类,最主要问题是如何把数据拆分为多个独立的部分,并为这些部分确定合适的大小。如果每个部分太小,那么设计的类无法做到线程安全。如果每个部分太大,那么这个类无法扩展。

如何设计线程安全的可扩展类

让我们通过示例进一步说明:

一个例子

假设我们要追踪一个城市居住了多少人。需要提供两个方法,一个方法获取当前城市的居民人数,另一个方法把某个人从一个城市转移到另一个城市。接口设计如下:

public interface CityToCount { static final String[] ALL_CITIES = new String[] { 'Springfield' , 'South Park' }; static final int POPULATION_COUNT = 1000000; void move( String from, String to ); int count(String name);}

由于多个线程需要并行调用此接口,因此必须思考接口实现的方案。要么使用 java.util.concurrent.ConcurrentHashMap,要么使用 java.util.HashMap 和单锁。使用 java.util.concurrent.ConcurrentHashMap 类:

public class CityToCountUsingConcurrentHashMap    implements CityToCount {    private ConcurrentHashMap<String, Integer> map =        new ConcurrentHashMap<String, Integer>();    public CityToCountUsingConcurrentHashMap() {        for (String city : ALL_CITIES) {            map.put(city, POPULATION_COUNT);        }    }    public void move(String from, String to) {        map.compute(from, (key, value) -> {            if (value == null) {                return POPULATION_COUNT - 1;            }            return value - 1;        });        map.compute(to, (key, value) -> {            if (value == null) {                return POPULATION_COUNT + 1;            }            return value + 1;        });    }    public int count(String name) {        return map.get(name);    }}

move 方法调用线程安全的 compute 方法减小迁出城市中的居民数。然后,用 compute 增加迁入城市的居民数。count 方法中调用了线程安全的 get 方法。

下面是使用 java.util.HashMap 的实现:

public class CityToCountUsingSynchronizedHashMap implements CityToCount { private HashMap<String, Integer> map = new HashMap<String, Integer>(); private Object lock = new Object(); public CityToCountUsingSynchronizedHashMap() { for (String city : ALL_CITIES) { map.put(city, POPULATION_COUNT); } } public void move(String from, String to) { synchronized (lock) { map.compute(from, (key, value) -> { if (value == null) { return POPULATION_COUNT - 1; } return value - 1; }); map.compute(to, (key, value) -> { if (value == null) { return POPULATION_COUNT + 1; } return value + 1; }); } } public int count(String name) { synchronized (lock) { return map.get(name); } }}

move 方法同样使用了 compute 方法增加、减少迁出城市和迁入城市的居民数。而这一次,由于 compute方法不是线程安全的,因此这两种方法都被 synchronized 代码块包围。count 方法同样使用了 get 加 synchronized 代码块。

上面两种解决方案都是线程安全的。

但是,ConcurrentHashMap 方案可以用不同线程并行更新多个城市。反观 HashMap 方案,由于 HashMap 代码完全被锁包围,同一时间只能有一个线程更新 HashMap。因此,ConcurrentHashMap 方案应该扩展性更好。让我们来看看。

太大意味着无法扩展

为了比较两种实现的可扩展性,使用下面的基准测试:

import org.openjdk.jmh.annotations.Benchmark;import org.openjdk.jmh.annotations.State;import org.openjdk.jmh.annotations.Scope;@State(Scope.Benchmark)public class CityToCountBenchmark {    public CityToCount cityToCountUsingSynchronizedHashMap        = new CityToCountUsingSynchronizedHashMap();    public CityToCount cityToCountUsingConcurrentHashMap        = new CityToCountUsingConcurrentHashMap();    @Benchmark    public void synchronizedHashMap() {        String name = Thread.currentThread().getName();        cityToCountUsingSynchronizedHashMap.move(name, name + '2');     }    @Benchmark    public void concurrentHashMap() {        String name = Thread.currentThread().getName();        cityToCountUsingConcurrentHashMap.move(name, name + '2');     } }

基准测试使用 jmh,一种 OpenJDK 微基准测试框架。在基准测试中,我把人们从一个城市迁移到另一个城市。每个工作线程都会更新不同的城市。迁出城市的名称为线程 ID,迁入城市的名称为线程 ID 加2。在 Intel i5 4核CPU上运行基准测试,结果如下:

如何设计线程安全的可扩展类

如我们看到的那样,使用 ConcurrentHashMap 扩展性更好:当线程数大于两个,该方案性能要比单个锁更好。

太小意味着线程不安全

现在,需要增加另一个方法获取所有城市的居民总数。下面用 ConcurrentHashMap 方案实现:

public int completeCount() { int completeCount = 0; for (Integer value : map.values()) { completeCount += value; } return completeCount;}

要确认方案是否线程安全,可以使用以下测试:

import com.vmlens.api.AllInterleavings;public class TestCompleteCountConcurrentHashMap {    @Test    public void test() throws InterruptedException {        try (AllInterleavings allInterleavings =                new AllInterleavings('TestCompleteCountConcurrentHashMap');) {            while (allInterleavings.hasNext()) {                CityToCount cityToCount =                    new CityToCountUsingConcurrentHashMap();                Thread first = new Thread(() -> {                    cityToCount.move('Springfield', 'South Park');                });                 first.start();                assertEquals(2 * CityToCount.POPULATION_COUNT,                        cityToCount.completeCount());                first.join();             }        }    } }

需要两个线程测试 completeCount 方法是否线程安全。一个线程把某个人从 Springfield 移到 South Park。另一个线程获取 completeCount并检查结果是否符合预期。

为了测试所有线程交叉的情况,第7行对所有线程使用 while 循环对 AllInterleavingsvmlens 进行测试。执行测试可以看到以下错误:

expected:<2000000> but was:<1999999>

Vmlens 报告揭示了问题:

如何设计线程安全的可扩展类

正如看到的那样,这里的问题在于:人数统计已经完成,而另一个线程还在把人从 Springfield 移到 South Park。这时 Springfield 的人数已经减过了,但 South Park 的人数还没有增加。

允许并行更新不同城市的人数,在 completeCount 与 move 并行执行时,会导致错误的结果。如果提供的方法操作范围是所有城市,则需要在方法执行期间锁定所有城市。为了支持这样的方法,需要第二种单锁解决方案。我们可以实现一个线程安全的 countComplete 方法,像下面这样:

public int completeCount() {    synchronized (lock) {        int completeCount = 0;        for (Integer value : map.values()) {            completeCount += value;        }    return completeCount;    }}

总结

虽然这个简单的例子不能体现数据结构的复杂性,但是示例中体现的思想在现实世界中也同样适用。除了在单线程中逐个字段更新,没有其它方法可以线程安全地更新多个关联字段。因此,同时达成线程安全与可扩展的唯一方法是在数据中找到独立的部分,然后用多个线程并行更新。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多