分享

一种读写可并发进行的队列的实现方法

 WindySky 2017-08-01

1 背景目前采用多线程的处理机制中,如下处理方式是比较常见的: 一个线程负责将上游数据放到一个公共队列中,另外一个线程从公共队列中取出数据进行处理。读取操作都需要共用一个互斥量来保证线程安全,这样写数据和取数据的操作实际上是串行的,有些时候,这个操作将对软件处理性能造成一定影响。如果我们能够实现一个队列,读取操作不需要任何互斥量保护就可以保证线程安全,那么读写线程的处理能力将得到明显提高。实际上就是保证队列的读取接口和写入接口之间不存在并发冲突,即一个线程只调用读取接口,一个线程只调用写入接口,这两个线程是不需要进行任何同步动作的;如果多个线程同时调用读取接口或者同时调用写入接口,那么读取接口和写入接口可以用不同的互斥量进行同步;最终达到我们的目的:多个线程中,读取速率不会影响写入的速率,反之亦然。除了读取操作外,很多地方可能还要知道队列的大小,比如内部调试信息,或者实现中需要限制队列的最大容量等,这在读写两个线程都可能用到的,也希望在任意处理线程中不用加锁就可以取到这个信息,这个接口和读写接口都不存在并发冲突的问题,从而提高执行效率。

1       实现方案

需要进行线程同步的操作都是因为大家需要修改(有的线程修改,也有的线程访问)公共资源造成的,只要我们能够保证忘列表中增加一个节点和删除一个节点都不需要都修改同一个资源,而且保证待访问的资源始终有效,那么就可以做到读取操作本身就是线程安全的。

下面是一个列表的简单实现。

struct ListNode

{

       int data;

       ListNode* next;

}

struct List

{

       ListNode* beg;

       ListNode* end;

       List():beg(0),end(0){}

       void push_back(int data)

{

if(!end)

beg = end = new ListNode(node);

else

{

end.next = new ListNode(node);

end = end->next;

}

}

       void pop_front()

{

ListNode* top = beg;

beg = beg->next;

delete top;

if(!beg)

end = beg;

}

       int front()

{

return beg->data;

}

}

上面的实现,因为增删操作都可能修改内部的beg,end变量,无法做到线程安全的。

如果push_back只修改end变量,pop_front只修改beg变量,那么这两个操作就可以做到线程安全的。如果列表为空的时候beg,end都以空指针来表示,就不可能做到这一点。如果列表为空的时候,beg,end也指向一个固定节点,那么就可能实现这个操作。如下所示:

struct List

{

       ListNode* beg;

       ListNode* end;

       List():beg(new ListNode(0)),end(beg){}

       …

}

当插入数据的时候都是用已有的end节点来保存数据,然后在生成一个新的表示结束的节点,如下

       void push_back(int data)

{

end.data = data;

end.next = new ListNode(0);

end = end->next;

}

       这样在插入数据的时候不需要修改beg变量了。在提取数据的时候也是做类似处理:

       void pop_front()

{

       if(!empty())

       {

                     ListNode* oldBeg = beg;

                     beg = beg->next;

                     delete oldBeg;

                     }

}

考虑实际得取值操作

int front()

{

       return beg->data;

}

如果调用front()操作得时候,beg刚好被删除,就可能造成很严重得问题。如果front()pop_front()都只在一个读线程中使用,问题不大。如果将这两个接口得功能合并,如下:

int pop_front()

{

       int ret = beg->data

       ListNode* oldBeg = beg;

       beg = beg->next;

       delete oldBeg;

       return ret;

}

好像使用起来更方便。但考虑到我们最终实现得列表应该可以保存任何类型得数据,如果自定义得类型在函数退出时候执行拷贝构造函数时出现异常,那么列表状态无法恢复了。也就是说这个接口不是异常安全的接口。考虑这一点,还是分为两个独立的接口更好一些。

当然在使用pop_frontfront之前必须确保列表不为空:

       boolean empty()

       {

              return beg == end;

       }

如果将非空判断放到front内,那么为空得时候应该返回什么值?不知道。最好是用户自己明白这种情况下的风险。

如果存在多个读线程,那么front()pop_front(),empty()都需要利用同一个互斥量保证线程同步。Push_back不应该和上面上个接口在同一个线程中调用。如果只是这么一个书面规范,实际应用中也很容易因为疏忽没有遵守这个规则。因此我们可以考虑分别提供两个不同的界面给读线程和写线程,这就使用Adaptor模式了,如下:

class WriteList

{

private:

       List& list;

public:

       WriteList(list& list): this.list(list){}

       void push_back(int data) { list.push_back(data); }

};

class ReadList

{

private:

       List& list;

public:

       ReadList(list& list): this.list(list){}

       void pop_front() { list.pop_front(); }

       int front() { return list.front(); }

       boolean empty() { return list.empty(); }

}

这样读取数据的线程只看到ReadList, 写数据的线程只能看到WriteList,就可以防止使用人员误用这4个接口。

下面考虑查询列表大小的接口:

int size()

{

       int size = 0;

       ListNode* ptmp = beg;

       for(ListNode* ptmp = beg; ptmp != end; ptmp = ptmp.next, ++size);

       return size;

}

这个操作过程需要访问beg,end指向的内存空间中保存的数据,但是pop_front的实现可能导致被访问的beg已经执行无效区域,为了确保代码安全,所有线程调试信息中需要输出列表大小的地方都要和pop_front一样使用同一个互斥量,这既造成性能问题,又有使用上的不便。很多时候写线程是需要调用这个接口的,比如要限制线程的最大元素数量的时候或者调试信息需要。为了解决不用加锁,可以访问size接口,需要保证已经删除的节点的内存区域的next数据可用,也就是说,删除节点只清除节点保存的数据区域,其它数据都是可用的。如果不释放内存区域,必然导致内存泄漏,我们可以考虑将这些已经释放的节点用于保存push_back写入的数据。

一个环行的链表可以解决这个问题。

Struct ListNode

{

       int data ;

       ListNode* next;

       ListNode(int v,ListNode* pnext = 0): data(v),next(pnext){}

}

struct List

{

       ListNode* beg;

       ListNode* end;

       List():beg(new ListNode(0)),end(beg){end->next = end;}

       …

}

首先构造一个环行链表,push_back的时候如果先前没有被释放的元素,则将新元素加到end后面,仍然维持环行链表的结构:

void push_back(int data)

{

end->data = data;

if(end->next == beg){

end->next = new ListNode(0,end->next);

              }

              end = end->next;

}

在这个处理过程中,beg指针可能被读线程移动到下一位,但对这里实现的逻辑没有任何影响,而且最终逻辑也是正确的。

void pop_front()

{

              if(beg != end) beg = beg->next;

}

同样在这个处理过程中,end指针可能被写线程移动到下一位,但对这里实现的逻辑没有任何影响,而且最终逻辑也是正确的。

其他接口的实现都是没有什么变化:

int front()

{

       return beg->data;

       }

boolean empty()

       {

              return beg == end;

       }

int size()

{

       int size = 0;

       ListNode* ptmp = beg;

       ListNode* pend = end;

       for(ListNode* ptmp = beg; ptmp != pend; ptmp = ptmp->next, ++size);

       return size;

}

注意函数实现过程中用了临时变量来保存当时的end指针,为了防止在size执行过程中,另一个线程不停的插入数据,导致这个循环无法终止,这样得到的值可以看作是列表某个时间的一个快照数据,和列表在size函数执行完毕后的真实大小可能有差异,这是多线程的特性所决定的,可以认为在这种处理方式下,出现这种误差是可以接受的。

可以为WriteList,ReadList加上size接口。多个线程都使用WriteList(或ReadList)那么是需要加锁进行线程同步的,但WriteListReadList的接口可以分别在多个线程中使用是不需要加锁进行线程同步的。

附件是实作代码,供参考。

/**************************
***
*** dept: test
*** author: htj
***
*************************/

#ifndef CIRCLE_LIST_H
#define CIRCLE_LIST_H

#include <memory>

template<class T,class Alloc = std::allocator<T> >
class CircleList
{
 struct ListNode;
 typedef ListNode* node_pointer;
 typedef ListNode node_type;
public:
 typedef Alloc allocator_type;
 typedef T value_type;
 typedef typename allocator_type::size_type  size_type;
 typedef typename allocator_type::difference_type difference_type;
 typedef typename allocator_type::reference reference;
 typedef typename allocator_type::const_reference const_reference;
 typedef typename allocator_type::pointer pointer;
 typedef typename allocator_type::const_pointer const_pointer;
 
        CircleList():m_end(allocate()),m_beg(m_end),m_size(0)
        {
        }
        ~CircleList()
        {
                for(node_pointer pbeg = m_beg; pbeg != m_end; pbeg->freeValue(), pbeg = pbeg->next);
                node_pointer end = m_end;
                node_pointer beg;
                do{
                        beg = m_end->next;
                        deallocate(m_end);
                        m_end = beg;

                }while(m_end != end);
        }
        void push_back(const_reference v)
        {
                m_end->assignValue(v);
                if(m_beg == m_end->next){
                        m_end->next = allocate(m_end->next);
                        ++m_size;
                }
                m_end = m_end->next;
        }
        bool empty() const
        {
                return m_beg == m_end;
        }
        const_reference front() const
        {
                return m_beg->value();
        }
        reference front()
        {
                return m_beg->value();
        }
        void pop_front()
        {
         if(m_beg != m_end)
         {
                 m_beg->freeValue();
                 m_beg = m_beg->next;
                }
        }
        size_type size() const
        {
                int ret = 0;
                node_pointer pend = m_end;
                for(node_pointer beg = m_beg; beg != m_end; ++ret, beg = beg->next);
                return ret;
        }
        size_type buffer_size() const
        {
                return m_size;
        }
private:
        node_pointer m_end;
        node_pointer m_beg;
        size_type m_size;
        //不明白allocator的使用方式,下面的表达式编译不通过
        //typedef typename allocator_type::template rebind<node_type>::other _node_alloc_type
        //暂时使用new,delete来分配释放内存
        node_pointer allocate()
        {
         return  new node_type();
        }
        node_pointer allocate(node_pointer pnext)
        {
         return new node_type(pnext);
        }
        void deallocate(node_pointer pnode)
        {
         delete pnode;
        }

 struct ListNode
 {
         char buffer[sizeof(T)];
         ListNode* next;
         ListNode():next(this)
  {
  }
         explicit ListNode(ListNode* pnext):next(pnext)
         {          
         }
         bool operator==(const ListNode& rhs) const
         {
                 return next == rhs->next;
         }
         bool operator != (const ListNode& rhs) const
         {
                 return !(*this == rhs);
         } 
         void freeValue()
         {
                 value().T::~T();
         }
         void assignValue(const T& data)
         {
                 new(buffer) T(data);
         }
         T& value()
         {
          return *(T*)buffer;
         }
         const T& value() const
         {
          return *(T*)buffer;
         }
 };
};

template<class LIST>
class WriteList
{
 LIST& m_list;
public:
 typedef typename LIST::value_type value_type;
 typedef typename LIST::size_type  size_type;
 typedef typename LIST::difference_type difference_type;
 typedef typename LIST::reference reference;
 typedef typename LIST::const_reference const_reference;
 typedef typename LIST::pointer pointer;
 typedef typename LIST::const_pointer const_pointer;
 WriteList(LIST& l): m_list(l)
 {
 }
 void push_back(const_reference data)
 {
  m_list.push_back(data);
 }
 bool empty()
 {
  return m_list.empty();
 }
 size_type size()
 {
  return m_list.size();
 }
 size_type buffer_size()
 {
  return m_list.buffer_size();
 }
};

template<class LIST>
class ReadList
{
 LIST& m_list;
public:
 typedef typename LIST::value_type value_type;
 typedef typename LIST::size_type  size_type;
 typedef typename LIST::difference_type difference_type;
 typedef typename LIST::reference reference;
 typedef typename LIST::const_reference const_reference;
 typedef typename LIST::pointer pointer;
 typedef typename LIST::const_pointer const_pointer;
 ReadList(LIST& l): m_list(l)
 {
 }
 void pop_front()
 {
  m_list.pop_front();
 }
 reference front()
 {
  return m_list.front();
 }
 const_reference front() const
 {
  return m_list.front();
 }
 bool empty()
 {
  return m_list.empty();
 }
 size_type size()
 {
  return m_list.size();
 }
 size_type buffer_size()
 {
  return m_list.buffer_size();
 }
};

#endif

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多