分享

JAVA线程间通信的几种方式

 liang1234_ 2019-01-24
今天在群里面看到一个很有意思的面试题:
“编写两个线程,一个线程打印1~25,另一个线程打印字母A~Z,打印顺序为12A34B56C……5152Z,要求使用线程间的通信。”

这是一道非常好的面试题,非常能彰显被面者关于多线程的功力,一下子就勾起了我的兴趣。这里抛砖引玉,给出7种想到的解法。

通用代码:

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. /**
  4. * Created by Edison Xu on 2017/3/2.
  5. */
  6. public enum Helper {
  7. instance;
  8. private static final ExecutorService tPool = Executors.newFixedThreadPool(2);
  9. public static String[] buildNoArr(int max) {
  10. String[] noArr = new String[max];
  11. for(int i=0;i<max;i++){
  12. noArr[i] = Integer.toString(i+1);
  13. }
  14. return noArr;
  15. }
  16. public static String[] buildCharArr(int max) {
  17. String[] charArr = new String[max];
  18. int tmp = 65;
  19. for(int i=0;i<max;i++){
  20. charArr[i] = String.valueOf((char)(tmp+i));
  21. }
  22. return charArr;
  23. }
  24. public static void print(String... input){
  25. if(input==null)
  26. return;
  27. for(String each:input){
  28. System.out.print(each);
  29. }
  30. }
  31. public void run(Runnable r){
  32. tPool.submit(r);
  33. }
  34. public void shutdown(){
  35. tPool.shutdown();
  36. }
  37. }

1. 第一种解法,包含多种小的不同实现方式,但一个共同点就是靠一个共享变量来做控制

a. 利用最基本的synchronizednotifywait

  1. public class MethodOne {
  2. private final ThreadToGo threadToGo = new ThreadToGo();
  3. public Runnable newThreadOne() {
  4. final String[] inputArr = Helper.buildNoArr(52);
  5. return new Runnable() {
  6. private String[] arr = inputArr;
  7. public void run() {
  8. try {
  9. for (int i = 0; i < arr.length; i=i+2) {
  10. synchronized (threadToGo) {
  11. while (threadToGo.value == 2)
  12. threadToGo.wait();
  13. Helper.print(arr[i], arr[i + 1]);
  14. threadToGo.value = 2;
  15. threadToGo.notify();
  16. }
  17. }
  18. } catch (InterruptedException e) {
  19. System.out.println("Oops...");
  20. }
  21. }
  22. };
  23. }
  24. public Runnable newThreadTwo() {
  25. final String[] inputArr = Helper.buildCharArr(26);
  26. return new Runnable() {
  27. private String[] arr = inputArr;
  28. public void run() {
  29. try {
  30. for (int i = 0; i < arr.length; i++) {
  31. synchronized (threadToGo) {
  32. while (threadToGo.value == 1)
  33. threadToGo.wait();
  34. Helper.print(arr[i]);
  35. threadToGo.value = 1;
  36. threadToGo.notify();
  37. }
  38. }
  39. } catch (InterruptedException e) {
  40. System.out.println("Oops...");
  41. }
  42. }
  43. };
  44. }
  45. class ThreadToGo {
  46. int value = 1;
  47. }
  48. public static void main(String args[]) throws InterruptedException {
  49. MethodOne one = new MethodOne();
  50. Helper.instance.run(one.newThreadOne());
  51. Helper.instance.run(one.newThreadTwo());
  52. Helper.instance.shutdown();
  53. }
  54. }

b. 利用LockCondition

  1. public class MethodTwo {
  2. private Lock lock = new ReentrantLock(true);
  3. private Condition condition = lock.newCondition();
  4. private final ThreadToGo threadToGo = new ThreadToGo();
  5. public Runnable newThreadOne() {
  6. final String[] inputArr = Helper.buildNoArr(52);
  7. return new Runnable() {
  8. private String[] arr = inputArr;
  9. public void run() {
  10. for (int i = 0; i < arr.length; i=i+2) {
  11. try {
  12. lock.lock();
  13. while(threadToGo.value == 2)
  14. condition.await();
  15. Helper.print(arr[i], arr[i + 1]);
  16. threadToGo.value = 2;
  17. condition.signal();
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. } finally {
  21. lock.unlock();
  22. }
  23. }
  24. }
  25. };
  26. }
  27. public Runnable newThreadTwo() {
  28. final String[] inputArr = Helper.buildCharArr(26);
  29. return new Runnable() {
  30. private String[] arr = inputArr;
  31. public void run() {
  32. for (int i = 0; i < arr.length; i++) {
  33. try {
  34. lock.lock();
  35. while(threadToGo.value == 1)
  36. condition.await();
  37. Helper.print(arr[i]);
  38. threadToGo.value = 1;
  39. condition.signal();
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. } finally {
  43. lock.unlock();
  44. }
  45. }
  46. }
  47. };
  48. }
  49. class ThreadToGo {
  50. int value = 1;
  51. }
  52. public static void main(String args[]) throws InterruptedException {
  53. MethodTwo two = new MethodTwo();
  54. Helper.instance.run(two.newThreadOne());
  55. Helper.instance.run(two.newThreadTwo());
  56. Helper.instance.shutdown();
  57. }
  58. }
c. 利用volatile:
volatile修饰的变量值直接存在main memory里面,子线程对该变量的读写直接写入main memory,而不是像其它变量一样在local thread里面产生一份copy。volatile能保证所修饰的变量对于多个线程可见性,即只要被修改,其它线程读到的一定是最新的值。
  1. public class MethodThree {
  2. private volatile ThreadToGo threadToGo = new ThreadToGo();
  3. class ThreadToGo {
  4. int value = 1;
  5. }
  6. public Runnable newThreadOne() {
  7. final String[] inputArr = Helper.buildNoArr(52);
  8. return new Runnable() {
  9. private String[] arr = inputArr;
  10. public void run() {
  11. for (int i = 0; i < arr.length; i=i+2) {
  12. while(threadToGo.value==2){}
  13. Helper.print(arr[i], arr[i + 1]);
  14. threadToGo.value=2;
  15. }
  16. }
  17. };
  18. }
  19. public Runnable newThreadTwo() {
  20. final String[] inputArr = Helper.buildCharArr(26);
  21. return new Runnable() {
  22. private String[] arr = inputArr;
  23. public void run() {
  24. for (int i = 0; i < arr.length; i++) {
  25. while(threadToGo.value==1){}
  26. Helper.print(arr[i]);
  27. threadToGo.value=1;
  28. }
  29. }
  30. };
  31. }
  32. public static void main(String args[]) throws InterruptedException {
  33. MethodThree three = new MethodThree();
  34. Helper.instance.run(three.newThreadOne());
  35. Helper.instance.run(three.newThreadTwo());
  36. Helper.instance.shutdown();
  37. }
  38. }

d. 利用AtomicInteger

  1. public class MethodFive {
  2. private AtomicInteger threadToGo = new AtomicInteger(1);
  3. public Runnable newThreadOne() {
  4. final String[] inputArr = Helper.buildNoArr(52);
  5. return new Runnable() {
  6. private String[] arr = inputArr;
  7. public void run() {
  8. for (int i = 0; i < arr.length; i=i+2) {
  9. while(threadToGo.get()==2){}
  10. Helper.print(arr[i], arr[i + 1]);
  11. threadToGo.set(2);
  12. }
  13. }
  14. };
  15. }
  16. public Runnable newThreadTwo() {
  17. final String[] inputArr = Helper.buildCharArr(26);
  18. return new Runnable() {
  19. private String[] arr = inputArr;
  20. public void run() {
  21. for (int i = 0; i < arr.length; i++) {
  22. while(threadToGo.get()==1){}
  23. Helper.print(arr[i]);
  24. threadToGo.set(1);
  25. }
  26. }
  27. };
  28. }
  29. public static void main(String args[]) throws InterruptedException {
  30. MethodFive five = new MethodFive();
  31. Helper.instance.run(five.newThreadOne());
  32. Helper.instance.run(five.newThreadTwo());
  33. Helper.instance.shutdown();
  34. }
  35. }

2. 第二种解法,是利用CyclicBarrierAPI;

CyclicBarrier可以实现让一组线程在全部到达Barrier时(执行await()),再一起同时执行,并且所有线程释放后,还能复用它,即为Cyclic。
CyclicBarrier类提供两个构造器:

  1. public CyclicBarrier(int parties, Runnable barrierAction) {
  2. }
  3. public CyclicBarrier(int parties) {
  4. }
  1. public class MethodFour{
  2. private final CyclicBarrier barrier;
  3. private final List<String> list;
  4. public MethodFour() {
  5. list = Collections.synchronizedList(new ArrayList<String>());
  6. barrier = new CyclicBarrier(2,newBarrierAction());
  7. }
  8. public Runnable newThreadOne() {
  9. final String[] inputArr = Helper.buildNoArr(52);
  10. return new Runnable() {
  11. private String[] arr = inputArr;
  12. public void run() {
  13. for (int i = 0, j=0; i < arr.length; i=i+2,j++) {
  14. try {
  15. list.add(arr[i]);
  16. list.add(arr[i+1]);
  17. barrier.await();
  18. } catch (InterruptedException | BrokenBarrierException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }
  23. };
  24. }
  25. public Runnable newThreadTwo() {
  26. final String[] inputArr = Helper.buildCharArr(26);
  27. return new Runnable() {
  28. private String[] arr = inputArr;
  29. public void run() {
  30. for (int i = 0; i < arr.length; i++) {
  31. try {
  32. list.add(arr[i]);
  33. barrier.await();
  34. } catch (InterruptedException | BrokenBarrierException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. }
  39. };
  40. }
  41. private Runnable newBarrierAction(){
  42. return new Runnable() {
  43. @Override
  44. public void run() {
  45. Collections.sort(list);
  46. list.forEach(c->System.out.print(c));
  47. list.clear();
  48. }
  49. };
  50. }
  51. public static void main(String args[]){
  52. MethodFour four = new MethodFour();
  53. Helper.instance.run(four.newThreadOne());
  54. Helper.instance.run(four.newThreadTwo());
  55. Helper.instance.shutdown();
  56. }
  57. }
这里多说一点,这个API其实还是利用lockcondition,无非是多个线程去争抢CyclicBarrier的instance的lock罢了,最终barrierAction执行时,是在抢到CyclicBarrierinstance的那个线程上执行的。

3. 第三种解法,是利用PipedInputStreamAPI;

这里用流在两个线程间通信,但是Java中的Stream是单向的,所以在两个线程中分别建了一个input和output。这显然是一种很搓的方式,不过也算是一种通信方式吧……-_-T,执行的时候那种速度简直。。。请不要BS我。
  1. public class MethodSix {
  2. private final PipedInputStream inputStream1;
  3. private final PipedOutputStream outputStream1;
  4. private final PipedInputStream inputStream2;
  5. private final PipedOutputStream outputStream2;
  6. private final byte[] MSG;
  7. public MethodSix() {
  8. inputStream1 = new PipedInputStream();
  9. outputStream1 = new PipedOutputStream();
  10. inputStream2 = new PipedInputStream();
  11. outputStream2 = new PipedOutputStream();
  12. MSG = "Go".getBytes();
  13. try {
  14. inputStream1.connect(outputStream2);
  15. inputStream2.connect(outputStream1);
  16. } catch (IOException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. public void shutdown() throws IOException {
  21. inputStream1.close();
  22. inputStream2.close();
  23. outputStream1.close();
  24. outputStream2.close();
  25. }
  26. public Runnable newThreadOne() {
  27. final String[] inputArr = Helper.buildNoArr(52);
  28. return new Runnable() {
  29. private String[] arr = inputArr;
  30. private PipedInputStream in = inputStream1;
  31. private PipedOutputStream out = outputStream1;
  32. public void run() {
  33. for (int i = 0; i < arr.length; i=i+2) {
  34. Helper.print(arr[i], arr[i + 1]);
  35. try {
  36. out.write(MSG);
  37. byte[] inArr = new byte[2];
  38. in.read(inArr);
  39. while(true){
  40. if("Go".equals(new String(inArr)))
  41. break;
  42. }
  43. } catch (IOException e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. }
  48. };
  49. }
  50. public Runnable newThreadTwo() {
  51. final String[] inputArr = Helper.buildCharArr(26);
  52. return new Runnable() {
  53. private String[] arr = inputArr;
  54. private PipedInputStream in = inputStream2;
  55. private PipedOutputStream out = outputStream2;
  56. public void run() {
  57. for (int i = 0; i < arr.length; i++) {
  58. try {
  59. byte[] inArr = new byte[2];
  60. in.read(inArr);
  61. while(true){
  62. if("Go".equals(new String(inArr)))
  63. break;
  64. }
  65. Helper.print(arr[i]);
  66. out.write(MSG);
  67. } catch (IOException e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. }
  72. };
  73. }
  74. public static void main(String args[]) throws IOException {
  75. MethodSix six = new MethodSix();
  76. Helper.instance.run(six.newThreadOne());
  77. Helper.instance.run(six.newThreadTwo());
  78. Helper.instance.shutdown();
  79. six.shutdown();
  80. }

4. 第四种解法,是利用BlockingQueue

顺便总结下BlockingQueue的一些内容。
BlockingQueue定义的常用方法如下:

  • add(Object):把Object加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则抛出异常。
  • offer(Object):表示如果可能的话,将Object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
  • put(Object):把Object加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里有空间再继续。
  • poll(time):获取并删除BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。当不传入time值时,立刻返回。
  • peek():立刻获取BlockingQueue里排在首位的对象,但不从队列里删除,如果队列为空,则返回null。
  • take():获取并删除BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止。

BlockingQueue有四个具体的实现类:

  • ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。
  • LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO顺序排序的。
  • PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。
  • SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。

这里我用了两种玩法:

  • 一种是共享一个queue,根据peekpoll的不同来实现;
  • 第二种是两个queue,利用take()会自动阻塞来实现。
  1. public class MethodSeven {
  2. private final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
  3. public Runnable newThreadOne() {
  4. final String[] inputArr = Helper.buildNoArr(52);
  5. return new Runnable() {
  6. private String[] arr = inputArr;
  7. public void run() {
  8. for (int i = 0; i < arr.length; i=i+2) {
  9. Helper.print(arr[i], arr[i + 1]);
  10. queue.offer("TwoToGo");
  11. while(!"OneToGo".equals(queue.peek())){}
  12. queue.poll();
  13. }
  14. }
  15. };
  16. }
  17. public Runnable newThreadTwo() {
  18. final String[] inputArr = Helper.buildCharArr(26);
  19. return new Runnable() {
  20. private String[] arr = inputArr;
  21. public void run() {
  22. for (int i = 0; i < arr.length; i++) {
  23. while(!"TwoToGo".equals(queue.peek())){}
  24. queue.poll();
  25. Helper.print(arr[i]);
  26. queue.offer("OneToGo");
  27. }
  28. }
  29. };
  30. }
  31. private final LinkedBlockingQueue<String> queue1 = new LinkedBlockingQueue<>();
  32. private final LinkedBlockingQueue<String> queue2 = new LinkedBlockingQueue<>();
  33. public Runnable newThreadThree() {
  34. final String[] inputArr = Helper.buildNoArr(52);
  35. return new Runnable() {
  36. private String[] arr = inputArr;
  37. public void run() {
  38. for (int i = 0; i < arr.length; i=i+2) {
  39. Helper.print(arr[i], arr[i + 1]);
  40. try {
  41. queue2.put("TwoToGo");
  42. queue1.take();
  43. } catch (InterruptedException e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. }
  48. };
  49. }
  50. public Runnable newThreadFour() {
  51. final String[] inputArr = Helper.buildCharArr(26);
  52. return new Runnable() {
  53. private String[] arr = inputArr;
  54. public void run() {
  55. for (int i = 0; i < arr.length; i++) {
  56. try {
  57. queue2.take();
  58. Helper.print(arr[i]);
  59. queue1.put("OneToGo");
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. }
  65. };
  66. }
  67. public static void main(String args[]) throws InterruptedException {
  68. MethodSeven seven = new MethodSeven();
  69. Helper.instance.run(seven.newThreadOne());
  70. Helper.instance.run(seven.newThreadTwo());
  71. Thread.sleep(2000);
  72. System.out.println("");
  73. Helper.instance.run(seven.newThreadThree());
  74. Helper.instance.run(seven.newThreadFour());
  75. Helper.instance.shutdown();
  76. }

原文链接:http:///2017/03/02/java-thread-communication.html






    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多