分享

关于C#线程,线程池和并行运算的简单使用和对比

 scholes_goal 2011-03-24

关于C#线程,线程池和并行运算的简单使用和对比

前言:看了书上两个使用C#4.0并行编程的demo,又对照以前收藏的网上几篇讲述线程池的雄文,一并整理,写个示例总结一下。写这篇文章的时候,发现关于线程的好几个基础的重要的知识点自己都不熟悉,而且可能习惯性认知浅薄,所以痛苦的无以复加,不知道到底要说什么。不想看文章的可以直接下载最后的示例,本文代码主要参考Marc Clifton的“.NET's ThreadPool Class - Behind The Scenes”,对新手也许有帮助。

参考:

http://msdn.microsoft.com/zh-cn/library/system.threading.threadpool(VS.80).aspx

http://www./KB/threads/threadtests.aspx

http://www./KB/threads/smartthreadpool.aspx

http://blog./2009/07/thread-pool-1-the-goal-and-the-clr-thread-pool.html  (老赵的浅谈线程池上中下三篇)

Jeffrey Richter <<CLR via C#>> 3rd Edition

 

先大概看一下控制台应用程序的Main方法的主要代码:

 
001 static bool done = false;
002 static decimal count2 = 0;
003 static int threadDone = 0;//标志启用线程数?
004 static System.Timers.Timer timer = new System.Timers.Timer(1000);
005  
006 static decimal[] threadPoolCounters = new decimal[10];
007 static Thread[] threads = new Thread[10];
008 static System.Timers.Timer[] threadTimers = new System.Timers.Timer[10];
009  
010 static void Main(string[] args)
011 {
012     timer.Stop();
013     /*当 AutoReset 设置为 false 时,Timer 只在第一个 Interval 过后引发一次 Elapsed 事件。
014      若要保持以 Interval 时间间隔引发 Elapsed 事件,请将 AutoReset 设置为 true。*/
015     timer.AutoReset = false;
016     timer.Elapsed += new ElapsedEventHandler(OnTimerEvent);//当timer.Start()时,触发事件
017     decimal total = 0;
018  
019     // raw test
020     decimal count1 = SingleThreadTest();//单一线程,一跑到底
021     Console.WriteLine("Single thread count = " + count1.ToString());
022  
023     // create one thread, increment counter, destroy thread, repeat
024     Console.WriteLine();
025     CreateAndDestroyTest();//创建一个线程,运算,然后销毁该线程 重复前面的动作
026     Console.WriteLine("Create and destroy per count = " + count2.ToString());
027  
028     // Create 10 threads and run them simultaneously
029     //一次性创建10个线程,然后遍历使线程执行运算
030     Console.WriteLine();
031     InitThreadPoolCounters();
032     InitThreads();
033     StartThreads();
034     while (threadDone != 10) { };
035     Console.WriteLine("10 simultaneous threads:");
036     for (int i = 0; i < 10; i++)
037     {
038         Console.WriteLine("T" + i.ToString() + " = " + threadPoolCounters[i].ToString() + "   ");
039         total += threadPoolCounters[i];
040     }
041     Console.WriteLine("Total = " + total.ToString());
042     Console.WriteLine();
043  
044     Console.WriteLine("///////////////////////////////////////////////////");
045  
046     // using ThreadPool
047     //直接通过线程池的QueueUserWorkItem方法,按队列执行10个任务
048     Console.WriteLine();
049     Console.WriteLine("ThreadPool:");
050     InitThreadPoolCounters();
051     QueueThreadPoolThreads();
052     while (threadDone != 10) { };
053     Console.WriteLine("ThreadPool: 10 simultaneous threads:");
054     total = 0;
055     for (int i = 0; i < 10; i++)
056     {
057         //              threadTimers[i].Stop();
058         //              threadTimers[i].Dispose();
059         Console.WriteLine("T" + i.ToString() + " = " + threadPoolCounters[i].ToString() + "   ");
060         total += threadPoolCounters[i];
061     }
062     Console.WriteLine("Total = " + total.ToString());
063  
064     // using SmartThreadPool
065     //通过Amir Bar的SmartThreadPool线程池,利用QueueUserWorkItem方法,按队列执行10个任务
066     Console.WriteLine();
067     Console.WriteLine("SmartThreadPool:");
068     InitThreadPoolCounters();
069     QueueSmartThreadPoolThreads();
070     while (threadDone != 10) { };
071     Console.WriteLine("SmartThreadPool: 10 simultaneous threads:");
072     total = 0;
073     for (int i = 0; i < 10; i++)
074     {
075         Console.WriteLine("T" + i.ToString() + " = " + threadPoolCounters[i].ToString() + "   ");
076         total += threadPoolCounters[i];
077     }
078     Console.WriteLine("Total = " + total.ToString());
079  
080     // using ManagedThreadPool
081     //通过Stephen Toub改进后的线程池,利用QueueUserWorkItem方法,按队列执行10个任务
082     Console.WriteLine();
083     Console.WriteLine("ManagedThreadPool:");
084     InitThreadPoolCounters();
085     QueueManagedThreadPoolThreads();
086     while (threadDone != 10) { };
087     Console.WriteLine("ManagedThreadPool: 10 simultaneous threads:");
088     total = 0;
089     for (int i = 0; i < 10; i++)
090     {
091         Console.WriteLine("T" + i.ToString() + " = " + threadPoolCounters[i].ToString() + "   ");
092         total += threadPoolCounters[i];
093     }
094     Console.WriteLine("Total = " + total.ToString());
095  
096     // using C#4.0 Parallel
097     //通过Tasks.Parallel.For进行并行运算
098     Console.WriteLine();
099     Console.WriteLine("Parallel:");
100     InitThreadPoolCounters();
101     UseParallelTasks();
102     while (threadDone != 10) { };
103     Console.WriteLine("Parallel: 10 simultaneous threads:");
104     total = 0;
105     for (int i = 0; i < 10; i++)
106     {
107         Console.WriteLine("T" + i.ToString() + " = " + threadPoolCounters[i].ToString() + "   ");
108         total += threadPoolCounters[i];
109     }
110     Console.WriteLine("Total = " + total.ToString());
111 }

我们可以先熟悉一下大致思路。代码中,我们主要依靠输出的数字count或者total来判断哪个方法执行效率更高(原文是How Hign Can I Count?),通常输出的数字越大,我们就认为它”干的活越多“,效率越高。主要实现过程就是通过一个静态的System.Timers.Timer对象的timer实例,设置它的Interval属性和ElapsedEventHandler事件:

 
1         static System.Timers.Timer timer = new System.Timers.Timer(1000);
2 /*当 AutoReset 设置为 false 时,Timer 只在第一个 Interval 过后引发一次 Elapsed 事件。
3              若要保持以 Interval 时间间隔引发 Elapsed 事件,请将 AutoReset 设置为 true。*/
4 timer.AutoReset = false;
5 timer.Elapsed += new ElapsedEventHandler(OnTimerEvent);//当timer.Start()时,触发事件

其中,timer的事件触发的函数:

 
1 static void OnTimerEvent(object src, ElapsedEventArgs e)
2    {
3        done = true;
4    }

每次timer.Start执行的时候,一次测试就将开始,这样可以确保测试的不同方法都在1000毫秒内跑完。

下面开始具体介绍几个方法:

A、线程

这个非常简单,就是通过主线程计算在1000毫秒内,count从0递增加到了多少:

 
01 /// <summary>
02 /// 单一线程,一跑到底
03 /// </summary>
04 /// <returns></returns>
05 static decimal SingleThreadTest()
06 {
07     done = false;
08     decimal counter = 0;
09     timer.Start();
10     while (!done)
11     {
12         ++counter;
13     }
14     return counter;
15 }

while判断可以保证方法在1000毫秒内执行完成。

 

B、多线程

这个多线程方法比较折腾,先创建线程,然后运行,最后销毁线程,这就是一个线程执行单元,重复10次这个线程执行单元。

 
01 /// <summary>
02   /// 创建一个线程,运算,然后销毁该线程 重复前面的动作
03   /// </summary>
04   static void CreateAndDestroyTest()
05   {
06       done = false;
07       timer.Start();
08       while (!done)
09       {
10           Thread counterThread = new Thread(new ThreadStart(Count1Thread));
11           counterThread.IsBackground = true;//后台线程
12           counterThread.Start();
13           while (counterThread.IsAlive) { };
14       }
15   }

那个ThreadStart委托对应的方法Count1Thread如下:

 
1 static void Count1Thread()
2 {
3     ++count2; //静态字段count2自增
4 }

从表面上看,大家估计都可以猜到,效果可能不佳。

 

C、还是多线程

这个方法不判断线程的执行状态,不用等到一个线程销毁后再创建一个线程,然后执行线程方法。线程执行的方法就是根据线程的Name找到一个指定数组的某一索引,并累加改变数组的值:

 
01 /// <summary>
02     /// 将数组和线程数标志threadDone回到初始状态
03     /// </summary>
04     static void InitThreadPoolCounters()
05     {
06         threadDone = 0;
07         for (int i = 0; i < 10; i++)
08         {
09             threadPoolCounters[i] = 0;
10         }
11     }
12  
13     /// <summary>
14     /// 初始化10个线程
15     /// </summary>
16     static void InitThreads()
17     {
18         for (int i = 0; i < 10; i++)
19         {
20             threads[i] = new Thread(new ThreadStart(Count2Thread));
21             threads[i].IsBackground = true;
22             threads[i].Name = i.ToString();//将当前线程的Name赋值为数组索引,在Count2Thread方法中获取对应数组
23         }
24     }
25  
26     /// <summary>
27     /// 开始多线程运算
28     /// </summary>
29     static void StartThreads()
30     {
31         done = false;
32         timer.Start();
33         for (int i = 0; i < 10; i++)
34         {
35             threads[i].Start();
36         }
37     }

其中,每一个线程需要执行的委托方法

 
1 static void Count2Thread()
2 {
3     int n = Convert.ToInt32(Thread.CurrentThread.Name);//取数组索引
4     while (!done)
5     {
6         ++threadPoolCounters[n];
7     }
8     Interlocked.Increment(ref threadDone);//以原子操作的形式保证threadDone递增
9 }

在测试过程中,我们看代码:

 
01 // Create 10 threads and run them simultaneously
02       //一次性创建10个线程,然后遍历使线程执行运算
03       Console.WriteLine();
04       InitThreadPoolCounters();
05       InitThreads();
06       StartThreads();
07       while (threadDone != 10) { };
08       Console.WriteLine("10 simultaneous threads:");
09       for (int i = 0; i < 10; i++)
10       {
11           Console.WriteLine("T" + i.ToString() + " = " + threadPoolCounters[i].ToString() + "   ");
12           total += threadPoolCounters[i];
13       }
14       Console.WriteLine("Total = " + total.ToString());
15       Console.WriteLine();

最后算出这个数组的所有元素的总和,就是这10个线程在1000毫秒内所做的事情。其中, while (threadDone != 10) { };这个判断非常重要。这个方法看上去没心没肺,线程创建好就不管它的死活了(还是管活不管死?),所以效率应该不低。

实际上,我在本地测试并看了一下输出,表面看来,按count大小逆序排列:C>A>B,这就说明多线程并不一定比单线程运行效率高。其实B之所以效率不佳,主要是由于这个方法大部分的”精力“花在线程的执行状态和销毁处理上。

注意,其实C和A、B都没有可比性,因为C计算的是数组的总和,而A和B只是简单的对一个数字进行自加。

ps:C这一块说的没有中心,想到哪写到哪,所以看起来写得很乱,如果看到这里您还觉着不知所云,建议先下载最后的demo,先看代码,再对照这篇文章。

好了,到这里,我们对线程的创建和使用应该有了初步的了解。细心的人可能会发现,我们new一个Thread,然后给线程实例设置属性,比如是否后台线程等等,其实这部分工作可以交给下面介绍的线程池ThreadPool来做(D、E和F主要介绍线程池)。

 

D、线程池ThreadPool

在实际的项目中大家可能使用最多最熟悉的就是这个类了,所以没什么可说的:

 
01 /// <summary>
02 /// ThreadPool测试
03 /// </summary>
04 static void QueueThreadPoolThreads()
05 {
06     done = false;
07     for (int i = 0; i < 10; i++)
08     {
09         ThreadPool.QueueUserWorkItem(new WaitCallback(Count3Thread), i);
10     }
11  
12     timer.Start();
13 }
14  
15 static void Count3Thread(object state)
16 {
17     int n = (int)state;
18     while (!done)
19     {
20         ++threadPoolCounters[n];
21     }
22     Interlocked.Increment(ref threadDone);
23 }

我们知道线程池里的线程默认都是后台线程,所以它实际上简化了线程的属性设置,更方便异步编程。

需要说明的是,线程池使用过程中会有这样那样的缺陷(虽然本文的几个线程池任务都不会受这种缺陷影响)。比如,我们一次性向线程池中加入100个任务,但是当前的系统可能只支持25个线程,并且每个线程正处于”忙碌“状态,如果一次性加入池中系统会处理不过来,那么多余的任务必须等待,这就造成等待的时间过长,系统无法响应。还好,ThreadPool提供了GetAvailableThreads方法,可以让你知道当前可用的工作线程数量。

 
01 static void QueueThreadPoolThreads()
02 {
03     done = false;
04     for (int i = 0; i < 10; i++)
05     {
06         //ThreadPool.QueueUserWorkItem(new WaitCallback(Count3Thread), i); //直接给程序池添加任务有时是很草率的
07  
08         WaitCallback wcb = new WaitCallback(Count3Thread);
09         int workerThreads, availabeThreads;
10         ThreadPool.GetAvailableThreads(out workerThreads, out availabeThreads);
11         if (workerThreads > 0)//可用线程数>0
12         {
13             ThreadPool.QueueUserWorkItem(wcb, i);
14         }
15         else
16         {
17             //to do 可以采取一种策略,让这个任务合理地分配给线程
18         }
19     }

如果没有可用的工作线程数,必须设计一定的策略,让这个任务合理地分配给线程。

也许就是类似于上面那样的限制,很多开发者都自己创建自己的线程池,同时也就有了后面的SmartThreadPoolManagedThreadPool大展身手的机会。

 

E、线程池SmartThreadPool

大名鼎鼎的SmartThreadPool,但是我从来没在项目中使用过,所以只是找了一段简单的代码测试一下:

 
01 /// <summary>
02  /// SmartThreadPool测试
03  /// </summary>
04  static void QueueSmartThreadPoolThreads()
05  {
06      SmartThreadPool smartThreadPool = new SmartThreadPool();
07      // Create a work items group that processes 
08      // one work item at a time
09      IWorkItemsGroup wig = smartThreadPool.CreateWorkItemsGroup(1);
10  
11      done = false;
12      timer.Start();
13      for (int i = 0; i < 10; i++)
14      {
15          wig.QueueWorkItem(new WorkItemCallback(Count4Thread), i);
16      }
17      // Wait for the completion of all work items in the work items group
18      wig.WaitForIdle();
19      smartThreadPool.Shutdown();
20  }
21  
22 static object Count4Thread(object state)
23  {
24      int n = (int)state;
25      while (!done)
26      {
27          ++threadPoolCounters[n];
28      }
29      Interlocked.Increment(ref threadDone);
30      return null;
31  }

自从收藏这个SmartThreadPool.dll后,我还从没有在项目中使用过。查看它的源码注释挺少也挺乱的,不知道有没有高人知道它的一个效率更好的方法。您也可以看看英文原文,自己尝试体验一下。如果您熟悉使用SmartThreadPool,欢迎讨论。

 

F、线程池ManagedThreadPool

Stephen Toub这个完全用C#托管代码实现的线程池也非常有名,在Marc Clifton的英文原文中,作者也不吝溢美之词,赞它“quite excellent”,用当前异军突起的一个词汇形容就是太给力了,于我心有戚戚焉:

01 /// <summary>
02    /// ManagedThreadPool测试
03    /// </summary>
04    static void QueueManagedThreadPoolThreads()
05    {
06        done = false;
07        timer.Start();
08        for (int i = 0; i < 10; i++)
09        {
10            Toub.Threading.ManagedThreadPool.QueueUserWorkItem(new WaitCallback(Count5Thread), i);
11        }
12    }
13    static void Count5Thread(object state)
14    {
15        int n = (int)state;
16        while (!done)
17        {
18            ++threadPoolCounters[n];
19        }
20        Interlocked.Increment(ref threadDone);
21    }

 

对于这个托管的线程池,我个人的理解,就是它在管理线程的时候,这个池里还有一个缓存线程的池,即一个ArrayList对象。它一开始就初始化了一定数量的线程,并通过ProcessQueuedItems方法保证异步执行进入池中的队列任务(那个死循环有时可能导致CPU过分忙碌),这样在分配异步任务的时候,就省去了频繁去创建(new)一个线程。同时它在实现信号量(Semaphore)的同步和线程出入队列的设计上都可圈可点,非常巧妙,强烈推荐您阅读它的源码。

 

G、并行运算

下面的示例,我只使用了简单的System.Threading.Tasks.Parallel.For 对应的for 循环的并行运算:

 
01 /// <summary>
02 /// 并行运算测试
03 /// </summary>
04 static void UseParallelTasks()
05 {
06     done = false;
07     timer.Start();
08     // System.Threading.Tasks.Parallel.For - for 循环的并行运算
09     System.Threading.Tasks.Parallel.For(0, 10, (i) => { Count6Thread(i); });
10 }
11 static void Count6Thread(object state)
12 {
13     int n = (int)state;
14     while (!done)
15     {
16         ++threadPoolCounters[n];
17     }
18     Interlocked.Increment(ref threadDone);
19 }

没有什么要特殊说明的,就是新类库的使用。看代码,好像比使用线程或线程池更加简单直接,有机会争取多用一用。我在本地测试的时候,在Release版本下,按照count的大小逆序排列,总体上G>D>F>E。需要注意到一件事,就是SmartThreadPool中排入队列的任务是一个返回值为Object的委托类型,这和其他的几个没有返回的(void类型)不同。SmartThreadPool口碑还是不错的,也许是我没有正确使用它。

 

最后小结一下:本文主要列举了C#中我所知道的几种常见的异步处理的方法,欢迎大家纠错或补充。

 

示例下载:demo

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多