分享

我写的多线程环形缓冲,没有用锁。大家看看有没有错误和优化的地方? - C/C - Ch...

 jijo 2008-08-29

要做分析网络数据包的项目,我写好的个多线程的环形缓冲,自己试了半天,好像没错误,大家帮忙看下。我没有用锁,锁对性能影响好像很大。但我觉得还有可以优化的地方。
背景要求:将捕获到的包存入环形缓冲的一个单元,如果缓冲缓冲满了(写线程快读线程一圈),就开一个是原来两倍大小新环形缓冲。读线程在处理完旧缓冲缓冲后,释放旧缓冲空间。
两个线程,生产者和消费者问题。
先说数据结构:
环形缓冲中有很多单元,每一个单元用于储存一个包的数据。线程可以同时访问环形缓冲这个大数据结构,但不能同时访问某一个缓冲单元。所以我在每个单元里加了个标志,0表明可写,1表明可读。初始为0.这样就不需要用到锁或信号量。 当标志为0,写线程可以访问,写完数据后,将标志改为1(都没使用原子操作),并跳入下一个单元。读线程在标志为1的时候,读取其中的数据,读完后将标志改为0.

/*Cirbuf chain 写线程可能很快 以至于不只有1个或2个环形缓冲,可能有多个*/
struct Cirbuf* packetCirbuf;

/*环形缓冲的一个单元*/
struct CirUnit
{
    int flag; /*0 表明可写,1表明可读*/
    char unit[UNITSIZE];
};

/*环形缓冲*/
struct Cirbuf
{
    struct Cirbuf * newBuf; /*point to new Cirbuf*/
    int readPoint; /*for reading thread*/
    int writePoint; /*for writing thread*/
    int bufSize;    /*buf char array's size*/
    struct CirUnit *buf;        /*point to unit array*/
};


读写线程
对于写线程:
如果写线程发现标志为0,这此缓冲单元可写,将数据写入。
如果写线程发现当前标志为1,就表明已经追上读线程,于是要新开缓冲空间。并把新空间加入到环形缓冲区链表中。
对于读线程:
如果发现标志位1,则单元可读,读出数据,并把标志改为0.
如果读线程发现当前标志为0,就表明 (1).追上了写线程 (2)或旧缓冲区所有单元已经被读线程处理完,而此时写线程已经在处理新缓冲区。 对于这两种情况的分辨,只要看下面Cirbuf结构的Cirbuf * newBuf 指针,如果指针为空,表明为第一种情况,否则为第二种。对于第二种情况,需要释放旧的环形缓冲区空间,并将指针指向新的环形缓冲区。
读线程在数据单元不可用时(读线程追上写线程),我最先写代码时是让他continue while的循环,后来改成了让他delay 50ms,这里不知道怎么处理好。


extern "C" void* ReadThread(void *arg)
{
    while(1)
    {
        if(packetCirbuf->buf[packetCirbuf->readPoint].flag == 0)
        {
            if(packetCirbuf->newBuf != NULL)
            {
                /*表明readthread已经处理完旧的缓冲区并且已经有新的缓冲区,这时应该释放旧缓冲*/
                struct Cirbuf *temp = packetCirbuf;
                packetCirbuf = packetCirbuf->newBuf;
                FreeCirbuf(temp);
                continue;
            }
            /* delay*/
            pthread_cond_t mycond = PTHREAD_COND_INITIALIZER;
            pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
            struct timespec ts;
            int rv;
            ts.tv_sec = 0;
            ts.tv_nsec = 50000; /* 500,00 nanoseconds = 50 ms */
            pthread_mutex_lock(&mymutex);
            rv = pthread_cond_timedwait(&mycond, &mymutex, &ts);
            pthread_mutex_unlock(&mymutex);

            continue;
        }
        DoRead();
        packetCirbuf->buf[packetCirbuf->readPoint].flag = 0;
        if(++packetCirbuf->readPoint == packetCirbuf->bufSize)
            packetCirbuf->readPoint = 0;
    }
    return NULL;
}

extern "C" void* WriteThread(void *arg)
{
    struct Cirbuf* latestBuf;
    /*进写线程后,初始化latestBuf指针 使其指向packetCirbuf,这时的packetCirbuf是由主线程函数创建的初始buf*/
    latestBuf = packetCirbuf;
    if(latestBuf == NULL)
    {
        cout << "packet capture buffer NULL error" << endl;
        exit(0);
    }
    /*进入循环*/
    char test=0;
    while(1)
    {
        if (latestBuf->buf[latestBuf->writePoint].flag == 1)
        {
            /*we need a larger buf*/
            latestBuf = GetNewCirbuf(latestBuf->bufSize * 2,latestBuf);
        }
        else
        {
            DoWrite(latestBuf,test++);
            latestBuf->buf[latestBuf->writePoint].flag = 1;
            if(++latestBuf->writePoint == latestBuf->bufSize)
                latestBuf->writePoint = 0;
        }
    }
    return NULL;
}


不需要用锁的原因是 数据竞争发生在flag内存单元,
读线程如果取得flag为 1,但线程调度或另一个CPU核上的线程将flag改为0,就会发生错误。但只有读线程自己会把flag从1变为0,所以不会出错 同理写进程也是。

程序应该有优化的地方 比如读进程追上写进程的时候,是阻塞50ms还是直接continue还是有其他方法之类。

我测试是写进程往缓冲写数字1 2 3 读进程打印出来,打印的结果没有错序,也没有缺数发生。

整个程序代码:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <iostream>
#include <thread.h>
#include <time.h>

#define INITBUFSIZE        10 /*环型缓冲区初始单元个数*/
#define UNITSIZE        200/*每单元大小*/

using namespace std;
/*global data*/
/*Cirbuf chain 写进程可能很快 以至于不只有1个或2个环形缓冲,可能有多个*/
struct Cirbuf* packetCirbuf;

/*环形缓冲的一个单元*/
struct CirUnit
{
        int flag; /*0 表明可写,1表明可读*/
        char unit[UNITSIZE];
};

/*环形缓冲*/
struct Cirbuf
{
        struct Cirbuf * newBuf; /*point to new Cirbuf*/
        int readPoint; /*for reading thread*/
        int writePoint; /*for writing thread*/
        int bufSize;        /*buf char array's size*/
        struct CirUnit *buf;                /*point to unit array*/
};

struct Cirbuf* GetNewCirbuf(int bufSize,struct Cirbuf* oldBuf)
{
        if(bufSize>30000)
        {
                printf("oh my god,the bufSize is out of my league,I cannot handle it,exit");
                exit(1);
        }
        struct Cirbuf* newBuf = new Cirbuf();
        if(oldBuf != NULL)
                oldBuf->newBuf = newBuf;
        newBuf->newBuf = NULL;
        newBuf->readPoint = newBuf->writePoint = 0;
        newBuf->bufSize = bufSize;
        newBuf->buf = new CirUnit[bufSize];
        memset(newBuf->buf,0,sizeof(CirUnit)*bufSize); /*初始化单元为0*/
        return newBuf;
}

int FreeCirbuf(struct Cirbuf* bufPoint)
{
        delete bufPoint->buf;
        delete bufPoint;
        return 1;
}

void DoWrite(struct Cirbuf* latestBuf,char flag)
{
        latestBuf->buf[latestBuf->writePoint].unit[0] = flag;
        //printf("%d ",flag);
}

void DoRead()
{
        //cout<< packetCirbuf->buf[packetCirbuf->readPoint].unit[0] << endl;
        //printf("%d ",packetCirbuf->buf[packetCirbuf->readPoint].unit[0]);
        //printf(".");
}

extern "C" void* ReadThread(void *arg)
{
        while(1)
        {
                if(packetCirbuf->buf[packetCirbuf->readPoint].flag == 0)
                {
                        if(packetCirbuf->newBuf != NULL)
                        {
                                /*表明readthread已经处理完旧的缓冲区并且已经有新的缓冲区,这时应该释放旧缓冲*/
                                struct Cirbuf *temp = packetCirbuf;
                                packetCirbuf = packetCirbuf->newBuf;
                                FreeCirbuf(temp);
                                continue;
                        }
                        /* delay*/
                        pthread_cond_t mycond = PTHREAD_COND_INITIALIZER;
                        pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
                        struct timespec ts;
                        int rv;
                        ts.tv_sec = 0;
                        ts.tv_nsec = 50000; /* 500,00 nanoseconds = 50 ms */
                        pthread_mutex_lock(&mymutex);
                        rv = pthread_cond_timedwait(&mycond, &mymutex, &ts);
                        pthread_mutex_unlock(&mymutex);

                        continue;
                }
                DoRead();
                packetCirbuf->buf[packetCirbuf->readPoint].flag = 0;
                if(++packetCirbuf->readPoint == packetCirbuf->bufSize)
                        packetCirbuf->readPoint = 0;
        }
        return NULL;
}

extern "C" void* WriteThread(void *arg)
{
        struct Cirbuf* latestBuf;
        /*进写进程后,初始化latestBuf指针 使其指向packetCirbuf,这时的packetCirbuf是由主线程函数创建的初始buf*/
        latestBuf = packetCirbuf;
        if(latestBuf == NULL)
        {
                cout << "packet capture buffer NULL error" << endl;
                exit(0);
        }
        /*进入循环*/
        char test=0;
        while(1)
        {
                if (latestBuf->buf[latestBuf->writePoint].flag == 1)
                {
                        /*we need a larger buf*/
                        latestBuf = GetNewCirbuf(latestBuf->bufSize * 2,latestBuf);
                }
                else
                {
                        DoWrite(latestBuf,test++);
                        latestBuf->buf[latestBuf->writePoint].flag = 1;
                        if(++latestBuf->writePoint == latestBuf->bufSize)
                                latestBuf->writePoint = 0;
                }
        }
        return NULL;
}

int main(int argc, char *argv[])
{
        pthread_t threadNum[2];
        packetCirbuf = GetNewCirbuf(INITBUFSIZE,NULL);
        thr_create(0,0,WriteThread,0,0,&threadNum[0]);
        thr_create(0,0,ReadThread,0,0,&threadNum[1]);
        thr_join(threadNum[0],NULL,NULL);
        thr_join(threadNum[1],NULL,NULL);
        FreeCirbuf(packetCirbuf);
}





    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多