C#高性能TCP服务的多种实现方式
哎~~想想大部分园友应该对"高性能"字样更感兴趣,为了吸引眼球所以标题中一定要突出,其实我更喜欢的标题是《猴赛雷,C#编写TCP服务的花样姿势!》。
本篇文章的主旨是使用.NET/C#实现TCP高性能服务的不同方式,包括但不限于如下内容:
APM方式,即AsynchronousProgrammingModel
TAP方式,即Task-basedAsynchronousPattern
SAEA方式,即SocketAsyncEventArgs
RIO方式,即RegisteredI/O
在.NET/C#中对于Socket的支持均是基于WindowsI/OCompletionPorts完成端口技术的封装,通过不同的Non-Blocking封装结构来满足不同的编程需求。以上方式均已在Cowboy.Sockets中有完整实现,并且APM和TAP方式已经在实际项目中应用。Cowboy.Sockets还在不断的进化和完善中,如有任何问题请及时指正。
虽然有这么多种实现方式,但抽象的看,它们是一样一样的,用两个Loop即可描述:AcceptLoop和ReadLoop,如下图所示。(这里提及的"Loop"指的是一种循环方式,而非特指while/for等关键字。)
在任何TCPServer的实现中,一定存在一个AcceptSocketLoop,用于接收Client端的Connect请求以建立TCPConnection。
在任何TCPServer的实现中,一定存在一个ReadSocketLoop,用于接收Client端Write过来的数据。
如果Accept循环阻塞,则会导致无法快速的建立连接,服务端PendingBacklog满,进而导致Client端收到ConnectTimeout的异常。如果Read循环阻塞,则显然会导致无法及时收到Client端发过来的数据,进而导致Client端SendBuffer满,无法再发送数据。
从实现细节的角度看,能够导致服务阻塞的位置可能在:
Accept到新的Socket,构建新的Connection需要分配各种资源,分配资源慢;
Accept到新的Socket,没有及时触发下一次Accept;
Read到新的Buffer,判定Payload消息长度,判定过程长;
Read到新的Buffer,发现Payload还没有收全,继续Read,则"可能"会导致一次BufferCopy;
Payload接收完毕,进行De-Serialization转成可识别的ProtocolMessage,反序列化慢;
由BusinessModule来处理相应的ProtocolMessage,处理过程慢;
1-2涉及到Accept过程和Connection的建立过程,3-4涉及到ReceiveBuffer的处理过程,5-6涉及到应用逻辑侧的实现。
Java中著名的Netty网络库从4.0版本开始对于Buffer部分做了全新的尝试,采用了名叫ByteBuf的设计,实现BufferZeroCopy以减少高并发条件下Buffer拷贝带来的性能损失和GC压力。DotNetty,Orleans,Helios等项目正在尝试在C#中进行类似的ByteBuf的实现。
APM方式:TcpSocketServer
TcpSocketServer的实现是基于.NETFramework自带的TcpListener和TcpClient的更进一步的封装,采用基于APM的BeginXXX和EndXXX接口实现。
TcpSocketServer中的AcceptLoop指的就是,
BeginAccept->EndAccept->BeginAccept->EndAccept->BeginAccept->...
每一个建立成功的Connection由TcpSocketSession来处理,所以TcpSocketSession中会包含ReadLoop,
BeginRead->EndRead->BeginRead->EndRead->BeginRead->...
TcpSocketServer通过暴露Event来实现Connection的建立与断开和数据接收的通知。
eventEventHandlerClientConnected;
eventEventHandlerClientDisconnected;
eventEventHandlerClientDataReceived;
使用也是简单直接,直接订阅事件通知。
复制代码
privatestaticvoidStartServer()
{
_server=newTcpSocketServer(22222);
_server.ClientConnected+=server_ClientConnected;
_server.ClientDisconnected+=server_ClientDisconnected;
_server.ClientDataReceived+=server_ClientDataReceived;
_server.Listen();
}
staticvoidserver_ClientConnected(objectsender,TcpClientConnectedEventArgse)
{
Console.WriteLine(string.Format("TCPclient{0}hasconnected{1}.",e.Session.RemoteEndPoint,e.Session));
}
staticvoidserver_ClientDisconnected(objectsender,TcpClientDisconnectedEventArgse)
{
Console.WriteLine(string.Format("TCPclient{0}hasdisconnected.",e.Session));
}
staticvoidserver_ClientDataReceived(objectsender,TcpClientDataReceivedEventArgse)
{
vartext=Encoding.UTF8.GetString(e.Data,e.DataOffset,e.DataLength);
Console.Write(string.Format("Client:{0}{1}-->",e.Session.RemoteEndPoint,e.Session));
Console.WriteLine(string.Format("{0}",text));
_server.Broadcast(Encoding.UTF8.GetBytes(text));
}
复制代码
TAP方式:AsyncTcpSocketServer
AsyncTcpSocketServer的实现是基于.NETFramework自带的TcpListener和TcpClient的更进一步的封装,采用基于TAP的async/await的XXXAsync接口实现。
然而,实际上XXXAsync并没有创建什么神奇的效果,其内部实现只是将APM的方法转换成了TAP的调用方式。
复制代码
//Task-basedasyncpublicmethods
[HostProtection(ExternalThreading=true)]
publicTaskAcceptSocketAsync()
{
returnTask.Factory.FromAsync(BeginAcceptSocket,EndAcceptSocket,null);
}
[HostProtection(ExternalThreading=true)]
publicTaskAcceptTcpClientAsync()
{
returnTask.Factory.FromAsync(BeginAcceptTcpClient,EndAcceptTcpClient,null);
}
复制代码
AsyncTcpSocketServer中的AcceptLoop指的就是,
while(IsListening)
{
vartcpClient=await_listener.AcceptTcpClientAsync();
}
每一个建立成功的Connection由AsyncTcpSocketSession来处理,所以AsyncTcpSocketSession中会包含ReadLoop,
while(State==TcpSocketConnectionState.Connected)
{
intreceiveCount=await_stream.ReadAsync(_receiveBuffer,0,_receiveBuffer.Length);
}
为了将async/await异步到底,AsyncTcpSocketServer所暴露的接口也同样是Awaitable的。
复制代码
publicinterfaceIAsyncTcpSocketServerMessageDispatcher
{
TaskOnSessionStarted(AsyncTcpSocketSessionsession);
TaskOnSessionDataReceived(AsyncTcpSocketSessionsession,byte[]data,intoffset,intcount);
TaskOnSessionClosed(AsyncTcpSocketSessionsession);
}
复制代码
使用时仅需将一个实现了该接口的对象注入到AsyncTcpSocketServer的构造函数中即可。
复制代码
publicclassSimpleMessageDispatcher:IAsyncTcpSocketServerMessageDispatcher
{
publicasyncTaskOnSessionStarted(AsyncTcpSocketSessionsession)
{
Console.WriteLine(string.Format("TCPsession{0}hasconnected{1}.",session.RemoteEndPoint,session));
awaitTask.CompletedTask;
}
publicasyncTaskOnSessionDataReceived(AsyncTcpSocketSessionsession,byte[]data,intoffset,intcount)
{
vartext=Encoding.UTF8.GetString(data,offset,count);
Console.Write(string.Format("Client:{0}-->",session.RemoteEndPoint));
Console.WriteLine(string.Format("{0}",text));
awaitsession.SendAsync(Encoding.UTF8.GetBytes(text));
}
publicasyncTaskOnSessionClosed(AsyncTcpSocketSessionsession)
{
Console.WriteLine(string.Format("TCPsession{0}hasdisconnected.",session));
awaitTask.CompletedTask;
}
}
复制代码
当然,对于接口的实现也不是强制了,也可以在构造函数中直接注入方法的实现。
复制代码
publicAsyncTcpSocketServer(
IPEndPointlistenedEndPoint,
FunconSessionDataReceived=null,
FunconSessionStarted=null,
FunconSessionClosed=null,
AsyncTcpSocketServerConfigurationconfiguration=null)
{}
复制代码
SAEA方式:TcpSocketSaeaServer
SAEA是SocketAsyncEventArgs的简写。SocketAsyncEventArgs是.NETFramework3.5开始支持的一种支持高性能Socket通信的实现。SocketAsyncEventArgs相比于APM方式的主要优点可以描述如下:
Themainfeatureoftheseenhancementsistheavoidanceoftherepeatedallocationandsynchronizationofobjectsduringhigh-volumeasynchronoussocketI/O.TheBegin/EnddesignpatterncurrentlyimplementedbytheSocketclassforasynchronoussocketI/OrequiresaSystem.IAsyncResultobjectbeallocatedforeachasynchronoussocketoperation.
也就是说,优点就是无需为每次调用都生成IAsyncResult等对象,向原生Socket更靠近一些。
使用SocketAsyncEventArgs的推荐步骤如下:
AllocateanewSocketAsyncEventArgscontextobject,orgetafreeonefromanapplicationpool.
Setpropertiesonthecontextobjecttotheoperationabouttobeperformed(thecallbackdelegatemethodanddatabuffer,forexample).
Calltheappropriatesocketmethod(xxxAsync)toinitiatetheasynchronousoperation.
Iftheasynchronoussocketmethod(xxxAsync)returnstrueinthecallback,querythecontextpropertieswww.wang027.comforcompletionstatus.
Iftheasynchronoussocketmethod(xxxAsync)returnsfalseinthecallback,theoperationcompletedsynchronously.Thecontextpropertiesmaybequeriedfortheoperationresult.
Reusethecontextforanotheroperation,putitbackinthepool,ordiscardit.
重点在于池化(Pooling),池化的目的就是为了重用和减少运行时分配和垃圾回收的压力。
TcpSocketSaeaServer即是对SocketAsyncEventArgs的应用和封装,并实现了Pooling技术。TcpSocketSaeaServer中的重点是SaeaAwaitable类,SaeaAwaitable中内置了一个SocketAsyncEventArgs,并通过GetAwaiter返回SaeaAwaiter来支持async/await操作。同时,通过SaeaExtensions扩展方法对来扩展SocketAsyncEventArgs的Awaitable实现。
publicstaticSaeaAwaitableAcceptAsync(thisSocketsocket,SaeaAwaitableawaitable)
publicstaticSaeaAwaitableConnectAsync(thisSocketsocket,SaeaAwaitableawaitable)
publicstaticSaeaAwaitableDisonnectAsync(thisSocketsocket,SaeaAwaitableawaitable)
publicstaticSaeaAwaitableReceiveAsync(thisSocketsocket,SaeaAwaitableawaitable)
publicstaticSaeaAwaitableSendAsync(thisSocketsocket,SaeaAwaitableawaitable)
SaeaPool则是一个QueuedObjectPool的衍生实现,用于池化SaeaAwaitable实例。同时,为了减少TcpSocketSaeaSession的构建过程,也实现了SessionPool即QueuedObjectPool。
TcpSocketSaeaServer中的AcceptLoop指的就是,
复制代码
while(IsListening)
{
varsaea=_acceptSaeaPool.Take();
varsocketError=await_listener.AcceptAsync(saea);
if(socketError==SocketError.Success)
{
varacceptedSocket=saea.Saea.AcceptSocket;
}
_acceptSaeaPool.Return(saea);
}
复制代码
每一个建立成功的Connection由TcpSocketSaeaSession来处理,所以TcpSocketSaeaSession中会包含ReadLoop,
复制代码
varsaea=_saeaPool.Take();
saea.Saea.SetBuffer(_receiveBuffer,0,_receiveBuffer.Length);
while(State==TcpSocketConnectionState.Connected)
{
saea.Saea.SetBuffer(0,_receiveBuffer.Length);
varsocketError=await_socket.ReceiveAsync(saea);
if(socketError!=SocketError.Success)
break;
varreceiveCount=saea.Saea.BytesTransferred;
if(receiveCount==0)
break;
}
复制代码
同样,TcpSocketSaeaServer对外所暴露的接口也同样是Awaitable的。
publicinterfaceITcpSocketSaeaServerMessageDispatcher
{
TaskOnSessionStarted(TcpSocketSaeaSessionsession);
TaskOnSessionDataReceived(TcpSocketSaeaSessionsession,byte[]data,intoffset,intcount);
TaskOnSessionClosed(TcpSocketSaeaSessionsession);
}
使用起来也是简单直接:
复制代码
publicclassSimpleMessageDispatcher:ITcpSocketSaeaServerMessageDispatcher
{
publicasyncTaskOnSessionStarted(TcpSocketSaeaSessionsession)
{
Console.WriteLine(string.Format("TCPsession{0}hasconnected{1}.",session.RemoteEndPoint,session));
awaitTask.CompletedTask;
}
publicasyncTaskOnSessionDataReceived(TcpSocketSaeaSessionsession,byte[]data,intoffset,intcount)
{
vartext=Encoding.UTF8.GetString(data,offset,count);
Console.Write(string.Format("Client:{0}-->",session.RemoteEndPoint));
Console.WriteLine(string.Format("{0}",text));
awaitsession.SendAsync(Encoding.UTF8.GetBytes(text));
}
publicasyncTaskOnSessionClosed(TcpSocketSaeaSessionsession)
{
Console.WriteLine(string.Format("TCPsession{0}hasdisconnected.",session));
awaitTask.CompletedTask;
}
}
复制代码
RIO方式:TcpSocketRioServer
从Windows8.1/WindowsServer2012R2开始,微软推出了RegisteredI/ONetworkingExtensions来支持高性能Socket服务的实现,简称RIO。
ThefollowingfunctionsaresupportedforWindowsStoreappsonWindows8.1,WindowsServer2012R2,andlater.MicrosoftVisualStudio2013Update3orlaterisrequiredforWindowsStoreapps.
RIOCloseCompletionQueue
RIOCreateCompletionQueue
RIOCreateRequestQueue
RIODequeueCompletion
RIODeregisterBuffer
RIONotify
RIOReceive
RIOReceiveEx
RIORegisterBuffer
RIOResizeCompletionQueue
RIOResizeRequestQueue
RIOSend
RIOSendEx
到目前为止,.NETFramework还没有推出对RIO的支持,所以若想在C#中实现RIO则只能通过P/Invoke方式,RioSharp是开源项目中的一个比较完整的实现。
Cowboy.Sockets直接引用了RioSharp的源代码,放置在Cowboy.Sockets.Experimental名空间下,以供实验和测试使用。
同样,通过TcpSocketRioServer来实现AcceptLoop,
复制代码
_listener.OnAccepted=(acceptedSocket)=>
{
Task.Run(async()=>
{
awaitProcess(acceptedSocket);
})
.Forget();
};
复制代码
通过TcpSocketRioSession来处理ReadLoop,
while(State==TcpSocketConnectionState.Connected)
{
intreceiveCount=await_stream.ReadAsync(_receiveBuffer,0,_receiveBuffer.Length);
if(receiveCount==0)
break;
}
测试代码一如既往的类似:
复制代码
publicclassSimpleMessageDispatcher:ITcpSocketRioServerMessageDispatcher
{
publicasyncTaskOnSessionStarted(TcpSocketRioSessionsession)
{
//Console.WriteLine(string.Format("TCPsession{0}hasconnected{1}.",session.RemoteEndPoint,session));
Console.WriteLine(string.Format("TCPsessionhasconnected{0}.",session));
awaitTask.CompletedTask;
}
publicasyncTaskOnSessionDataReceived(TcpSocketRioSessionsession,byte[]data,intoffset,intcount)
{
vartext=Encoding.UTF8.GetString(data,offset,count);
//Console.Write(string.Format("Client:{0}-->",session.RemoteEndPoint));
Console.Write(string.Format("Client:-->"));
Console.WriteLine(string.Format("{0}",text));
awaitsession.SendAsync(Encoding.UTF8.GetBytes(text));
}
publicasyncTaskOnSessionClosed(TcpSocketRioSessionsession)
{
Console.WriteLine(string.Format("TCPsession{0}hasdisconnected.",session));
awaitTask.CompletedTask;
}
}
|
|