前言:看了书上两个使用C#4.0并行编程的demo,又对照以前收藏的网上几篇讲述线程池的雄文,一并整理,写个示例总结一下。写这篇文章的时候,发现关于线程的好几个基础的重要的知识点自己都不熟悉,而且可能习惯性认知浅薄,所以痛苦的无以复加,不知道到底要说什么。不想看文章的可以直接下载最后的示例,本文代码主要参考Marc Clifton的“.NET's ThreadPool Class - Behind The Scenes”,对新手也许有帮助。
参考:
http://msdn.microsoft.com/zh-cn/library/system.threading.threadpool(VS.80).aspx
http://www./KB/threads/threadtests.aspx
http://www./KB/threads/smartthreadpool.aspx
http://blog./2009/07/thread-pool-1-the-goal-and-the-clr-thread-pool.html (老赵的浅谈线程池上中下三篇)
Jeffrey Richter <<CLR via C#>> 3rd Edition
先大概看一下控制台应用程序的Main方法的主要代码:
001 |
static bool done = false ; |
002 |
static decimal count2 = 0; |
003 |
static int threadDone = 0; |
004 |
static System.Timers.Timer timer = new System.Timers.Timer(1000); |
006 |
static decimal [] threadPoolCounters = new decimal [10]; |
007 |
static Thread[] threads = new Thread[10]; |
008 |
static System.Timers.Timer[] threadTimers = new System.Timers.Timer[10]; |
010 |
static void Main( string [] args) |
015 |
timer.AutoReset = false ; |
016 |
timer.Elapsed += new ElapsedEventHandler(OnTimerEvent); |
020 |
decimal count1 = SingleThreadTest(); |
021 |
Console.WriteLine( "Single thread count = " + count1.ToString()); |
025 |
CreateAndDestroyTest(); |
026 |
Console.WriteLine( "Create and destroy per count = " + count2.ToString()); |
031 |
InitThreadPoolCounters(); |
034 |
while (threadDone != 10) { }; |
035 |
Console.WriteLine( "10 simultaneous threads:" ); |
036 |
for ( int i = 0; i < 10; i++) |
038 |
Console.WriteLine( "T" + i.ToString() + " = " + threadPoolCounters[i].ToString() + " " ); |
039 |
total += threadPoolCounters[i]; |
041 |
Console.WriteLine( "Total = " + total.ToString()); |
044 |
Console.WriteLine( "///////////////////////////////////////////////////" ); |
049 |
Console.WriteLine( "ThreadPool:" ); |
050 |
InitThreadPoolCounters(); |
051 |
QueueThreadPoolThreads(); |
052 |
while (threadDone != 10) { }; |
053 |
Console.WriteLine( "ThreadPool: 10 simultaneous threads:" ); |
055 |
for ( int i = 0; i < 10; i++) |
059 |
Console.WriteLine( "T" + i.ToString() + " = " + threadPoolCounters[i].ToString() + " " ); |
060 |
total += threadPoolCounters[i]; |
062 |
Console.WriteLine( "Total = " + total.ToString()); |
067 |
Console.WriteLine( "SmartThreadPool:" ); |
068 |
InitThreadPoolCounters(); |
069 |
QueueSmartThreadPoolThreads(); |
070 |
while (threadDone != 10) { }; |
071 |
Console.WriteLine( "SmartThreadPool: 10 simultaneous threads:" ); |
073 |
for ( int i = 0; i < 10; i++) |
075 |
Console.WriteLine( "T" + i.ToString() + " = " + threadPoolCounters[i].ToString() + " " ); |
076 |
total += threadPoolCounters[i]; |
078 |
Console.WriteLine( "Total = " + total.ToString()); |
083 |
Console.WriteLine( "ManagedThreadPool:" ); |
084 |
InitThreadPoolCounters(); |
085 |
QueueManagedThreadPoolThreads(); |
086 |
while (threadDone != 10) { }; |
087 |
Console.WriteLine( "ManagedThreadPool: 10 simultaneous threads:" ); |
089 |
for ( int i = 0; i < 10; i++) |
091 |
Console.WriteLine( "T" + i.ToString() + " = " + threadPoolCounters[i].ToString() + " " ); |
092 |
total += threadPoolCounters[i]; |
094 |
Console.WriteLine( "Total = " + total.ToString()); |
099 |
Console.WriteLine( "Parallel:" ); |
100 |
InitThreadPoolCounters(); |
102 |
while (threadDone != 10) { }; |
103 |
Console.WriteLine( "Parallel: 10 simultaneous threads:" ); |
105 |
for ( int i = 0; i < 10; i++) |
107 |
Console.WriteLine( "T" + i.ToString() + " = " + threadPoolCounters[i].ToString() + " " ); |
108 |
total += threadPoolCounters[i]; |
110 |
Console.WriteLine( "Total = " + total.ToString()); |
我们可以先熟悉一下大致思路。代码中,我们主要依靠输出的数字count或者total来判断哪个方法执行效率更高(原文是How Hign Can I Count?),通常输出的数字越大,我们就认为它”干的活越多“,效率越高。主要实现过程就是通过一个静态的System.Timers.Timer对象的timer实例,设置它的Interval属性和ElapsedEventHandler事件:
1 |
static System.Timers.Timer timer = new System.Timers.Timer(1000); |
4 |
timer.AutoReset = false ; |
5 |
timer.Elapsed += new ElapsedEventHandler(OnTimerEvent); |
其中,timer的事件触发的函数:
1 |
static void OnTimerEvent( object src, ElapsedEventArgs e) |
每次timer.Start执行的时候,一次测试就将开始,这样可以确保测试的不同方法都在1000毫秒内跑完。
下面开始具体介绍几个方法:
A、线程
这个非常简单,就是通过主线程计算在1000毫秒内,count从0递增加到了多少:
04 |
/// <returns></returns> |
05 |
static decimal SingleThreadTest() |
while判断可以保证方法在1000毫秒内执行完成。
B、多线程
这个多线程方法比较折腾,先创建线程,然后运行,最后销毁线程,这就是一个线程执行单元,重复10次这个线程执行单元。
02 |
/// 创建一个线程,运算,然后销毁该线程 重复前面的动作 |
04 |
static void CreateAndDestroyTest() |
10 |
Thread counterThread = new Thread( new ThreadStart(Count1Thread)); |
11 |
counterThread.IsBackground = true ; |
12 |
counterThread.Start(); |
13 |
while (counterThread.IsAlive) { }; |
那个ThreadStart委托对应的方法Count1Thread如下:
1 |
static void Count1Thread() |
从表面上看,大家估计都可以猜到,效果可能不佳。
C、还是多线程
这个方法不判断线程的执行状态,不用等到一个线程销毁后再创建一个线程,然后执行线程方法。线程执行的方法就是根据线程的Name找到一个指定数组的某一索引,并累加改变数组的值:
02 |
/// 将数组和线程数标志threadDone回到初始状态 |
04 |
static void InitThreadPoolCounters() |
07 |
for ( int i = 0; i < 10; i++) |
09 |
threadPoolCounters[i] = 0; |
16 |
static void InitThreads() |
18 |
for ( int i = 0; i < 10; i++) |
20 |
threads[i] = new Thread( new ThreadStart(Count2Thread)); |
21 |
threads[i].IsBackground = true ; |
22 |
threads[i].Name = i.ToString(); |
29 |
static void StartThreads() |
33 |
for ( int i = 0; i < 10; i++) |
其中,每一个线程需要执行的委托方法
1 |
static void Count2Thread() |
3 |
int n = Convert.ToInt32(Thread.CurrentThread.Name); |
6 |
++threadPoolCounters[n]; |
8 |
Interlocked.Increment( ref threadDone); |
在测试过程中,我们看代码:
04 |
InitThreadPoolCounters(); |
07 |
while (threadDone != 10) { }; |
08 |
Console.WriteLine( "10 simultaneous threads:" ); |
09 |
for ( int i = 0; i < 10; i++) |
11 |
Console.WriteLine( "T" + i.ToString() + " = " + threadPoolCounters[i].ToString() + " " ); |
12 |
total += threadPoolCounters[i]; |
14 |
Console.WriteLine( "Total = " + total.ToString()); |
最后算出这个数组的所有元素的总和,就是这10个线程在1000毫秒内所做的事情。其中, while (threadDone != 10) { };这个判断非常重要。这个方法看上去没心没肺,线程创建好就不管它的死活了(还是管活不管死?),所以效率应该不低。
实际上,我在本地测试并看了一下输出,表面看来,按count大小逆序排列:C>A>B,这就说明多线程并不一定比单线程运行效率高。其实B之所以效率不佳,主要是由于这个方法大部分的”精力“花在线程的执行状态和销毁处理上。
注意,其实C和A、B都没有可比性,因为C计算的是数组的总和,而A和B只是简单的对一个数字进行自加。
ps:C这一块说的没有中心,想到哪写到哪,所以看起来写得很乱,如果看到这里您还觉着不知所云,建议先下载最后的demo,先看代码,再对照这篇文章。
好了,到这里,我们对线程的创建和使用应该有了初步的了解。细心的人可能会发现,我们new一个Thread,然后给线程实例设置属性,比如是否后台线程等等,其实这部分工作可以交给下面介绍的线程池ThreadPool来做(D、E和F主要介绍线程池)。
D、线程池ThreadPool
在实际的项目中大家可能使用最多最熟悉的就是这个类了,所以没什么可说的:
04 |
static void QueueThreadPoolThreads() |
07 |
for ( int i = 0; i < 10; i++) |
09 |
ThreadPool.QueueUserWorkItem( new WaitCallback(Count3Thread), i); |
15 |
static void Count3Thread( object state) |
20 |
++threadPoolCounters[n]; |
22 |
Interlocked.Increment( ref threadDone); |
我们知道线程池里的线程默认都是后台线程,所以它实际上简化了线程的属性设置,更方便异步编程。
需要说明的是,线程池使用过程中会有这样那样的缺陷(虽然本文的几个线程池任务都不会受这种缺陷影响)。比如,我们一次性向线程池中加入100个任务,但是当前的系统可能只支持25个线程,并且每个线程正处于”忙碌“状态,如果一次性加入池中系统会处理不过来,那么多余的任务必须等待,这就造成等待的时间过长,系统无法响应。还好,ThreadPool提供了GetAvailableThreads方法,可以让你知道当前可用的工作线程数量。
01 |
static void QueueThreadPoolThreads() |
04 |
for ( int i = 0; i < 10; i++) |
08 |
WaitCallback wcb = new WaitCallback(Count3Thread); |
09 |
int workerThreads, availabeThreads; |
10 |
ThreadPool.GetAvailableThreads( out workerThreads, out availabeThreads); |
11 |
if (workerThreads > 0) |
13 |
ThreadPool.QueueUserWorkItem(wcb, i); |
如果没有可用的工作线程数,必须设计一定的策略,让这个任务合理地分配给线程。
也许就是类似于上面那样的限制,很多开发者都自己创建自己的线程池,同时也就有了后面的SmartThreadPool和ManagedThreadPool大展身手的机会。
E、线程池SmartThreadPool
大名鼎鼎的SmartThreadPool,但是我从来没在项目中使用过,所以只是找了一段简单的代码测试一下:
04 |
static void QueueSmartThreadPoolThreads() |
06 |
SmartThreadPool smartThreadPool = new SmartThreadPool(); |
09 |
IWorkItemsGroup wig = smartThreadPool.CreateWorkItemsGroup(1); |
13 |
for ( int i = 0; i < 10; i++) |
15 |
wig.QueueWorkItem( new WorkItemCallback(Count4Thread), i); |
19 |
smartThreadPool.Shutdown(); |
22 |
static object Count4Thread( object state) |
27 |
++threadPoolCounters[n]; |
29 |
Interlocked.Increment( ref threadDone); |
自从收藏这个SmartThreadPool.dll后,我还从没有在项目中使用过。查看它的源码注释挺少也挺乱的,不知道有没有高人知道它的一个效率更好的方法。您也可以看看英文原文,自己尝试体验一下。如果您熟悉使用SmartThreadPool,欢迎讨论。
F、线程池ManagedThreadPool
Stephen Toub这个完全用C#托管代码实现的线程池也非常有名,在Marc Clifton的英文原文中,作者也不吝溢美之词,赞它“quite excellent”,用当前异军突起的一个词汇形容就是太给力了,于我心有戚戚焉:
02 |
/// ManagedThreadPool测试 |
04 |
static void QueueManagedThreadPoolThreads() |
08 |
for ( int i = 0; i < 10; i++) |
10 |
Toub.Threading.ManagedThreadPool.QueueUserWorkItem( new WaitCallback(Count5Thread), i); |
13 |
static void Count5Thread( object state) |
18 |
++threadPoolCounters[n]; |
20 |
Interlocked.Increment( ref threadDone); |
对于这个托管的线程池,我个人的理解,就是它在管理线程的时候,这个池里还有一个缓存线程的池,即一个ArrayList对象。它一开始就初始化了一定数量的线程,并通过ProcessQueuedItems方法保证异步执行进入池中的队列任务(那个死循环有时可能导致CPU过分忙碌),这样在分配异步任务的时候,就省去了频繁去创建(new)一个线程。同时它在实现信号量(Semaphore)的同步和线程出入队列的设计上都可圈可点,非常巧妙,强烈推荐您阅读它的源码。
G、并行运算
下面的示例,我只使用了简单的System.Threading.Tasks.Parallel.For 对应的for 循环的并行运算:
04 |
static void UseParallelTasks() |
08 |
// System.Threading.Tasks.Parallel.For - for 循环的并行运算 |
09 |
System.Threading.Tasks.Parallel.For(0, 10, (i) => { Count6Thread(i); }); |
11 |
static void Count6Thread(object state) |
16 |
++threadPoolCounters[n]; |
18 |
Interlocked.Increment(ref threadDone); |
没有什么要特殊说明的,就是新类库的使用。看代码,好像比使用线程或线程池更加简单直接,有机会争取多用一用。我在本地测试的时候,在Release版本下,按照count的大小逆序排列,总体上G>D>F>E。需要注意到一件事,就是SmartThreadPool中排入队列的任务是一个返回值为Object的委托类型,这和其他的几个没有返回的(void类型)不同。SmartThreadPool口碑还是不错的,也许是我没有正确使用它。
最后小结一下:本文主要列举了C#中我所知道的几种常见的异步处理的方法,欢迎大家纠错或补充。
示例下载:demo