配色: 字号:
C#高性能TCP服务的多种实现方式
2016-09-08 | 阅:  转:  |  分享 
  
C#高性能TCP服务的多种实现方式

哎~~想想大部分园友应该对"高性能"字样更感兴趣,为了吸引眼球所以标题中一定要突出,其实我更喜欢的标题是《猴赛雷,C#编写TCP服务的花样姿势!》。



本篇文章的主旨是使用.NET/C#实现TCP高性能服务的不同方式,包括但不限于如下内容:



APM方式,即AsynchronousProgrammingModel

TAP方式,即Task-basedAsynchronousPattern

SAEA方式,即SocketAsyncEventArgs

RIO方式,即RegisteredI/O

在.NET/C#中对于Socket的支持均是基于WindowsI/OCompletionPorts完成端口技术的封装,通过不同的Non-Blocking封装结构来满足不同的编程需求。以上方式均已在Cowboy.Sockets中有完整实现,并且APM和TAP方式已经在实际项目中应用。Cowboy.Sockets还在不断的进化和完善中,如有任何问题请及时指正。



虽然有这么多种实现方式,但抽象的看,它们是一样一样的,用两个Loop即可描述:AcceptLoop和ReadLoop,如下图所示。(这里提及的"Loop"指的是一种循环方式,而非特指while/for等关键字。)







在任何TCPServer的实现中,一定存在一个AcceptSocketLoop,用于接收Client端的Connect请求以建立TCPConnection。

在任何TCPServer的实现中,一定存在一个ReadSocketLoop,用于接收Client端Write过来的数据。

如果Accept循环阻塞,则会导致无法快速的建立连接,服务端PendingBacklog满,进而导致Client端收到ConnectTimeout的异常。如果Read循环阻塞,则显然会导致无法及时收到Client端发过来的数据,进而导致Client端SendBuffer满,无法再发送数据。



从实现细节的角度看,能够导致服务阻塞的位置可能在:



Accept到新的Socket,构建新的Connection需要分配各种资源,分配资源慢;

Accept到新的Socket,没有及时触发下一次Accept;

Read到新的Buffer,判定Payload消息长度,判定过程长;

Read到新的Buffer,发现Payload还没有收全,继续Read,则"可能"会导致一次BufferCopy;

Payload接收完毕,进行De-Serialization转成可识别的ProtocolMessage,反序列化慢;

由BusinessModule来处理相应的ProtocolMessage,处理过程慢;

1-2涉及到Accept过程和Connection的建立过程,3-4涉及到ReceiveBuffer的处理过程,5-6涉及到应用逻辑侧的实现。



Java中著名的Netty网络库从4.0版本开始对于Buffer部分做了全新的尝试,采用了名叫ByteBuf的设计,实现BufferZeroCopy以减少高并发条件下Buffer拷贝带来的性能损失和GC压力。DotNetty,Orleans,Helios等项目正在尝试在C#中进行类似的ByteBuf的实现。



APM方式:TcpSocketServer

TcpSocketServer的实现是基于.NETFramework自带的TcpListener和TcpClient的更进一步的封装,采用基于APM的BeginXXX和EndXXX接口实现。



TcpSocketServer中的AcceptLoop指的就是,



BeginAccept->EndAccept->BeginAccept->EndAccept->BeginAccept->...

每一个建立成功的Connection由TcpSocketSession来处理,所以TcpSocketSession中会包含ReadLoop,



BeginRead->EndRead->BeginRead->EndRead->BeginRead->...

TcpSocketServer通过暴露Event来实现Connection的建立与断开和数据接收的通知。



eventEventHandlerClientConnected;

eventEventHandlerClientDisconnected;

eventEventHandlerClientDataReceived;

使用也是简单直接,直接订阅事件通知。



复制代码

privatestaticvoidStartServer()

{

_server=newTcpSocketServer(22222);

_server.ClientConnected+=server_ClientConnected;

_server.ClientDisconnected+=server_ClientDisconnected;

_server.ClientDataReceived+=server_ClientDataReceived;

_server.Listen();

}



staticvoidserver_ClientConnected(objectsender,TcpClientConnectedEventArgse)

{

Console.WriteLine(string.Format("TCPclient{0}hasconnected{1}.",e.Session.RemoteEndPoint,e.Session));

}



staticvoidserver_ClientDisconnected(objectsender,TcpClientDisconnectedEventArgse)

{

Console.WriteLine(string.Format("TCPclient{0}hasdisconnected.",e.Session));

}



staticvoidserver_ClientDataReceived(objectsender,TcpClientDataReceivedEventArgse)

{

vartext=Encoding.UTF8.GetString(e.Data,e.DataOffset,e.DataLength);

Console.Write(string.Format("Client:{0}{1}-->",e.Session.RemoteEndPoint,e.Session));

Console.WriteLine(string.Format("{0}",text));

_server.Broadcast(Encoding.UTF8.GetBytes(text));

}

复制代码

TAP方式:AsyncTcpSocketServer

AsyncTcpSocketServer的实现是基于.NETFramework自带的TcpListener和TcpClient的更进一步的封装,采用基于TAP的async/await的XXXAsync接口实现。



然而,实际上XXXAsync并没有创建什么神奇的效果,其内部实现只是将APM的方法转换成了TAP的调用方式。



复制代码

//Task-basedasyncpublicmethods

[HostProtection(ExternalThreading=true)]

publicTaskAcceptSocketAsync()

{

returnTask.Factory.FromAsync(BeginAcceptSocket,EndAcceptSocket,null);

}



[HostProtection(ExternalThreading=true)]

publicTaskAcceptTcpClientAsync()

{

returnTask.Factory.FromAsync(BeginAcceptTcpClient,EndAcceptTcpClient,null);

}

复制代码

AsyncTcpSocketServer中的AcceptLoop指的就是,



while(IsListening)

{

vartcpClient=await_listener.AcceptTcpClientAsync();

}

每一个建立成功的Connection由AsyncTcpSocketSession来处理,所以AsyncTcpSocketSession中会包含ReadLoop,



while(State==TcpSocketConnectionState.Connected)

{

intreceiveCount=await_stream.ReadAsync(_receiveBuffer,0,_receiveBuffer.Length);

}

为了将async/await异步到底,AsyncTcpSocketServer所暴露的接口也同样是Awaitable的。



复制代码

publicinterfaceIAsyncTcpSocketServerMessageDispatcher

{

TaskOnSessionStarted(AsyncTcpSocketSessionsession);

TaskOnSessionDataReceived(AsyncTcpSocketSessionsession,byte[]data,intoffset,intcount);

TaskOnSessionClosed(AsyncTcpSocketSessionsession);

}

复制代码

使用时仅需将一个实现了该接口的对象注入到AsyncTcpSocketServer的构造函数中即可。



复制代码

publicclassSimpleMessageDispatcher:IAsyncTcpSocketServerMessageDispatcher

{

publicasyncTaskOnSessionStarted(AsyncTcpSocketSessionsession)

{

Console.WriteLine(string.Format("TCPsession{0}hasconnected{1}.",session.RemoteEndPoint,session));

awaitTask.CompletedTask;

}



publicasyncTaskOnSessionDataReceived(AsyncTcpSocketSessionsession,byte[]data,intoffset,intcount)

{

vartext=Encoding.UTF8.GetString(data,offset,count);

Console.Write(string.Format("Client:{0}-->",session.RemoteEndPoint));

Console.WriteLine(string.Format("{0}",text));



awaitsession.SendAsync(Encoding.UTF8.GetBytes(text));

}



publicasyncTaskOnSessionClosed(AsyncTcpSocketSessionsession)

{

Console.WriteLine(string.Format("TCPsession{0}hasdisconnected.",session));

awaitTask.CompletedTask;

}

}

复制代码

当然,对于接口的实现也不是强制了,也可以在构造函数中直接注入方法的实现。



复制代码

publicAsyncTcpSocketServer(

IPEndPointlistenedEndPoint,

FunconSessionDataReceived=null,

FunconSessionStarted=null,

FunconSessionClosed=null,

AsyncTcpSocketServerConfigurationconfiguration=null)

{}

复制代码

SAEA方式:TcpSocketSaeaServer

SAEA是SocketAsyncEventArgs的简写。SocketAsyncEventArgs是.NETFramework3.5开始支持的一种支持高性能Socket通信的实现。SocketAsyncEventArgs相比于APM方式的主要优点可以描述如下:



Themainfeatureoftheseenhancementsistheavoidanceoftherepeatedallocationandsynchronizationofobjectsduringhigh-volumeasynchronoussocketI/O.TheBegin/EnddesignpatterncurrentlyimplementedbytheSocketclassforasynchronoussocketI/OrequiresaSystem.IAsyncResultobjectbeallocatedforeachasynchronoussocketoperation.



也就是说,优点就是无需为每次调用都生成IAsyncResult等对象,向原生Socket更靠近一些。



使用SocketAsyncEventArgs的推荐步骤如下:



AllocateanewSocketAsyncEventArgscontextobject,orgetafreeonefromanapplicationpool.

Setpropertiesonthecontextobjecttotheoperationabouttobeperformed(thecallbackdelegatemethodanddatabuffer,forexample).

Calltheappropriatesocketmethod(xxxAsync)toinitiatetheasynchronousoperation.

Iftheasynchronoussocketmethod(xxxAsync)returnstrueinthecallback,querythecontextpropertieswww.wang027.comforcompletionstatus.

Iftheasynchronoussocketmethod(xxxAsync)returnsfalseinthecallback,theoperationcompletedsynchronously.Thecontextpropertiesmaybequeriedfortheoperationresult.

Reusethecontextforanotheroperation,putitbackinthepool,ordiscardit.

重点在于池化(Pooling),池化的目的就是为了重用和减少运行时分配和垃圾回收的压力。



TcpSocketSaeaServer即是对SocketAsyncEventArgs的应用和封装,并实现了Pooling技术。TcpSocketSaeaServer中的重点是SaeaAwaitable类,SaeaAwaitable中内置了一个SocketAsyncEventArgs,并通过GetAwaiter返回SaeaAwaiter来支持async/await操作。同时,通过SaeaExtensions扩展方法对来扩展SocketAsyncEventArgs的Awaitable实现。



publicstaticSaeaAwaitableAcceptAsync(thisSocketsocket,SaeaAwaitableawaitable)

publicstaticSaeaAwaitableConnectAsync(thisSocketsocket,SaeaAwaitableawaitable)

publicstaticSaeaAwaitableDisonnectAsync(thisSocketsocket,SaeaAwaitableawaitable)

publicstaticSaeaAwaitableReceiveAsync(thisSocketsocket,SaeaAwaitableawaitable)

publicstaticSaeaAwaitableSendAsync(thisSocketsocket,SaeaAwaitableawaitable)

SaeaPool则是一个QueuedObjectPool的衍生实现,用于池化SaeaAwaitable实例。同时,为了减少TcpSocketSaeaSession的构建过程,也实现了SessionPool即QueuedObjectPool



TcpSocketSaeaServer中的AcceptLoop指的就是,



复制代码

while(IsListening)

{

varsaea=_acceptSaeaPool.Take();



varsocketError=await_listener.AcceptAsync(saea);

if(socketError==SocketError.Success)

{

varacceptedSocket=saea.Saea.AcceptSocket;

}



_acceptSaeaPool.Return(saea);

}

复制代码

每一个建立成功的Connection由TcpSocketSaeaSession来处理,所以TcpSocketSaeaSession中会包含ReadLoop,



复制代码

varsaea=_saeaPool.Take();

saea.Saea.SetBuffer(_receiveBuffer,0,_receiveBuffer.Length);



while(State==TcpSocketConnectionState.Connected)

{

saea.Saea.SetBuffer(0,_receiveBuffer.Length);



varsocketError=await_socket.ReceiveAsync(saea);

if(socketError!=SocketError.Success)

break;



varreceiveCount=saea.Saea.BytesTransferred;

if(receiveCount==0)

break;

}

复制代码

同样,TcpSocketSaeaServer对外所暴露的接口也同样是Awaitable的。



publicinterfaceITcpSocketSaeaServerMessageDispatcher

{

TaskOnSessionStarted(TcpSocketSaeaSessionsession);

TaskOnSessionDataReceived(TcpSocketSaeaSessionsession,byte[]data,intoffset,intcount);

TaskOnSessionClosed(TcpSocketSaeaSessionsession);

}

使用起来也是简单直接:



复制代码

publicclassSimpleMessageDispatcher:ITcpSocketSaeaServerMessageDispatcher

{

publicasyncTaskOnSessionStarted(TcpSocketSaeaSessionsession)

{

Console.WriteLine(string.Format("TCPsession{0}hasconnected{1}.",session.RemoteEndPoint,session));

awaitTask.CompletedTask;

}



publicasyncTaskOnSessionDataReceived(TcpSocketSaeaSessionsession,byte[]data,intoffset,intcount)

{

vartext=Encoding.UTF8.GetString(data,offset,count);

Console.Write(string.Format("Client:{0}-->",session.RemoteEndPoint));

Console.WriteLine(string.Format("{0}",text));



awaitsession.SendAsync(Encoding.UTF8.GetBytes(text));

}



publicasyncTaskOnSessionClosed(TcpSocketSaeaSessionsession)

{

Console.WriteLine(string.Format("TCPsession{0}hasdisconnected.",session));

awaitTask.CompletedTask;

}

}

复制代码

RIO方式:TcpSocketRioServer

从Windows8.1/WindowsServer2012R2开始,微软推出了RegisteredI/ONetworkingExtensions来支持高性能Socket服务的实现,简称RIO。



ThefollowingfunctionsaresupportedforWindowsStoreappsonWindows8.1,WindowsServer2012R2,andlater.MicrosoftVisualStudio2013Update3orlaterisrequiredforWindowsStoreapps.



RIOCloseCompletionQueue

RIOCreateCompletionQueue

RIOCreateRequestQueue

RIODequeueCompletion

RIODeregisterBuffer

RIONotify

RIOReceive

RIOReceiveEx

RIORegisterBuffer

RIOResizeCompletionQueue

RIOResizeRequestQueue

RIOSend

RIOSendEx

到目前为止,.NETFramework还没有推出对RIO的支持,所以若想在C#中实现RIO则只能通过P/Invoke方式,RioSharp是开源项目中的一个比较完整的实现。



Cowboy.Sockets直接引用了RioSharp的源代码,放置在Cowboy.Sockets.Experimental名空间下,以供实验和测试使用。



同样,通过TcpSocketRioServer来实现AcceptLoop,



复制代码

_listener.OnAccepted=(acceptedSocket)=>

{

Task.Run(async()=>

{

awaitProcess(acceptedSocket);

})

.Forget();

};

复制代码

通过TcpSocketRioSession来处理ReadLoop,



while(State==TcpSocketConnectionState.Connected)

{

intreceiveCount=await_stream.ReadAsync(_receiveBuffer,0,_receiveBuffer.Length);

if(receiveCount==0)

break;

}

测试代码一如既往的类似:



复制代码

publicclassSimpleMessageDispatcher:ITcpSocketRioServerMessageDispatcher

{

publicasyncTaskOnSessionStarted(TcpSocketRioSessionsession)

{

//Console.WriteLine(string.Format("TCPsession{0}hasconnected{1}.",session.RemoteEndPoint,session));

Console.WriteLine(string.Format("TCPsessionhasconnected{0}.",session));

awaitTask.CompletedTask;

}



publicasyncTaskOnSessionDataReceived(TcpSocketRioSessionsession,byte[]data,intoffset,intcount)

{

vartext=Encoding.UTF8.GetString(data,offset,count);

//Console.Write(string.Format("Client:{0}-->",session.RemoteEndPoint));

Console.Write(string.Format("Client:-->"));

Console.WriteLine(string.Format("{0}",text));



awaitsession.SendAsync(Encoding.UTF8.GetBytes(text));

}



publicasyncTaskOnSessionClosed(TcpSocketRioSessionsession)

{

Console.WriteLine(string.Format("TCPsession{0}hasdisconnected.",session));

awaitTask.CompletedTask;

}

}

献花(0)
+1
(本文系thedust79首藏)