分享

快速掌握并发编程---Semaphore原理和实战

 田维常 2020-10-23

生活案例

停车场:停车场只有2个车位,即同时只能容纳2辆车,车辆都是停一会再走的,如何保证同一时刻最多有2个车停在停车位?请用代码实现。

女厕所:女厕所里只有五个位置,即最多只能有五位女性同时上厕所,如何保证同一时刻最多有五位女性在上厕所?请用java代码实现。(原本就只想用厕所来说,没想用女厕所来举例,但是之前遇到杠精,说男厕可以两个人一起上,无比尴尬)。

多线程读某个文件:实现一个文件允许同一时刻的并发访问数。

Semaphore入场

上面的两个案例可以用JDK1.5出的Semaphore来实现。实现停车场案例后,第二个案例也就很轻松实现了。公司大楼来了6辆车,怎么办?保安大哥手里只有两张停车卡(一个车位一张卡)。

import java.util.Random;
import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    //车位个数
    private static final int PARK_LOT = 2;
    //车辆数
    private static final int CARS = 6;

    private static Semaphore semaphore = new Semaphore(PARK_LOT);

    private static void park() {
        for (int i = 1; i <= CARS; i++) {
            int finalI = i;
            new Thread(() -> {
                try {
                    //看看有没有空车位
                    if (semaphore.availablePermits() == 0) {
                        System.out.println("第" + finalI + "辆司机看了看,哎,还没有空停车位,继续排队");
                    }
                    //尝试进入停车位
                    semaphore.acquire();
                    System.out.println("第" + finalI + "成功进入停车场");
                    Thread.sleep(new Random().nextInt(10000));//模拟车辆在停车场停留的时间
                    System.out.println("第" + finalI + "驶出停车场");
                    //离开停车场
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }).start();
        }
    }

    public static void main(String[] args) {
        park();
    }
}

运行输出

四辆车等待,两辆车进入停车位。然后后面就是出来一辆进入一辆,这个停车场确实保证了任何时可最多可以停两辆车了。

可能前面排队等着别人开出来,可是有的人不按规矩出牌,看到有车位一下就进去把车位抢了。所以这个停车也分公平和非公平的。

整个过程:




假设上面的场景是:司机们能看到车位是否为空,而且只能看到自己前面的车,更前面的车看不见。

通常有六种做法:

1.看到有空车位,便直接开过去、不用排队了(非公平

2.看到有车位了,还得看看前面还有没有车在排队(公平

3.不然,就一直看车位是否为空,以及自己前面还有没有车(死循环--自旋

4.能够停车了,尝试往空车位开去,但却被插队(不公平)的那家伙抢了车位(CAS

5.有时候保安也会说:今天车位满了,不让停进来了,回去吧(中断

6.保安叫你回去了,你却傻傻地不走,想再等等看(中断不响应

7.汽车从车位上离开,每离开一个,空车位就多一个(资源释放

以上是自己对生活和技术的理解,如有不对的地方希望指正。

深入Semaphore

Semaphore翻译

Semaphore是Doug Lea在JDK1.5时候搞出来的。Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。

UML类图

方法和主要属性

上面案例中我们一共使用了如下几个关键方法:

new Semaphore(2);
availablePermits() 
acquire();
release();

构造方法

从构造方法new Semaphore(PARK_LOT)为入口,咱们一步一步把这个Semaphore的源码给看一遍,然后梳理一下,最后总结一下。

   //两个车位 permits=2
   public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

有两个构造方法,并且这里我们看到了两个熟悉的面孔:

FairSync 公平的

NonfairSync 非公平的

上面的案例中,使用的是一个单个参数的构造方法,即此时使用的是非公平的。就想上面停车的场景,你排队在前面也没用,后面的直接插队,停到车位上去了。

我们来看看这个非公平NonfairSync 是怎么实现的

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
        //两个车位 permits=2
        NonfairSync(int permits) {
            super(permits);
        }
    }

这里面什么逻辑,通过super(permits)到父类中

 abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        //两个车位 permits=2
        Sync(int permits) {
            //就是给AQS设置state=2
            setState(permits);
        }
        //返回AQS中state的值
        final int getPermits() {
            return getState();
        }
}

构造方法就是创建一个Semaphore对象,给AQS中的state赋值。

availablePermits() 方法--检测是否有可用凭证(资源)

司机看看有没有空车位

semaphore.availablePermits() 

方法的源码

  public int availablePermits() {
        return sync.getPermits();
  }

其实这里就是获取AQS中的sate的值。如果state==0则证明已满。否则semaphore.acquire();

acquire方法--中断式

在Semaphore中获取资源的有两种方式:中断式和不中断式。

  1. 当前线程调用该方法的目的是希望获取一个信号量资源。如果当前信号量个数大于0,则当前信号量的计数会减1,然后该方法直接返回。

  2. 否则如果当前信号量个数等于0,则当前线程会被放入AQS的阻塞队列。

  3. 当其他线程调用了当前线程的interrupt()方法中断了当前线程时,则当前线程会抛出InterruptedException异常返回。

其中acquire()为中断式

public void acquire() throws InterruptedException {
    //入参是证明只需要一个信号
    sync.acquireSharedInterruptibly(1);
}

是AQS中的方法

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException 
{
        //判断是否中断
        if (Thread.interrupted()) throw new InterruptedException();
        //tryAcquireShared 和 doAcquireSharedInterruptibly 方法
        if (tryAcquireShared(arg) < 0){
            doAcquireSharedInterruptibly(arg);
        }
    }
    //空方法,有子类去实现
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
tryAcquireShared方法

该方法是在AQS中没有实现,是一个空方法,这里在Semaphore中有两个实现类

由于我们前面使用就是非公平模式,所以这里我们进入NonfairSync

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }
        //这方法也没有什么逻辑,直接调用nonfairTryAcquireShared
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

而nonfairTryAcquireShared方法则是在其父类里实现的

    abstract static class Sync extends AbstractQueuedSynchronizer {       
        //acquires=1
        final int nonfairTryAcquireShared(int acquires) {
            //死循环--自旋
            for (;;) {
                //获取AQS中的state,就是我们前面给的permits=2
                //也称之为获取剩余许可数
                int available = getState();
                int remaining = available - acquires;
                // 剩余的许可小于0或者比较设置成功
                if (remaining < 0 || compareAndSetState(available, remaining))
                    return remaining;
            }
        }

这里是一个CAS自旋。因为Semaphore是一个共享锁,可能有多个线程同时申请共享资源,因此CAS操作可能失败。直到成功获取返回剩余资源数目,或者发现没有剩余资源返回负值代表申请失败。在这里我们看看公平模式

    static final class FairSync extends Sync {
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                //公平模式多了这个方法
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

公平模式下的tryAcquireShared方法在试图获取之前做了一个判断,如果发现等对队列中有线程在等待获取资源,就直接返回-1表示获取失败。当前线程会被上层的acquireSharedInterruptibly方法调用doAcquireShared方法放入等待队列中。这正是“公平”模式的语义:如果有线程先于我进入等待队列且正在等待,就直接进入等待队列,效果便是各个线程按照申请的顺序获得共享资源,具有公平性。

hasQueuedPredecessors这个方法我们在之前的ReentranLock文章中说过了,这里就不在多说了。

doAcquireSharedInterruptibly方法

这个方法我们在之前的Reentranlock文章中也都讲过了快速掌握并发编程---细说ReentrantLock和AQS,在这里的主要是当获取锁失败后,他就是将当前线程放入等待队列并开始自旋检测获取资源。

acquireUnInterruptibly方法-- 不中断式

该方法与acquire()方法类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了acquireUninterruptibly获取资源时(包含被阻塞后),其他线程调用了当前线程的interrupt()方法设置了当前线程的中断标志,此时当前线程并不会抛出InterruptedException异常而返回。

    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

release()方法--释放资源

不管是在ReentranLock还是Semaphore里,释放锁和释放资源都不会公平性。

该方法的作用是把当前Semaphore对象的信号量值增加1,如果当前有线程因为调用aquire方法被阻塞而被放入了AQS的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量。

    public void release() {
        //注意入参是1
        sync.releaseShared(1);
    }
   //释放掉资源后,唤醒后继
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

而在这个方法中一共调用了两个方法tryReleaseShareddoReleaseShared

`tryReleaseShared`

tryReleaseShared方法的实现

既然不分公平和不公平,那么这个实现类就肯定是在他们两的父类里实现的,点进去,果然是在java.util.concurrent.Semaphore.Sync中实现的

        protected final boolean tryReleaseShared(int releases) {
            //自旋
            for (;;) {
                int current = getState();
                //记得前面aquire方法是state-1
                //而这里是+1,表示把凭证归还到池子里了,下一个人来就可以获取凭证了
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                //CAS设置state的值=之前state的值+release
                if (compareAndSetState(current, next))
                    return true;
            }
        }
`doReleaseShared`

此方法主要用于唤醒后继

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

希望结合源码的梳理和文章前面的说的几种做法,能让您更好地理解Semaphore。

总结

核心知识点:利用了AQS中的state和同步阻塞队列、CAS、死循环

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多