分享

深入APM,实现自己的APM

 scholes_goal 2011-03-22

     APM(Asynchronous Programming Model),异步编程模型。
     大家对这个应该不会陌生,甚至太过于熟悉了吧!因为在太多的地方看到过它,对于应用上也许更没的说了!呵呵。
     Control:BeginInvoke、EndInvoke
     Stream:BeginWrite、EndWrite
     ......
     上面这些都是FCL已经提供的可以使用APM的类,不过可能在一些特定的情况下:

1、构建和一些硬件(FCL没有提供支持的)直接交互的类。
2、BeginXXX、EndXXX这些都不是相同的名称,希望让调用者使用起来更加简单,对于这些现有的异步类进行下抽象。
3、在自定义的类上执行某些方法也许需要很多的时间,这些希望对这些方法进行异步处理,所以也加上APM这种方式。
4、一些Win32的函数(Win32注册表、日志...)目前只提供了同步处理,所以当使用这些的时候,希望对这些函数做些封装,加上异步支持。
............
     
     相信大家现在都明白为什么要自己的APM了吧!OK,那就让我们开始吧!


APM的核心(IAsyncResult)
     IAsyncResult接口是APM机制的核心:

public interface IAsyncResult {
   WaitHandle AsyncWaitHandle { 
get; } // For Wait-Until-Done technique
   Boolean    IsCompleted     { get; } // For Polling technique
   Object     AsyncState      { get; } // For Callback technique
   Boolean    CompletedSynchronously { get; } // Almost never used
}
     当任何一个BeginXXX方法被调用的时候,都会返回一个继承于该接口的对象。该对象可以用来监视异步动作的执行,例如,当调用BeginXXX后,我们可以通过该对象的属性判断异步动作是否完成,同时如果异步执行的时候有异常,那么就会将异常信息传给EndXXX方法,EndXXX方法等待异步动作执行完成后,将异常抛出。
     AsyncResultNoResult类继承于IAsyncResult接口,该类用于无特殊返回值的异步动作(执行成功或失败),例如stream的BeginWrite、EndWrite。
     public override void EndWrite(IAsyncResult asyncResult)

AsyncResultNoResult类:

internal class AsyncResultNoResult : IAsyncResult
{
   
// Fields set at construction which never change while 
   
// operation is pending
   private readonly AsyncCallback m_AsyncCallback;
   
private readonly Object m_AsyncState;

   
// Fields set at construction which do change after 
   
// operation completes
   private const Int32 c_StatePending = 0;
   
private const Int32 c_StateCompletedSynchronously = 1;
   
private const Int32 c_StateCompletedAsynchronously = 2;
   
private Int32 m_CompletedState = c_StatePending;

   
// Field that may or may not get set depending on usage
   private ManualResetEvent m_AsyncWaitHandle;

   
// Fields set when operation completes
   private Exception m_exception;

   
public AsyncResultNoResult(AsyncCallback asyncCallback, Object state)
   {
      m_AsyncCallback 
= asyncCallback;
      m_AsyncState 
= state;
   }

   
public void SetAsCompleted(
      Exception exception, Boolean completedSynchronously)
   {
      
// Passing null for exception means no error occurred. 
      
// This is the common case
      m_exception = exception;

      
// The m_CompletedState field MUST be set prior calling the callback
      Int32 prevState = Interlocked.Exchange(ref m_CompletedState,
         completedSynchronously 
? c_StateCompletedSynchronously : 
         c_StateCompletedAsynchronously);
      
if (prevState != c_StatePending)
         
throw new InvalidOperationException(
             
"You can set a result only once");

      
// If the event exists, set it
      if (m_AsyncWaitHandle != null) m_AsyncWaitHandle.Set();

      
// If a callback method was set, call it
      if (m_AsyncCallback != null) m_AsyncCallback(this);
   }

   
public void EndInvoke()
   {
      
// This method assumes that only 1 thread calls EndInvoke 
      
// for this object
      if (!IsCompleted)
      {
         
// If the operation isn't done, wait for it
         AsyncWaitHandle.WaitOne();
         AsyncWaitHandle.Close();
         m_AsyncWaitHandle 
= null;  // Allow early GC
      }

      
// Operation is done: if an exception occured, throw it
      if (m_exception != nullthrow m_exception;
   }

   
#region Implementation of IAsyncResult
   
public Object AsyncState { get { return m_AsyncState; } }

   
public Boolean CompletedSynchronously {
      
get { return Thread.VolatileRead(ref m_CompletedState) == 
                c_StateCompletedSynchronously; }
   }

   
public WaitHandle AsyncWaitHandle
   {
      
get
      {
         
if (m_AsyncWaitHandle == null)
         {
            Boolean done 
= IsCompleted;
            ManualResetEvent mre 
= new ManualResetEvent(done);
            
if (Interlocked.CompareExchange(ref m_AsyncWaitHandle, 
               mre, 
null!= null)
            {
               
// Another thread created this object's event; dispose 
               
// the event we just created
               mre.Close();
            }
            
else
            {
               
if (!done && IsCompleted)
               {
                  
// If the operation wasn't done when we created 
                  
// the event but now it is done, set the event
                  m_AsyncWaitHandle.Set();
               }
            }
         }
         
return m_AsyncWaitHandle;
      }
   }

   
public Boolean IsCompleted {
      
get { return Thread.VolatileRead(ref m_CompletedState) != 
                c_StatePending; }
   }
   
#endregion
}
     详细分析下,AsyncResultNoResult的构造函数接受两个参数(异步委托和执行参数),这两个参数被保存到私有属性中。定义了m_CompletedState字段,用于实现IAsyncResult的 IsCompleted和CompletedSynchronously属性,也定义了m_AsyncWaitHandle字段用于实现IAsyncResulte的AsyncWaitHandle,而且还有一个异常字段m_exception,当成功执行的时候,把该值设为null,失败的时候,将被设置导致失败的异常。
     还有一个重要的属性(m_AsyncWaitHandle),我们在EndInvoke方法可以看到m_AsyncWaitHandle.WaitOne()这行代码,执行到这里的时候,m_AsyncWaitHandle如果没有被释放的话,程序将一直等待,释放的程序在SetAsCompleted方法中m_AsyncWaitHandle.Set()。

AsyncResult类:

internal class AsyncResult<TResult> : AsyncResultNoResult
{
   
// Field set when operation completes
   private TResult m_result = default(TResult);

   
public AsyncResult(AsyncCallback asyncCallback, Object state) : 
      
base(asyncCallback, state) { }

   
public void SetAsCompleted(TResult result, 
      Boolean completedSynchronously)
   {
      
// Save the asynchronous operation's result
      m_result = result;

      
// Tell the base class that the operation completed 
      
// sucessfully (no exception)
      base.SetAsCompleted(null, completedSynchronously);
   }

   
new public TResult EndInvoke()
   {
      
base.EndInvoke(); // Wait until operation has completed 
      return m_result;  // Return the result (if above didn't throw)
   }
}
     看下代码应该很清晰了,该类继承与AsyncResultNoResult类,额外加了m_result属性,用于返回值。

利用自定义的IAsyncResult实现APM

     现在我们已经知道如何定义IAsyncResult,那么就开始展示如何使用AsyncResult<TResult>和AsyncResultNoResult。
     定义一个类:LongTask,该类对一些很耗时的方法进行异步处理,并且当执行完成的时候返回一个时间数据。

internal sealed class LongTask
{
   
private Int32 m_ms;  // Milliseconds;

   
public LongTask(Int32 seconds)
   {
      m_ms 
= seconds * 1000;
   }

   
// Synchronous version of time-consuming method
   public DateTime DoTask()
   {
      Thread.Sleep(m_ms);  
// Simulate time-consuming task
      return DateTime.Now; // Indicate when task completed
   }

   
// Asynchronous version of time-consuming method (Begin part)
   public IAsyncResult BeginDoTask(AsyncCallback callback, Object state)
   {
      
// Create IAsyncResult object identifying the 
      
// asynchronous operation
      AsyncResult<DateTime> ar = new AsyncResult<DateTime>(
         callback, state);

      
// Use a thread pool thread to perform the operation
      ThreadPool.QueueUserWorkItem(DoTaskHelper, ar);

      
return ar;  // Return the IAsyncResult to the caller
   }

   
// Asynchronous version of time-consuming method (End part)
   public DateTime EndDoTask(IAsyncResult asyncResult)
   {
      
// We know that the IAsyncResult is really an 
      
// AsyncResult<DateTime> object
      AsyncResult<DateTime> ar = (AsyncResult<DateTime>)asyncResult;

      
// Wait for operation to complete, then return result or 
      
// throw exception
      return ar.EndInvoke();
   }

   
// Asynchronous version of time-consuming method (private part 
   
// to set completion result/exception)
   private void DoTaskHelper(Object asyncResult)
   {
      
// We know that it's really an AsyncResult<DateTime> object
      AsyncResult<DateTime> ar = (AsyncResult<DateTime>)asyncResult;
      
try
      {
         
// Perform the operation; if sucessful set the result
         DateTime dt = DoTask();
         ar.SetAsCompleted(dt, 
false);
      }
      
catch (Exception e)
      {
         
// If operation fails, set the exception
         ar.SetAsCompleted(e, false);
      }
   }
}
     当调用BeginDoTask的时候,先实例化一个AsyncResult对象,然后把DoTaskHelper方法加到线程池上,而DoTaskHelper方法只是对执行动作进行了封装,如果该方法成功执行,则返回时间数据,如果出现异常,则设定异常信息,用于EndDoTask的时候抛出。

测试和性能
1、测试LongTask

private static void FunctionalTest()
{
  IAsyncResult ar;
  LongTask lt 
= new LongTask(5);

  
// Prove that the Wait-until-done technique works
  ar = lt.BeginDoTask(nullnull);
  Console.WriteLine(
"Task completed at: {0}", lt.EndDoTask(ar));

  
// Prove that the Polling technique works
  ar = lt.BeginDoTask(nullnull);
  
while (!ar.IsCompleted)
  {
     Console.WriteLine(
"Not completed yet.");
     Thread.Sleep(
1000);
  }
  Console.WriteLine(
"Task completed at: {0}", lt.EndDoTask(ar));

  
// Prove that the Callback technique works
  lt.BeginDoTask(TaskCompleted, lt);
  Console.ReadLine();
}

private static void TaskCompleted(IAsyncResult ar)
{
  LongTask lt 
= (LongTask)ar.AsyncState;
  Console.WriteLine(
"Task completed at: {0}", lt.EndDoTask(ar));
  Console.WriteLine(
"All done, hit Enter to exit app.");
}
执行结果如下:


2、比较自定义APM和delegate之间的性能

private const Int32 c_iterations = 100 * 1000// 100 thousand
private static Int32 s_numDone;
private delegate DateTime DoTaskDelegate();

private static void PerformanceTest()
{
   AutoResetEvent are 
= new AutoResetEvent(false);
   LongTask lt 
= new LongTask(0);

   Stopwatch sw;

   s_numDone 
= 0;
   sw 
= Stopwatch.StartNew();
   
for (Int32 n = 0; n < c_iterations; n++)
   {
      lt.BeginDoTask(
delegate(IAsyncResult ar)
      {
         
if (Interlocked.Increment(ref s_numDone) == c_iterations)
            are.Set();
      }, 
null);
   }
   are.WaitOne();
   Console.WriteLine(
"AsyncResult Time: {0}", sw.Elapsed);
   s_numDone 
= 0;
   DoTaskDelegate doTaskDelegate 
= lt.DoTask;

   sw 
= Stopwatch.StartNew();
   
for (Int32 n = 0; n < c_iterations; n++)
   {
      doTaskDelegate.BeginInvoke(
delegate(IAsyncResult ar)
      {
         
if (Interlocked.Increment(ref s_numDone) == c_iterations)
            are.Set();
      }, 
null);
   }
   are.WaitOne();
   Console.WriteLine(
"Delegate    Time: {0}", sw.Elapsed);
}
执行结果如下:

     从结果上看,既然自定义的APM性能要比FCL的APM好点,其实在这里大家仔细看下就可以清楚:
     区别就在于ManualResetEvent,FCL的APM是在创建IAsyncResult对象的同时,也创建了ManualResetEvent。而AsyncResultNoResult中,可以看下面的代码,只有在真正需要用到ManualResetEvent的时候,才去创建,一般不实例化。有个注意的地方:ManualResetEvent是内核对象,而创建或使用一个内核对象都是很耗资源的,所以只有当真正需要的时候才去使用。
public WaitHandle AsyncWaitHandle
   {
      
get
      {
         
if (m_AsyncWaitHandle == null)
         {
            Boolean done 
= IsCompleted;
            ManualResetEvent mre 
= new ManualResetEvent(done);
            
if (Interlocked.CompareExchange(ref m_AsyncWaitHandle, 
               mre, 
null!= null)
            {
               mre.Close();
            }
            
else
            {
               
if (!done && IsCompleted)
               {
                  m_AsyncWaitHandle.Set();
               }
            }
         }
         
return m_AsyncWaitHandle;
      }


    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多