配色: 字号:
创建基于MSMQ的Responsive Service
2016-10-05 | 阅:  转:  |  分享 
  
创建基于MSMQ的ResponsiveService

一、One-wayMEPV.S.ResponsibleService

我们知道MSMQ天生就具有异步的特性,它只能以One-way的MEP(MessageExchangePattern)进行通信。Client和Service之间采用One-wayMEP的话就意味着Client调用Service之后立即返回,它无法获得Service的执行结果,也无法捕捉Service运行的Exception。下图简单表述了基于MSMQ的WCFService中Client和Service的交互。





但是在有些场景中,这是无法容忍的。再拿我在上一篇文章的OrderDelivery的例子来说。Client向Service提交了Order,却无法确认该Order是否被Service正确处理,这显然是不能接受的。我们今天就来讨论一下,如何创建一个ResponsiveService来解决这个问题:Client不再是对Service的执行情况一无所知,它可以获知Order是否被Service正确处理了。



二、Solution



虽然我们的目的很简单:当Client向Service递交了Order之后,能以某种方式获知Order的执行结果;对于Service端来说,在正确把Order从MessageQueue中获取出来、并正确处理之后,能够向Order的递交者发送一个AcknowledgeMessage。为了简单起见,这个AcknowledgeMessage包含两组信息:



OrderNo.:被处理的Order的一个能够为一标志它的ID。

Exception:如果处理失败的Exception,如果成功处理为null。

要在WCF中实现这样的目的,对于Request/ReplyMEP来说是简单而直接的:Client向Service递交Order,并等待Service的Response,Service在处理接收到Order之后直接将处理结果返回给Client就可以了。但是我们说过MSMQ天生就是异步的,我们只有采取一种间接的方式实现“曲线救国”。



我们的解决方案是:在每个ClientDomain也创建一个基于MSMQ的本地的WCFService,用于接收来自Order处理端发送的AcknowledgeMessage。对于处理Order的Service来说,在正确处理Order之后,想对应的Client发送AcknowledgeMessage。下图简单演示整个过程:





三、Implementation



了解了上面的Solution之后,我们来看看该Solution在真正实现过程中有什么样的困难。对于处理Order的Service来说,在向Client端发送AcknowledgeMessage的时候,它必须要知道该Order对应的Client的ResponseService的MSMQ的Address以及其他和Operation相关的Context信息(在这里我们不需要,不过考虑到扩展性,我们把包括了address的Context的信息封装到一个了Class中,在这里叫做:OrderResponseContext)。而这些Context却不能在Configuration中进行配置,因为他可以同时面临着很多个Client:比如每个Client用于接收Response的MessageQueue的address都不一样。所以这个OrderResponseContext必须通过对应的Client来提供。基于此,我们具有两面两种解决方式:



方式一、修改ServiceContract,把OrderResponseContext当成是Operation的一个参数



这是我们最容易想到的,比如我们原来的Operation这样定义:



namespaceArtech.ResponsiveQueuedService.Contract

{

[ServiceContract]

[ServiceKnownType(typeof(Order))]

publicinterfaceIOrderProcessor

{

[OperationContract(IsOneWay=true)]

voidSubmit(Orderorder);

}

}



现在变成:



namespaceArtech.ResponsiveQueuedService.Contract

{

[ServiceContract]

[ServiceKnownType(typeof(Order))]

publicinterfaceIOrderProcessor

{

[OperationContract(IsOneWay=true)]

voidSubmit(Orderorder,OrderResponseContextresponseContext);

}

}



虽然这种方式看起来不错,但是却不值得推荐。在一般情况下,我们的Contract需要是很稳定的,一经确定就不能轻易更改,因为Contract是被交互的多方共同支持的,牵一发动全身;此外,从ServiceContract代表的是Service的一个Interface,他是对业务逻辑的抽象、和具体实现无关,而对于我们的例子来说,我们仅仅是定义一个递交Order的Operation,从业务逻辑来看,OrderResponseContext和抽象的业务逻辑毫无关系。基于此,我们需要寻求一种和ServiceContract无关的解决方式:



方式二、将OrderResponseContext放到SoapMessage的Header中



其实我们要解决的问题很简单,就是要把OrderResponseContext的信息置于SoapMessage中发送到Service。而我们知道,Soap的Header具有极强的可伸缩性,原则上,我们可以把任何控制信息置于Header中。基于WCF的编程模式很容易地帮助我们实现对SoapHeader的插入和获取:



我们可以通过下面的方式获得当前OperationContext的IncomingMessageHeaders和OutgoingMessageHeaders



OperationContext.Current.IncomingMessageHeaders

OperationContext.Current.OutgoingMessageHeaders



如果我们要把一个OrderResponseContext对象插入到当前OperationContext的OutgoingMessageHeaders中,我们可以通过下面的代码来实现:



OrderResponseContextcontext=newOrderResponseContext();

MessageHeaderheader=newMessageHeader(context);

OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("name","namespace"));



相应的,我们可以通过下面的代码从OutgoingMessageHeadersOrderResponseContext的数据获取的内容:



OrderResponseContextcontext=OperationContext.Current.IncomingMessageHeaders.GetHeader("name","namespace"));

四、Sample



我们照例给出一个完整的Sample,下面是整个Solution的结构:





除了一贯使用的4层结构(Contract-Service-Hosting-Client),还为ResponseService增加了下面两层:



Localservice:作为ClientDomain的ResponseService。

LocalHosting:HostLocalservice。

1.Contract:Artech.ResponsiveQueuedService.Contract



ServiceContract:Artech.ResponsiveQueuedService.Contract.IOrderProcessor



usingSystem;

usingSystem.Collections.Generic;

usingSystem.Text;

usingSystem.ServiceModel;



namespaceArtech.ResponsiveQueuedService.Contract

{

[ServiceContract]

[ServiceKnownType(typeof(Order))]

publicinterfaceIOrderProcessor

{

[OperationContract(IsOneWay=true)]

voidSubmit(Orderorder);

}

}



ServiceContract:Artech.ResponsiveQueuedService.Contract.IOrderRessponse



usingSystem;

usingSystem.Collections.Generic;

usingSystem.Text;

usingSystem.ServiceModel;



namespaceArtech.ResponsiveQueuedService.Contract

{

[ServiceContract]

publicinterfaceIOrderRessponse

{

[OperationContract(IsOneWay=true)]

voidSubmitOrderResponse(GuidorderNo,FaultExceptionexception);

}

}



接收来自Orderprocessing端的Response:OrderNo.和Exception。



DataContract:Artech.ResponsiveQueuedService.Contract.Order





usingSystem;

usingSystem.Collections.Generic;

usingSystem.Text;

usingSystem.Runtime.Serialization;



namespaceArtech.ResponsiveQueuedService.Contract

{

[DataContract]

publicclassOrder

{

PrivateFields



Constructors



PublicProperties



PublicMethods

}

}



对Order的封装。



DataContract:Artech.ResponsiveQueuedService.Contract.OrderResponseContext



usingSystem;

usingSystem.Collections.Generic;

usingSystem.Text;

usingSystem.Runtime.Serialization;

usingSystem.ServiceModel;



namespaceArtech.ResponsiveQueuedService.Contract

{

[DataContract]

publicclassOrderResponseContext

{

privateUri_responseAddress;



[DataMember]

publicUriResponseAddress

{

get{return_responseAddress;}

set{_responseAddress=value;}

}



publicstaticOrderResponseContextCurrent

{

get

{

if(OperationContext.Current==null)

{

returnnull;

}



returnOperationContext.Current.IncomingMessageHeaders.GetHeader("OrderResponseContext","Artech.ResponsiveQueuedService.Contract");

}

set

{

MessageHeaderheader=newMessageHeader(value);

OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("OrderResponseContext","Artech.ResponsiveQueuedService.Contract"));

}

}

}

}



ResponseAddress代表Host在ClientDomain的ResponseService的Address。同过Current把OrderResponseContext插入到OutgoingMessageHeaders中、以及从IngoingMessageHeaders取出OrderResponseContext对象。



2.OrderProcessingService:Artech.ResponsiveQueuedService.Service



usingSystem;

usingSystem.Collections.Generic;

usingSystem.Text;

usingArtech.ResponsiveQueuedService.Contract;

usingSystem.ServiceModel;

usingSystem.Net.Security;



namespaceArtech.ResponsiveQueuedService.Service

{

publicclassOrderProcessorService:IOrderProcessor

{

privatevoidProcessOrder(Orderorder)

{



if(order.OrderDate
{

thrownewException();

}

}



IOrderProcessorMembers

}

}



在这里我们模拟了这样的场景:先通过OrderDate判断Order是否过期,如果过期创建一个FaultException,否则正确处理该Order,然后通过OrderResponseContext.Current从IncomingMessageHeader中获取封装在OrderResponseContext对象中的ResponseAddress,创建Binding并调用ResponseService.



3.OrderProcessingServiceHosting:Artech.ResponsiveQueuedService.Hosting



Configuration






































bindingConfiguration="MsmqBinding"contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor"/>











Program



usingSystem;

usingSystem.Collections.Generic;

usingSystem.Text;

usingArtech.ResponsiveQueuedService.Service;

usingSystem.ServiceModel;

usingSystem.Configuration;

usingSystem.Messaging;



namespaceArtech.ResponsiveQueuedService.Hosting

{

classProgram

{

staticvoidMain(string[]args)

{

stringpath=ConfigurationManager.AppSettings["msmqPath"];

if(!MessageQueue.Exists(path))

{

MessageQueue.Create(path);

}



using(ServiceHosthost=newServiceHost(typeof(OrderProcessorService)))

{

host.Opened+=delegate

{

Console.WriteLine("TheOrderProcessorservicehasbeguntolisten");

};



host.Open();



Console.Read();

}

}

}

}



4.ResponseService:Artech.ResponsiveQueuedService.LocalService.OrderRessponseService



usingSystem;

usingSystem.Collections.Generic;

usingSystem.Text;

usingArtech.ResponsiveQueuedService.Contract;

usingSystem.ServiceModel;



namespaceArtech.ResponsiveQueuedService.LocalService

{

publicclassOrderRessponseService:IOrderRessponse

{

IOrderRessponseMembers

}

}





5.ResponseServiceHosting:Artech.ResponsiveQueuedService.LocalhHosting



Configuration






































bindingConfiguration="msmqBinding"contract="Artech.ResponsiveQueuedService.Contract.IOrderRessponse"/>











Program



usingSystem;

usingSystem.Collections.Generic;

usingSystem.Text;

usingArtech.ResponsiveQueuedService.LocalService;

usingSystem.Configuration;

usingSystem.ServiceModel;

usingSystem.Messaging;



namespaceArtech.ResponsiveQueuedService.LocalhHosting

{

classProgram

{

staticvoidMain(string[]args)

{

stringpath=ConfigurationManager.AppSettings["msmqPath"];

if(!MessageQueue.Exists(path))

{

MessageQueue.Create(path);

}



using(ServiceHosthost=newServiceHost(typeof(OrderRessponseService)))

{

host.Opened+=delegate

{

Console.WriteLine("TheOrderResponseservicehasbeguntolisten");

};



host.Open();



Console.Read();

}

}

}

}



6.Client:Artech.ResponsiveQueuedService.Client



Configuration:




































bindingConfiguration="MsmqBinding"contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor"name="defaultEndpoint"/>









Program:



usingSystem;

usingSystem.Collections.Generic;

usingSystem.Text;

usingSystem.Configuration;

usingSystem.ServiceModel;

usingArtech.ResponsiveQueuedService.Contract;

usingSystem.Messaging;



namespaceArtech.ResponsiveQueuedService.Clinet

{

classProgram

{

staticvoidMain(string[]args)

{

Orderorder1=newOrder(Guid.NewGuid(),DateTime.Today.AddDays(5),Guid.NewGuid(),"SupplierA");

Orderorder2=newOrder(Guid.NewGuid(),DateTime.Today.AddDays(-5),Guid.NewGuid(),"SupplierA");



stringpath=ConfigurationManager.AppSettings["msmqPath"];

Uriaddress=newUri(path);

OrderResponseContextcontext=newOrderResponseContext();

context.ResponseAddress=address;



ChannelFactorychannelFactory=newChannelFactory("defaultEndpoint");

IOrderProcessororderProcessor=channelFactory.CreateChannel();



using(OperationContextScopecontextScope=newOperationContextScope(orderProcessorasIContextChannel))

{

Console.WriteLine("SubmittheorderoforderNo.:{0}",order1.OrderNo);

OrderResponseContext.Current=context;

orderProcessor.Submit(order1);

}



using(OperationContextScopecontextScope=newOperationContextScope(orderProcessorasIContextChannel))

{

Console.WriteLine("SubmittheorderoforderNo.:{0}",order2.OrderNo);

OrderResponseContext.Current=context;

orderProcessor.Submit(order2);

}



Console.Read();

}

}

}



我创建了两个Order对象,其中一个已经过期。从Configuration中取出ResponseAddress并购建一个OrderResponseContext,然后分两次将这两个Order向OrderProcessingService递交。在调用OrderProcessingOrder的OperationContextScope中,通过OrderResponseContext.Current将OrderResponseContext对象插入OutcomingMessageHeader中。

献花(0)
+1
(本文系thedust79首藏)