分享

ddsdcx

 昵称9574 2006-07-04

13: 并发编程

面向对象使我们能将程序划分成相互独立的模块。但是你时常还会碰到,不但要把程序分解开来,而且还要让它的各个部分都能独立运行的问题。

这种能独立运行的子任务就是线程(thread)。编程的时候,你可以认为线程都是能独立运行的,有自己CPU的子任务。实际上,是一些底层机制在为你分割CPU的时间,只是你不知道罢了。这种做法能简化多线程的编程。

进程(process)是一种有专属地址空间的"自含式(self-contained)"程序。通过在不同的任务之间定时切换CPU,多任务(multitasking)操作系统营造出一种同一个时点可以有多个进程(程序)在同时运行的效果。线程是进程内部的独立的,有序的指令流。由此,一个进程能包含多个并发执行的线程。

多线程的用途很广,但归纳起来不外乎,程序的某一部分正在等一个事件或资源,而你又不想让它把整个程序都给阻塞了。因此你可以创建一个与该事件或资源相关的线程,让它与主程序分开来运行。

学习并发编程就像是去探访一个新的世界,同时学习一种新的编程语言,最起码也得接受一套新的理念。随着绝大多数的微电脑操作系统提供了多线程支持,编程语言和类库也做了相应的扩展。总而言之,多线程编程:

  1. 看上去不但神秘,而且还要求你改变编程的观念
  2. 各种语言对多线程的支持大同小异,所以理解线程就等于掌握了一种通用语言

虽然支持多线程会让Java变得更复杂,但这并不全是Java的错——不是Java无能,而是线程太狡猾了。

理解并发编程的难度不亚于理解多态性。如果只想了解其最基本的运行机制,那么只要花一点工夫就行了,但是要想真正做到融会贯通,那就非得下苦功了。本章的宗旨是,让你能牢固地掌握并发编程方面的基础知识,写出较为合理的多线程程序。要提醒你,多线程看着容易其实很难,因此你很可能会过于自信了。所以如果你要写一些比较复杂的东西,务必先去研究一下这方面的专著。

动机

并发编程的一个最主要的用途就是创建反应灵敏的用户界面。试想有这么一个程序,由于要进行大量的CPU密集的运算,它完全忽略了用户输入,以致于变得非常迟钝了。要解决这种问题,关键在于,程序在进行运算的同时,还要时不时地将控制权交还给用户界面,这样才能对用户的操作做出及时的响应。假设有一个"quit"按钮,你总不会希望每写一段代码就做一次轮询的吧,你要的是"quit"能及时响应用户的操作,就像你在定时检查一样。

常规的方法是不可能在运行指令的同时还把控制权交给其他程序的。这听上去简直就是在天方夜谭,就好像CPU能同时出现在两个地方,但是多线程所营造的正是这个效果。

并发编程还能用来优化吞吐率。比方说,你可以一边在等数据,一边做其他事情。要是没有多线程,唯一可行的办法就是作轮询,但是这种做法不但笨拙而且很难。

如果是多处理器的系统,线程还会被分到多个处理器上,这样就能大大提高(指令的)吞吐率了。通常多处理器的Web服务器就是这样做的,它会为每个请求分配一个线程,这样就把多个请求分摊给各个CPU了。

有一点要记住,那就是多线程程序也必须能运行在单CPU系统上。因此,即便不用多线程也应该能写出相同的程序。但是多线程提供了一种非常重要的,程序结构方面的优势,因而能大大简化软件的设计。对于像仿真模拟之类的视频游戏,如果不用多线程,真不知道该怎么解决。

多线程模型大大简化了"让一个程序同时做几件事"这个难题。在多线程环境下,CPU会不断地在线程之间进行切换,给它们分配时间。在线程看来,它能随时获得CPU,但实际上CPU的时间已经被分配给了所有线程。这里有个例外,如果程序是运行在多CPU的系统上,那么线程就真有可能独占整个CPU了。但是多线程最值得称道的还是它的底层抽象,即代码无需知道它是运行在单CPU还是多CPU的系统上。由此,你可以用多线程来创建可透明扩展的程序——如果程序运行得太慢了,直接在机器上加CPU就行了。多任务与多线程是充分利用多处理器系统的好办法。

在单CPU系统里,线程会多少影响程序的运算效率,但是通算下来,它在程序结构,平衡资源以及用户的舒适度方面的优势还是远远超过了性能方面的损失。总之,多线程能令你设计出更为松散耦合(more loosely-coupled)的应用程序;否则,你就得多写很多代码,把那些本应交由多线程去处理的事情揽到自己头上了。

基本线程

要想创建线程,最简单的办法就是继承java.lang.Thread。这个类已经为线程的创建和运行做了必要的配置。run( )Thread最重要的方法,要想让线程替你办事,你就必须覆写这个方法。由此可知,run( )所包含的就是要和程序里其它线程"同时"执行的代码。

下面这段程序会创建五个线程,每个线程都包含一个由static变量生成的唯一标识符。我们覆写了Threadrun( )方法,让它每做一次循环就减一,并且在数到零的时候返回(什么时候run( )返回了,线程也就中止了)。

//: c13:SimpleThread.java
// Very simple Threading example.
import com.bruceeckel.simpletest.*;
public class SimpleThread extends Thread {
private static Test monitor = new Test();
private int countDown = 5;
private static int threadCount = 0;
public SimpleThread() {
super("" + ++threadCount); // Store the thread name
start();
}
public String toString() {
return "#" + getName() + ": " + countDown;
}
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
}
}
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new SimpleThread();
monitor.expect(new String[] {
"#1: 5",
"#2: 5",
"#3: 5",
"#5: 5",
"#1: 4",
"#4: 5",
"#2: 4",
"#3: 4",
"#5: 4",
"#1: 3",
"#4: 4",
"#2: 3",
"#3: 3",
"#5: 3",
"#1: 2",
"#4: 3",
"#2: 2",
"#3: 2",
"#5: 2",
"#1: 1",
"#4: 2",
"#2: 1",
"#3: 1",
"#5: 1",
"#4: 1"
}, Test.IGNORE_ORDER + Test.WAIT);
}
} ///:~

我们用Thread的构造函数给线程起名字。接着又在toString( )里面用getName( )把这个名字提取出来。

实际上Threadrun( )里面总是会有循环,这些循环会一直运行下去,直到线程结束。所以你必须设定条件打破循环(或者像上面那样,直接在run( )里面return)。通常run( )是个无限循环,也就是说,刨开那些会让run( )停下来的意外情况,线程会一直运行下去(本章的后面会讲怎样安全地通知线程停下来)。

main( )创建并启动了多个线程。Threadstart( )方法会先对线程做一些初始化,再调用run( )。所以整个步骤应该是:调用构造函数创建一个对象,并且在构造函数里面调用start( )来配置这个线程,然后让线程的执行机制去调用run( )。如果你不调用start( )(你也可以这么做,后面会举例的),那么线程永远也不会启动。

线程的调度机制是非决定性,因此程序每次运行的时候,输出都不一样。实际上,这么简单的一个程序,在不同版本的JDK里,输出也是天差地别。比方说早期的JDK不能把时间片分得很细,因此线程1会第一个结束,接着是线程2,线程3,等等。实际上这跟调用立即进入循环的子程序没什么两样,只是启动线程的代价更高了。但是在JDK 1.4里,你会发现程序输出会比较接近SimpleThread.java所给出的,这表示线程调度机制能更好地进行时间分片了——这样每个线程就都能经常得到服务了。通常Sun是不会告诉你这种JDK运行方式的变化的,所以别去指望线程的运行方式会始终如一。写多线程程序的时候,最好还是保守一些。

main( )创建了Thread,但是却没去拿它的reference。如果是普通对象,这一点就足以让它成为垃圾,但Thread不会。Thread都会为它自己"注册",所以实际上reference还保留在某个地方。除非run( )退出,线程中止,否则垃圾回收器不能动它。

Yielding

如果你知道run( )已经告一段落了,你就可以给线程调度机制作一个暗示,告诉它你干完了,可以让别的线程来使用CPU了。这个暗示(注意,只是暗示——无法保证你用的这个JVM会不会对此作出反映)是用yield( )形式给出的。

下面我们就用这个办法来修改程序:

//: c13:YieldingThread.java
// Suggesting when to switch threads with yield().
import com.bruceeckel.simpletest.*;
public class YieldingThread extends Thread {
private static Test monitor = new Test();
private int countDown = 5;
private static int threadCount = 0;
public YieldingThread() {
super("" + ++threadCount);
start();
}
public String toString() {
return "#" + getName() + ": " + countDown;
}
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
yield();
}
}
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new YieldingThread();
monitor.expect(new String[] {
"#1: 5",
"#2: 5",
"#4: 5",
"#5: 5",
"#3: 5",
"#1: 4",
"#2: 4",
"#4: 4",
"#5: 4",
"#3: 4",
"#1: 3",
"#2: 3",
"#4: 3",
"#5: 3",
"#3: 3",
"#1: 2",
"#2: 2",
"#4: 2",
"#5: 2",
"#3: 2",
"#1: 1",
"#2: 1",
"#4: 1",
"#5: 1",
"#3: 1"
}, Test.IGNORE_ORDER + Test.WAIT);
}
} ///:~

用了yield( )之后,程序的输出变得更整齐了。但是如果字符串更长一些的话,效果就会同SimpleThread.java的差不多了(试试看,逐步增加toString( )字符串的长度,看看输出的效果)。Java的线程调度机制是抢占式的(preemptive),也就是说,只要它认为有必要,它会随时中断当前线程,并且切换到其它线程。因此,如果I/O(通过main( )线程执行)占用的时间太长了,线程调度机制就会在run( )运行到yield( )之前把它给停下来。总之yield( )只会在很少的情况下起作用,而且不能用来进行很严肃的调校。

Sleeping

还有一种控制线程的办法,就是用sleep( )让它停一段以毫秒计的时间。如果把上面那段程序里的yield( )换成sleep( ),程序就变成这样了:

//: c13:SleepingThread.java
// Calling sleep() to wait for awhile.
import com.bruceeckel.simpletest.*;
public class SleepingThread extends Thread {
private static Test monitor = new Test();
private int countDown = 5;
private static int threadCount = 0;
public SleepingThread() {
super("" + ++threadCount);
start();
}
public String toString() {
return "#" + getName() + ": " + countDown;
}
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
try {
sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public static void
main(String[] args) throws InterruptedException {
for(int i = 0; i < 5; i++)
new SleepingThread().join();
monitor.expect(new String[] {
"#1: 5",
"#1: 4",
"#1: 3",
"#1: 2",
"#1: 1",
"#2: 5",
"#2: 4",
"#2: 3",
"#2: 2",
"#2: 1",
"#3: 5",
"#3: 4",
"#3: 3",
"#3: 2",
"#3: 1",
"#4: 5",
"#4: 4",
"#4: 3",
"#4: 2",
"#4: 1",
"#5: 5",
"#5: 4",
"#5: 3",
"#5: 2",
"#5: 1"
});
}
} ///:~

sleep( )一定要放在try域里,这是因为有可能会出现时间没到sleep( )就被中断的情况。如果有人拿到了线程的reference,并且调用了它的interrupt( ),这种事就发生了。(interrupt( )也会影响处于wait( )join( )状态的线程,所以这两个方法也要放在try域里。后面我们会再讲。)如果你准备用interrupt( )唤醒线程,那最好是用wait( )而不是sleep( ),因为这两者的catch语句是不一样的。这里我们所遵循的原则是:"除非知道该怎样去处理异常,否则别去捕捉"。所以,我们把它当作RuntimeException往外面抛。

你会发现输出变得有规律了——每个线程都会在其它线程开始之前倒数到零。这是因为我们对每个线程都用了join( )(马上就要讲),于是main( )会在继续下一步的执行之前,先等这个线程结束。如果没有join( ),你会看到,线程还是在以随机的方式运行,也就是说sleep( )也不是什么控制线程执行的办法。它只是暂停线程。唯一能保证的事情是,它会休眠至少100毫秒,但是它恢复运行所花的时间可能更长,因为在休眠结束之后,线程调度机制还要花时间来接管。

如果你一定要控制线程的执行顺序,那最彻底的办法还是不用线程。你可以自己写一个协作程序,让它按一定顺序交换程序的运行权。

优先级

线程的优先级(priority)的作用是,告诉线程调度机制这个线程的重要程度的高低。虽然CPU伺候线程的顺序是非决定性的,但是如果有很多线程堵在那里等着启动,线程调度机制会倾向于首先启动优先级最高的线程。但这并不意味着低优先级的线程就没机会运行了(也就是说优先级不会造成死锁)。优先级低只表示运行的机会少而已。

下面我们用优先级来修改SimpleThread.java。优先级是通过ThreadsetPriority( )方法调整的。

//: c13:SimplePriorities.java
// Shows the use of thread priorities.
import com.bruceeckel.simpletest.*;
public class SimplePriorities extends Thread {
private static Test monitor = new Test();
private int countDown = 5;
private volatile double d = 0; // No optimization
public SimplePriorities(int priority) {
setPriority(priority);
start();
}
public String toString() {
return super.toString() + ": " + countDown;
}
public void run() {
while(true) {
// An expensive, interruptable operation:
for(int i = 1; i < 100000; i++)
d = d + (Math.PI + Math.E) / (double)i;
System.out.println(this);
if(--countDown == 0) return;
}
}
public static void main(String[] args) {
new SimplePriorities(Thread.MAX_PRIORITY);
for(int i = 0; i < 5; i++)
new SimplePriorities(Thread.MIN_PRIORITY);
monitor.expect(new String[] {
"Thread[Thread-1,10,main]: 5",
"Thread[Thread-1,10,main]: 4",
"Thread[Thread-1,10,main]: 3",
"Thread[Thread-1,10,main]: 2",
"Thread[Thread-1,10,main]: 1",
"Thread[Thread-2,1,main]: 5",
"Thread[Thread-2,1,main]: 4",
"Thread[Thread-2,1,main]: 3",
"Thread[Thread-2,1,main]: 2",
"Thread[Thread-2,1,main]: 1",
"Thread[Thread-3,1,main]: 5",
"Thread[Thread-4,1,main]: 5",
"Thread[Thread-5,1,main]: 5",
"Thread[Thread-6,1,main]: 5",
"Thread[Thread-3,1,main]: 4",
"Thread[Thread-4,1,main]: 4",
"Thread[Thread-5,1,main]: 4",
"Thread[Thread-6,1,main]: 4",
"Thread[Thread-3,1,main]: 3",
"Thread[Thread-4,1,main]: 3",
"Thread[Thread-5,1,main]: 3",
"Thread[Thread-6,1,main]: 3",
"Thread[Thread-3,1,main]: 2",
"Thread[Thread-4,1,main]: 2",
"Thread[Thread-5,1,main]: 2",
"Thread[Thread-6,1,main]: 2",
"Thread[Thread-4,1,main]: 1",
"Thread[Thread-3,1,main]: 1",
"Thread[Thread-6,1,main]: 1",
"Thread[Thread-5,1,main]: 1"
}, Test.IGNORE_ORDER + Test.WAIT);
}
} ///:~

这里我们用Thread.toString( )方法覆写了SimplePrioritytoString( )Thread.toString( )会打印线程的名字(你可以在构造函数里设置这个名字;不过这里,我们用了线程自动生成的Thread-1, Thread-2等),优先级,及其所属的"线程组"。由于线程是自我标识的,因此我们没用threadNumber。覆写后的toString( )还会显示countDown的值。

你会发现,线程1的优先级被设到了最高水平,而其它线程的优先级都设在最低水平。

run( )进行了100,000次非常耗时的double型加法和除法运算。把变量d设成volatile是为了排除优化。要不然,你根本没法看出优先级的效果(可以试试把运算用的for循环去掉)。通过运算,你能看出,线程调度机制为线程1安排了最多的机会。虽然往控制台输出也是一种很"昂贵"的操作,但是你却不能用它来观察优先级的效果,因为线程往控制台打印的时候是不会被中断的(否则控制台的显示就乱了),但是数学运算是可以被打断的。对线程调度机制来说,这次运算的耗时长得足以让它注意到线程1的高优先级。

此外,你还可以用getPriority( )来读取线程的优先级,用setPriority( )随时修改线程的优先级(而不是像SimplePriorities.java那样,在构造函数里面设置)。

虽然JDK提供了10级优先级,但是却不能很好地映射到很多操作系统上。比方说,Windows 2000平台上有7个等级还没固定下来,因此映射是不确定的(虽然Sun的Solaris有231个等级)。要想保持可移植性,唯一的办法就是,在调整优先级的时候,盯住MIN_PRIORITY, NORM_PRIORITY, 和MIN_PRORITY

守护线程

所谓"守护线程(daemon thread)"是指,只要程序还在运行,它就应该在后台提供某种公共服务的线程,但是守护线程不属于程序的核心部分。因此,当所有非守护线程都运行结束的时候,程序也结束了。相反,只要还有非守护线程在运行,程序就不能结束。比如,运行main( )的线程就属于非守护线程。

//: c13:SimpleDaemons.java
// Daemon threads don‘t prevent the program from ending.
public class SimpleDaemons extends Thread {
public SimpleDaemons() {
setDaemon(true); // Must be called before start()
start();
}
public void run() {
while(true) {
try {
sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(this);
}
}
public static void main(String[] args) {
for(int i = 0; i < 10; i++)
new SimpleDaemons();
}
} ///:~

要想创建守护线程,必须在它启动之前就setDaemon( )。我们让SimpleDaemonsrun( )睡了一小会儿,但是当所有的线程全都启动了,程序也就结束了,甚至连让线程打印的时间都没留下。这是因为已经没有非守护线程(这里只有main( )是)来维持程序的运行了。于是程序什么都没打印就结束了。

可以用isDaemon( )来判断一个线程是不是守护线程。守护线程所创建的线程也自动是守护线程。请看下面这个例子:

//: c13:Daemons.java
// Daemon threads spawn other daemon threads.
import java.io.*;
import com.bruceeckel.simpletest.*;
class Daemon extends Thread {
private Thread[] t = new Thread[10];
public Daemon() {
setDaemon(true);
start();
}
public void run() {
for(int i = 0; i < t.length; i++)
t[i] = new DaemonSpawn(i);
for(int i = 0; i < t.length; i++)
System.out.println("t[" + i + "].isDaemon() = "
+ t[i].isDaemon());
while(true)
yield();
}
}
class DaemonSpawn extends Thread {
public DaemonSpawn(int i) {
start();
System.out.println("DaemonSpawn " + i + " started");
}
public void run() {
while(true)
yield();
}
}
public class Daemons {
private static Test monitor = new Test();
public static void main(String[] args) throws Exception {
Thread d = new Daemon();
System.out.println("d.isDaemon() = " + d.isDaemon());
// Allow the daemon threads to
// finish their startup processes:
Thread.sleep(1000);
monitor.expect(new String[] {
"d.isDaemon() = true",
"DaemonSpawn 0 started",
"DaemonSpawn 1 started",
"DaemonSpawn 2 started",
"DaemonSpawn 3 started",
"DaemonSpawn 4 started",
"DaemonSpawn 5 started",
"DaemonSpawn 6 started",
"DaemonSpawn 7 started",
"DaemonSpawn 8 started",
"DaemonSpawn 9 started",
"t[0].isDaemon() = true",
"t[1].isDaemon() = true",
"t[2].isDaemon() = true",
"t[3].isDaemon() = true",
"t[4].isDaemon() = true",
"t[5].isDaemon() = true",
"t[6].isDaemon() = true",
"t[7].isDaemon() = true",
"t[8].isDaemon() = true",
"t[9].isDaemon() = true"
}, Test.IGNORE_ORDER + Test.WAIT);
}
} ///:~

Daemon先将自己的守护标志设成"true",再创建了一些线程。它并没有去设置这些线程的daemon状态,但是测试的结果却告诉我们,它们都是守护线程。接着它进入了一个无限循环,不断地调用yield( )把控制权交给别的线程。

一旦main( )完成了任务,程序也就结束了。这是因为除了守护线程,已经没别的东西在运行了。我们让main( )的线程睡上一秒钟,这样你就能看清楚守护线程是怎样启动的了。如果不这么做,你只能看到部分线程的创建。(试着调整sleep( )的时间,看看会有什么结果。)

连接线程

线程还能调用另一个线程的join( ),等那个线程结束之后再继续运行。如果线程调用了调用了另一个线程tt.join( ),那么在线程t结束之前(判断标准是,t.isAlive( )等于false),主叫线程会被挂起。

调用join( )的时候可以给一个timeout参数,(可以是以毫秒,也可以是以纳秒作单位),这样如果目标线程在时限到期之后还没有结束,join( )就会强制返回了。

join( )调用可以被主叫线程的interrupt( )打断,所以join( )也要用try-catch括起来。

所有这些事情,下面这段代码里全有了:

//: c13:Joining.java
// Understanding join().
import com.bruceeckel.simpletest.*;
class Sleeper extends Thread {
private int duration;
public Sleeper(String name, int sleepTime) {
super(name);
duration = sleepTime;
start();
}
public void run() {
try {
sleep(duration);
} catch (InterruptedException e) {
System.out.println(getName() + " was interrupted. " +
"isInterrupted(): " + isInterrupted());
return;
}
System.out.println(getName() + " has awakened");
}
}
class Joiner extends Thread {
private Sleeper sleeper;
public Joiner(String name, Sleeper sleeper) {
super(name);
this.sleeper = sleeper;
start();
}
public void run() {
try {
sleeper.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(getName() + " join completed");
}
}
public class Joining {
private static Test monitor = new Test();
public static void main(String[] args) {
Sleeper
sleepy = new Sleeper("Sleepy", 1500),
grumpy = new Sleeper("Grumpy", 1500);
Joiner
dopey = new Joiner("Dopey", sleepy),
doc = new Joiner("Doc", grumpy);
grumpy.interrupt();
monitor.expect(new String[] {
"Grumpy was interrupted. isInterrupted(): false",
"Doc join completed",
"Sleepy has awakened",
"Dopey join completed"
}, Test.AT_LEAST + Test.WAIT);
}
} ///:~

Sleeper是一个会睡上一段时间的Thread,至于睡多长时间,这要由构造函数的参数决定。Sleeperrun( )sleep( )可以因时限到期而返回,也可以被interrupt( )打断。catch语句在报告中断的同时,会一并报告isInterrupted( )。当有别的线程调用了本线程的interrupt( )时,会设置一个标记以表示这个这个线程被打断了。当本线程捕获这个异常的时候,会清除这个标志。所以catch语句会永远报告说isInterrupted( )是false。这个标记是用来应付其它情况的,或许在没出异常的情况下,线程要用它来检查自己是不是被中断了。

Joiner是另一个线程,它调用了Sleeperjoin( ),所以它要等Sleeper醒过来。main( )创建了两个Sleeper分派给两个Joiner。你会发现,不论Sleeper是被打断还是正常结束,Joiner都会随Sleeper一道结束。

另外一种方式

迄今为止,你所看到的都是些很简单的例子。这些线程都继承了Thread,这种做法很很明智,对象只是作为线程,不做别的事情。但是类可能已经继承了别的类,这样它就不能再继承Thread了(Java不支持多重继承)。这时,你就要用Runnable接口了。Runnable的意思是,这个类实现了run( )方法,而Thread就是Runnable的。

下面我们来演示一下这种方法的基本步骤:

//: c13:RunnableThread.java
// SimpleThread using the Runnable interface.
public class RunnableThread implements Runnable {
private int countDown = 5;
public String toString() {
return "#" + Thread.currentThread().getName() +
": " + countDown;
}
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
}
}
public static void main(String[] args) {
for(int i = 1; i <= 5; i++)
new Thread(new RunnableThread(), "" + i).start();
// Output is like SimpleThread.java
}
} ///:~

Runnable接口只有一个方法,那就是run( ),但是如果你想对它做一些Thread对象才能做的事情(比方说toString( )里面的getName( )),你就必须用Thread.currentThread( )去获取其reference。Thread类有一个构造函数,可以拿Runnable和线程的名字作参数。

如果对象是Runnable的,那只说明它有run( )方法。这并没有什么特别的,也就是说,不会因为它是Runnable的,就使它具备了线程的先天功能,这一点同Thread的派生类不同的。所以你必须像例程那样,用Runnable对象去创建线程。把Runnable对象传给Thread的构造函数,创建一个独立的Thread对象。接着再调用那个线程的start( ),由它来进行初始化,然后线程的调度机制就能调用run( )了。

Runnable interface的好处在于,所有东西都属于同一个类;也就是说Runnable能让你创建基类和其它接口的mixin(混合类)。如果你要访问其它东西,直接用就是了,不用再一个一个地打交道。但是内部类也有这个功能,它也可以直接访问宿主类的成员。所以这个理由不足以说服我们放弃Thread的内部类而去使用Runnable的mixin。

Runnable的意思是,你要用代码——也就是run( )方法——来描述一个处理过程,而不是创建一个表示这个处理过程的对象。在如何理解线程方面,一直存在着争议。这取决于,你是将线程看作是对象还是处理过程[68]。如果你认为它是一个处理过程,那么你就摆脱了"万物皆对象"的OO教条。但与此同时,如果你只想让这个处理过程掌管程序的某一部分,那你就没理由让整个类都成为Runnable的。有鉴于此,用内部类的形式将线程代码隐藏起来,通常是个更明智的选择。就像下面这段代码:

//: c13:ThreadVariations.java
// Creating threads with inner classes.
import com.bruceeckel.simpletest.*;
// Using a named inner class:
class InnerThread1 {
private int countDown = 5;
private Inner inner;
private class Inner extends Thread {
Inner(String name) {
super(name);
start();
}
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
try {
sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public String toString() {
return getName() + ": " + countDown;
}
}
public InnerThread1(String name) {
inner = new Inner(name);
}
}
// Using an anonymous inner class:
class InnerThread2 {
private int countDown = 5;
private Thread t;
public InnerThread2(String name) {
t = new Thread(name) {
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
try {
sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public String toString() {
return getName() + ": " + countDown;
}
};
t.start();
}
}
// Using a named Runnable implementation:
class InnerRunnable1 {
private int countDown = 5;
private Inner inner;
private class Inner implements Runnable {
Thread t;
Inner(String name) {
t = new Thread(this, name);
t.start();
}
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public String toString() {
return t.getName() + ": " + countDown;
}
}
public InnerRunnable1(String name) {
inner = new Inner(name);
}
}
// Using an anonymous Runnable implementation:
class InnerRunnable2 {
private int countDown = 5;
private Thread t;
public InnerRunnable2(String name) {
t = new Thread(new Runnable() {
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public String toString() {
return Thread.currentThread().getName() +
": " + countDown;
}
}, name);
t.start();
}
}
// A separate method to run some code as a thread:
class ThreadMethod {
private int countDown = 5;
private Thread t;
private String name;
public ThreadMethod(String name) { this.name = name; }
public void runThread() {
if(t == null) {
t = new Thread(name) {
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
try {
sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public String toString() {
return getName() + ": " + countDown;
}
};
t.start();
}
}
}
public class ThreadVariations {
private static Test monitor = new Test();
public static void main(String[] args) {
new InnerThread1("InnerThread1");
new InnerThread2("InnerThread2");
new InnerRunnable1("InnerRunnable1");
new InnerRunnable2("InnerRunnable2");
new ThreadMethod("ThreadMethod").runThread();
monitor.expect(new String[] {
"InnerThread1: 5",
"InnerThread2: 5",
"InnerThread2: 4",
"InnerRunnable1: 5",
"InnerThread1: 4",
"InnerRunnable2: 5",
"ThreadMethod: 5",
"InnerRunnable1: 4",
"InnerThread2: 3",
"InnerRunnable2: 4",
"ThreadMethod: 4",
"InnerThread1: 3",
"InnerRunnable1: 3",
"ThreadMethod: 3",
"InnerThread1: 2",
"InnerThread2: 2",
"InnerRunnable2: 3",
"InnerThread2: 1",
"InnerRunnable2: 2",
"InnerRunnable1: 2",
"ThreadMethod: 2",
"InnerThread1: 1",
"InnerRunnable1: 1",
"InnerRunnable2: 1",
"ThreadMethod: 1"
}, Test.IGNORE_ORDER + Test.WAIT);
}
} ///:~

InnerThread1创建了一个继承Thread的非匿名内部类,然后在构造函数里创建了一个这个内部类的实例。如果你要用宿主类的方法访问内部类的特殊功能(新方法)的话,这种做法会比较好。但是绝大多数情况下,创建线程的目的,只是要用它的Thread功能,所以不必创建非匿名的内部类。InnerThread2就演示了这个方案:在构造函数里面创建一个继承Thread的匿名内部类,然后把它上传给Thread的reference t。宿主类的其它方法可以通过t访问Thread的接口,它们无需知道这个对象的确切类型。

第三和第四个类重复了前两个类,不过它们都不是Thread,而是Runnable。这么做是想告诉你,用Runnable什么便宜也没占到,相反代码还稍微复杂了一些(也更难读了一些)。所以结论是,除非迫不得已只能用Runnable,否则我会选Thread

ThreadMethod演示了怎样在方法的内部创建线程。要运行线程的时候就调用方法,线程启动之后方法就返回。如果这个线程不那么重要,只是做一些辅助操作的话,那么相比在构造函数里启动线程,这个方法或许会更好一些。

创建反应敏捷的用户界面

正如我们前面所讲的,创建反映敏捷的用户界面是多线程的主要用途之一。虽然我们要到第14章才讲讲图形用户界面,但是现在我们可以做一个简单的控制台用户界面的程序。下面这段程序有两个版本,一个专注于计算从不理会控制台的输入,而另一个将计算任务放到线程里面,因此能在进行计算的同时还监听控制台的输入。

//: c13:ResponsiveUI.java
// User interface responsiveness.
import com.bruceeckel.simpletest.*;
class UnresponsiveUI {
private volatile double d = 1;
public UnresponsiveUI() throws Exception {
while(d > 0)
d = d + (Math.PI + Math.E) / d;
System.in.read(); // Never gets here
}
}
public class ResponsiveUI extends Thread {
private static Test monitor = new Test();
private static volatile double d = 1;
public ResponsiveUI() {
setDaemon(true);
start();
}
public void run() {
while(true) {
d = d + (Math.PI + Math.E) / d;
}
}
public static void main(String[] args) throws Exception {
//! new UnresponsiveUI(); // Must kill this process
new ResponsiveUI();
Thread.sleep(300);
System.in.read(); // ‘monitor‘ provides input
System.out.println(d); // Shows progress
}
} ///:~

UnresponsiveUI将计算工作放进一个无限循环,所以很明显,它永远也不会去理会控制台的输入(编译器受骗了,它以为过了while循环就能读到输入了)。如果你把创建UnresponsiveUI对象的那行代码注释回来,再重新运行,那么除非杀掉进程否则没法退出程序。

要想让程序反应灵敏,可以把运算放进run( )里面,然后让抢占式的调度程序来管理它,这样当你按下Enter键的时候,就能看到,它在一边等用户的输入一边做后台的计算(处于测试的目的,我们让com.bruceeckel.simpletest.Test对象自动地往System.in.read( )控制台写。关于这个对象,我们到第15章再讲)。

共享有限的资源

你可以认为单线程程序是一个在问题空间里游走的,一次只作一件事的孤独的个体。由于只有它一个,因此你无需考虑两个实体同时申请同一项资源的问题。这个问题有点像两个人同时把车停在一个车位上,同时穿一扇门,甚至是同时发言。

但是在多线程环境下,事情就不那么简单了,你必须考虑两个或两个以上线程同时申请同一资源的问题。必须杜绝资源访问方面的冲突,否则你就会碰到两个线程同时访问一个银行账号,同时往一台打印机上输出,或者同时调整一个阀值之类的问题。

用不正确的方法访问资源

试看下面这段例程。AlwaysEven会"保证",每次调用getValue( )的时候都会返回一个偶数。此外还有一个 "Watcher"线程,它会不时地调用getValue( ),然后检查这个数是不是真的是偶数。这么做看上去有些多余,因为从代码上看,很明显这个值肯定是偶数。但是意外来了。下面是源代码:

//: c13:AlwaysEven.java
// Demonstrating thread collision over resources by
// reading an object in an unstable intermediate state.
public class AlwaysEven {
private int i;
public void next() { i++; i++; }
public int getValue() { return i; }
public static void main(String[] args) {
final AlwaysEven ae = new AlwaysEven();
new Thread("Watcher") {
public void run() {
while(true) {
int val = ae.getValue();
if(val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}.start();
while(true)
ae.next();
}
} ///:~

main( )创建了一个AlwaysEven对象,由于要供匿名的Thread内部类访问,因此它必须是final的。如果这个线程读出的值不是偶数,它会把这个值打印出来(以此证明它捕获了一个正处于不稳定状态的对象),然后退出程序。

这个例子很形象地说明了多线程环境的最本质的问题:永远也不会知道线程会在什么时候启动。想想,你正坐在餐桌边上,拿着一把叉子,对准了盘子里最后一片火腿,正当你的叉子要碰到火腿的时候,它突然消失了(因为你已经被挂起来了,另一个线程进来偷走了那片火腿)。这就是写并发程序的时候要处理的问题。

有些时候,你不用关心别人是不是正在用那个资源(从别人的盘子里叉火腿)。但是对多线程环境,你必须要有办法能防止两个线程同时访问同一个资源,至少别在关键的时候。

要防止这种冲突很简单,只要在线程运行的时候给资源上锁就行了。第一个访问这个资源的线程给它上锁,在它解锁之前,其它线程都不能访问这个资源,接着另一个线程给这个资源上锁然后再使用,如此循环。如果轿车的前座是一个有限的资源,那么哪个小孩想坐,它就得大喊一声"是我的"。

测试框架

为了简化问题,在开始讲解之前,我们先写一个测试线程程序的小框架。我们的做法是,把这些例程里的公用代码分离出来。先说"watcher"线程,它的任务是观察对象的"invariant"是不是被破坏了。也就是说,我们假设对象的内部状态应该遵守某种规则,但是如果你从外部观察到对象正处在一个无效的中间状态的话,那么从用户的角度来看它的"invariant"就已经遭到破坏了(不是说对象不能处在这种无效的中间状态,只是这种状态不应该被客户观察到)。我们要有办法能观察它的invariant是不是被破坏了,同时还要知道这个值是多少。为了让方法能同时返回这两个信息,我们把它合并成一个标记接口(tagging interface)。所谓标记接口是指只有名字没有方法的接口。

//: c13:InvariantState.java
// Messenger carrying invariant data
public interface InvariantState {} ///:~

为了提高代码的可读性,根据我们的设计,成功或失败的信息已经包含在类的名字和类的类型里了。表示成功的类是:

//: c13:InvariantOK.java
// Indicates that the invariant test succeeded
public class InvariantOK implements InvariantState {} ///:~

表示失败的类是InvariantFailure。不过失败对象通常要携带原因,因此:

//: c13:InvariantFailure.java
// Indicates that the invariant test failed
public class InvariantFailure implements InvariantState {
public Object value;
public InvariantFailure(Object value) {
this.value = value;
}
} ///:~

现在我们可以定义要进行测试的类的接口了:

//: c13:Invariant.java
public interface Invariant {
InvariantState invariant();
} ///:~

在创建通用的"watcher"线程之前,先提醒大家,本章的某些例程,在某些平台下运行结果可能会与预想的不一样。这里的很多例子都想说明,在多线程的环境下,单个线程的运行已经收到了侵犯,但是这种侵犯不是每次都发生。[69]还有一些例子,它是想演示这种侵犯(但老是不成工),因此我们可以证明这种侵犯不会发生。碰到这种情况,我们就得有办法让程序过几秒停下来。下面这个类做的就是这件事,它继承了标准类库的Timer类:

//: c13:Timeout.java
// Set a time limit on the execution of a program
import java.util.*;
public class Timeout extends Timer {
public Timeout(int delay, final String msg) {
super(true); // Daemon thread
schedule(new TimerTask() {
public void run() {
System.out.println(msg);
System.exit(0);
}
}, delay);
}
} ///:~

delay以毫秒计,时间到了就打印消息。注意,调用了super(true)之后,它就是一个守护线程了,因此如果其它线程都退出了,它也就结束了。我们给Timer.schedule( )两个参数,一个是TimerTask(这里用匿名内部类的方式创建),另一个delay。经过delay毫秒,Timeout就开始执行TimerTaskrun( )方法。用Timer要比直接用sleep( )更简单也清楚。此外,Timer的扩展性很好,它能支持大量的(数以千计的)并发任务,因此它还是一个非常有用的工具。

现在我们该把Invariant接口和Timeout类用进InvariantWatcher线程了:

//: c13:InvariantWatcher.java
// Repeatedly checks to ensure invariant is not violated
public class InvariantWatcher extends Thread {
private Invariant invariant;
public InvariantWatcher(Invariant invariant) {
this.invariant = invariant;
setDaemon(true);
start();
}
// Stop everything after awhile:
public
InvariantWatcher(Invariant invariant, final int timeOut){
this(invariant);
new Timeout(timeOut,
"Timed out without violating invariant");
}
public void run() {
while(true) {
InvariantState state = invariant.invariant();
if(state instanceof InvariantFailure) {
System.out.println("Invariant violated: "
+ ((InvariantFailure)state).value);
System.exit(0);
}
}
}
} ///:~

构造函数需要一个测试用的Invariant对象,此外线程也是在构造函数里面启动的。第二个构造函数调用了第一个,此外它还创建了一个Timeout对象。Timeout的任务是过一段时间停止当前程序的运行——这是给invariant不会收到侵犯的程序准备的,否则程序就停不出来了。run( )会捕捉当前的InvariantState,并对它进行测试,如果失败,它会打印value的值。注意,我们不能从线程内部往外面抛异常,因为这只会中止线程而不是程序。

现在我们可以用这个框架来重写AlwaysEven.java了:

//: c13:EvenGenerator.java
// AlwaysEven.java using the invariance tester
public class EvenGenerator implements Invariant {
private int i;
public void next() { i++; i++; }
public int getValue() { return i; }
public InvariantState invariant() {
int val = i; // Capture it in case it changes
if(val % 2 == 0)
return new InvariantOK();
else
return new InvariantFailure(new Integer(val));
}
public static void main(String[] args) {
EvenGenerator gen = new EvenGenerator();
new InvariantWatcher(gen);
while(true)
gen.next();
}
} ///:~

定义invariant( )的时候一定要把要观察的值存到本地变量里面。这样才能观测到真实的值,而不是会变的值(被其它线程修改了)。

这里,问题不在对象会处于无效状态,而在于当对象正处于这种无效状态的时候,别的线程在调用它的方法。

资源访问的冲突

EvenGenerator来说,最糟糕的事情就是客户线程观察到它正处于不稳定的中间状态。但是这个对象的内部数据的一致性还是有保证的,最终还是会回到其正常的状态。但是如果真有两个线程在同时修改一个对象,那么这种资源冲突的后果就严重了,对象的状态很可能会被改错。

先简单地介绍一下semaphore的概念。Semaphore是一种用于线程间通信的标志对象。如果semaphore的值是零,则线程可以获得它所监视的资源,如果不是零,那么线程就无法获取这个资源,于是线程必须等。如果申请到了资源,线程会先对semaphore作递增,再使用这个资源。递增和递减是原子操作(atomic operation,也就是说不会被打断的操作),由此semaphore就防止两个线程同时使用同一项资源。

如果semaphore能妥善的看护它所监视的资源,那么对象就永远也不会陷入不稳定状态。下面我们先简单地实践一下semaphore的思想:

//: c13:Semaphore.java
// A simple threading flag
public class Semaphore implements Invariant {
private volatile int semaphore = 0;
public boolean available() { return semaphore == 0; }
public void acquire() { ++semaphore; }
public void release() { --semaphore; }
public InvariantState invariant() {
int val = semaphore;
if(val == 0 || val == 1)
return new InvariantOK();
else
return new InvariantFailure(new Integer(val));
}
} ///:~

这个类的核心部分由available( )acquire( ),以及release( )组成,还是比较简单明了的。由于线程会在获取对象之前先测试其是否available( ),因此semaphore的值只可能是0或1,因此我们在invariant( )里面就测试了这两个值。

但是看看Semaphore的效果吧:

//: c13:SemaphoreTester.java
// Colliding over shared resources
public class SemaphoreTester extends Thread {
private volatile Semaphore semaphore;
public SemaphoreTester(Semaphore semaphore) {
this.semaphore = semaphore;
setDaemon(true);
start();
}
public void run() {
while(true)
if(semaphore.available()) {
yield(); // Makes it fail faster
semaphore.acquire();
yield();
semaphore.release();
yield();
}
}
public static void main(String[] args) throws Exception {
Semaphore sem = new Semaphore();
new SemaphoreTester(sem);
new SemaphoreTester(sem);
new InvariantWatcher(sem).join();
}
} ///:~

SemaphoreTester是一个会不断测试Semaphore对象是否available的线程,如果可以,它就先acquire,再release。注意semaphore字段是volatile的,这样就能确保编译器不对读做优化了。

main( )创建了两个SemaphoreTester线程。你会发现,不用多长时间,invariant就被篡改了。这是因为,当SemaphoreTester调用available( )的时候,对象确实是可得的,但是等到它调用acquire( )的时候,另一个线程已经获取并修改了这个对象的semaphore了。InvariantWater会发觉这个值太大了,或者两个线程同时release( )之后,发现这个值变负的了。要注意InvariantWatcher已经join( )到main线程了,因此程序会一直运行到故障发生。

我发现在我的机器上加了yield( )之后,错误提前了很多,不过这与操作系统和JVM有关。你可以试试把yield( )去掉;可能要花很长时间才会产生错误,这也证明了多线程程序的排错有多难。

这个例子再次强调了并发编程的风险:如果这么简单的程序都会有问题,你还能对并发环境报什么侥幸心理呢。

解决共享资源的冲突

实际上所有的多线程架构都采用串行访问的方式来解决共享资源的冲突问题。也就是说,同一时刻只有一个线程可以访问这个共享资源。通常是这样实现的,在代码的前后设一条加锁和解锁的语句,这样同一时刻只有一个线程能够执行这段代码。由于锁定语句会产生"互斥(mutual exclusion)"的效果,因此这一机制通常也被称为mutex。

就拿你们家的浴室打比方;浴室(共享资源)要供很多人(线程)使用,而且每个人用的时候都是独占的。要用浴室之前,先要敲门,看看里面是不是有人。如果每人,就进去把门锁上。这样其它要用浴室的线程,就被堵在门外了,于是它必须等在门外,直到浴室的门重新打开。

等我们讲到浴室的门打开,另一个线程进去换走原先那个线程的时候,这个比方就有些不贴切了。实际上等在外面的线程并没有排成一列,相反由于线程的调度机制是非决定性的,因此谁都不知道谁会是下一个。这些线程更像是在围着浴室的门绕圈,当浴室的门开的那一刹那,最靠近门的那个线程会挤进去换走原来的那个。前面已经讲过了,我们可以用yield( )setPriority( )来给线程调度机制提一些建议,但究竟能起多大作用,还要看平台和JVM。

Java提供了内置的防止资源冲突的解决方案,这就是synchronized关键词。它的工作原理很像Semaphore类:当线程想执行由synchronized看护的代码时,它会先检查其semaphore是否可得,如果是,它会先获取semaphore,再执行代码,用完之后再释放semaphore。但是和我们写的Semaphore不同,synchronized是语言内置的,因此不会有什么问题。

通常共享资源就是一段内存,其表现形式就是对象,不过也可以是文件,I/O端口或打印机之类的。要想控制对共享资源的访问,先把它放进对象里面。然后把所有要访问这个资源的方法都作成synchronized的。也就是说,只要有一个线程还在调用synchronized方法,其它线程就不允许访问所有的synchronized方法。

通常你会把类的成员设成private的,然后用方法进行访问,因此你可以把方法做成synchronized。下面就是synchronized方法的声明:

synchronized void f() { /* ... */ }
synchronized void g(){ /* ... */ }

每个对象都有一个锁(也称监控器monitor),它是对象生来就有的东西(因此你不必为此写任何代码)。当你调用synchronized方法时,这个对象就被锁住了。在方法返回并且解锁之前,谁也不能调用同一个对象的其它synchronized方法。就说上面那两个方法,如果你调用了f( ),那么在f( )返回并且解锁之前,你是不能调用同一个对象的g( )的。因此对任何一个特定的对象,所有的synchronized方法都会共享一个锁,而这个锁能防止两个或两个以上线程同时读写一块共用内存。

一个线程能多次获得对象的锁。也就是说,一个synchronized方法调用了另一个synchronized方法,而后者又调用了另一synchronized方法,诸如此类。JVM会跟踪对象被上锁的次数。如果对象没有被锁住,那么它的计数器应该为零。当线程第一次获得对象的锁时,计数器为一。线程每获一次对象的锁,计数器就加一。当然,只有第一次获得对象锁的线程才能多次获得锁。线程每退出一个synchronized方法,计数器就减一。等减到零了,对象也就解锁了,这时其它线程就可以使用这个对象了。

此外每个类还有一个锁(它属于类的Class对象),这样当类的synchronized static方法读取static数据的时候,就不会相互干扰了。

Synchronized改写EvenGenerator

只要用synchronized稍加处理,EvenGenerator.java的线程冲突问题就能迎刃而解:

//: c13:SynchronizedEvenGenerator.java
// Using "synchronized" to prevent thread collisions
public
class SynchronizedEvenGenerator implements Invariant {
private int i;
public synchronized void next() { i++; i++; }
public synchronized int getValue() { return i; }
// Not synchronized so it can run at
// any time and thus be a genuine test:
public InvariantState invariant() {
int val = getValue();
if(val % 2 == 0)
return new InvariantOK();
else
return new InvariantFailure(new Integer(val));
}
public static void main(String[] args) {
SynchronizedEvenGenerator gen =
new SynchronizedEvenGenerator();
new InvariantWatcher(gen, 4000); // 4-second timeout
while(true)
gen.next();
}
} ///:~

你会发现next( )getValue( )都是synchronized。如果你只对其中一个做synchronized,那么另一个就被忽略了,于是其它线程就能肆无忌惮地调用它了。一定要记住:所有访问共享资源的方法都必须是synchronized的,否则程序肯定会出错。而invariant( )(译者注:原文为InvariantState,疑有误)倒不是synchronized的,这是因为它是供测试线程用的,因此我们希望它能随时被调用,只有这样才能算是真正的测试。

原子操作

"原子操作(atomic operation)是不需要synchronized",这是Java多线程编程的老生常谈了。所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行倒结束,中间不会有任何context switch(切换到另一个线程)。

通常所说的原子操作包括对非longdouble型的primitive进行赋值,以及返回这两者之外的primitive。之所以要把它们排除在外是因为它们都比较大,而JVM的设计规范又没有要求读操作和赋值操作必须是原子操作(JVM可以试着去这么作,但并不保证)。不过如果你在longdouble前面加了volatile,那么它就肯定是原子操作了。

如果你一知半解地把这条规则用到SynchronizedEvenGenerator.java上,就会发觉:

public synchronized int getValue() { return i; }

好像很符合原子操作的定义嘛。但是把synchronized去掉试试看,程序很快就出了错。这是因为,虽然return i是原子操作,但删掉synchronized之后,别的线程就能在它还处于不稳定状态的时候读到它了。在做这种优化之前,先得真正弄懂这么做的后果是什么。这里没有现成的经验。

下面,我们来看一个更简单的例子:一个返回序列号的类。[70]每次调用nextSerialNumber( )的时候,它都会返回一个唯一的值:

//: c13:SerialNumberGenerator.java
public class SerialNumberGenerator {
private static volatile int serialNumber = 0;
public static int nextSerialNumber() {
return serialNumber++;
}
} ///:~

SerialNumberGenerator大概是你能想到的最简单的例子了。如果你是从C++转过来的,或者有其它低级语言的经验,你会认为递增肯定是一个原子操作,因为它通常都是用CPU的指令来实现的。但是在JVM里,递增不是原子操作,它涉及到了读和写。所以即便是这么简单的一个操作,多线程也有机可乘。

serialNumber字段是volatile的,这是因为每个线程都有一个保存变量副本的本地栈,如果你把变量定义为volatile的,那么编译器就不会做任何优化了。而优化的意思就是减少数据同步的读写。

为了防止发现问题的用时过长所造成的数据量太大的问题,我们得先准备一个set。它重复利用了同一块内存来存储int,因此不会有内存不够的问题。这就是CircularSet。这么做的依据是,当我们重新绕回来的时候,冲突的可能性已经非常小了。为了防止线程冲突,我们把add( )contains( )方法做成synchronized的。

//: c13:SerialNumberChecker.java
// Operations that may seem safe are not,
// when threads are present.
// Reuses storage so we don‘t run out of memory:
class CircularSet {
private int[] array;
private int len;
private int index = 0;
public CircularSet(int size) {
array = new int[size];
len = size;
// Initialize to a value not produced
// by the SerialNumberGenerator:
for(int i = 0; i < size; i++)
array[i] = -1;
}
public synchronized void add(int i) {
array[index] = i;
// Wrap index and write over old elements:
index = ++index % len;
}
public synchronized boolean contains(int val) {
for(int i = 0; i < len; i++)
if(array[i] == val) return true;
return false;
}
}
public class SerialNumberChecker {
private static CircularSet serials =
new CircularSet(1000);
static class SerialChecker extends Thread {
SerialChecker() { start(); }
public void run() {
while(true) {
int serial =
SerialNumberGenerator.nextSerialNumber();
if(serials.contains(serial)) {
System.out.println("Duplicate: " + serial);
System.exit(0);
}
serials.add(serial);
}
}
}
public static void main(String[] args) {
for(int i = 0; i < 10; i++)
new SerialChecker();
// Stop after 4 seconds:
new Timeout(4000, "No duplicates detected");
}
} ///:~

SerialNumberChecker包含一个保存着所有已经被提取出来的序列号static CircularSet。此外它还内嵌了一个会提取序列号,并保证其唯一性的SerialChecker线程。我们创建了很多线程,因此问题出现的还算比较快(在你的机器上,很可能跑很长时间也不会出问题,但是在多CPU的系统上,它确实冲突了)。要解决这个问题,只要把synchronized关键词加到nextSerialNumber( )前面就行了。

最安全的原子操作只有读取和对primitive赋值这两种。但是正如EvenGenerator.java所揭示的,原子操作也能访问正处于无效状态的对象,所以绝对不能想当然。我们一开头就讲了,longdouble型的操作不一定时原子操作(虽然有些JVM能保证longdouble也是原子操作,但是如果你真的用了这个特性的话,代码就没有可移植性了。)

最安全的做法还是遵循如下的方针:

  1. 如果你要synchronize类的一个方法,索性把所有的方法全都synchronize了。要判断,哪个方法该synchronize,哪个方法可以不synchronize,通常是很难的,而且也没什么把握。
  2. 删除synchronized的时候要绝对小心。通常这么做是为了性能,但是synchronized的开销在JDK1.3和1.4里已经大为降低了。此外,只有在用profiler分析过,确认synchronized确实是瓶颈的前提下才能这么作。

修订Semaphore类

现在,我们再来看看Semaphore.java。看上去,只要把这三个方法做成sychronized就能解决问题了,就像这样:

//: c13:SynchronizedSemaphore.java
// Colliding over shared resources
public class SynchronizedSemaphore extends Semaphore {
private volatile int semaphore = 0;
public synchronized boolean available() {
return semaphore == 0;
}
public synchronized void acquire() { ++semaphore; }
public synchronized void release() { --semaphore; }
public InvariantState invariant() {
int val = semaphore;
if(val == 0 || val == 1)
return new InvariantOK();
else
return new InvariantFailure(new Integer(val));
}
public static void main(String[] args) throws Exception {
SynchronizedSemaphore sem =new SynchronizedSemaphore();
new SemaphoreTester(sem);
new SemaphoreTester(sem);
new InvariantWatcher(sem).join();
}
} ///:~

乍看上去SynchronizedSemaphore实在是太怪了——它继承自Semaphore,但是与原来的方法相比,覆写的方法只是多了个synchronized。Java覆写方法的时候是不允许修改方法的特征签名的,但是这里为什么不会报错。这是因为synchronized关键词不属于方法的特征签名,所以你可以随便加。

之所以要继承Semaphore是为了能复用SemaphoreTester。但是等程序开始运行,你还是会看到InvariantFailure

为什么还是不行?这是因为,线程要根据available( )的返回值来判断Semaphore是不是可得,因此它得先释放对象的锁再知道它是不是可得。这时另一个线程冲了进来,抢在这个线程的前头给Semaphore加了一。对此,第一个线程一无所知,在它看来Semaphore仍然是可得的,因此它会傻乎乎地调用acquire( ),从而将Semaphore置于无效状态。又一个血淋淋的教训。所以千万要牢记并发编程的最高法则:绝对不能想当然。

要解决这个问题,唯一的办法就是将可得性测试与获取合并成一个原子操作——也就是对象锁和synchronized关键词所提供的效果。也就是说,对象锁和synchronized关键词是Java内置的semaphore,因此没必要再去搞一套了。

关键段

有时你只需要防止多个线程同时访问方法中的某一部分,而不是整个方法。这种需要隔离的代码就被称为关键段(critical section)。创建关键段需要用到synchronized关键词。这里,synchronized的作用是,指明执行下列代码需获得哪个对象的锁。

synchronized(syncObject) {
// This code can be accessed 
// by only one thread at a time
}


关键段又被称为"同步块(synchronized block)";线程在执段代码之前,必须先获得syncObject的锁。如果其它线程已经获得这个锁了,那么在它解锁之前,线程不能运行关键段中的代码。

同步分两种,代码的同步和方法的同步。下面这段例程比较了这两种方案。通过对比可以看出,相比同步整个方法,同步一段代码能显著增加其它线程获得这个对象的机会。此外,它还演示了如何用wrapper使用和保护非线程安全的类:

//: c13:CriticalSection.java
// Synchronizing blocks instead of entire methods. Also
// demonstrates protection of a non-thread-safe class
// with a thread-safe one.
import java.util.*;
class Pair { // Not thread-safe
private int x, y;
public Pair(int x, int y) {
this.x = x;
this.y = y;
}
public Pair() { this(0, 0); }
public int getX() { return x; }
public int getY() { return y; }
public void incrementX() { x++; }
public void incrementY() { y++; }
public String toString() {
return "x: " + x + ", y: " + y;
}
public class PairValuesNotEqualException
extends RuntimeException {
public PairValuesNotEqualException() {
super("Pair values not equal: " + Pair.this);
}
}
// Arbitrary invariant -- both variables must be equal:
public void checkState() {
if(x != y)
throw new PairValuesNotEqualException();
}
}
// Protect a Pair inside a thread-safe class:
abstract class PairManager {
protected Pair p = new Pair();
private List storage = new ArrayList();
public synchronized Pair getPair() {
// Make a copy to keep the original safe:
return new Pair(p.getX(), p.getY());
}
protected void store() { storage.add(getPair()); }
// A "template method":
public abstract void doTask();
}
// Synchronize the entire method:
class PairManager1 extends PairManager {
public synchronized void doTask() {
p.incrementX();
p.incrementY();
store();
}
}
// Use a critical section:
class PairManager2 extends PairManager {
public void doTask() {
synchronized(this) {
p.incrementX();
p.incrementY();
}
store();
}
}
class PairManipulator extends Thread {
private PairManager pm;
private int checkCounter = 0;
private class PairChecker extends Thread {
PairChecker() { start(); }
public void run() {
while(true) {
checkCounter++;
pm.getPair().checkState();
}
}
}
public PairManipulator(PairManager pm) {
this.pm = pm;
start();
new PairChecker();
}
public void run() {
while(true) {
pm.doTask();
}
}
public String toString() {
return "Pair: " + pm.getPair() +
" checkCounter = " + checkCounter;
}
}
public class CriticalSection {
public static void main(String[] args) {
// Test the two different approaches:
final PairManipulator
pm1 = new PairManipulator(new PairManager1()),
pm2 = new PairManipulator(new PairManager2());
new Timer(true).schedule(new TimerTask() {
public void run() {
System.out.println("pm1: " + pm1);
System.out.println("pm2: " + pm2);
System.exit(0);
}
}, 500); // run() after 500 milliseconds
}
} ///:~

你也看到了,Pair不是一个线程安全的类,它的invariant(我们随便定了一个)要求XY的值相同。此外,正如我们前面所看到的,递增不是线程安全的操作,而这些方法也都不是synchronized的,所以在多线程环境下,Pair肯定会出问题。

PariManager包含一个Pair对象,与此同时它还控制了外部对这个对象的所有访问。注意,它的两个public方法,getPair( )abstract doTash( )都是synchronized。只是后者的同步化处理放到了实现里面。

PairManager的功能,有的是在abstract的基类里实现的,有的是在派生类里定义的。用设计模式的话来说,这就是是Template Method[71]设计模式要求你把会变的代码封装起来;这里会变的就是模板方法doTask( )PairManager1把整个doTask( )同步化了,而PairManager2用关键段部分地同步化了这个方法。注意,synchronized关键词不属于方法的特征签名,因此覆写的时候可以加上去。

PairManager2值得关注。store( )是一个protected方法,也就是说只有子类才能访问,一般的客户根本是看不到的。所以我们就无需再把它放到synchronized方法里了。这里它被置于关键段之外。

关键段必须要有对象来同步,最常见的就是这个方法自己所属的对象:synchronized(this)PairManager2就用了这个方法。这样,当关键段获得对象锁的时候,其他线程就不能调用这个对象的synchronized方法了。而这样做的效果就是,大大缩小了同步的范围。

有时你还会碰到要创建一个单独的对象,然后对它进行同步化的情形,这时这个方案就不合适了。下面我们来演示一下,怎样让对象的方法针对不同的锁做同步化,这样两个线程就可以同时进入同一个对象了:

//: c13:SyncObject.java
// Synchronizing on another object
import com.bruceeckel.simpletest.*;
class DualSynch {
private Object syncObject = new Object();
public synchronized void f() {
System.out.println("Inside f()");
// Doesn‘t release lock:
try {
Thread.sleep(500);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Leaving f()");
}
public void g() {
synchronized(syncObject) {
System.out.println("Inside g()");
try {
Thread.sleep(500);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Leaving g()");
}
}
}
public class SyncObject {
private static Test monitor = new Test();
public static void main(String[] args) {
final DualSynch ds = new DualSynch();
new Thread() {
public void run() {
ds.f();
}
}.start();
ds.g();
monitor.expect(new String[] {
"Inside g()",
"Inside f()",
"Leaving g()",
"Leaving f()"
}, Test.WAIT + Test.IGNORE_ORDER);
}
} ///:~

DualSyncf( )方法针对this作了同步(整个方法的同步),而g( )里面则有一个针对syncObject的同步段。所以这两个同步是相互独立的。这一点已经在main( )里得到了证明。main( )启动了一个线程调用f( ),而它自己又调用了g( )。可以从程序的输出看到这两个方法在同时运行,它们谁也没有在等另一个。

回到CriticalSection.java。为了测试这两个PairManager,我们创建了一个PairManipulator。这个类会在线程里运行doTask( ),然后在PairChecker内部类的实例里运行另一个线程(检查invariant)。为了能知道究竟做了多少检查,我们让PairChecker每做一次检查就递增一次checkCountermain( )创建了两个PairManipulator。它会让这两个线程运行一段时间,等到Timer到期,它会执行run( ),也就是打印这两个PairManipulator,然后再退出。程序的输出大致是这样的:

pm1: Pair: x: 58892, y: 58892 checkCounter = 44974
pm2: Pair: x: 73153, y: 73153 checkCounter = 100535

虽然每次运行的结果都不一样,但是总的来看PairManager1.doTask( )的运行次数不会多过PairManager2.doTask( )的,因为后者用的是同步段,因此访问的机会较多。通常这就是你选择使用同步段的原因:让线程有更多的机会访问(在保证安全的前提下)。

当然,最后还是要靠程序员:所有访问共享资源的代码都必须被包进同步段里。

线程的状态

线程的状态可归纳为以下四种:

  1. New: 线程对象已经创建完毕,但尚未启动(start),因此还不能运行。
  2. Runnable: 处在这种状态下的线程,只要分时机制分配给它CPU周期,它就能运行。也就是说,具体到某个时点,它可能正在运行,也可能没有运行,但是轮到它运行的时候,谁都不能阻止它;它没有dead,也没有被阻塞。
  3. Dead: 要想中止线程,正常的做法是退出run( )。在Java 2以前,你也可以调用stop( ),不过现在不建议用这个办法了,因为它很可能会造成程序运行状态的不稳定。此外还有一个destroy( )(不过它还没有实现,或许将来也不会了,也就是说已经被放弃了)。后面我们会讲怎样用其它办法来实现stop( )的功能。
  4. Blocked: 就线程本身而言,它是可以运行的,但是有什么别的原因在阻止它运行。线程调度机制会直接跳过blocked的线程,根本不给它分配CPU的时间。除非它重新进入runnable状态,否则什么都干不了。

进入阻塞状态

如果线程被阻塞了,那肯定是出了什么问题。问题可能有以下几种:

  1. 你用sleep(milliseconds)方法叫线程休眠。在此期间,线程是不能运行的。
  2. 你用wait( )方法把线程挂了起来。除非收到notify( )notifyAll( )消息,否则线程无法重新进入runnable状态。这部分内容会在后面讲。
  3. 线程在等I/O结束。
  4. 线程要调用另一个对象的synchronized方法,但是还没有得到对象的锁。

或许你还在旧代码里看到过suspend( )resume( ),不过Java 2已经放弃了这两个方法(因为很容易造成死锁),所以这里就不作介绍了。

线程间的协作

理解了线程会相互冲突以及该如何防止这种冲突之后,下一步就该学习怎样让线程协同工作了。要做到这一点,关键是要让线程能相互"协商(handshaking)"。而这个任务要由Objectwait( )notify( )来完成。

waitnotify

首先要强调,线程sleep( )的时候并不释放对象的锁,但是wait( )的时候却会释放对象的锁。也就是说在线程wait( )期间,别的线程可以调用它的synchronized方法。当线程调用了某个对象wait( )方法之后,它就中止运行并释放那个对象锁了。

Java有两种wait( )。第一种需要一个以毫秒记的时间作参数,它的意思和sleep( )一样,都是:"暂停一段时间。"区别在于:

  1. wait( )会释放对象的锁。
  2. 除了时间到了,wait( )还可以用notify( )notifyAll( )来中止

第二种wait( )不需要任何参数;它的用途更广。线程调用了这种wait( )之后,会一直等下去,直到(有别的线程调用了这个对象的)notify( )notifyAll( )

sleep( )属于Thread不同,wait( )notify( ), 和notifyAll( )是根Object的方法。虽然这样做法(把专为多线程服务的方法放到通用的根类里面)看上去有些奇怪,但却是必要的。因为它们所操控的是每个对象都会有的锁。所以结论就是,你可以在类的synchronized方法里调用wait( ),至于它继不继承Thread,实没实现Runnable已经无所谓了。实际上你也只能在synchronized方法里或synchronized段里调用wait( )notify( )notifyAll( )(sleep( )则没有这个限制,因为它不对锁进行操作)。如果你在非synchronized方法里调用了这些方法,程序还是可以编译的,但是一运行就会出一个IllegalMonitorStateException。这个异常带着一个挺让人费解的"current thread not owner"消息。这个消息的意思是,如果线程想调用对象的wait( )notify( ),或notifyAll( )方法,必须先"拥有"(得到)这个对象的锁。

你可以让另一个对象来操控这个对象的锁。要想这么做,第一步是先获取对象的锁。比方说要想调用对象xnotify( ),可以在xsynchronized段里:

synchronized(x) {
x.notify();
}

通常情况下,如果条件是由方法之外的其他力量所控制的(最常见的就是要由其他线程修改),那么你就应该用wait( )。你总不希望闲着的时候还让线程不停地作测试吧;这种"瞎忙活"是对CPU资源的巨大浪费。所以wait( )能让你在等待世道改变的同时让线程休眠,当(其他线程调用了对象的)notify( )notifyAll( )的时候,线程自会醒来,然后检查条件是不是改变了。所以说wait( )提供了一种同步线程间的活动的方法。

举个例子,假设有这么一间饭店,它有一个厨师和一个服务员。服务员必须等厨师烧菜。厨师做完一道菜就通知服务员,服务员上完菜接着等。这是一个很精彩的线程合作的例子:厨师就是producer,而服务员就是consumer。下面我们用代码来为这个故事建模:

//: c13:Restaurant.java
// The producer-consumer approach to thread cooperation.
import com.bruceeckel.simpletest.*;
class Order {
private static int i = 0;
private int count = i++;
public Order() {
if(count == 10) {
System.out.println("Out of food, closing");
System.exit(0);
}
}
public String toString() { return "Order " + count; }
}
class WaitPerson extends Thread {
private Restaurant restaurant;
public WaitPerson(Restaurant r) {
restaurant = r;
start();
}
public void run() {
while(true) {
while(restaurant.order == null)
synchronized(this) {
try {
wait();
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(
"Waitperson got " + restaurant.order);
restaurant.order = null;
}
}
}
class Chef extends Thread {
private Restaurant restaurant;
private WaitPerson waitPerson;
public Chef(Restaurant r, WaitPerson w) {
restaurant = r;
waitPerson = w;
start();
}
public void run() {
while(true) {
if(restaurant.order == null) {
restaurant.order = new Order();
System.out.print("Order up! ");
synchronized(waitPerson) {
waitPerson.notify();
}
}
try {
sleep(100);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
public class Restaurant {
private static Test monitor = new Test();
Order order; // Package access
public static void main(String[] args) {
Restaurant restaurant = new Restaurant();
WaitPerson waitPerson = new WaitPerson(restaurant);
Chef chef = new Chef(restaurant, waitPerson);
monitor.expect(new String[] {
"Order up! Waitperson got Order 0",
"Order up! Waitperson got Order 1",
"Order up! Waitperson got Order 2",
"Order up! Waitperson got Order 3",
"Order up! Waitperson got Order 4",
"Order up! Waitperson got Order 5",
"Order up! Waitperson got Order 6",
"Order up! Waitperson got Order 7",
"Order up! Waitperson got Order 8",
"Order up! Waitperson got Order 9",
"Out of food, closing"
}, Test.WAIT);
}
} ///:~

Order是一个会自己计数的类,同时它还负责中止程序;数到10的时候,它会调用System.exit( )

WaitPerson必须知道它是为哪家Restaurant服务的,所以他得从那家饭店的"帐台",也就是restaurant.order,拿单子。WaitPersonrun( )调用了wait( )方法,所以除非Chefnotify( )叫醒他,否则他会一直等下去。这是个非常简单的程序,所以我们知道只会有一个线程在等WaitPerson的锁:就是WaitPerson线程自己,所以notify( )是安全的。如果情况比较复杂,有很多线程在等一个对象的锁,那么你可能就不知道该叫醒哪个线程了。遇到这种情况,可以用notifyAll( ),这个方法会叫醒所有在等这个对象的锁的线程。这样线程就能自行判断本次唤醒是否与自己有关。

请注意,wait( )所在的while循环,它的测试条件正是wait要等的东西。乍看上去真有点奇怪——如果你等的是订单,那么醒过来之后肯定会有订单,难道不是吗?问题在于在多线程环境下,还没等WaitPerson醒过来,订单就可能被其他线程给抢走了。所以唯一安全的做法就是套用下面这个wait( )定式:

while(conditionIsNotMet)
wait( );

用管道进行线程间的I/O操作

在很多情况下,线程也可以利用I/O来进行通信。多线程类库会提供一种"管道(pipes)"来实现线程间的I/O。对Java I/O类库而言,这个类就是PipedWriter(可以让线程往管道里写数据)和PipedReader(让另一个线程从这个管道里读数据)。你可以把它理解成"producer-consumer"问题的一个变型,而管道则提供了一个现成的解决方案。

下面我们来举一个简单的例子。两个线程利用管道来进行通信:

//: c13:PipedIO.java
// Using pipes for inter-thread I/O
import java.io.*;
import java.util.*;
class Sender extends Thread {
private Random rand = new Random();
private PipedWriter out = new PipedWriter();
public PipedWriter getPipedWriter() { return out; }
public void run() {
while(true) {
for(char c = ‘A‘; c <= ‘z‘; c++) {
try {
out.write(c);
sleep(rand.nextInt(500));
} catch(Exception e) {
throw new RuntimeException(e);
}
}
}
}
}
class Receiver extends Thread {
private PipedReader in;
public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
}
public void run() {
try {
while(true) {
// Blocks until characters are there:
System.out.println("Read: " + (char)in.read());
}
} catch(IOException e) {
throw new RuntimeException(e);
}
}
}
public class PipedIO {
public static void main(String[] args) throws Exception {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
sender.start();
receiver.start();
new Timeout(4000, "Terminated");
}
} ///:~

SenderReceiver表示两个需要相互通信才能完成任务的线程。SenderPipedWriter是一个独立的对象;但是ReceiverPipedReader必须与构造函数里的PipedWriter相关联。Sender把数据写入Writer,然后休眠一段时间。Receiver则不sleep( )wait( ),它read( )不到数据的时候,自动就阻塞了。这样一来你就能不用wait( )循环就获得producer-consumer的效果了。

注意,main( )先创建了senderreceiver,再调用了start( )。如果你没创建完对象就启动线程,那么管道在不同的平台上的行为就有可能会不一致。

更复杂的协同

这里只讲了最基本的协同方式(即通常籍由wait( )notify( )/notifyAll( )来实现的producer-consumer模式)。它已经能解决绝大多数的线程协同问题了,但是在高级的教科书里还有很多更复杂协同方式(特别是本章末尾所推荐的,由Doug Lea编著的Concurrent Programming in Java, 2nd Edition)

死锁

由于线程能被阻塞,更由于synchronized方法能阻止其它线程访问本对象,因此有可能会出现如下这种情况:线程一在等线程二(释放某个对象),线程二又在等线程三,这样依次排下去直到有个线程在等线程一。这样就形成了一个环,每个线程都在等对方释放资源,而它们谁都不能运行。这就是所谓的死锁(deadlock)。

如果程序一运行就死锁,那倒也简单了。你可以马上着手解决这个问题。但真正的麻烦在于,程序看上去能正常运行,但是却潜伏着会引起死锁的隐患。或许你认为这里根本就不可能会有死锁,而bug也就这样潜伏下来了。直到有一天,让某个用户给撞上了(而且这种bug还很可能是不可重复的)。所以对并发编程来说,防止死锁是设计阶段的一个重要任务。

下面我们来看看由Dijkstra发现的经典的死锁场景:哲学家吃饭问题。原版的故事里有五个哲学家(不过我们的例程里允许有任意数量)。这些哲学家们只做两件事,思考和吃饭。他们思考的时候,不需要任何共享资源,但是吃饭的时候,就必须坐到餐桌旁。餐桌上的餐具是有限的。原版的故事里,餐具是叉子,吃饭的时候要用两把叉子把面条从碗里捞出来。但是很明显,把叉子换成筷子会更合理,所以:一个哲学家需要两根筷子才能吃饭。

现在引入问题的关键:这些哲学家很穷,只买得起五根筷子。他们坐成一圈,两个人的中间放一根筷子。哲学家吃饭的时候必须同时得到左手边和右手边的筷子。如果他身边的任何一位正在使用筷子,那他只有等着。

这个问题之所以有趣就在于,它演示了这么一个程序,它看上去似乎能正常运行,但是却容易引起死锁。你可以自己试试,用命令行参数调节哲学家的数量和思考的时间。如果有很多哲学家,而且/或者他们思考的时间很长,或许你永远也碰不到死锁,但是死锁的可能性总还是在的。默认的命令行参数会让它很快地死锁:

//: c13:DiningPhilosophers.java
// Demonstrates how deadlock can be hidden in a program.
// {Args: 5 0 deadlock 4}
import java.util.*;
class Chopstick {
private static int counter = 0;
private int number = counter++;
public String toString() {
return "Chopstick " + number;
}
}
class Philosopher extends Thread {
private static Random rand = new Random();
private static int counter = 0;
private int number = counter++;
private Chopstick leftChopstick;
private Chopstick rightChopstick;
static int ponder = 0; // Package access
public Philosopher(Chopstick left, Chopstick right) {
leftChopstick = left;
rightChopstick = right;
start();
}
public void think() {
System.out.println(this + " thinking");
if(ponder > 0)
try {
sleep(rand.nextInt(ponder));
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
public void eat() {
synchronized(leftChopstick) {
System.out.println(this + " has "
+ this.leftChopstick + " Waiting for "
+ this.rightChopstick);
synchronized(rightChopstick) {
System.out.println(this + " eating");
}
}
}
public String toString() {
return "Philosopher " + number;
}
public void run() {
while(true) {
think();
eat();
}
}
}
public class DiningPhilosophers {
public static void main(String[] args) {
if(args.length < 3) {
System.err.println("usage:\n" +
"java DiningPhilosophers numberOfPhilosophers " +
"ponderFactor deadlock timeout\n" +
"A nonzero ponderFactor will generate a random " +
"sleep time during think().\n" +
"If deadlock is not the string " +
"‘deadlock‘, the program will not deadlock.\n" +
"A nonzero timeout will stop the program after " +
"that number of seconds.");
System.exit(1);
}
Philosopher[] philosopher =
new Philosopher[Integer.parseInt(args[0])];
Philosopher.ponder = Integer.parseInt(args[1]);
Chopstick
left = new Chopstick(),
right = new Chopstick(),
first = left;
int i = 0;
while(i < philosopher.length - 1) {
philosopher[i++] =
new Philosopher(left, right);
left = right;
right = new Chopstick();
}
if(args[2].equals("deadlock"))
philosopher[i] = new Philosopher(left, first);
else // Swapping values prevents deadlock:
philosopher[i] = new Philosopher(first, left);
// Optionally break out of program:
if(args.length >= 4) {
int delay = Integer.parseInt(args[3]);
if(delay != 0)
new Timeout(delay * 1000, "Timed out");
}
}
} ///:~

ChopstickPhilosopher都包含一个能自动递增的static counter。每个Philosopher有两个reference,一个表示左边那根Chopstick,另一个表示右边那根;Philosopher吃饭之前必须拿到这两根筷子。

staticponder表示哲学家要花多长时间思考。如果这个值非零,则think( )会用它来生成一个随机的休眠时间。我们用这种方法证明,如果线程(也就是哲学家)花在其它事情上(思考)的时间多了,那么它们使用共享资源(筷子)的机会就少了,因而程序就不太容易死锁了,但事实并非如此。

请看eat( )Philosophersynchronized 左边那根筷子。如果得不到,他就等,这时他处于阻塞状态。得到左边那根筷子之后,他又用相同的方法去申请右边的筷子。吃完之后,他也是先放左边的,再放右边的。

Philosopherrun( )里不停的思考和吃饭。

main( )至少需要三个参数,如果没给够,它就打印使用的信息。第三个参数可以是"deadlock"字符串,这时程序会启动会引发死锁的版本。任何其它字符串都会运行其非死锁的版本。最后一个参数(可选的)是一个时限,程序运行这段时间之后(以秒为单位,不论是否死锁都会)都会退出。为了能自动地进行测试,这个时限参数是必须的。

除了创建Philosopher数组,设置ponder的值,main( )还创建了两个Chopstick对象,第一个对象放在first变量里。除了最后一个对象,其他Philosopher的初始化过程都一样:先新建一个Philosopher,把leftright Chopstick传给他,然后把rightChopstick传到left,再为right创建一个新的Chopstick。下一个Philosopher再继续用这两根筷子。

在会死锁版本里,我们把先前存在first里的那根筷子放在最后那个Philosopher的左边。由于最后一位Philosopher正好坐在第一位的旁边,这样他们就必须共享first筷子了。这种安排有可能会造成这种情况:每个哲学家手里都攥着一根筷子,他在等他旁边那位放下手里的筷子,这样他才能吃饭,但程序已经死锁了。

试着用各种命令行参数来运行程序,看看程序会怎样运行,特别是要注意在哪些情况下程序不会死锁。

在告诉你如何修补这个问题之前,先了解一下只有在下述四个条件同时满足的情况下,死锁才会发生:

  1. 互斥:也许线程会用到很多资源,但其中至少要有一项是不能共享的。这里,一根筷子同一时刻只能供一个哲学家使用。
  2. 至少要有一个进程会在占用一项资源的同时还在等另一项正被其它进程所占用的资源。也就是说,要想让死锁发生,哲学家必须攥着一根筷子等另一根。
  3. (调度系统或其他进程)不能从进程里抢资源。所有进程都必须正常的释放资源。我们的哲学家都彬彬有礼,不会从他的邻座手里抢筷子。
  4. 必需要有等待的环。一个进程在一个已经被另一进程抢占了的资源,而那个进程又在等另一个被第三个进程抢占了的资源,以此类推,直到有个进程正在等被第一个进程抢占了的资源,这样就形成了瘫痪性的阻塞了。这里,由于每个哲学家都是先左后右的拿筷子,所以有可能会造成等待的环。在例程中,我们修改了最后一位哲学家的构造函数,让他先右后左地拿筷子,从而破解了死锁。

由于死锁要同时满足这四个条件,所用只要去掉其中一个就能防止死锁。对于这个程序,预防死锁最简单的办法是破掉第四条。之所以会死锁,是因为每个哲学家都以固定的次序拿筷子:先左后右。因此就有可能会发生每个人的左手都捏着一根筷子,然后等右边那根,这样就形成了等待环了。如果初始化的时候让最后那位哲学家先右后左的拿筷子,那他就不会再碍着他左边那位去拿右边的筷子了,因此等待环就被破了。这只是这个问题的解决方法之一,你还可以用破解其它条件的办法来解决这个问题(要想学得更细,可以去看更高级的教课书)。

Java语言没有提供任何能预防死锁的机制,所以只能靠你来设计了。对于那些排错的人来说,这可不是什么好消息。

停止线程的正确方法

为了降低死锁的发生几率,Java 2放弃了Threadstop( )suspend( )resume( )方法。

之所以要放弃stop( )是因为,它不会释放对象的锁,因此如果对象正处于无效状态(也就是被破坏了),其它线程就可能会看到并且修改它了。这个问题的后果可能非常微秒,因此难以察觉。所以别再用stop( )了,相反你应该设置一个旗标(flag)来告诉线程什么时候该停止。下面是一个简单的例子:

//: c13:Stopping.java
// The safe way to stop a thread.
import java.util.*;
class CanStop extends Thread {
// Must be volatile:
private volatile boolean stop = false;
private int counter = 0;
public void run() {
while(!stop && counter < 10000) {
System.out.println(counter++);
}
if(stop)
System.out.println("Detected stop");
}
public void requestStop() { stop = true; }
}
public class Stopping {
public static void main(String[] args) {
final CanStop stoppable = new CanStop();
stoppable.start();
new Timer(true).schedule(new TimerTask() {
public void run() {
System.out.println("Requesting stop");
stoppable.requestStop();
}
}, 500); // run() after 500 milliseconds
}
} ///:~     

stop必须是volatile的,这样才能确保run( )方法能看到它(否则它会使用本地的缓存值)。这个线程的"任务"是打印10,000个数字,所以当counter >= 10000或有人要它停下来的时候,它就结束了。注意requestStop( )不是synchronized,因为stop既是boolean(改成true是一个原子操作)又是volatile的。

main( )启动完CanStop之后设置了一个Timer,让它过半秒自动调用requestStop( )Timer的构造函数里的true参数的任务是,把这个线程设成守护线程,这样它就不会阻止程序退出了。

打断受阻的线程

有时线程受阻之后就不能再做轮询了,比如在等输入,这时你就不能像前面那样去查询旗标了。碰到这种情况,你可以用Thread.interrupt( )方法打断受阻的线程:

//: c13:Interrupt.java
// Using interrupt() to break out of a blocked thread.
import java.util.*;
class Blocked extends Thread {
public Blocked() {
System.out.println("Starting Blocked");
start();
}
public void run() {
try {
synchronized(this) {
wait(); // Blocks
}
} catch(InterruptedException e) {
System.out.println("Interrupted");
}
System.out.println("Exiting run()");
}
}
public class Interrupt {
static Blocked blocked = new Blocked();
public static void main(String[] args) {
new Timer(true).schedule(new TimerTask() {
public void run() {
System.out.println("Preparing to interrupt");
blocked.interrupt();
blocked = null; // to release it
}
}, 2000); // run() after 2000 milliseconds
}
} ///:~

Blocked.run( )里面的wait( )使线程受阻。Timer期满之后,会调用blockedinterrupt( )方法。最后要把blocked的 reference设成null,这样垃圾回收器才能把它给回收了(在我们这个程序里,这一步不是必须的,但对会运行很长时间的程序来说,这一步很重要)。

线程组

线程组是一个装线程的容器(collection)。用Joshua Bloch[72],也就是负责修补和改进JDK 1.2的Java容器类库的那位Sun的软件架构师,的话来讲,它的意义可以概括为:

"最好把线程组看成是一次不成功的实验,或者就当它根本不存在。"

如果你(和我一样)曾经在线程组上花了很多时间和精力,你就会觉得很奇怪,为什么Sun没有在此之前就这个问题发表过更多的官方申明呢(过去几年里,Java方面的类似问题还有很多)。对于这种问题,诺贝尔经济学奖得主Joseph Stiglitz有一条人生哲学可以解释,[73] 这就是所谓的承诺升级理论:

"延续错误的代价是别人付的,但是承认错误的代价是由你付的。"

线程组还剩一个小用途。如果组里的线程抛出一个没有被(异常处理程序)捕捉到的异常,就会启动ThreadGroup.uncaughtException( )。而它会在标准错误流上打印出栈的轨迹。要想修改这个行为,你必须覆写这个方法。

总结

要懂得什么时候用什么时候用并发,什么时候不用并发,这点非常重要。使用并发的主要理由包括:要管理大量的任务,让它们同时运行以提高系统的利用率(包括在多CPU上透明的分配负载);更合理的组织代码;以及方便用户。平衡负载的一个经典案例是在等待I/O的同时做计算。方便用户的经典案例是在用户下载大文件的时候监控"stop"按钮。

线程还有一个额外的好处,那就是它提供了"轻型"(100个指令级的)运行环境(execution context)的切换,而进程环境(process context)的切换则是"重型"的(数千个指令)。由于所有线程会共享进程的内存空间,所以轻型的环境切换只会改变程序执行顺序和本地变量。而重型的进程环境切换则必须交换全部的内存空间。

多线程的主要缺点包括:

  1. 等待共享资源的时候,运行速度会慢下来。
  2. 线程管理需要额外的CPU开销。
  3. 如果设计得不不合理,程序会变得异常负责。
  4. 会引发一些不正常的状态,像饥饿(starving),竞争(racing),死锁(deadlock),活锁(livelock)。
  5. 不同平台上会有一些不一致。比如我在开发本书例程时发现,在有些平台下竞争很快就出现,但是换了台机器,它根本就不出现。如果你在后者搞开发,然后发布到前者,那可就惨了。

线程的难点在于多个线程会共享同一项资源——比如对象的内存——而你又必须确保同一时刻不会有两个或两个以上的线程去访问那项资源。这就需要合理地使用synchronized关键词了,但是用之前必须完全理解,否则它会悄悄地地把死锁了带进来。

此外线程的运用方面还有某种艺术。Java的设计思想是,让你能根据需要创建任意多的对象来解决问题,至少理论上如此。(对Java来说创建数以百万计的对象,比如工程方面的有限元分析,还不太现实。)但是你能创建的线程数量应该还是有一个上限的,因为到了这个数量,线程就僵掉了。这个临界点很难找,通常由OS和JVM决定;或许是一百以内,也可能是几千。不过通常你只需创建几个线程就能解决问题了,所以这还不算是什么限制;但是对于更为通用的设计,这就是一个限制了。

线程方面一个重要,但却不那么直观的结论。那就是,通常你可以在run( )的主循环里插上yield( ),然后让线程调度机制帮你加快程序的运行。这绝对是一种艺术,特别是当等待延长之后,性能却上升了。之所以会这样是因为,较短的延迟会使正在运行的线程还没准备好休眠就收到休眠结束的信号,这样为了能让线程干完工作之后再休眠,调度机制不得不先把它停下来再唤醒它。额外的运行环境的切换会导致运行速度的下降,而yield( )sleep( )则可以防止这种多余的切换。要理解这个问题有多麻烦还真得好好想想。

要想更深入地探讨多线程问题,推荐你看Concurrent Programming in Java, 2nd Edition,作者Doug Lea, Addison-Wesley, 2000出版。


[68] Runnable在Java 1.0里就有了,而内部类是Java 1.1新加的。这一点部分地解释了为什么会有Runnable。此外,传统的多线程架构较少涉及对象,它关注的主要是函数该如何运行。就我个人的习惯,只要有机会就继承Thread,我觉得这么做更有条理,灵活性也高。

[69] 有些例子是在双CPU的Win2K系统上开发的,所以冲突很快就产生了。但是在单CPU的机器上,很可能要等很长时间才会有冲突——多线程之所以会这么难,就是因为它经常会有这种让人很恼火的问题。试想你在单CPU的机器上开发了多线程的程序,测试下来一切正常,但是当你把程序部署到多CPU系统时,一运行它就崩溃了。

[70] 受Joshua Bloch的Effective Java, Addison-Wesley 2001,190页的启发。

[71] 参见由Gamma等著的Design Patterns, Addison-Wesley 1995.

[72] Effective Java,作者Joshua Bloch, Addison-Wesley 2001, 第 211页。

[73] 在Java的发展史上,类似的项目还有很多。为什么要半途而废?——我曾经不止一次地问过这个问题。看来这就是答案。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多