分享

使用libevent进行多线程socket编程demo

 mediatv 2015-10-29
最近要对一个用libevent写的C C++项目进行修改,要改成多线程的,故做了一些学习和研究。libevent是一个用C语言写的开源的一个库。它对sock

最近要对一个用libevent写的C/C++项目进行修改,要改成多线程的,故做了一些学习和研究。

libevent是一个用C语言写的开源的一个库。它对socket编程里的epoll/select等功能进行了封装,并且使用了一些设计模式(比如反 应堆模式),用事件机制来简化了socket编程。libevent的好处网上有很多,但是初学者往往都看不懂。我打个比方吧, 1) 假设有N个客户端同时往服务端通过socket写数据,用了libevent之后,你的server程序里就不用再使用epoll或是select来判断 都哪些socket的缓冲区里已经收到了客户端写来的数据。当某个socket的缓冲区里有可读数据时,libevent会自动触发一个“读事件”,通过 这个“读事件”来调用相应的代码来读取socket缓冲区里的数据即可。换句话说,libevent自己调用select()或是epoll的函数来判断 哪个缓冲区可读了,只要可读了,就自动调用相应的处理程序。 2) 对于“写事件”,libevent会监控某个socket的缓冲区是否可写(一般情况下,只要缓冲区没满就可写),只要可写,就会触发“写事件”,通过“写事件”来调用相应的函数,将数据写到socket里。

以上两个例子分别从“读”和“写”两方面简介了一下,可能不十分准确(但十分准确的描述往往会让人看不懂)。

以下两个链接关于libevent的剖析比较详细,想学习libevent最好看一下。

1) sparkliang的专栏 2) 鱼思故渊的专栏

=========关于libevent使用多线程的讨论=========================

网上很多资料说libevent不支持多线程,也有很多人说libevent可以支持多线程。究竟值不支持呢?我的答案是: 得看你的多线程是怎么写的,如何跟libevent结合的。

1)可以肯定的是,libevent的 信号事件 是不支持多线程的(因为源码里用了个全局变量)。可以看这篇文章(http://blog.csdn.net/sparkliang/article/details/5306809)。(注:libevent里有“超时事件”,“IO事件”,“信号事件”。)

2)对于不同的线程,使用不同的base,是可以的。

3)如果不同的线程使用相同的base呢?——如果在不同的线程里的事件都注册到同一个base上,会有问题吗?

(http://www.cnblogs.com/zzyoucan/p/3970578.html)这篇博客里提到说,不行!即使加锁也不行。我最近稍 微看了部分源码,我的答案是:不加锁会有并发问题,但如果对每个event_add(),event_del()等这些操作event的动作都用同一个临 界变量来加锁,应该是没问题的。——貌似也有点问题,如果某个事件没有用event_set()设置为EV_PERSIST,当事件发生时,会被自动删 除。有可能线程a在删除事件的时候,线程b却在添加事件,这样还是会出现并发问题。 最后的结论是——不行!

========本次实验代码逻辑的说明==========================

我采取的方案是对于不同的线程,使用不同的base。——即每个线程对应一个base,将线程里的事件注册到线程的base上,而不是所有线程里的事件都用同一个base。

一 实验需求描述:

1)写一个client和server程序。多个client可以同时连接一个server;

2)client接收用户在标准输入的字符,发往server端;

3)server端收到后,再把收到的数据处理一下,返回给client;

4)client收到server返回的数据后,将其打印在终端上。

二 设计方案:

1. client:

1) client采用两个线程,主线程接收用户在终端上的输入,并通过socket将用户的输入发往server。

2) 派生一个子线程,接收server返回来的数据,如果收到数据,就打印出来。

2. server:

在主线程里监听client有没有连接连过来,如果有,立马accept出一个socket,并创建一个子线程,在子线程里接收client传过来的数据,并对数据进行一些修改,然后将修改后的数据写回到client端。

三 代码实现

1. client代码如下:

  1 #include <iostream>
  2 #include <sys/select.h>
  3 #include <sys/socket.h>
  4 #include <unistd.h>
  5 #include <pthread.h>
  6 #include <stdio.h>
  7 #include <stdlib.h>
  8 #include <sys/types.h>
  9 #include <netinet/in.h>
 10 #include <arpa/inet.h>
 11 #include <string>
 12 #include <string.h>
 13 #include <event.h>
 14 using namespace std;
 15 
 16 #define BUF_SIZE 1024
 17 
 18 /**
 19  * 连接到server端,如果成功,返回fd,如果失败返回-1
 20  */
 21 int connectServer(char* ip, int port){
 22     int fd = socket( AF_INET, SOCK_STREAM, 0 );
 23     cout<<"fd= "<<fd<<endl;
 24     if(-1 == fd){
 25         cout<<"Error, connectServer() quit"<<endl;
 26         return -1;
 27     }
 28     struct sockaddr_in remote_addr; //服务器端网络地址结构体
 29     memset(&remote_addr,0,sizeof(remote_addr)); //数据初始化--清零
 30     remote_addr.sin_family=AF_INET; //设置为IP通信
 31     remote_addr.sin_addr.s_addr=inet_addr(ip);//服务器IP地址
 32     remote_addr.sin_port=htons(port); //服务器端口号
 33     int con_result = connect(fd, (struct sockaddr*) &remote_addr, sizeof(struct sockaddr));
 34     if(con_result < 0){
 35         cout<<"Connect Error!"<<endl;
 36         close(fd);
 37         return -1;
 38     }
 39     cout<<"con_result="<<con_result<<endl;
 40     return fd;
 41 }
 42 
 43 void on_read(int sock, short event, void* arg)
 44 {
 45     char* buffer = new char[BUF_SIZE];
 46     memset(buffer, 0, sizeof(char)*BUF_SIZE);
 47     //--本来应该用while一直循环,但由于用了libevent,只在可以读的时候才触发on_read(),故不必用while了
 48     int size = read(sock, buffer, BUF_SIZE);
 49     if(0 == size){//说明socket关闭
 50         cout<<"read size is 0 for socket:"<<sock<<endl;
 51         return;
 52     }
 53     cout<<"Received from server---"<<buffer<<endl;
 54     delete[]buffer;
 55 }
 56 
 57 void* init_read_event(void* arg){
 58     long long_sock = (long)arg;
 59     int sock = (int)long_sock;
 60     //-----初始化libevent,设置回调函数on_read()------------
 61     struct event_base* base = event_base_new();
 62     struct event* read_ev = (struct event*)malloc(sizeof(struct event));//发生读事件后,从socket中取出数据
 63     event_set(read_ev, sock, EV_READ|EV_PERSIST, on_read, NULL);
 64     event_base_set(base, read_ev);
 65     event_add(read_ev, NULL);
 66     event_base_dispatch(base);
 67     //--------------
 68     event_del(read_ev);
 69     free(read_ev);
 70     event_base_free(base);
 71 }
 72 /**
 73  * 创建一个新线程,在新线程里初始化libevent读事件的相关设置,并开启event_base_dispatch
 74  */
 75 void init_read_event_thread(int sock){
 76     pthread_t thread;
 77     pthread_create(&thread,NULL,init_read_event,(void*)sock);
 78     pthread_detach(thread);
 79 }
 80 int main() {
 81     cout << "main started" << endl; // prints Hello World!!!
 82     cout << "Please input server IP:"<<endl;
 83     char ip[16];
 84     cin >> ip;
 85     cout << "Please input port:"<<endl;
 86     int port;
 87     cin >> port;
 88     cout << "ServerIP is "<<ip<<" ,port="<<port<<endl;
 89     int socket_fd = connectServer(ip, port);
 90     cout << "socket_fd="<<socket_fd<<endl;
 91     init_read_event_thread(socket_fd);
 92     //--------------------------
 93     char buffer[BUF_SIZE];
 94     bool isBreak = false;
 95     while(!isBreak){
 96         cout << "Input your data to server(\'q\' or \"quit\" to exit)"<<endl;
 97         cin >> buffer;
 98         if(strcmp("q", buffer)==0 || strcmp("quit", buffer)==0){
 99             isBreak=true;
100             close(socket_fd);
101             break;
102         }
103         cout << "Your input is "<<buffer<<endl;
104         int write_num = write(socket_fd, buffer, strlen(buffer));
105         cout << write_num <<" characters written"<<endl;
106         sleep(2);
107     }
108     cout<<"main finished"<<endl;
109     return 0;
110 }
client端的代码

1)在main()里先调用init_read_event_thread()来生成一个子线程,子线程里调用 init_read_event()来将socket的读事件注册到libevent的base上,并调用libevent的 event_base_dispatch()不断地进行轮询。一旦socket可读,libevent就调用“读事件”上绑定的on_read()函数来 读取数据。

2)在main()的主线程里,通过一个while循环来接收用户从终端的输入,并通过socket将用户的输入写到server端。

-------------------------------------------------------------

2. server端代码如下:

  1 #include <iostream>
  2 #include <sys/select.h>
  3 #include <sys/socket.h>
  4 #include <stdio.h>
  5 #include <unistd.h>
  6 #include <pthread.h>
  7 #include <stdio.h>
  8 #include <sys/types.h>
  9 #include <netinet/in.h>
 10 #include <arpa/inet.h>
 11 #include <string>
 12 #include <string.h>
 13 #include <event.h>
 14 #include <stdlib.h>
 15 using namespace std;
 16 
 17 #define SERVER_IP "127.0.0.1"
 18 #define SERVER_PORT 9090
 19 #define BUF_SIZE 1024
 20 
 21 struct sock_ev_write{//用户写事件完成后的销毁,在on_write()中执行
 22     struct event* write_ev;
 23     char* buffer;
 24 };
 25 struct sock_ev {//用于读事件终止(socket断开)后的销毁
 26     struct event_base* base;//因为socket断掉后,读事件的loop要终止,所以要有base指针
 27     struct event* read_ev;
 28 };
 29 
 30 /**
 31  * 销毁写事件用到的结构体
 32  */
 33 void destroy_sock_ev_write(struct sock_ev_write* sock_ev_write_struct){
 34     if(NULL != sock_ev_write_struct){
 35 //        event_del(sock_ev_write_struct->write_ev);//因为写事件没用EV_PERSIST,故不用event_del
 36         if(NULL != sock_ev_write_struct->write_ev){
 37             free(sock_ev_write_struct->write_ev);
 38         }
 39         if(NULL != sock_ev_write_struct->buffer){
 40             delete[]sock_ev_write_struct->buffer;
 41         }
 42         free(sock_ev_write_struct);
 43     }
 44 }
 45 
 46 
 47 /**
 48  * 读事件结束后,用于销毁相应的资源
 49  */
 50 void destroy_sock_ev(struct sock_ev* sock_ev_struct){
 51     if(NULL == sock_ev_struct){
 52         return;
 53     }
 54     event_del(sock_ev_struct->read_ev);
 55     event_base_loopexit(sock_ev_struct->base, NULL);//停止loop循环
 56     if(NULL != sock_ev_struct->read_ev){
 57         free(sock_ev_struct->read_ev);
 58     }
 59     event_base_free(sock_ev_struct->base);
 60 //    destroy_sock_ev_write(sock_ev_struct->sock_ev_write_struct);
 61     free(sock_ev_struct);
 62 }
 63 int getSocket(){
 64     int fd =socket( AF_INET, SOCK_STREAM, 0 );
 65     if(-1 == fd){
 66         cout<<"Error, fd is -1"<<endl;
 67     }
 68     return fd;
 69 }
 70 
 71 void on_write(int sock, short event, void* arg)
 72 {
 73     cout<<"on_write() called, sock="<<sock<<endl;
 74     if(NULL == arg){
 75         cout<<"Error! void* arg is NULL in on_write()"<<endl;
 76         return;
 77     }
 78     struct sock_ev_write* sock_ev_write_struct = (struct sock_ev_write*)arg;
 79 
 80     char buffer[BUF_SIZE];
 81     sprintf(buffer, "fd=%d, received[%s]", sock, sock_ev_write_struct->buffer);
 82 //    int write_num0 = write(sock, sock_ev_write_struct->buffer, strlen(sock_ev_write_struct->buffer));
 83 //    int write_num = write(sock, sock_ev_write_struct->buffer, strlen(sock_ev_write_struct->buffer));
 84     int write_num = write(sock, buffer, strlen(buffer));
 85     destroy_sock_ev_write(sock_ev_write_struct);
 86     cout<<"on_write() finished, sock="<<sock<<endl;
 87 }
 88 
 89 void on_read(int sock, short event, void* arg)
 90 {
 91     cout<<"on_read() called, sock="<<sock<<endl;
 92     if(NULL == arg){
 93         return;
 94     }
 95     struct sock_ev* event_struct = (struct sock_ev*) arg;//获取传进来的参数
 96     char* buffer = new char[BUF_SIZE];
 97     memset(buffer, 0, sizeof(char)*BUF_SIZE);
 98     //--本来应该用while一直循环,但由于用了libevent,只在可以读的时候才触发on_read(),故不必用while了
 99     int size = read(sock, buffer, BUF_SIZE);
100     if(0 == size){//说明socket关闭
101         cout<<"read size is 0 for socket:"<<sock<<endl;
102         destroy_sock_ev(event_struct);
103         close(sock);
104         return;
105     }
106     struct sock_ev_write* sock_ev_write_struct = (struct sock_ev_write*)malloc(sizeof(struct sock_ev_write));
107     sock_ev_write_struct->buffer = buffer;
108     struct event* write_ev = (struct event*)malloc(sizeof(struct event));//发生写事件(也就是只要socket缓冲区可写)时,就将反馈数据通过socket写回客户端
109     sock_ev_write_struct->write_ev = write_ev;
110     event_set(write_ev, sock, EV_WRITE, on_write, sock_ev_write_struct);
111     event_base_set(event_struct->base, write_ev);
112     event_add(write_ev, NULL);
113     cout<<"on_read() finished, sock="<<sock<<endl;
114 }
115 
116 
117 /**
118  * main执行accept()得到新socket_fd的时候,执行这个方法
119  * 创建一个新线程,在新线程里反馈给client收到的信息
120  */
121 void* process_in_new_thread_when_accepted(void* arg){
122     long long_fd = (long)arg;
123     int fd = (int)long_fd;
124     if(fd<0){
125         cout<<"process_in_new_thread_when_accepted() quit!"<<endl;
126         return 0;
127     }
128     //-------初始化base,写事件和读事件--------
129     struct event_base* base = event_base_new();
130     struct event* read_ev = (struct event*)malloc(sizeof(struct event));//发生读事件后,从socket中取出数据
131 
132     //-------将base,read_ev,write_ev封装到一个event_struct对象里,便于销毁---------
133     struct sock_ev* event_struct = (struct sock_ev*)malloc(sizeof(struct sock_ev));
134     event_struct->base = base;
135     event_struct->read_ev = read_ev;
136     //-----对读事件进行相应的设置------------
137     event_set(read_ev, fd, EV_READ|EV_PERSIST, on_read, event_struct);
138     event_base_set(base, read_ev);
139     event_add(read_ev, NULL);
140     //--------开始libevent的loop循环-----------
141     event_base_dispatch(base);
142     cout<<"event_base_dispatch() stopped for sock("<<fd<<")"<<" in process_in_new_thread_when_accepted()"<<endl;
143     return 0;
144 }
145 
146 /**
147  * 每当accept出一个新的socket_fd时,调用这个方法。
148  * 创建一个新线程,在新线程里与client做交互
149  */
150 void accept_new_thread(int sock){
151     pthread_t thread;
152     pthread_create(&thread,NULL,process_in_new_thread_when_accepted,(void*)sock);
153     pthread_detach(thread);
154 }
155 
156 /**
157  * 每当有新连接连到server时,就通过libevent调用此函数。
158  *    每个连接对应一个新线程
159  */
160 void on_accept(int sock, short event, void* arg)
161 {
162     struct sockaddr_in remote_addr;
163     int sin_size=sizeof(struct sockaddr_in);
164     int new_fd = accept(sock,  (struct sockaddr*) &remote_addr, (socklen_t*)&sin_size);
165     if(new_fd < 0){
166         cout<<"Accept error in on_accept()"<<endl;
167         return;
168     }
169     cout<<"new_fd accepted is "<<new_fd<<endl;
170     accept_new_thread(new_fd);
171     cout<<"on_accept() finished for fd="<<new_fd<<endl;
172 }
173 
174 int main(){
175     int fd = getSocket();
176     if(fd<0){
177         cout<<"Error in main(), fd<0"<<endl;
178     }
179     cout<<"main() fd="<<fd<<endl;
180     //----为服务器主线程绑定ip和port------------------------------
181     struct sockaddr_in local_addr; //服务器端网络地址结构体
182     memset(&local_addr,0,sizeof(local_addr)); //数据初始化--清零
183     local_addr.sin_family=AF_INET; //设置为IP通信
184     local_addr.sin_addr.s_addr=inet_addr(SERVER_IP);//服务器IP地址
185     local_addr.sin_port=htons(SERVER_PORT); //服务器端口号
186     int bind_result = bind(fd, (struct sockaddr*) &local_addr, sizeof(struct sockaddr));
187     if(bind_result < 0){
188         cout<<"Bind Error in main()"<<endl;
189         return -1;
190     }
191     cout<<"bind_result="<<bind_result<<endl;
192     listen(fd, 10);
193     //-----设置libevent事件,每当socket出现可读事件,就调用on_accept()------------
194     struct event_base* base = event_base_new();
195     struct event listen_ev;
196     event_set(&listen_ev, fd, EV_READ|EV_PERSIST, on_accept, NULL);
197     event_base_set(base, &listen_ev);
198     event_add(&listen_ev, NULL);
199     event_base_dispatch(base);
200     //------以下语句理论上是不会走到的---------------------------
201     cout<<"event_base_dispatch() in main() finished"<<endl;
202     //----销毁资源-------------
203     event_del(&listen_ev);
204     event_base_free(base);
205     cout<<"main() finished"<<endl;
206 }
server端的代码

1)在main()里(运行在主线程中),先设置服务端的socket,然后为主线程生成一个libevent的base,并将一个“读事件”注 册到base上。“读事件”绑定了一个on_accept(),每当client有新连接连过来时,就会触发这个“读事件”,进而调用 on_accept()方法。

2)在on_accept()里(运行在主线程中),每当有新连接连过来时,就会accept出一个新的new_fd,并调用 accept_new_thread()来创建一个新的子线程。子线程里会调用 process_in_new_thread_when_accepted()方法。

3)process_in_new_thread_when_accepted()方法里(运行在子线程中),创建一个子线程的base,并创建 一个“读事件”,注册到“子线程的base”上。并调用event_base_dispatch(base)进入libevent的loop中。当发现 new_fd的socket缓冲区中有数据可读时,就触发了这个“读事件”,继而调用on_read()方法。

4)on_read()方法里(运行在子线程中),从socket缓冲区里读取数据。读完数据之后,将一个“写事件”注册到“子线程的base”上。一旦socket可写,就调用on_write()函数。

5)on_write()方法(运行在子线程中),对数据进行修改,然后通过socket写回到client端。

注:其实可以不用注册“写事件”,在on_read()方法中直接修改数据,然后写回到client端也是可以的——但这有个问题。就是如果 socket的写缓冲区是满的,那么这时候 write(sock, buffer, strlen(buffer))会阻塞的。这会导致整个on_read()方法阻塞掉,而无法读到接下来client传过来的数据了。而用了 libevent的”写事件“之后,虽然 write(sock, buffer, strlen(buffer))仍然会阻塞,但如果write操作是在另外的线程里,就不会影响on_read()函数里的流程了。(本例中write操 作没有在另外的线程里,所以注不注册“写事件”效果都一样)。

相关热词搜索: libevent 线程

下一篇:排错经历:全局变量被多次析构
上一篇:微博推荐静态数据存储方案: lushan

相关文章

关键词:
  • 1
  • 0
  • 0
  • 0
  • 0
  • 0
  • 0
  • 0
    爱好Linux技术网-Linux操作系统命令安装下载视频教程学习网站!
    当前位置:首页 > 编程 > c&c++ > 使用libevent进行多线程socket编程demo

    使用libevent进行多线程socket编程demo

    发布时间:2015-03-08 21:26:38   编辑:AHLinux.com
    最近要对一个用libevent写的C C++项目进行修改,要改成多线程的,故做了一些学习和研究。libevent是一个用C语言写的开源的一个库。它对sock

    最近要对一个用libevent写的C/C++项目进行修改,要改成多线程的,故做了一些学习和研究。

    libevent是一个用C语言写的开源的一个库。它对socket编程里的epoll/select等功能进行了封装,并且使用了一些设计模式 (比如反 应堆模式),用事件机制来简化了socket编程。libevent的好处网上有很多,但是初学者往往都看不懂。我打个比方吧, 1) 假设有N个客户端同时往服务端通过socket写数据,用了libevent之后,你的server程序里就不用再使用epoll或是select来判断 都哪些socket的缓冲区里已经收到了客户端写来的数据。当某个socket的缓冲区里有可读数据时,libevent会自动触发一个“读事件”,通过 这个“读事件”来调用相应的代码来读取socket缓冲区里的数据即可。换句话说,libevent自己调用select()或是epoll的函数来判断 哪个缓冲区可读了,只要可读了,就自动调用相应的处理程序。 2) 对于“写事件”,libevent会监控某个socket的缓冲区是否可写(一般情况下,只要缓冲区没满就可写),只要可写,就会触发“写事件”,通过“写事件”来调用相应的函数,将数据写到socket里。

    以上两个例子分别从“读”和“写”两方面简介了一下,可能不十分准确(但十分准确的描述往往会让人看不懂)。

    以下两个链接关于libevent的剖析比较详细,想学习libevent最好看一下。

    1) sparkliang的专栏 2) 鱼思故渊的专栏

    =========关于libevent使用多线程的讨论=========================

    网上很多资料说libevent不支持多线程,也有很多人说libevent可以支持多线程。究竟值不支持呢?我的答案是: 得看你的多线程是怎么写的,如何跟libevent结合的。

    1)可以肯定的是,libevent的 信号事件 是不支持多线程的(因为源码里用了个全局变量)。可以看这篇文章(http://blog.csdn.net/sparkliang/article/details/5306809)。(注:libevent里有“超时事件”,“IO事件”,“信号事件”。)

    2)对于不同的线程,使用不同的base,是可以的。

    3)如果不同的线程使用相同的base呢?——如果在不同的线程里的事件都注册到同一个base上,会有问题吗?

    (http://www.cnblogs.com/zzyoucan/p/3970578.html)这篇博客里提到说,不行!即使加锁也不行。我 最近稍 微看了部分源码,我的答案是:不加锁会有并发问题,但如果对每个event_add(),event_del()等这些操作event的动作都用同一个临 界变量来加锁,应该是没问题的。——貌似也有点问题,如果某个事件没有用event_set()设置为EV_PERSIST,当事件发生时,会被自动删 除。有可能线程a在删除事件的时候,线程b却在添加事件,这样还是会出现并发问题。 最后的结论是——不行!

    ========本次实验代码逻辑的说明==========================

    我采取的方案是对于不同的线程,使用不同的base。——即每个线程对应一个base,将线程里的事件注册到线程的base上,而不是所有线程里的事件都用同一个base。

    一 实验需求描述:

    1)写一个client和server程序。多个client可以同时连接一个server;

    2)client接收用户在标准输入的字符,发往server端;

    3)server端收到后,再把收到的数据处理一下,返回给client;

    4)client收到server返回的数据后,将其打印在终端上。

    二 设计方案:

    1. client:

    1) client采用两个线程,主线程接收用户在终端上的输入,并通过socket将用户的输入发往server。

    2) 派生一个子线程,接收server返回来的数据,如果收到数据,就打印出来。

    2. server:

    在主线程里监听client有没有连接连过来,如果有,立马accept出一个socket,并创建一个子线程,在子线程里接收client传过来的数据,并对数据进行一些修改,然后将修改后的数据写回到client端。

    三 代码实现

    1. client代码如下:

      1 #include <iostream>
      2 #include <sys/select.h>
      3 #include <sys/socket.h>
      4 #include <unistd.h>
      5 #include <pthread.h>
      6 #include <stdio.h>
      7 #include <stdlib.h>
      8 #include <sys/types.h>
      9 #include <netinet/in.h>
     10 #include <arpa/inet.h>
     11 #include <string>
     12 #include <string.h>
     13 #include <event.h>
     14 using namespace std;
     15 
     16 #define BUF_SIZE 1024
     17 
     18 /**
     19  * 连接到server端,如果成功,返回fd,如果失败返回-1
     20  */
     21 int connectServer(char* ip, int port){
     22     int fd = socket( AF_INET, SOCK_STREAM, 0 );
     23     cout<<"fd= "<<fd<<endl;
     24     if(-1 == fd){
     25         cout<<"Error, connectServer() quit"<<endl;
     26         return -1;
     27     }
     28     struct sockaddr_in remote_addr; //服务器端网络地址结构体
     29     memset(&remote_addr,0,sizeof(remote_addr)); //数据初始化--清零
     30     remote_addr.sin_family=AF_INET; //设置为IP通信
     31     remote_addr.sin_addr.s_addr=inet_addr(ip);//服务器IP地址
     32     remote_addr.sin_port=htons(port); //服务器端口号
     33     int con_result = connect(fd, (struct sockaddr*) &remote_addr, sizeof(struct sockaddr));
     34     if(con_result < 0){
     35         cout<<"Connect Error!"<<endl;
     36         close(fd);
     37         return -1;
     38     }
     39     cout<<"con_result="<<con_result<<endl;
     40     return fd;
     41 }
     42 
     43 void on_read(int sock, short event, void* arg)
     44 {
     45     char* buffer = new char[BUF_SIZE];
     46     memset(buffer, 0, sizeof(char)*BUF_SIZE);
     47     //--本来应该用while一直循环,但由于用了libevent,只在可以读的时候才触发on_read(),故不必用while了
     48     int size = read(sock, buffer, BUF_SIZE);
     49     if(0 == size){//说明socket关闭
     50         cout<<"read size is 0 for socket:"<<sock<<endl;
     51         return;
     52     }
     53     cout<<"Received from server---"<<buffer<<endl;
     54     delete[]buffer;
     55 }
     56 
     57 void* init_read_event(void* arg){
     58     long long_sock = (long)arg;
     59     int sock = (int)long_sock;
     60     //-----初始化libevent,设置回调函数on_read()------------
     61     struct event_base* base = event_base_new();
     62     struct event* read_ev = (struct event*)malloc(sizeof(struct event));//发生读事件后,从socket中取出数据
     63     event_set(read_ev, sock, EV_READ|EV_PERSIST, on_read, NULL);
     64     event_base_set(base, read_ev);
     65     event_add(read_ev, NULL);
     66     event_base_dispatch(base);
     67     //--------------
     68     event_del(read_ev);
     69     free(read_ev);
     70     event_base_free(base);
     71 }
     72 /**
     73  * 创建一个新线程,在新线程里初始化libevent读事件的相关设置,并开启event_base_dispatch
     74  */
     75 void init_read_event_thread(int sock){
     76     pthread_t thread;
     77     pthread_create(&thread,NULL,init_read_event,(void*)sock);
     78     pthread_detach(thread);
     79 }
     80 int main() {
     81     cout << "main started" << endl; // prints Hello World!!!
     82     cout << "Please input server IP:"<<endl;
     83     char ip[16];
     84     cin >> ip;
     85     cout << "Please input port:"<<endl;
     86     int port;
     87     cin >> port;
     88     cout << "ServerIP is "<<ip<<" ,port="<<port<<endl;
     89     int socket_fd = connectServer(ip, port);
     90     cout << "socket_fd="<<socket_fd<<endl;
     91     init_read_event_thread(socket_fd);
     92     //--------------------------
     93     char buffer[BUF_SIZE];
     94     bool isBreak = false;
     95     while(!isBreak){
     96         cout << "Input your data to server(\'q\' or \"quit\" to exit)"<<endl;
     97         cin >> buffer;
     98         if(strcmp("q", buffer)==0 || strcmp("quit", buffer)==0){
     99             isBreak=true;
    100             close(socket_fd);
    101             break;
    102         }
    103         cout << "Your input is "<<buffer<<endl;
    104         int write_num = write(socket_fd, buffer, strlen(buffer));
    105         cout << write_num <<" characters written"<<endl;
    106         sleep(2);
    107     }
    108     cout<<"main finished"<<endl;
    109     return 0;
    110 }
    client端的代码

    1)在main()里先调用init_read_event_thread()来生成一个子线程,子线程里调用 init_read_event()来将socket的读事件注册到libevent的base上,并调用libevent的 event_base_dispatch()不断地进行轮询。一旦socket可读,libevent就调用“读事件”上绑定的on_read()函数来 读取数据。

    2)在main()的主线程里,通过一个while循环来接收用户从终端的输入,并通过socket将用户的输入写到server端。

    -------------------------------------------------------------

    2. server端代码如下:

      1 #include <iostream>
      2 #include <sys/select.h>
      3 #include <sys/socket.h>
      4 #include <stdio.h>
      5 #include <unistd.h>
      6 #include <pthread.h>
      7 #include <stdio.h>
      8 #include <sys/types.h>
      9 #include <netinet/in.h>
     10 #include <arpa/inet.h>
     11 #include <string>
     12 #include <string.h>
     13 #include <event.h>
     14 #include <stdlib.h>
     15 using namespace std;
     16 
     17 #define SERVER_IP "127.0.0.1"
     18 #define SERVER_PORT 9090
     19 #define BUF_SIZE 1024
     20 
     21 struct sock_ev_write{//用户写事件完成后的销毁,在on_write()中执行
     22     struct event* write_ev;
     23     char* buffer;
     24 };
     25 struct sock_ev {//用于读事件终止(socket断开)后的销毁
     26     struct event_base* base;//因为socket断掉后,读事件的loop要终止,所以要有base指针
     27     struct event* read_ev;
     28 };
     29 
     30 /**
     31  * 销毁写事件用到的结构体
     32  */
     33 void destroy_sock_ev_write(struct sock_ev_write* sock_ev_write_struct){
     34     if(NULL != sock_ev_write_struct){
     35 //        event_del(sock_ev_write_struct->write_ev);//因为写事件没用EV_PERSIST,故不用event_del
     36         if(NULL != sock_ev_write_struct->write_ev){
     37             free(sock_ev_write_struct->write_ev);
     38         }
     39         if(NULL != sock_ev_write_struct->buffer){
     40             delete[]sock_ev_write_struct->buffer;
     41         }
     42         free(sock_ev_write_struct);
     43     }
     44 }
     45 
     46 
     47 /**
     48  * 读事件结束后,用于销毁相应的资源
     49  */
     50 void destroy_sock_ev(struct sock_ev* sock_ev_struct){
     51     if(NULL == sock_ev_struct){
     52         return;
     53     }
     54     event_del(sock_ev_struct->read_ev);
     55     event_base_loopexit(sock_ev_struct->base, NULL);//停止loop循环
     56     if(NULL != sock_ev_struct->read_ev){
     57         free(sock_ev_struct->read_ev);
     58     }
     59     event_base_free(sock_ev_struct->base);
     60 //    destroy_sock_ev_write(sock_ev_struct->sock_ev_write_struct);
     61     free(sock_ev_struct);
     62 }
     63 int getSocket(){
     64     int fd =socket( AF_INET, SOCK_STREAM, 0 );
     65     if(-1 == fd){
     66         cout<<"Error, fd is -1"<<endl;
     67     }
     68     return fd;
     69 }
     70 
     71 void on_write(int sock, short event, void* arg)
     72 {
     73     cout<<"on_write() called, sock="<<sock<<endl;
     74     if(NULL == arg){
     75         cout<<"Error! void* arg is NULL in on_write()"<<endl;
     76         return;
     77     }
     78     struct sock_ev_write* sock_ev_write_struct = (struct sock_ev_write*)arg;
     79 
     80     char buffer[BUF_SIZE];
     81     sprintf(buffer, "fd=%d, received[%s]", sock, sock_ev_write_struct->buffer);
     82 //    int write_num0 = write(sock, sock_ev_write_struct->buffer, strlen(sock_ev_write_struct->buffer));
     83 //    int write_num = write(sock, sock_ev_write_struct->buffer, strlen(sock_ev_write_struct->buffer));
     84     int write_num = write(sock, buffer, strlen(buffer));
     85     destroy_sock_ev_write(sock_ev_write_struct);
     86     cout<<"on_write() finished, sock="<<sock<<endl;
     87 }
     88 
     89 void on_read(int sock, short event, void* arg)
     90 {
     91     cout<<"on_read() called, sock="<<sock<<endl;
     92     if(NULL == arg){
     93         return;
     94     }
     95     struct sock_ev* event_struct = (struct sock_ev*) arg;//获取传进来的参数
     96     char* buffer = new char[BUF_SIZE];
     97     memset(buffer, 0, sizeof(char)*BUF_SIZE);
     98     //--本来应该用while一直循环,但由于用了libevent,只在可以读的时候才触发on_read(),故不必用while了
     99     int size = read(sock, buffer, BUF_SIZE);
    100     if(0 == size){//说明socket关闭
    101         cout<<"read size is 0 for socket:"<<sock<<endl;
    102         destroy_sock_ev(event_struct);
    103         close(sock);
    104         return;
    105     }
    106     struct sock_ev_write* sock_ev_write_struct = (struct sock_ev_write*)malloc(sizeof(struct sock_ev_write));
    107     sock_ev_write_struct->buffer = buffer;
    108     struct event* write_ev = (struct event*)malloc(sizeof(struct event));//发生写事件(也就是只要socket缓冲区可写)时,就将反馈数据通过socket写回客户端
    109     sock_ev_write_struct->write_ev = write_ev;
    110     event_set(write_ev, sock, EV_WRITE, on_write, sock_ev_write_struct);
    111     event_base_set(event_struct->base, write_ev);
    112     event_add(write_ev, NULL);
    113     cout<<"on_read() finished, sock="<<sock<<endl;
    114 }
    115 
    116 
    117 /**
    118  * main执行accept()得到新socket_fd的时候,执行这个方法
    119  * 创建一个新线程,在新线程里反馈给client收到的信息
    120  */
    121 void* process_in_new_thread_when_accepted(void* arg){
    122     long long_fd = (long)arg;
    123     int fd = (int)long_fd;
    124     if(fd<0){
    125         cout<<"process_in_new_thread_when_accepted() quit!"<<endl;
    126         return 0;
    127     }
    128     //-------初始化base,写事件和读事件--------
    129     struct event_base* base = event_base_new();
    130     struct event* read_ev = (struct event*)malloc(sizeof(struct event));//发生读事件后,从socket中取出数据
    131 
    132     //-------将base,read_ev,write_ev封装到一个event_struct对象里,便于销毁---------
    133     struct sock_ev* event_struct = (struct sock_ev*)malloc(sizeof(struct sock_ev));
    134     event_struct->base = base;
    135     event_struct->read_ev = read_ev;
    136     //-----对读事件进行相应的设置------------
    137     event_set(read_ev, fd, EV_READ|EV_PERSIST, on_read, event_struct);
    138     event_base_set(base, read_ev);
    139     event_add(read_ev, NULL);
    140     //--------开始libevent的loop循环-----------
    141     event_base_dispatch(base);
    142     cout<<"event_base_dispatch() stopped for sock("<<fd<<")"<<" in process_in_new_thread_when_accepted()"<<endl;
    143     return 0;
    144 }
    145 
    146 /**
    147  * 每当accept出一个新的socket_fd时,调用这个方法。
    148  * 创建一个新线程,在新线程里与client做交互
    149  */
    150 void accept_new_thread(int sock){
    151     pthread_t thread;
    152     pthread_create(&thread,NULL,process_in_new_thread_when_accepted,(void*)sock);
    153     pthread_detach(thread);
    154 }
    155 
    156 /**
    157  * 每当有新连接连到server时,就通过libevent调用此函数。
    158  *    每个连接对应一个新线程
    159  */
    160 void on_accept(int sock, short event, void* arg)
    161 {
    162     struct sockaddr_in remote_addr;
    163     int sin_size=sizeof(struct sockaddr_in);
    164     int new_fd = accept(sock,  (struct sockaddr*) &remote_addr, (socklen_t*)&sin_size);
    165     if(new_fd < 0){
    166         cout<<"Accept error in on_accept()"<<endl;
    167         return;
    168     }
    169     cout<<"new_fd accepted is "<<new_fd<<endl;
    170     accept_new_thread(new_fd);
    171     cout<<"on_accept() finished for fd="<<new_fd<<endl;
    172 }
    173 
    174 int main(){
    175     int fd = getSocket();
    176     if(fd<0){
    177         cout<<"Error in main(), fd<0"<<endl;
    178     }
    179     cout<<"main() fd="<<fd<<endl;
    180     //----为服务器主线程绑定ip和port------------------------------
    181     struct sockaddr_in local_addr; //服务器端网络地址结构体
    182     memset(&local_addr,0,sizeof(local_addr)); //数据初始化--清零
    183     local_addr.sin_family=AF_INET; //设置为IP通信
    184     local_addr.sin_addr.s_addr=inet_addr(SERVER_IP);//服务器IP地址
    185     local_addr.sin_port=htons(SERVER_PORT); //服务器端口号
    186     int bind_result = bind(fd, (struct sockaddr*) &local_addr, sizeof(struct sockaddr));
    187     if(bind_result < 0){
    188         cout<<"Bind Error in main()"<<endl;
    189         return -1;
    190     }
    191     cout<<"bind_result="<<bind_result<<endl;
    192     listen(fd, 10);
    193     //-----设置libevent事件,每当socket出现可读事件,就调用on_accept()------------
    194     struct event_base* base = event_base_new();
    195     struct event listen_ev;
    196     event_set(&listen_ev, fd, EV_READ|EV_PERSIST, on_accept, NULL);
    197     event_base_set(base, &listen_ev);
    198     event_add(&listen_ev, NULL);
    199     event_base_dispatch(base);
    200     //------以下语句理论上是不会走到的---------------------------
    201     cout<<"event_base_dispatch() in main() finished"<<endl;
    202     //----销毁资源-------------
    203     event_del(&listen_ev);
    204     event_base_free(base);
    205     cout<<"main() finished"<<endl;
    206 }
    server端的代码

    1)在main()里(运行在主线程中),先设置服务端的socket,然后为主线程生成一个libevent的base,并将一个“读事件”注 册到base上。“读事件”绑定了一个on_accept(),每当client有新连接连过来时,就会触发这个“读事件”,进而调用 on_accept()方法。

    2)在on_accept()里(运行在主线程中),每当有新连接连过来时,就会accept出一个新的new_fd,并调用 accept_new_thread()来创建一个新的子线程。子线程里会调用 process_in_new_thread_when_accepted()方法。

    3)process_in_new_thread_when_accepted()方法里(运行在子线程中),创建一个子线程的base,并创建 一个“读事件”,注册到“子线程的base”上。并调用event_base_dispatch(base)进入libevent的loop中。当发现 new_fd的socket缓冲区中有数据可读时,就触发了这个“读事件”,继而调用on_read()方法。

    4)on_read()方法里(运行在子线程中),从socket缓冲区里读取数据。读完数据之后,将一个“写事件”注册到“子线程的base”上。一旦socket可写,就调用on_write()函数。

    5)on_write()方法(运行在子线程中),对数据进行修改,然后通过socket写回到client端。

    注:其实可以不用注册“写事件”,在on_read()方法中直接修改数据,然后写回到client端也是可以的——但这有个问题。就是如果 socket的写缓冲区是满的,那么这时候 write(sock, buffer, strlen(buffer))会阻塞的。这会导致整个on_read()方法阻塞掉,而无法读到接下来client传过来的数据了。而用了 libevent的”写事件“之后,虽然 write(sock, buffer, strlen(buffer))仍然会阻塞,但如果write操作是在另外的线程里,就不会影响on_read()函数里的流程了。(本例中write操 作没有在另外的线程里,所以注不注册“写事件”效果都一样)。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多