Cowboy开源WebSocket网络库
Cowboy.WebSockets是一个托管在GitHub上的基于.NET/C#实现的开源WebSocket网络库,其完整的实现了RFC6455(TheWebSocketProtocol)协议标准,并部分实现了RFC7692(CompressionExtensionsforWebSocket)协议标准。
WebSocket可理解为建立在TCP连接通道上的更进一步的握手,并确定了消息封装格式。
通过定义控制帧(ControlFrame)和数据帧(DataFrame)来控制通道内的通信和数据传输,下图用使用ABNF格式描述了帧头部的格式。
Cowboy.WebSockets中对于WebSocket的Client/Server分别做了实现,分别对应代码中的:
AsyncWebSocketClient
AsyncWebSocketServer
Cowboy.WebSockets的内部实现是基于Cowboy.Sockets中的TAP模式的AsyncTcpSocketServer和AsyncTcpSocketClient。关于Cowboy.Sockets可以参考文章《C#高性能TCP服务的多种实现方式》。
可通过NuGet查找Cowboy来获取nuget包。
WebSocket服务端应用
实现AsyncWebSocketServerModule抽象类,其中ModulePath对应着"ws://host:port/path"中的path部分。可以实现多个Module,将多个Module注入到AsyncWebSocketServerModuleCatalog中,或者采用反射机制等自动发现Module。
复制代码
publicclassTestWebSocketModule:AsyncWebSocketServerModule
{
publicTestWebSocketModule()
:base(@"/test")
{
}
publicoverrideasyncTaskOnSessionStarted(AsyncWebSocketSessionsession)
{
Console.WriteLine(string.Format("WebSocketsession[{0}]hasconnected.",session.RemoteEndPoint));
awaitTask.CompletedTask;
}
publicoverrideasyncTaskOnSessionTextReceived(AsyncWebSocketSessionsession,stringtext)
{
Console.Write(string.Format("WebSocketsession[{0}]receivedText-->",session.RemoteEndPoint));
Console.WriteLine(string.Format("{0}",text));
awaitsession.SendTextAsync(text);
}
publicoverrideasyncTaskOnSessionBinaryReceived(AsyncWebSocketSessionsession,byte[]data,intoffset,intcount)
{
vartext=Encoding.UTF8.GetString(data,offset,count);
Console.Write(string.Format("WebSocketsession[{0}]receivedBinary-->",session.Remotewww.wang027.comEndPoint));
Console.WriteLine(string.Format("{0}",text));
awaitsession.SendBinaryAsync(Encoding.UTF8.GetBytes(text));
}
publicoverrideasyncTaskOnSessionClosed(AsyncWebSocketSessionsession)
{
Console.WriteLine(string.Format("WebSocketsession[{0}]hasdisconnected.",session.RemoteEndPoint));
awaitTask.CompletedTask;
}
}
复制代码
实例化AsyncWebSocketServer,并将AsyncWebSocketServerModuleCatalog实例注入,即可启动WebSocket的服务端监听。
复制代码
classProgram
{
staticAsyncWebSocketServer_server;
staticvoidMain(string[]args)
{
NLogLogger.Use();
try
{
varcatalog=newAsyncWebSocketServerModuleCatalog();
catalog.RegisterModule(newTestWebSocketModule());
varconfig=newAsyncWebSocketServerConfiguration();
//config.SslEnabled=true;
//config.SslServerCertificate=newSystem.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.pfx","Cowboy");
//config.SslPolicyErrorsBypassed=true;
_server=newAsyncWebSocketServer(22222,catalog,config);
_server.Listen();
Console.WriteLine("WebSocketserverhasbeenstartedon[{0}].",_server.ListenedEndPoint);
Console.WriteLine("Typesomethingtosendtoclients...");
while(true)
{
try
{
stringtext=Console.ReadLine();
if(text=="quit")
break;
Task.Run(async()=>
{
//await_server.BroadcastText(text);
//Console.WriteLine("WebSocketserver[{0}]broadcaststext->[{1}].",_server.ListenedEndPoint,text);
await_server.BroadcastBinaryAsync(Encoding.UTF8.GetBytes(text));
Console.WriteLine("WebSocketserver[{0}]broadcastsbinary->[{1}].",_server.ListenedEndPoint,text);
});
}
catch(Exceptionex)
{
Console.WriteLine(ex.Message);
}
}
_server.Shutdown();
Console.WriteLine("WebSocketserverhasbeenstoppedon[{0}].",_server.ListenedEndPoint);
}
catch(Exceptionex)
{
Logger.Get().Error(ex.Message,ex);
}
Console.ReadKey();
}
}
复制代码
WebSocket客户端应用
客户端侧在实例化AsyncWebSocketClient时有两种方式:
实现IAsyncWebSocketClientMessageDispatcher接口;
直接构造函数注入接受各种事件的Func<>实现;
复制代码
publicinterfaceIAsyncWebSocketClientMessageDispatcher
{
TaskOnServerConnected(AsyncWebSocketClientclient);
TaskOnServerTextReceived(AsyncWebSocketClientclient,stringtext);
TaskOnServerBinaryReceived(AsyncWebSocketClientclient,byte[]data,intoffset,intcount);
TaskOnServerDisconnected(AsyncWebSocketClientclient);
TaskOnServerFragmentationStreamOpened(AsyncWebSocketClientclient,byte[]data,intoffset,intcount);
TaskOnServerFragmentationStreamContinued(AsyncWebSocketClientclient,byte[]data,intoffset,intcount);
TaskOnServerFragmentationStreamClosed(AsyncWebSocketClientclient,byte[]data,intoffset,intcount);
}
复制代码
下面的DEMO采用了方式二。
复制代码
classProgram
{
staticAsyncWebSocketClient_client;
staticvoidMain(string[]args)
{
NLogLogger.Use();
Task.Run(async()=>
{
try
{
varconfig=newAsyncWebSocketClientConfiguration();
//config.SslTargetHost="Cowboy";
//config.SslClientCertificates.Add(newSystem.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.cer"));
//config.SslPolicyErrorsBypassed=true;
//varuri=newUri("ws://echo.websocket.org/");
//varuri=newUri("wss://127.0.0.1:22222/test");
varuri=newUri("ws://127.0.0.1:22222/test");
_client=newAsyncWebSocketClient(uri,
OnServerTextReceived,
OnServerBinaryReceived,
OnServerConnected,
OnServerDisconnected,
config);
await_client.Connect();
Console.WriteLine("WebSocketclienthasconnectedtoserver[{0}].",uri);
Console.WriteLine("Typesomethingtosendtoserver...");
while(_client.State==WebSocketState.Open)
{
try
{
stringtext=Console.ReadLine();
if(text=="quit")
break;
Task.Run(async()=>
{
//await_client.SendText(text);
//Console.WriteLine("Client[{0}]sendtext->[{1}].",_client.LocalEndPoint,text);
await_client.SendBinaryAsync(Encoding.UTF8.GetBytes(text));
Console.WriteLine("Client[{0}]sendbinary->[{1}].",_client.LocalEndPoint,text);
}).Forget();
}
catch(Exceptionex)
{
Console.WriteLine(ex.Message);
}
}
await_client.Close(WebSocketCloseCode.NormalClosure);
Console.WriteLine("WebSocketclienthasdisconnectedfromserver[{0}].",uri);
}
catch(Exceptionex)
{
Logger.Get().Error(ex.Message,ex);
}
}).Wait();
Console.ReadKey();
}
privatestaticasyncTaskOnServerConnected(AsyncWebSocketClientclient)
{
Console.WriteLine(string.Format("WebSocketserver[{0}]hasconnected.",client.RemoteEndPoint));
awaitTask.CompletedTask;
}
privatestaticasyncTaskOnServerTextReceived(AsyncWebSocketClientclient,stringtext)
{
Console.Write(string.Format("WebSocketserver[{0}]receivedText-->",client.RemoteEndPoint));
Console.WriteLine(string.Format("{0}",text));
awaitTask.CompletedTask;
}
privatestaticasyncTaskOnServerBinaryReceived(AsyncWebSocketClientclient,byte[]data,intoffset,intcount)
{
vartext=Encoding.UTF8.GetString(data,offset,count);
Console.Write(string.Format("WebSocketserver[{0}]receivedBinary-->",client.RemoteEndPoint));
Console.WriteLine(string.Format("{0}",text));
awaitTask.CompletedTask;
}
privatestaticasyncTaskOnServerDisconnected(AsyncWebSocketClientclient)
{
Console.WriteLine(string.Format("WebSocketserver[{0}]hasdisconnected.",client.RemoteEndPoint));
awaitTask.CompletedTask;
}
}
复制代码
|
|