分享

C# 实现的多线程异步Socket数据包接收器框架

 glxym 2015-05-15
几天前在博问中看到一个C# Socket问题,就想到笔者2004年做的一个省级交通流量接收服务器项目,当时的基本求如下:

  • 接收自动观测设备通过无线网卡、Internet和Socket上报的交通量数据包
  • 全年365*24运行的自动观测设备5分钟上报一次观测数据,每笔记录约2K大小
  • 规划全省将有100个左右的自动观测设备(截止2008年10月还只有30个)

      当时,VS2003才发布年多,笔者也是接触C#不久。于是Google了国内国外网,希望找点应用C#解决Socket通信问题的思路和代码。最后,找到了两篇帮助最大的文章:一篇是国人写的Socket接收器框架,应用了独立的客户端Socket会话(Session)概念,给笔者提供了一个接收服务器的总体框架思路;另一篇是美国人写的,提出了多线程、分段接收数据包的技术方案,描述了多线程、异步Socket的许多实现细节,该文坚定了笔者采用多线程和异步方式处理Socket接收器的技术路线。

    具体实现和测试时笔者还发现,在Internet环境下的Socket应用中,需要系统有极强的容错能力:没有办法控制异常,就必须允许它们存在(附加源代码中可以看到,try{}catch{}语句较多)。对此,笔者设计了一个专门的检查和清理线程,完成无效或超时会话的清除和资源释放工作。

    依稀记得,国内框架作者的名称空间有ibm,认为是IBM公司职员,通过邮件后才知道其人在深圳。笔者向他请教了几个问题,相互探讨了几个技术关键点。可惜,现在再去找,已经查不到原文和邮件了。只好借此机会,将本文献给这两个素未谋面的技术高人和同行,也盼望拙文或源码能给读者一点有用的启发和帮助。

1、主要技术思路


    整个系统由三个核心线程组成,并由.NET线程池统一管理:


侦听客户端连接请求线程:ListenClientRequest(),循环侦听客户端连接请求。如果有,检测该客户端IP,看是否是同一观测设备,然后建立一个客户端TSession对象,并通过Socket异步调用方法BeginReceive()接收数据包、EndReceive()处理数据包 
数据包处理线程:HandleDatagrams(),循环检测数据包队列_datagramQueue,完成数据包解析、判断类型、存储等工作 
客户端状态检测线程:CheckClientState(),循环检查客户端会话表_sessionTable,判断会话对象是否有效,设置超时会话关闭标志,清楚无效会话对象及释放其资源 

2、主要类简介

    系统主要由3个类组成:

TDatagramReceiver(数据包接收服务器):系统的核心进程类,建立Socket连接、处理与存储数据包、清理系统资源,该类提供全部的public属性和方法 
TSession(客户端会话):由每个客户端的Socket对象组成,有自己的数据缓冲区,清理线程根据该对象的最近会话时间判断是否超时 
TDatagram(数据包类):判断数据包类别、解析数据包

3、关键函数和代码

    下面简介核心类TDatagramReceiver的关键实现代码。

3.1  系统启动

      系统启动方法StartReceiver()首先清理资源、创建数据库连接、初始化若干计数值,然后创建服务器端侦听Socket对象,最后调用静态方法ThreadPool.QueueUserWorkItem()在线程池中创建3个核心处理线程。

Code
  1. /// <summary>
  2. ///  启动接收器
  3. /// </summary>
  4. public bool StartReceiver()
  5. {
  6.     try
  7.     {
  8.         _stopReceiver = true;

  9.         this.Close();

  10.         if (!this.ConnectDatabase()) return false;

  11.         _clientCount = 0;
  12.         _datagramQueueCount = 0;
  13.         _datagramCount = 0;
  14.         _errorDatagramCount = 0;
  15.         _exceptionCount = 0;

  16.         _sessionTable = new Hashtable(_maxAllowClientCount);
  17.         _datagramQueue = new Queue<TDatagram>(_maxAllowDatagramQueueCount);

  18.         _stopReceiver = false;  // 循环中均要该标志

  19.         if (!this.CreateReceiverSocket())  //建立服务器端 Socket 对象
  20.         {
  21.             return false;
  22.         }

  23.         // 侦听客户端连接请求线程, 使用委托推断, 不建 CallBack 对象
  24.         if (!ThreadPool.QueueUserWorkItem(ListenClientRequest))
  25.         {
  26.             return false;
  27.         }

  28.         // 处理数据包队列线程
  29.         if (!ThreadPool.QueueUserWorkItem(HandleDatagrams))
  30.         {
  31.             return false;
  32.         }

  33.         // 检查客户会话状态, 长时间未通信则清除该对象
  34.         if (!ThreadPool.QueueUserWorkItem(CheckClientState))
  35.         {
  36.             return false;
  37.         }

  38.         _stopConnectRequest = false;  // 启动接收器,则自动允许连接
  39.     }
  40.     catch
  41.     {
  42.         this.OnReceiverException();
  43.         _stopReceiver = true;
  44.     }
  45.     return !_stopReceiver;
  46. }
复制代码
下面是创建侦听Socket对象的方法代码。

Code
  1. /// <summary>
  2. /// 创建接收服务器的 Socket, 并侦听客户端连接请求
  3. /// </summary>
  4. private bool CreateReceiverSocket()
  5. {
  6.     try
  7.     {
  8.         _receiverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  9.         _receiverSocket.Bind(new IPEndPoint(IPAddress.Any, _tcpSocketPort));  // 绑定端口
  10.         _receiverSocket.Listen(_maxAllowListenQueueLength);  // 开始监听

  11.         return true;
  12.     }
  13.     catch
  14.     {
  15.         this.OnReceiverException();
  16.         return false;
  17.     }
  18. }
复制代码
3.2  侦听客户端连接请求

      服务器端循环等待客户端连接请求。一旦有请求,先判断客户端连接数是否超限,接着检测该客户端IP地址,一切正常后建立TSession对象,并调用异步方法接收客户端Socket数据包。

      代码中,Socket读到数据时的回调AsyncCallback委托方法EndReceiveData()完成数据接收工作,正常情况下启动另一个异步BeginReceive()调用。

      .NET中,每个异步方法都有自己的独立线程,异步处理其实也基于多线程机制的。下面代码中的异步套异步调用,既占用较大的系统资源,也给处理带来意想不到的结果,更是出现异常时难以控制和处理的关键所在。

Code
  1. /// <summary>
  2. /// 循环侦听客户端请求,由于要用线程池,故带一个参数
  3. /// </summary>
  4. private void ListenClientRequest(object state)
  5. {
  6.     Socket client = null;
  7.     while (!_stopReceiver)
  8.     {
  9.         if (_stopConnectRequest)  //  停止客户端连接请求
  10.         {
  11.             if (_receiverSocket != null)
  12.             {
  13.                 try
  14.                 {
  15.                     _receiverSocket.Close();  // 强制关闭接收器
  16.                 }
  17.                 catch
  18.                 {
  19.                     this.OnReceiverException();
  20.                 }
  21.                 finally
  22.                 {
  23.                     // 必须为 null,否则 disposed 对象仍然存在,将引发下面的错误
  24.                     _receiverSocket = null;
  25.                 }
  26.             }
  27.             continue;
  28.         }
  29.         else
  30.         {
  31.             if (_receiverSocket == null)
  32.             {
  33.                 if (!this.CreateReceiverSocket())
  34.                 {
  35.                     continue;
  36.                 }
  37.             }
  38.         }

  39.         try
  40.         {
  41.             if (_receiverSocket.Poll(_loopWaitTime, SelectMode.SelectRead))
  42.             {
  43.                 // 频繁关闭、启动时,这里容易产生错误(提示套接字只能有一个)
  44.                 client = _receiverSocket.Accept();

  45.                 if (client != null && client.Connected)
  46.                 {
  47.                     if (this._clientCount >= this._maxAllowClientCount)
  48.                     {
  49.                         this.OnReceiverException();

  50.                         try
  51.                         {
  52.                             client.Shutdown(SocketShutdown.Both);
  53.                             client.Close();
  54.                         }
  55.                         catch { }
  56.                     }
  57.                     else if (CheckSameClientIP(client))  // 已存在该 IP 地址
  58.                     {
  59.                         try
  60.                         {
  61.                             client.Shutdown(SocketShutdown.Both);
  62.                             client.Close();
  63.                         }
  64.                         catch { }
  65.                     }
  66.                     else
  67.                     {
  68.                         TSession session = new TSession(client);
  69.                         session.LoginTime = DateTime.Now;

  70.                         lock (_sessionTable)
  71.                         {
  72.                             int preSessionID = session.ID;
  73.                             while (true)
  74.                             {
  75.                                 if (_sessionTable.ContainsKey(session.ID))  // 有可能重复该编号
  76.                                 {
  77.                                     session.ID = 100000 + preSessionID;
  78.                                 }
  79.                                 else
  80.                                 {
  81.                                     break;
  82.                                 }
  83.                             }
  84.                             _sessionTable.Add(session.ID, session);  // 登记该会话客户端
  85.                             Interlocked.Increment(ref _clientCount);
  86.                         }

  87.                         this.OnClientRequest();

  88.                         try  // 客户端连续连接或连接后立即断开,易在该处产生错误,系统忽略之
  89.                         {
  90.                             // 开始接受来自该客户端的数据
  91.                             session.ClientSocket.BeginReceive(session.ReceiveBuffer, 0, 
  92.                                 session.ReceiveBufferLength, SocketFlags.None, EndReceiveData, session);
  93.                         }
  94.                         catch
  95.                         {
  96.                             session.DisconnectType = TDisconnectType.Exception;
  97.                             session.State = TSessionState.NoReply;
  98.                         }
  99.                     }
  100.                 }
  101.                 else if (client != null)  // 非空,但没有连接(connected is false)
  102.                 {
  103.                     try
  104.                     {
  105.                         client.Shutdown(SocketShutdown.Both);
  106.                         client.Close();
  107.                     }
  108.                     catch { }
  109.                 }
  110.             }
  111.         }
  112.         catch
  113.         {
  114.             this.OnReceiverException();

  115.             if (client != null)
  116.             {
  117.                 try
  118.                 {
  119.                     client.Shutdown(SocketShutdown.Both);
  120.                     client.Close();
  121.                 }
  122.                 catch { }
  123.             }
  124.         }
  125.         // 该处可以适当暂停若干毫秒
  126.     }
  127.     // 该处可以适当暂停若干毫秒
  128. }

  129. /// <summary>
  130. /// 循环侦听客户端请求,由于要用线程池,故带一个参数
  131. /// </summary>
  132. private void ListenClientRequest(object state)
  133. {
  134.     Socket client = null;
  135.     while (!_stopReceiver)
  136.     {
  137.         if (_stopConnectRequest)  //  停止客户端连接请求
  138.         {
  139.             if (_receiverSocket != null)
  140.             {
  141.                 try
  142.                 {
  143.                     _receiverSocket.Close();  // 强制关闭接收器
  144.                 }
  145.                 catch
  146.                 {
  147.                     this.OnReceiverException();
  148.                 }
  149.                 finally
  150.                 {
  151.                     // 必须为 null,否则 disposed 对象仍然存在,将引发下面的错误
  152.                     _receiverSocket = null;
  153.                 }
  154.             }
  155.             continue;
  156.         }
  157.         else
  158.         {
  159.             if (_receiverSocket == null)
  160.             {
  161.                 if (!CreateReceiverSocket())
  162.                 {
  163.                     continue;
  164.                 }
  165.             }
  166.         }

  167.         try
  168.         {
  169.             if (_receiverSocket.Poll(_loopWaitTime, SelectMode.SelectRead))
  170.             {
  171.                 // 频繁关闭、启动时,这里容易产生错误(提示套接字只能有一个)
  172.                 client = _receiverSocket.Accept();

  173.                 if (client != null && client.Connected)
  174.                 {
  175.                     if (this._clientCount >= this._maxAllowClientCount)
  176.                     {
  177.                         this.OnReceiverException();

  178.                         try
  179.                         {
  180.                             client.Shutdown(SocketShutdown.Both);
  181.                             client.Close();
  182.                         }
  183.                         catch { }
  184.                     }
  185.                     else if (CheckSameClientIP(client))  // 已存在该 IP 地址
  186.                     {
  187.                         try
  188.                         {
  189.                             client.Shutdown(SocketShutdown.Both);
  190.                             client.Close();
  191.                         }
  192.                         catch { }
  193.                     }
  194.                     else
  195.                     {
  196.                         TSession session = new TSession(client);
  197.                         session.LoginTime = DateTime.Now;

  198.                         lock (_sessionTable)
  199.                         {
  200.                             int preSessionID = session.ID;
  201.                             while (true)
  202.                             {
  203.                                 if (_sessionTable.ContainsKey(session.ID))  // 有可能重复该编号
  204.                                 {
  205.                                     session.ID = 100000 + preSessionID;
  206.                                 }
  207.                                 else
  208.                                 {
  209.                                     break;
  210.                                 }
  211.                             }
  212.                             _sessionTable.Add(session.ID, session);  // 登记该会话客户端
  213.                             Interlocked.Increment(ref _clientCount);
  214.                         }

  215.                         OnClientRequest();

  216.                         try  // 客户端连续连接或连接后立即断开,易在该处产生错误,系统忽略之
  217.                         {
  218.                             // 开始接受来自该客户端的数据
  219.                             session.ClientSocket.BeginReceive(session.ReceiveBuffer, 0, 
  220.                                 session.ReceiveBufferLength, SocketFlags.None, EndReceiveData, session);
  221.                         }
  222.                         catch
  223.                         {
  224.                             session.DisconnectType = TDisconnectType.Exception;
  225.                             session.State = TSessionState.NoReply;
  226.                         }
  227.                     }
  228.                 }
  229.                 else if (client != null)  // 非空,但没有连接(connected is false)
  230.                 {
  231.                     try
  232.                     {
  233.                         client.Shutdown(SocketShutdown.Both);
  234.                         client.Close();
  235.                     }
  236.                     catch { }
  237.                 }
  238.             }
  239.         }
  240.         catch
  241.         {
  242.             this.OnReceiverException();

  243.             if (client != null)
  244.             {
  245.                 try
  246.                 {
  247.                     client.Shutdown(SocketShutdown.Both);
  248.                     client.Close();
  249.                 }
  250.                 catch { }
  251.             }
  252.         }
  253.         // 该处可以适当暂停若干毫秒
  254.     }
  255.     // 该处可以适当暂停若干毫秒
  256. }
复制代码
3.3  处理数据包


      该线程循环查看数据包队列,完成数据包的解析与存储等工作。具体实现时,如果队列中没有数据包,可以考虑等待若干毫秒,提高CPU利用率。

Code
  1. private void HandleDatagrams(object state)
  2. {
  3.     while (!_stopReceiver)
  4.     {
  5.         this.HandleOneDatagram();  // 处理一个数据包

  6.         if (!_stopReceiver)
  7.         {
  8.             // 如果连接关闭,则重新建立,可容许几个连接错误出现
  9.             if (_sqlConnection.State == ConnectionState.Closed)
  10.             {
  11.                 this.OnReceiverWork();

  12.                 try
  13.                 {
  14.                     _sqlConnection.Open();
  15.                 }
  16.                 catch
  17.                 {
  18.                     this.OnReceiverException();
  19.                 }
  20.             }
  21.         }
  22.     }
  23. }

  24. /// <summary>
  25. /// 处理一个包数据,包括:验证、存储
  26. /// </summary>
  27. private void HandleOneDatagram()
  28. {
  29.     TDatagram datagram = null;

  30.     lock (_datagramQueue)
  31.     {
  32.         if (_datagramQueue.Count > 0)
  33.         {
  34.             datagram = _datagramQueue.Dequeue();  // 取队列数据
  35.             Interlocked.Decrement(ref _datagramQueueCount);
  36.         }
  37.     }

  38.     if (datagram == null) return;

  39.     datagram.Clear();
  40.     datagram = null;  // 释放对象
  41. }
复制代码
3.4  检查与清理会话

      本线程负责处理建立连接后的客户端会话TSession或Socket对象的关闭与资源清理工作,其它方法中出现异常等情况,尽可能标记相关TSession对象的属性NoReply=true,表示该会话已经无效、需要清理。 

      检查会话队列并清理资源分3步:第一步,Shutdown()客户端Socket,此时可能立即触发某些Socket的异步方法EndReceive();第二步,Close()客户端Socket,释放占用资源;第三步,从会话表中清除该会话对象。其中,第一步完成后,某个TSession也许不会立即到第二步,因为可能需要处理其异步结束方法。

      需要指出, 由于涉及多线程处理,需要频繁加解锁操作,清理工作前先建立一个会话队列列副本sessionTable2,检查与清理该队副本列列的TSession对象。

Code
  1. /// <summary>
  2. /// 检查客户端状态(扫描方式,若长时间无数据,则断开)
  3. /// </summary>
  4. private void CheckClientState(object state)
  5. {
  6.     while (!_stopReceiver)
  7.     {
  8.         DateTime thisTime = DateTime.Now;

  9.         // 建立一个副本 ,然后对副本进行操作
  10.         Hashtable sessionTable2 = new Hashtable();
  11.         lock (_sessionTable)
  12.         {
  13.             foreach (TSession session in _sessionTable.Values)
  14.             {
  15.                 if (session != null)
  16.                 {
  17.                     sessionTable2.Add(session.ID, session);
  18.                 }
  19.             }
  20.         }

  21.         foreach (TSession session in sessionTable2.Values)  // 对副本进行操作
  22.         {
  23.             Monitor.Enter(session);
  24.             try
  25.             {
  26.                 if (session.State == TSessionState.NoReply)  // 分三步清除一个 Session
  27.                 {
  28.                     session.State = TSessionState.Closing;
  29.                     if (session.ClientSocket != null)
  30.                     {
  31.                         try
  32.                         {
  33.                             // 第一步:shutdown
  34.                             session.ClientSocket.Shutdown(SocketShutdown.Both);
  35.                         }
  36.                         catch { }
  37.                     }
  38.                 }
  39.                 else if (session.State == TSessionState.Closing)
  40.                 {
  41.                     session.State = TSessionState.Closed;
  42.                     if (session.ClientSocket != null)
  43.                     {
  44.                         try
  45.                         {
  46.                             // 第二步: Close
  47.                             session.ClientSocket.Close();
  48.                         }
  49.                         catch { }
  50.                     }
  51.                 }
  52.                 else if (session.State == TSessionState.Closed)
  53.                 {

  54.                     lock (_sessionTable)
  55.                     {
  56.                         // 第三步:remove from table
  57.                         _sessionTable.Remove(session.ID);
  58.                         Interlocked.Decrement(ref _clientCount);
  59.                     }

  60.                     this.OnClientRequest();
  61.                     session.Clear();  // 清空缓冲区
  62.                 }
  63.                 else if (session.State == TSessionState.Normal)  // 正常的会话 
  64.                 {

  65.                     TimeSpan ts = thisTime.Subtract(session.LastDataReceivedTime);
  66.                     if (Math.Abs(ts.TotalSeconds) > _maxSocketDataTimeout)  // 超时,则准备断开连接
  67.                     {
  68.                         session.DisconnectType = TDisconnectType.Timeout;
  69.                         session.State = TSessionState.NoReply;  // 标记为将关闭、准备断开
  70.                     }
  71.                 }
  72.             }
  73.             finally
  74.             {
  75.                 Monitor.Exit(session);
  76.             }
  77.         }  // end foreach

  78.         sessionTable2.Clear();
  79.     }  // end while
  80. }
复制代码
4 、结语

    基于多线程处理的系统代价是比较大的,需要经常调用加/解锁方法lock()或Monitor.Enter(),需要经常创建处理线程等。从实际运行效果看,笔者的实现方案有较好的稳定性:2005年4月到5月间,在一个普通PC机器上连续运行30多天不出一点故障。同时,笔者采用了时序区间判重等算法,有效地提高了系统处理与响应速度。测试表明,在普通的PC机器(P4 2.0)上,可以做到0.5秒处理一个数据包,如果优化代码和服务器,还有较大的性能提升空间。

    上面的代码是笔者实现的省级公路交通流量数据服务中心(DSC)项目中的接收服务器框架部分,整个系统还包括:数据转发交通部的转发服务器、数据远程查询客户端、综合报表数据处理系统、数据在线发布系统、系统运行监控系统等。

    实际的接收服务器类及其辅助类超过3K行,整个系统则超过了60K。因为是早期实现的程序,难免有代码粗糙、方法欠妥的感觉,只有留待下个版本完善扩充了。由于与甲方有保密合同和版权保护等,不可能公开全部源代码,删减也有不当之处,读者发现时请不吝指正。下面是带详细注释的代码下载URL。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多