一、One-way MEP V.S. Responsible Service
我们知道MSMQ天生就具有异步的特性,它只能以One-way的MEP(Message Exchange Pattern)进行通信。Client和Service之间采用One-way MEP的话就意味着Client调用Service之后立即返回,它无法获得Service的执行结果,也无法捕捉Service运行的Exception。下图简单表述了基于MSMQ的WCF Service中Client和Service的交互。
二、 Solution 虽然我们的目的很简单:当Client向Service递交了Order之后,能以某种方式获知Order的执行结果;对于Service端来说,在正确把Order从Message Queue中获取出来、并正确处理之后,能够向Order的递交者发送一个Acknowledge Message。为了简单起见,这个Acknowledge Message包含两组信息:
要在WCF中实现这样的目的,对于Request/Reply MEP来说是简单而直接的:Client向Service递交Order,并等待Service的Response,Service在处理接收到Order之后直接将处理结果 返回给Client就可以了。但是我们说过MSMQ天生就是异步的,我们只有采取一种间接的方式实现“曲线救国”。 我们的解决方案是:在每个Client Domain也创建一个基于MSMQ的本地的WCF Service,用于接收来自Order处理端发送的Acknowledge Message。对于处理Order 的Service来说,在正确处理Order之后,想对应的Client发送Acknowledge Message。下图简单演示整个过程:
了解了上面的Solution之后,我们来看看该Solution在真正实现过程中有什么样的困难。对于处理Order的Service来说,在向Client端发送Acknowledge Message的时候,它必须要知道该Order对应的Client的Response Service的MSMQ的Address以及其他和Operation相关的Context信息(在这里我们不需要,不过考虑到扩展性,我们把包括了address的Context的信息 封装到一个了Class中,在这里叫做:OrderResponseContext)。而这些Context却不能在Configuration中进行配置,因为他可以同时面临着很多个Client:比如每个Client用于接收Response 的Message Queue的address都不一样。所以这个OrderResponseContext必须通过对应的Client来提供。基于此,我们具有两面两种解决方式: 方式一、修改Service Contract,把OrderResponseContext当成是Operation的一个参数 这是我们最容易想到的,比如我们原来的Operation这样定义: namespace Artech.ResponsiveQueuedService.Contract
{ [ServiceContract] [ServiceKnownType(typeof(Order))] public interface IOrderProcessor { [OperationContract(IsOneWay = true)] void Submit(Order order); } } 现在变成: namespace Artech.ResponsiveQueuedService.Contract
{ [ServiceContract] [ServiceKnownType(typeof(Order))] public interface IOrderProcessor { [OperationContract(IsOneWay = true)] void Submit(Order order, OrderResponseContext responseContext); } } 虽然这种方式看起来不错,但是却不值得推荐。在一般情况下,我们的Contract需要是很稳定的,一经确定就不能轻易更改,因为Contract是被交互的多方共同支持的,牵一发动全身;此外,从Service Contract代表的是Service的一个Interface,他是对业务逻辑的抽象、和具体实现无关,而对于我们的例子来说,我们仅仅是定义一个递交Order的Operation,从业务逻辑来看,OrderResponseContext和抽象的业务逻辑毫无关系。基于此,我们需要寻求一种和Service Contract无关的解决方式: 方式二、将OrderResponseContext放到Soap Message 的Header中 其实我们要解决的问题很简单,就是要把OrderResponseContext的信息置于Soap Message中发送到Service。而我们知道,Soap的Header具有极强的可伸缩性,原则上,我们可以把任何控制信息置于Header中。基于WCF的编程模式很容易地帮助我们实现对Soap Header的插入和获取: 我们可以通过下面的方式获得当前Operation Context的Incoming Message Headers和Outgoing Message Headers OperationContext.Current.IncomingMessageHeaders
OperationContext.Current.OutgoingMessageHeaders 如果我们要把一个OrderResponseContext 对象插入到当前Operation Context的Outgoing Message Headers中,我们可以通过下面的代码来实现: OrderResponseContext context = new OrderResponseContext();
MessageHeader<OrderResponseContext> header = new MessageHeader<OrderResponseContext>( context); OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("name", "namespace")); 相应的,我们可以通过下面的代码从Outgoing Message Headers OrderResponseContext的数据获取的内容: OrderResponseContext context = OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("name", "namespace"));
四、Sample 我们照例给出一个完整的Sample,下面是整个Solution的结构:
1.Contract: Artech.ResponsiveQueuedService.Contract Service Contract: Artech.ResponsiveQueuedService.Contract. IOrderProcessor using System;
using System.Collections.Generic; using System.Text; using System.ServiceModel; namespace Artech.ResponsiveQueuedService.Contract { [ServiceContract] [ServiceKnownType(typeof(Order))] public interface IOrderProcessor { [OperationContract(IsOneWay = true)] void Submit(Order order); } } Service Contract: Artech.ResponsiveQueuedService.Contract.IOrderRessponse using System;
using System.Collections.Generic; using System.Text; using System.ServiceModel; namespace Artech.ResponsiveQueuedService.Contract { [ServiceContract] public interface IOrderRessponse { [OperationContract(IsOneWay =true)] void SubmitOrderResponse(Guid orderNo,FaultException exception); } } 接收来自Order processing端的Response:Order No.和Exception。 Data Contract: Artech.ResponsiveQueuedService.Contract.Order using System;
using System.Collections.Generic; using System.Text; using System.Runtime.Serialization; namespace Artech.ResponsiveQueuedService.Contract { [DataContract] public class Order { Private Fields#region Private Fields private Guid _orderNo; private DateTime _orderDate; private Guid _supplierID; private string _supplierName; #endregion Constructors#region Constructors public Order(Guid orderNo, DateTime orderDate, Guid supplierID, string supplierName) { this._orderNo = orderNo; this._orderDate = orderDate; this._supplierID = supplierID; this._supplierName = supplierName; } #endregion Public Properties#region Public Properties [DataMember] public Guid OrderNo { get { return _orderNo; } set { _orderNo = value; } } [DataMember] public DateTime OrderDate { get { return _orderDate; } set { _orderDate = value; } } [DataMember] public Guid SupplierID { get { return _supplierID; } set { _supplierID = value; } } [DataMember] public string SupplierName { get { return _supplierName; } set { _supplierName = value; } } #endregion Public Methods#region Public Methods public override string ToString() { string description = string.Format("Order No.\t: {0}\n\tOrder Date\t: {1}\n\tSupplier No.\t: {2}\n\tSupplier Name\t: {3}", this._orderNo, this._orderDate.ToString("yyyy/MM/dd"), this._supplierID, this._supplierName); return description; } #endregion } } 对Order的封装。 Data Contract:Artech.ResponsiveQueuedService.Contract. OrderResponseContext using System;
using System.Collections.Generic; using System.Text; using System.Runtime.Serialization; using System.ServiceModel; namespace Artech.ResponsiveQueuedService.Contract { [DataContract] public class OrderResponseContext { private Uri _responseAddress; [DataMember] public Uri ResponseAddress { get { return _responseAddress; } set { _responseAddress = value; } } public static OrderResponseContext Current { get { if (OperationContext.Current == null) { return null; } return OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("OrderResponseContext", "Artech.ResponsiveQueuedService.Contract"); } set { MessageHeader<OrderResponseContext> header = new MessageHeader<OrderResponseContext>(value); OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("OrderResponseContext", "Artech.ResponsiveQueuedService.Contract")); } } } } ResponseAddress代表Host在Client Domain的Response Service的Address。同过Current把OrderResponseContext插入到Outgoing Message Headers中、以及从Ingoing Message Headers取出OrderResponseContext对象。 2.Order Processing Service:Artech.ResponsiveQueuedService.Service using System;
using System.Collections.Generic; using System.Text; using Artech.ResponsiveQueuedService.Contract; using System.ServiceModel; using System.Net.Security; namespace Artech.ResponsiveQueuedService.Service { public class OrderProcessorService:IOrderProcessor { private void ProcessOrder(Order order) { if (order.OrderDate < DateTime.Today) { throw new Exception(); } } IOrderProcessor Members#region IOrderProcessor Members public void Submit(Order order) { Console.WriteLine("Begin to process the order of the order No.: {0}", order.OrderNo); FaultException exception= null; if (order.OrderDate < DateTime.Today) { exception = new FaultException(new FaultReason("The order has expried"), new FaultCode("sender")); Console.WriteLine("It‘s fail to process the order.\n\tOrder No.: {0}\n\tReason:{1}", order.OrderNo, "The order has expried"); } else { Console.WriteLine("It‘s successful to process the order.\n\tOrder No.: {0}", order.OrderNo); } NetMsmqBinding binding = new NetMsmqBinding(); binding.ExactlyOnce = false; binding.Security.Transport.MsmqAuthenticationMode = MsmqAuthenticationMode.None; binding.Security.Transport.MsmqProtectionLevel = ProtectionLevel.None; ChannelFactory<IOrderRessponse> channelFacotry = new ChannelFactory<IOrderRessponse>(binding); OrderResponseContext responseContext = OrderResponseContext.Current; IOrderRessponse channel = channelFacotry.CreateChannel(new EndpointAddress(responseContext.ResponseAddress)); using (OperationContextScope contextScope = new OperationContextScope(channel as IContextChannel)) { channel.SubmitOrderResponse(order.OrderNo, exception); } } #endregion } } 在这里我们模拟了这样的场景:先通过Order Date判断Order是否过期,如果过期创建一个FaultException,否则正确处理该Order,然后通过OrderResponseContext.Current从Incoming Message Header中获取封装在OrderResponseContext对象中的Response Address,创建Binding并调用Response Service. 3. Order Processing Service Hosting: Artech.ResponsiveQueuedService.Hosting Configuration <?xml version="1.0" encoding="utf-8" ?>
<configuration> <appSettings> <add key="msmqPath" value=".\private$\orderprocessor"/> </appSettings> <system.serviceModel> <bindings> <netMsmqBinding> <binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false"> <security> <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" /> </security> </binding> </netMsmqBinding> </bindings> <services> <service name="Artech.ResponsiveQueuedService.Service.OrderProcessorService"> <endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding" bindingConfiguration="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" /> </service> </services> </system.serviceModel> </configuration> Program using System;
using System.Collections.Generic; using System.Text; using Artech.ResponsiveQueuedService.Service; using System.ServiceModel; using System.Configuration; using System.Messaging; namespace Artech.ResponsiveQueuedService.Hosting { class Program { static void Main(string[] args) { string path = ConfigurationManager.AppSettings["msmqPath"]; if (!MessageQueue.Exists(path)) { MessageQueue.Create(path); } using (ServiceHost host = new ServiceHost(typeof(OrderProcessorService))) { host.Opened += delegate { Console.WriteLine("The Order Processor service has begun to listen"); }; host.Open(); Console.Read(); } } } } 4. Response Service: Artech.ResponsiveQueuedService.LocalService.OrderRessponseService using System;
using System.Collections.Generic; using System.Text; using Artech.ResponsiveQueuedService.Contract; using System.ServiceModel; namespace Artech.ResponsiveQueuedService.LocalService { public class OrderRessponseService : IOrderRessponse { IOrderRessponse Members#region IOrderRessponse Members public void SubmitOrderResponse(Guid orderNo, FaultException exception) { if (exception == null) { Console.WriteLine("It‘s successful to process the order!\n\tOrder No.: {0}",orderNo); } else { Console.WriteLine("It‘s fail to process the order!\n\tOrder No.: {0}\n\tReason: {1}", orderNo, exception.Message); } } #endregion } }
Configuration <?xml version="1.0" encoding="utf-8" ?>
<configuration> <appSettings> <add key="msmqPath" value=".\private$\orderresponse"/> </appSettings> <system.serviceModel> <bindings> <netMsmqBinding> <binding name="msmqBinding" exactlyOnce="false"> <security> <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" /> </security> </binding> </netMsmqBinding> </bindings> <services> <service name="Artech.ResponsiveQueuedService.LocalService.OrderRessponseService"> <endpoint address="net.msmq://localhost/private/orderresponse" binding="netMsmqBinding" bindingConfiguration="msmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderRessponse" /> </service> </services> </system.serviceModel> </configuration> Program using System;
using System.Collections.Generic; using System.Text; using Artech.ResponsiveQueuedService.LocalService; using System.Configuration; using System.ServiceModel; using System.Messaging; namespace Artech.ResponsiveQueuedService.LocalhHosting { class Program { static void Main(string[] args) { string path = ConfigurationManager.AppSettings["msmqPath"]; if (!MessageQueue.Exists(path)) { MessageQueue.Create(path); } using (ServiceHost host = new ServiceHost(typeof(OrderRessponseService))) { host.Opened += delegate { Console.WriteLine("The Order Response service has begun to listen"); }; host.Open(); Console.Read(); } } } } 6. Client: Artech.ResponsiveQueuedService.Client Configuration: <?xml version="1.0" encoding="utf-8" ?>
<configuration> <appSettings> <add key="msmqPath" value="net.msmq://localhost/private/orderresponse"/> </appSettings> <system.serviceModel> <bindings> <netMsmqBinding> <binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false"> <security> <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" /> </security> </binding> </netMsmqBinding> </bindings> <client> <endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding" bindingConfiguration="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" name="defaultEndpoint" /> </client> </system.serviceModel> </configuration> Program: using System;
using System.Collections.Generic; using System.Text; using System.Configuration; using System.ServiceModel; using Artech.ResponsiveQueuedService.Contract; using System.Messaging; namespace Artech.ResponsiveQueuedService.Clinet { class Program { static void Main(string[] args) { Order order1 = new Order(Guid.NewGuid(), DateTime.Today.AddDays(5), Guid.NewGuid(), "Supplier A"); Order order2 = new Order(Guid.NewGuid(), DateTime.Today.AddDays(-5), Guid.NewGuid(), "Supplier A"); string path = ConfigurationManager.AppSettings["msmqPath"]; Uri address = new Uri(path); OrderResponseContext context = new OrderResponseContext(); context.ResponseAddress = address; ChannelFactory<IOrderProcessor> channelFactory = new ChannelFactory<IOrderProcessor>("defaultEndpoint"); IOrderProcessor orderProcessor = channelFactory.CreateChannel(); using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel)) { Console.WriteLine("Submit the order of order No.: {0}", order1.OrderNo); OrderResponseContext.Current = context; orderProcessor.Submit(order1); } using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel)) { Console.WriteLine("Submit the order of order No.: {0}", order2.OrderNo); OrderResponseContext.Current = context; orderProcessor.Submit(order2); } Console.Read(); } } } 我创建了两个Order对象, 其中一个已经过期。从Configuration中取出Response Address并购建一个OrderResponseContext,然后分两次将这两个Order向Order Processing Service递交。在调用Order Processing Order的Operation Context Scope中,通过OrderResponseContext.Current将OrderResponseContext对象插入Outcoming Message Header中。 我们现在运行一下整个程序,看看最终的输出结果: Client:
|
|
来自: waywin > 《Technology》