配色: 字号:
Cowboy 开源 WebSocket 网络库
2016-09-07 | 阅:  转:  |  分享 
  
Cowboy开源WebSocket网络库

Cowboy.WebSockets是一个托管在GitHub上的基于.NET/C#实现的开源WebSocket网络库,其完整的实现了RFC6455(TheWebSocketProtocol)协议标准,并部分实现了RFC7692(CompressionExtensionsforWebSocket)协议标准。



WebSocket可理解为建立在TCP连接通道上的更进一步的握手,并确定了消息封装格式。



通过定义控制帧(ControlFrame)和数据帧(DataFrame)来控制通道内的通信和数据传输,下图用使用ABNF格式描述了帧头部的格式。





Cowboy.WebSockets中对于WebSocket的Client/Server分别做了实现,分别对应代码中的:



AsyncWebSocketClient

AsyncWebSocketServer

Cowboy.WebSockets的内部实现是基于Cowboy.Sockets中的TAP模式的AsyncTcpSocketServer和AsyncTcpSocketClient。关于Cowboy.Sockets可以参考文章《C#高性能TCP服务的多种实现方式》。



可通过NuGet查找Cowboy来获取nuget包。



WebSocket服务端应用

实现AsyncWebSocketServerModule抽象类,其中ModulePath对应着"ws://host:port/path"中的path部分。可以实现多个Module,将多个Module注入到AsyncWebSocketServerModuleCatalog中,或者采用反射机制等自动发现Module。



复制代码

publicclassTestWebSocketModule:AsyncWebSocketServerModule

{

publicTestWebSocketModule()

:base(@"/test")

{

}



publicoverrideasyncTaskOnSessionStarted(AsyncWebSocketSessionsession)

{

Console.WriteLine(string.Format("WebSocketsession[{0}]hasconnected.",session.RemoteEndPoint));

awaitTask.CompletedTask;

}



publicoverrideasyncTaskOnSessionTextReceived(AsyncWebSocketSessionsession,stringtext)

{

Console.Write(string.Format("WebSocketsession[{0}]receivedText-->",session.RemoteEndPoint));

Console.WriteLine(string.Format("{0}",text));



awaitsession.SendTextAsync(text);

}



publicoverrideasyncTaskOnSessionBinaryReceived(AsyncWebSocketSessionsession,byte[]data,intoffset,intcount)

{

vartext=Encoding.UTF8.GetString(data,offset,count);

Console.Write(string.Format("WebSocketsession[{0}]receivedBinary-->",session.Remotewww.wang027.comEndPoint));

Console.WriteLine(string.Format("{0}",text));



awaitsession.SendBinaryAsync(Encoding.UTF8.GetBytes(text));

}



publicoverrideasyncTaskOnSessionClosed(AsyncWebSocketSessionsession)

{

Console.WriteLine(string.Format("WebSocketsession[{0}]hasdisconnected.",session.RemoteEndPoint));

awaitTask.CompletedTask;

}

}

复制代码

实例化AsyncWebSocketServer,并将AsyncWebSocketServerModuleCatalog实例注入,即可启动WebSocket的服务端监听。



复制代码

classProgram

{

staticAsyncWebSocketServer_server;



staticvoidMain(string[]args)

{

NLogLogger.Use();



try

{

varcatalog=newAsyncWebSocketServerModuleCatalog();

catalog.RegisterModule(newTestWebSocketModule());



varconfig=newAsyncWebSocketServerConfiguration();

//config.SslEnabled=true;

//config.SslServerCertificate=newSystem.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.pfx","Cowboy");

//config.SslPolicyErrorsBypassed=true;



_server=newAsyncWebSocketServer(22222,catalog,config);

_server.Listen();



Console.WriteLine("WebSocketserverhasbeenstartedon[{0}].",_server.ListenedEndPoint);

Console.WriteLine("Typesomethingtosendtoclients...");

while(true)

{

try

{

stringtext=Console.ReadLine();

if(text=="quit")

break;

Task.Run(async()=>

{

//await_server.BroadcastText(text);

//Console.WriteLine("WebSocketserver[{0}]broadcaststext->[{1}].",_server.ListenedEndPoint,text);

await_server.BroadcastBinaryAsync(Encoding.UTF8.GetBytes(text));

Console.WriteLine("WebSocketserver[{0}]broadcastsbinary->[{1}].",_server.ListenedEndPoint,text);

});

}

catch(Exceptionex)

{

Console.WriteLine(ex.Message);

}

}



_server.Shutdown();

Console.WriteLine("WebSocketserverhasbeenstoppedon[{0}].",_server.ListenedEndPoint);

}

catch(Exceptionex)

{

Logger.Get().Error(ex.Message,ex);

}



Console.ReadKey();

}

}

复制代码

WebSocket客户端应用

客户端侧在实例化AsyncWebSocketClient时有两种方式:



实现IAsyncWebSocketClientMessageDispatcher接口;

直接构造函数注入接受各种事件的Func<>实现;

复制代码

publicinterfaceIAsyncWebSocketClientMessageDispatcher

{

TaskOnServerConnected(AsyncWebSocketClientclient);

TaskOnServerTextReceived(AsyncWebSocketClientclient,stringtext);

TaskOnServerBinaryReceived(AsyncWebSocketClientclient,byte[]data,intoffset,intcount);

TaskOnServerDisconnected(AsyncWebSocketClientclient);



TaskOnServerFragmentationStreamOpened(AsyncWebSocketClientclient,byte[]data,intoffset,intcount);

TaskOnServerFragmentationStreamContinued(AsyncWebSocketClientclient,byte[]data,intoffset,intcount);

TaskOnServerFragmentationStreamClosed(AsyncWebSocketClientclient,byte[]data,intoffset,intcount);

}

复制代码

下面的DEMO采用了方式二。



复制代码

classProgram

{

staticAsyncWebSocketClient_client;



staticvoidMain(string[]args)

{

NLogLogger.Use();



Task.Run(async()=>

{

try

{

varconfig=newAsyncWebSocketClientConfiguration();

//config.SslTargetHost="Cowboy";

//config.SslClientCertificates.Add(newSystem.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.cer"));

//config.SslPolicyErrorsBypassed=true;



//varuri=newUri("ws://echo.websocket.org/");

//varuri=newUri("wss://127.0.0.1:22222/test");

varuri=newUri("ws://127.0.0.1:22222/test");

_client=newAsyncWebSocketClient(uri,

OnServerTextReceived,

OnServerBinaryReceived,

OnServerConnected,

OnServerDisconnected,

config);

await_client.Connect();



Console.WriteLine("WebSocketclienthasconnectedtoserver[{0}].",uri);

Console.WriteLine("Typesomethingtosendtoserver...");

while(_client.State==WebSocketState.Open)

{

try

{

stringtext=Console.ReadLine();

if(text=="quit")

break;

Task.Run(async()=>

{

//await_client.SendText(text);

//Console.WriteLine("Client[{0}]sendtext->[{1}].",_client.LocalEndPoint,text);

await_client.SendBinaryAsync(Encoding.UTF8.GetBytes(text));

Console.WriteLine("Client[{0}]sendbinary->[{1}].",_client.LocalEndPoint,text);

}).Forget();

}

catch(Exceptionex)

{

Console.WriteLine(ex.Message);

}

}



await_client.Close(WebSocketCloseCode.NormalClosure);

Console.WriteLine("WebSocketclienthasdisconnectedfromserver[{0}].",uri);

}

catch(Exceptionex)

{

Logger.Get().Error(ex.Message,ex);

}

}).Wait();



Console.ReadKey();

}



privatestaticasyncTaskOnServerConnected(AsyncWebSocketClientclient)

{

Console.WriteLine(string.Format("WebSocketserver[{0}]hasconnected.",client.RemoteEndPoint));

awaitTask.CompletedTask;

}



privatestaticasyncTaskOnServerTextReceived(AsyncWebSocketClientclient,stringtext)

{

Console.Write(string.Format("WebSocketserver[{0}]receivedText-->",client.RemoteEndPoint));

Console.WriteLine(string.Format("{0}",text));



awaitTask.CompletedTask;

}



privatestaticasyncTaskOnServerBinaryReceived(AsyncWebSocketClientclient,byte[]data,intoffset,intcount)

{

vartext=Encoding.UTF8.GetString(data,offset,count);

Console.Write(string.Format("WebSocketserver[{0}]receivedBinary-->",client.RemoteEndPoint));

Console.WriteLine(string.Format("{0}",text));



awaitTask.CompletedTask;

}



privatestaticasyncTaskOnServerDisconnected(AsyncWebSocketClientclient)

{

Console.WriteLine(string.Format("WebSocketserver[{0}]hasdisconnected.",client.RemoteEndPoint));

awaitTask.CompletedTask;

}

}

复制代码

献花(0)
+1
(本文系thedust79首藏)