分享

c++11中信号量(semaphore)的实现 | 陆仁贾

 wtkc 2015-03-03

c++11中有 mutex (互斥量),有 condition_variable (条件变量),并没有 semaphore (信号量)。信号量,操作系统中一般都有提,后来 google 说可以使用 mutex+condition_variable 实现一个,后来写来写去,都死锁 (deadlock) ——,O__O"…,后来 google 了一个,整理了一下思路。

信号量

神马是信号量?信号量是一个整数 count,提供两个原子(atom,不可分割)操作:P 操作和 V 操作,或是说 wait 和 signal 操作。

  • P操作 (wait操作):count 减1;如果 count < 0 那么挂起执行线程;
  • V操作 (signal操作):count 加1;如果 count <= 0 那么唤醒一个执行线程;

如何理解这个信号量?为嘛信号量是这个东西?想想互斥量 mutex,相当于一把锁,如果一个人来了 lock 一下,其他人进不去了;最初的人 unlock 了,又可以进一个人了,进去一个又 lock 住。如果 mutex 锁在 unlock 状态下叫做 1 的话,lock 状态叫 0;1 实际反映的是锁的数量 !现在可以有多把锁,数量 count 最初为 n (可以设定);

  • 来一个人取一把锁(count减1),如果发现锁的数量(count)小于0,岂不言外之意,没有锁了 ? 这样的情况下就要等(wait),或是说悬挂(suspend),或是说阻塞(block);直到神马时候呢?有人释放一把锁给 me 为止。当然了,如果最初就有锁的话,直接拿一把进去就可以了,O__O"…。
  • me 的事情办完了,要出去了,还回一把锁(count加1),如果发现 count <=0,言外之意是神马呢?有人在等丫,O__O"…;好吧,me 把自己的锁给某一个人,唤醒一个等待的线程。当然如果最初就没有人等,me 就走 me 的,不用唤醒谁了。

下面的解释就对应上面的 wait 操作和 signal 操作,也算是它的真实意图了。wait 和 signal 有神马用呢?最典型的用法可能就是 count = 1 时候相当于一把互斥锁丫!当然很多时候,共享的资源有多个,比如有 n 个坑,每个线程占一个坑,这个时候使用信号量这个工具要比 mutex 更合适。算了,问题还集中在 wait 和 signal 的实现上。

问题和思路

  • 一个 int 或是 long 变量 count,很好设定;因为 wait 和 signal 都是原子操作,所以至少要一个 mutex 来保证互斥;
  • 有时候需要 suspend 一个线程,有时候要 wakeup 一个线程,条件变量是合适人选,所以需要一个 condition_variable;suspend 和 wakeup 都是在 condition_variable 上(这是载体);
  • wakeup 一个线程,肿么保证一定会 wakeup 一个 condition_variable 上的一个呢?me 们借助一个辅助变量 wakeups —— 要唤醒线程名额(初值为 0),也就是在 signal 的时候,发现有人在等,就 wakeups++(要唤醒人数+1) 然后通知一下条件变量可以唤醒一个。而 wait 操作如果发现 count 数量不够,就阻塞,直到 wakeups 有名额(大于0)为止(条件变量的 wait 就是等待该条件发生),当然唤醒一个之后 wakeups 减1。(条件变量设置的条件是 wakeups > 0。)

程序代码

  1. /*
  2. * modified by: ilovers
  3. */
  4. #include <mutex>
  5. #include <condition_variable>
  6. namespace ilovers{
  7.     class semaphore {
  8.     public:
  9.         semaphore(int value=1): count{value}, wakeups{0} {}
  10.         void wait(){
  11.             std::unique_lock<std::mutex> lock{mutex};
  12.             if (--count<0) { // count is not enough ?
  13.                 condition.wait(lock, [&]()->bool{ return wakeups>0;}); // suspend and wait ...
  14.                 --wakeups;  // ok, me wakeup !
  15.             }
  16.         }
  17.         void signal(){
  18.             std::lock_guard<std::mutex> lock{mutex};
  19.             if(++count<=0) { // have some thread suspended ?
  20.                 ++wakeups;
  21.                 condition.notify_one(); // notify one !
  22.             }
  23.         }
  24.     private:
  25.         int count;
  26.         int wakeups;
  27.         std::mutex mutex;
  28.         std::condition_variable condition;
  29.     };
  30. };

几点说明:

  • semaphore 组织在 ilovers 命名空间下,使用的时候包含头文件,然后 ilovers::semaphore sem; 方式使用;默认 count = 1,也就是一个 mutex 的效果;
  • sem.wait(); 是 P 操作;sem.signal(); 是 V 操作;
  • 程序中构造对象统一使用 c++11 的 {} 方式,虽然 () 依然使用,比如 count{value} 和 count(value),lock{mutex} 和 lock(mutex) 都是可以的;

信号量的用处

信号量的用法大体来说和 mutex m 一样;m.lock(); 和 m.unlock(); 将需要保护的代码上下围起来;mutex 的特点就是 count = 1,典型的(二分)互斥量;而 sem(n) 则是数量有 n 个,比如 n 个坑,都占完的情况下,阻塞当前线程,否则可以进去(占个坑,O__O"…)。

同步两个线程的顺序

如果 semaphore::count = 0 的情况下,一个线程 A 首先 wait 的话,就会被阻塞掉;而一个线程 B 如果不 wait 就直接干活,等干完 signal 一下的话,过一会 A 就会执行了(count = 1,已经有数量了)。这样就能保证 B 在 A 的前面执行了,类似的方法可以将多个线程 A、B、C、D 按照 D、C、B、A 排个顺序执行出来。

  1. #include <iostream>
  2. #include <thread>
  3. #include "ilovers/semaphore"
  4. std::mutex m;
  5. ilovers::semaphore ba(0), cb(0), dc(0);
  6. void a()
  7. {
  8.     ba.wait();  // b --> a
  9.     std::lock_guard<std::mutex> lock{m};
  10.     std::cout << "thread a" << '\n';
  11. }
  12. void b()
  13. {
  14.     cb.wait();  // c --> b
  15.     std::lock_guard<std::mutex> lock{m};
  16.     std::cout << "thread b" << '\n';
  17.     ba.signal();  // b --> a
  18. }
  19. void c()
  20. {
  21.     dc.wait();  // d --> c
  22.     std::lock_guard<std::mutex> lock{m};
  23.     std::cout << "thread c" << '\n';
  24.     cb.signal();  // c --> b
  25. }
  26. void d()
  27. {
  28.     std::lock_guard<std::mutex> lock{m};
  29.     std::cout << "thread d" << '\n';
  30.     dc.signal();  // d --> c
  31. }
  32. int main()
  33. {
  34.     std::thread th1{a}, th2{b}, th3{c}, th4{d};
  35.     th1.join();
  36.     th2.join();
  37.     th3.join();
  38.     th4.join();
  39.     std::cout << "thread main" << std::endl;
  40.     return 0;
  41. }

去掉上面的同步的信号量 ba、cb、dc 之后,程序的输出可能是 a b c d、a c d b、a c b d 等,几乎不可能是 d c b a 的顺序,然后加上三个控制顺序的信号量后,输出的顺序就是 d c b a main。对了 std::lock_guard 和 mutex 是为了互斥访问 std::cout 对象,这点要注意些(cout 一个共享对象/变量)。

后话

如果 u 没有注意到这个真相,me 这里提一下:count = -5,表明当前有 5 个线程阻塞了(really?),count = 10,表明当前有 10 个位置可以直接用(不用等)。如果 count = N0(初值),而当前 count = 0 表明 N0 个进程在干活,没有人在等;count = -10,表明 N0 个干活,N0+10 执行了 wait 操作,10 个在等待。

当初 me 认为不需要 wakeups 辅助变量就可以 wait 和 notify,而是直接使用 count,后来发现不对,notify_one 的时候 count 一定是负的,而不是 count > 0(有空闲位置,分配给一个挂起的线程?no !此时的已经没有挂起的线程了!),根据负的 count 则不能断定要不要真的从条件变量上卸掉一个线程,O__O"…。比如,假设当前有 5 个挂起来的线程,count = -5,此时有一个线程工作完了,signal/V 操作了一下,count = -4,这个时候 conditon_variable 怎么知道要不要唤醒一个线程呢?但是通过一个 wakeups 就可以:一个线程工作完了,V 操作一下,同时 wakeups++,这个时候 condition_variable 感知到 wakeups 有名额了,然后唤醒一个线程。唤醒的逻辑就是如此。

总赶脚有点撇脚

思来想去,wakeups 这个辅助变量真的有用么?或是说一定需要借助它吗?wakeups me 们说它的意义是:有一个 V 操作然后去唤醒一个挂起的线程。这里面为嘛要借助一个辅助变量呢?(虽然借助于 wakeups 然后设置条件变量的条件 wakeups > 0 可以。) V 操作的时候直接 notify_one 而 P 操作的时候只是简单的挂起线程(不设置条件),不是更自然么?好吧,至少 me 目前是这么想的,貌似也是行的通的:

简单版本

  1. /*
  2. * author: ilovers
  3. */
  4. #include <mutex>
  5. #include <condition_variable>
  6. namespace ilovers{
  7.     class semaphore {
  8.     public:
  9.         semaphore(int value=1): count{value}{}
  10.         void wait(){
  11.             std::unique_lock<std::mutex> lock{mutex};
  12.             if (--count<0) // count is not enough ?
  13.                 condition.wait(lock); // suspend and wait...
  14.         }
  15.         void signal(){
  16.             std::lock_guard<std::mutex> lock{mutex};
  17.             if(++count<=0) // have some thread suspended ?
  18.                 condition.notify_one(); // notify one !
  19.         }
  20.     private:
  21.         int count;
  22.         std::mutex mutex;
  23.         std::condition_variable condition;
  24.     };
  25. };

赶脚这个才是更自然的 semaphore 的实现,O__O"…。me 测试了一下 semaphore 作为 mutex 使用(默认),去同步四个线程,每个线程 ++sum 100000 次,发现不带 wakeups 的简单版本,速度很慢, 20秒左右;而上面带 wakeups 的版本则快一些,2 秒左右(当然结果是一致的),O__O"…可能是 condition_variable::wait 操作,带有条件则执行效率更高的缘故?现在只能这么猜测了。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多