分享

SignalR 实时通知消息并行推送和批量存储实现

 WindySky 2017-09-15

前言:SignalR是基于.NET平台Web应用的实时消息通讯框架,有人称之为.NET平台的.NodeJS;可用于Web页面聊天,消息推送等功能实现。本文摘取部分代码,利用.NET平台的Parallel功能实现通知消息的并行推送和批量存储。


1. 接收通知消息的Api接口:

MVC 控制器代码:

[csharp] view plain copy
  1. [HttpPost]  
  2. [AllowAnonymous]  
  3. public void Notify(Message message)  
  4. {  
  5.     var msgModel = new MessageModel();  
  6.     msgModel.Notify(message);  
  7. }  
  8.   
  9. [HttpPost]  
  10. [AllowAnonymous]  
  11. public void Notify(List<Message> msgList)  
  12. {  
  13.     var msgModel = new MessageModel();  
  14.     msgModel.BatchNotify(msgList);  
  15. }     


2. 通知消息的并行推送:

2.1 批量推送接口

首先调用到Task.Factory.StartNew()方法创建新任务,用BulkCopy()方法将数据批量插入到数据库;然后并行推送消息,因为消息接收人有多个,根据消息接收人的消息实体对象列表,Parallel.ForEach()对每条消息并行推送到Web用户的前端。

[csharp] view plain copy
  1. /// <summary>  
  2. /// 通知单条消息  
  3. /// </summary>  
  4. /// <param name="msg"></param>  
  5. public void Notify(Message msg)  
  6. {  
  7.     List<Message> msgList = new List<Message>();  
  8.     msgList.Add(msg);  
  9.     BatchNotify(msgList);  
  10. }  
  11.   
  12. /// <summary>  
  13. /// 多条消息列表推送  
  14. /// </summary>  
  15. /// <param name="msgList"></param>  
  16. public void BatchNotify(List<Message> msgList)  
  17. {  
  18.     //批量插入消息  
  19.     var msgService = new MessageService();  
  20.     Task.Factory.StartNew(() =>  
  21.     {  
  22.         msgService.BulkInsert(msgList);  
  23.     });  
  24.   
  25.     IList<string> onlineConnectionIds = new List<string>();  
  26.     var hub = GlobalHost.ConnectionManager.GetHubContext<ClientPushHub>();  
  27.     var onlineUsers = ChatHub.GetOnlineUsersOnHub();  
  28.   
  29.     //发送消息  
  30.     Parallel.ForEach<Message>(msgList, (msg) =>   
  31.     {   
  32.         PushMessage(msg, onlineUsers, hub);   
  33.     });  
  34. }  



2.2 调用SignalR 接口给在线用户发送通知消息

每条消息有多个接收人,每个接收人在前端的Connection连接存储在ConnectionIds集合中,遍历进行推送。

[csharp] view plain copy
  1. /// <summary>  
  2. /// 单条消息推送服务  
  3. /// </summary>  
  4. /// <param name="message">消息数据</param>  
  5. private void PushMessage(Message msg, List<User> onlineUsers, IHubContext hub)  
  6. {  
  7.     User user = null;  
  8.     List<RecieverEntity> recievers = msg.Recievers;  
  9.     foreach (var reciever in recievers)  
  10.     {  
  11.         user = onlineUsers.SingleOrDefault(a => a.Name == reciever.Reciever);  
  12.         if (user != null)  
  13.         {  
  14.             foreach (var connId in user.ConnectionIds)  
  15.             {  
  16.                 hub.Clients.Client(connId).onPushingMessage(new  
  17.                 {  
  18.                     sender = msg.Sender,  
  19.                     msgTitle = msg.MsgTitle,  
  20.                     msgContent = msg.MsgContent,  
  21.                     billUrl = msg.BillUrl,  
  22.                     iframeTxt = msg.IframeTxt,  
  23.                     iframeCode = msg.IframeCode,  
  24.                     sendTime = DateTime.Now.ToString(),  
  25.                     sendType = "notification"  
  26.                 });  
  27.             }  
  28.         }  
  29.     }  
  30. }  


2.3 客户端接收通知消息

[javascript] view plain copy
  1. //接收系统服务通知消息  
  2. clientPushHub.client.onPushingMessage = function (message) {  
  3.     popNewMsgHintWindow(message);  
  4. }  

3. 批量消息的批量存储

3.1 批量插入

[csharp] view plain copy
  1. /// <summary>  
  2. /// 批量插入  
  3. /// </summary>  
  4. /// <param name="msgList"></param>  
  5. public void BulkInsert(List<Message> msgList)  
  6. {  
  7.     var msgTable = MsgBatchUtility.GetTableSchema();  
  8.     foreach (var msg in msgList)  
  9.     {  
  10.         Insert(msgTable, msg);  
  11.     }  
  12.     MsgBatchUtility.BulkCopy(msgTable);  
  13. }  

3.2  获取消息表结构

[csharp] view plain copy
  1. /// <summary>  
  2. /// 获取数据表Schema  
  3. /// </summary>  
  4. /// <returns></returns>  
  5. public static DataTable GetTableSchema()  
  6. {  
  7.     DataTable dt = new DataTable();  
  8.     dt.Columns.AddRange(new DataColumn[]{  
  9.         new DataColumn("MsgID", typeof(int)),  
  10.         new DataColumn("MsgType", typeof(byte)),  
  11.         new DataColumn("MsgTitle", typeof(string)),  
  12.         new DataColumn("MsgContent", typeof(string)),  
  13.         new DataColumn("Status", typeof(byte)),  
  14.         new DataColumn("SenderID", typeof(string)),  
  15.         new DataColumn("Sender", typeof(string)),  
  16.         new DataColumn("SendTime", typeof(DateTime)),  
  17.         new DataColumn("RecieverID", typeof(string)),  
  18.         new DataColumn("Reciever", typeof(string)),  
  19.         new DataColumn("RecievedTime", typeof(DateTime)),  
  20.         new DataColumn("AppName", typeof(string)),  
  21.         new DataColumn("AppInstanceID", typeof(string)),  
  22.     });  
  23.     return dt;  
  24. }  



3.3 数据库批量拷贝方法BulkCopy()

[csharp] view plain copy
  1. /// <summary>  
  2. /// 批量插入方法  
  3. /// </summary>  
  4. /// <param name="dt"></param>  
  5. public static void BulkCopy(DataTable dt)  
  6. {  
  7.     SqlConnection sqlConn = new SqlConnection(connectionString);  
  8.     SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConn);  
  9.     bulkCopy.DestinationTableName = "dbo.im_message";  
  10.     bulkCopy.BatchSize = dt.Rows.Count;  
  11.     try  
  12.     {  
  13.         sqlConn.Open();  
  14.         if (dt != null && dt.Rows.Count != 0)  
  15.             bulkCopy.WriteToServer(dt);  
  16.     }  
  17.     catch (Exception)  
  18.     {  
  19.         throw;  
  20.     }  
  21.     finally  
  22.     {  
  23.         sqlConn.Close();  
  24.         if (bulkCopy != null)  
  25.             bulkCopy.Close();  
  26.     }  
  27. }  



总结:


本文暂时没有对SignalR的用法和在线用户列表维护做出代码示例,会在后期文章中单独列出,SiganlR功能在做WebQQ聊天功能,网站页面聊天,聊天室功能时非常有用,有兴趣的开发人员可以以此为框架,搭建自己的实时通讯的工具,至于性能方法,SignalR评价是非常不错的,完全可以做企业级的分布式实时应用,有兴趣的读者可以访问官方站点了解。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多