分享

基于curl的异步http实现

 wtkc 2015-03-03

简述用于windows客户端的一个异步http模块的实现

1.需要实现的feature

1.1 很容易地发起异步http请求,然后回调。
1.2 能够管理http并发数。
1.3 能够支持http超时:不依赖于curl中实现的连接超时及其它超时。
1.4 请求可以取消。


2.参与者和简要分析:
Manager:接收http请求,调用curl。
Request:封装http请求。
Response:封装http回应。


线程模型:
这里实现异步一般会开线程,假定有一个UI(主)线程,可能有这些模式:
Manager在UI线程中管理若干个工作线程,curl_easy接口。
Manager在自己新起的http线程中管理若干个工作线程,curl_easy接口。
Manager在自己新起的http线程中调用curl_multi接口。


Manager在UI线程中调用curl_multi接口不合适,因为需要占用UI线程时间去select。
开多个work线程浪费资源,线程管理难度大,线程并发度的减少不会造成性能瓶颈,
因为主要耗时在网络IO上。


所以选定模型是:开一个http线程,在上面调用curl_multi接口。


解决回调问题:
这里会遇到两个问题,一个是回调的线程问题:在http中检测到IO完成,
如果直接在http中进行回调,会使得使用者要考虑多线程使用问题,回调中可能的崩溃,
耗时操作会影响异步http模块性能。另一个是回调对象的生命周期问题:如果回调
到对象的成员函数,在回调时有可能该对象已经析构。


考虑到这些,使用了chromium的线程模型:chromium中的base模块提供了几个抽象
层次的线程调度接口。其中一个层次是:知道对方线程的标识符,即可以向对应的线程
派发任务。


于是设计要求调用者是在某个被管理的线程上,那么在发起请求后,在Manager的SendRequest
方法中可以检测到主调线程的MessageLoopProxy,将该代理作为请求的一部分放到请求
队列中。在请求完成后,将response和SendRequest中的callback对象绑定到一起作为
一个任务由MessageLoopProxy派发,这样就回到调用者所在线程上了。


在此同时,对象的生命周期问题也解决,base中有弱指针。


回调接口被设计为void XXX::CallBack(WeakPtr<XXX> ptr, scoped_refptr<Response> ptr);
在绑定到当前对象的弱引用时得到一个签名为 void (scoped_refptr<Response>) 的Runnable Object。
这个签名的对象才可以交给Manager。
在IO完成时,再将这个Runnable Object的参数绑定到对应的Response上,就得到签名为 void (void)的
Runnable Object,同时也是线程池能派发的对象。


在这里引入chromium的线程池,加上了调用者的约束,解决回调问题。


3.参与者设计
Request设计:
包含一堆参数,比如url,HTTP_VERB,HTTP头操作,HTTP的BODY操作,超时设置。
为了简化使用,可以提供一个MakeGetRequest的函数,用于生成一个不需要那么多复杂设置的Request。


Reponse设计:
包含自己定义的错误码,curl的http错误码,http状态码,response body buffer,
response header,原来的url,甚至一个void* user的不透明指针。


Manager设计:
作为单件存在,拥有初始化和反初始化接口。这样,Manager得和所在模块的生命周期
绑定在一起,在合适的地方初始化和反初始化。


在初始化时内部创建http工作线程,整个模块处于就绪状态。


请求队列是少不了的,在访问时注意互斥。
为每个请求分配一个handle,交给调用者。
维护一个当前工作队列,即:加到CURLM中的所有请求。


可以预见http就是不断循环地处理一些事件。


如何及时响应添加的请求。
如何及时退出。
如何处理超时。
如何避免轮询。


如果每次循环都需要主动检测请求队列,可能比较低效。因为一方面肯定要从
curl的select中退出,然后去检测请求队列,而检测时可能发现没有请求。另外一方面
访问队列得加锁。


一般而言,这个问题的解决方案是用个Event。
如果这里用Event,那么退出,删除操作是不是也得有个Event。另外Event的自动切换状态
或手动切换状态会不会切出问题。Event的信息量是1,处理多个请求足够吗。
其实Event的信息量处理多个请求是够的,只要添加请求就触发一次事件能保证work的。


然而在这里不使用Event解决这个问题,使用windows消息,为添加,删除,结束分别定义消息。
用一个状态变量记录上次处理请求队列时是否有未处理的请求,否则需要加锁地读请求队列状态。
当状态变量为true时,请求队列一定非空。当状态变量为false时,请求队列可能是空,也可以有
请求,这说明在上一次读请求队列后请求队列中又加了请求,会通过消息来唤起当前线程。


先来看IO循环,在这些条件下应该退出IO循环:
1.still running的handle数为0,不需要进入IO循环。
2.still running的handle数目发生变化时应该退出IO循环,这样,外部就有可能处理IO完成。
3.still running数没有达到最大,且有未处理请求时。
4.有消息时应该退出IO循环,这个不用说。


这里select是在socket上等待,而其它4个退出条件需要轮询,所以在实现上有点违背curl_multi
的设计初衷。如果每次都额外select一个socket,在发线程消息时,往这个socket发点数据,
就可以将对线程消息的检测放到select中来。前三个退出条件和当前的still running的handle数
相关,可以在select前检测一次。select退出前still running的handle数不会发生变化,所以
可以放心select同时保持敏感。不过这里没有再创建socket了,还是用轮询吧。


在IO循环外是http线程的主循环,主循环可能干这些事:
A.处理IO完成。
B.处理消息。
C.在没有任务时放弃自己的线程执行,通过GetMessage进入休眠。
D.请求队列非空,且当前执行的handle没有达到最大时,需要处理请求队列。
E.设置TIMER(需要建立一个消息窗口),用于检测超时。消息时间是最先可能造时的请求超时的时间。


这些事如何安排?
最开始两个是事件派发类:B和C是同一类逻辑,一量地B或C需要处理,即使进入了IO循环也会
马上退出。
接着是D,检测一下是否能进入GetMessage的调用状态,如果能,则一直等待有消息,重新进入BC的逻辑。
然后是A,表示没有外部的事要处理,专心做IO吧。

而E,在任何可能影响超时时间的后面都加一个。


  1. for (;;)  
  2. {  
  3.     if (m_UnHandleRequest > 0 && m_RunningRequest < MAX_RUNNING_REQUEST)  
  4.     {  
  5.         HandleQueueingRequest();  
  6.         ModifyTimer();  
  7.     }  
  8.     while (::PeekMessage())  
  9.     {  
  10.         ProcessMessage();  
  11.         if (m_QuitFlag)  
  12.         {  
  13.             ClearResource();  
  14.             break;  
  15.         }  
  16.     }  
  17.     if (m_QuitFlag)  
  18.     {  
  19.         break;  
  20.     }  
  21.     ModifyTimer();  
  22.       
  23.     if (!HaveRequestAndIO())  
  24.     {  
  25.         GetMessage();  
  26.         ProcessMessage();  
  27.         if (m_QuitFlag)  
  28.         {  
  29.             ClearResource();  
  30.             break;  
  31.         }  
  32.     }  
  33.     if (m_QuitFlag)  
  34.     {  
  35.         break;  
  36.     }  
  37.     ModifyTimer();  
  38.       
  39.     IOLoop();  
  40.     IOComplete();  
  41.     ModifyTimer();  
  42. }  
  43.   
  44. void ProcessMessage()  
  45. {  
  46.     case 退出:m_QuitFlag = 1; break;  
  47.     case 取消Request:HandleCanceledRequest(); break;  
  48.     case 添加Request:HandleQueueingRequest();break;  
  49. }  
  50.   
  51. void HandleCanceledRequest()  
  52. {  
  53.     在请求队列中,直接remove掉。  
  54.     在running中,对应的remove操作。  
  55.     // bad case,在处理请求时,完成的task已经进入调用者线程的task队列  
  56. }  
  57.   
  58. void HandleQueueingRequest()  
  59. {  
  60.     将请求队列中的请求放到Running队列中。  
  61.     调用curl对应接口开始处理对应的请求。  
  62.     更新m_UnHandleRequest。  
  63. }  
  64.   
  65. void ModifyTimer()  
  66. {  
  67.     计算最近超时,如果存在则更新timer。  
  68. }  
  69.   
  70. void IOLoop()  
  71. {  
  72.     int m_OldRunningRequest = m_RunningRequest;  
  73.     while (m_RunningRequest)  
  74.     {  
  75.         if (m_UnHandleRequest > 0 && m_RunningRequest < MAX_RUNNING_REQUEST)  
  76.         {  
  77.             break;  
  78.         }  
  79.         if (PeekMessage())  
  80.         {  
  81.             break;  
  82.         }  
  83.         if (m_OldRunningRequest != m_RunningRequest)  
  84.         {  
  85.             break;  
  86.         }  
  87.           
  88.         curl_multi_timeout();  
  89.         修改超时时间,如果超时时间超过100毫秒。  
  90.           
  91.         如果超时时间是0,需要curl_multi_perform一次。  
  92.         curl_multi_fdset();  
  93.         select();  
  94.     }  
  95. }  




    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多