序言:当使用线程来同时运行多个任务的时候,可以通过使用锁(互斥)来同步两个对象的行为。从而使得另一个对象不会干涉另一个任务的资源(线程安全)。下面要使线程之间相互协作,从而使多个任务可以一起工作去解决某个问题。

一、wait()和notifyAll();

注意事项:wait和notify()/notifyAll()只能在加锁的方法中去使用,不然会抛出IllegalMonitorStateException

1、Wait():

wait使当前线程在等待某个特定的条件前被挂起,同时释放锁,只有在notify或notidyAll发生的时候,当前线程才会被唤醒。有两种形式的wait:
①:接受毫秒作为参数,在规定时间后唤醒线程。

  1. class Blocked{
  2. synchronized void waitinfo2(){
  3. try {
  4. System.out.println("线程被挂起");
  5. wait(2000);//2000毫秒后线程自动唤醒
  6. System.out.println("当前线程为:"+Thread.currentThread());
  7. }catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. }
  11. class WaitTest implements Runnable{
  12. Blocked blocked=new Blocked();
  13. @Override
  14. public void run() {
  15. blocked.waitinfo2();
  16. }
  17. }
  18. public class WaitDemo {
  19. public static void main(String[] args) throws InterruptedException {
  20. ExecutorService executorService = Executors.newCachedThreadPool();
  21. executorService.execute(new WaitTest());
  22. }
  23. }

②:不接受任何参数,将无限等待下去,直到线程收到notify或者notifyAll()消息

  1. class Blocked{
  2. synchronized void waitinfo(){
  3. try {
  4. System.out.println("线程被挂起");
  5. wait();
  6. System.out.println("当前线程为:"+Thread.currentThread());
  7. }catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. synchronized void prod(){
  11. notify();
  12. }
  13. }
  14. class WaitTest implements Runnable{
  15. Blocked blocked=new Blocked();
  16. @Override
  17. public void run() {
  18. blocked.waitinfo();
  19. }
  20. }
  21. public class WaitDemo {
  22. public static void main(String[] args) throws InterruptedException {
  23. WaitTest waitTest = new WaitTest();
  24. ExecutorService executorService = Executors.newCachedThreadPool();
  25. executorService.execute(waitTest);
  26. TimeUnit.SECONDS.sleep(2);
  27. new Thread(){
  28. @Override
  29. public void run() {
  30. waitTest.blocked.prod();
  31. }
  32. }.start();
  33. }
  34. }

注意:对于wait来说,只有同一个对象锁的notify或者notifyAll才能将其唤醒

  1. class Blocked{
  2. synchronized void waitinfo(){
  3. try {
  4. System.out.println("线程被挂起");
  5. wait();
  6. System.out.println("当前线程为:"+Thread.currentThread());
  7. }catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. synchronized void prod(){
  11. notify();
  12. }
  13. }
  14. class WaitTest implements Runnable{
  15. Blocked blocked=new Blocked();
  16. @Override
  17. public void run() {
  18. blocked.waitinfo();
  19. }
  20. }
  21. public class WaitDemo {
  22. public static void main(String[] args) throws InterruptedException {
  23. WaitTest waitTest = new WaitTest();
  24. WaitTest waitTest2 = new WaitTest();
  25. ExecutorService executorService = Executors.newCachedThreadPool();
  26. executorService.execute(waitTest);
  27. TimeUnit.SECONDS.sleep(2);
  28. new Thread(){
  29. @Override
  30. public void run() {
  31. // waitTest2.blocked.prod();//此时不是同一个对象锁,则无法唤醒线程
  32. waitTest.blocked.prod();
  33. }
  34. }.start();
  35. }
  36. }

2、notify和notifyAll的区别:

  1. notify在众多等待**同一个锁**的任务**只有一个**会被唤醒,所有在使用notify时要保证被唤醒的是恰当的任务。而notifyAll会将众多等**待同一个锁**的任务**一起唤醒。notify如果因某个特定的锁而被调用时,只有等待这个特定锁的任务才会被唤醒。**
  1. class Blocker {
  2. synchronized void waitingCall() {
  3. try {
  4. while (!Thread.interrupted()) {
  5. wait();
  6. System.out.println("唤醒的线程为: " + Thread.currentThread() + " ");
  7. }
  8. } catch (InterruptedException e) {
  9. }
  10. }
  11. synchronized void prod() {
  12. notify();
  13. }
  14. synchronized void prodAll() {
  15. notifyAll();
  16. }
  17. }
  18. class Task implements Runnable {
  19. static Blocker blocker = new Blocker();
  20. public void run() {
  21. blocker.waitingCall();
  22. }
  23. }
  24. class Task2 implements Runnable {
  25. static Blocker blocker = new Blocker();
  26. public void run() {
  27. blocker.waitingCall();
  28. }
  29. }
  30. public class NotifyVsNotifyAll {
  31. public static void main(String[] args) throws Exception {
  32. ExecutorService exec = Executors.newCachedThreadPool();
  33. for (int i = 0; i < 5; i++)
  34. exec.execute(new Task());
  35. exec.execute(new Task2());
  36. Timer timer = new Timer();//开启一个定时任务
  37. timer.scheduleAtFixedRate(new TimerTask() {
  38. boolean prod = true;
  39. @Override
  40. public void run() {
  41. if (prod) {
  42. System.out.println("唤醒一个线程 notify()");
  43. Task.blocker.prod();
  44. prod = false;
  45. } else {
  46. System.out.println("唤醒所以的线程 notifyAll()");
  47. Task.blocker.prodAll();
  48. prod = true;
  49. }
  50. }
  51. }, 400, 400);
  52. TimeUnit.SECONDS.sleep(2); // Run for a while...
  53. timer.cancel();//关闭计时器
  54. TimeUnit.SECONDS.sleep(2);
  55. System.out.println("=============");
  56. Task2.blocker.prodAll();//唤醒特定的notifyAll();
  57. exec.shutdownNow();
  58. }
  59. }

3.wait和notify的示例(生产者与消费者):

  1. 在下面的列子中生产者是厨师,消费者是服务员,两者相互工作互不干扰。
  1. class Meal {//食物
  2. private final int orderNum;
  3. Meal(int orderNum) {
  4. this.orderNum = orderNum;
  5. }
  6. @Override
  7. public String toString() {
  8. return "Meal " + orderNum;
  9. }
  10. }
  11. class WaitPerson implements Runnable {//消费者
  12. private Restaurnt restaurnt;
  13. WaitPerson(Restaurnt restaurnt) {
  14. this.restaurnt = restaurnt;
  15. }
  16. @Override
  17. public void run() {
  18. try {
  19. while (!Thread.interrupted()) {
  20. synchronized (this) {
  21. while (restaurnt.meal == null)
  22. wait();//此时服务员线程的锁被释放
  23. }
  24. System.out.println("Waitperson got " + restaurnt.meal);
  25. synchronized (restaurnt.chef) {//先捕获对应的锁,解开对应chef的wait
  26. restaurnt.meal = null;
  27. restaurnt.chef.notifyAll();
  28. }
  29. }
  30. } catch (InterruptedException e) {
  31. System.out.println(e);
  32. }
  33. }
  34. }
  35. class Chef implements Runnable {
  36. private Restaurnt restaurnt;
  37. private int count = 0;
  38. Chef(Restaurnt restaurnt) {
  39. this.restaurnt = restaurnt;
  40. }
  41. @Override
  42. public void run() {
  43. try {
  44. while (!Thread.interrupted()) {
  45. synchronized (this) {
  46. while (restaurnt.meal != null)//如果有食物则进入挂起状态
  47. wait();
  48. }
  49. if (++count == 10) {
  50. System.out.println("以达到销售目标,停止售卖");
  51. restaurnt.service.shutdownNow();//关闭线程
  52. }
  53. System.out.println("Order up!");
  54. synchronized (restaurnt.waitPerson) {//先捕获对应的锁,解开对应waitPerson的wait
  55. restaurnt.meal = new Meal(count);
  56. restaurnt.waitPerson.notifyAll();
  57. }
  58. TimeUnit.MILLISECONDS.sleep(100);
  59. }
  60. } catch (InterruptedException e) {
  61. System.out.println("Chef interrupted");
  62. }
  63. }
  64. }
  65. public class Restaurnt {
  66. Meal meal;
  67. Chef chef = new Chef(this);
  68. WaitPerson waitPerson = new WaitPerson(this);
  69. ExecutorService service = Executors.newCachedThreadPool();
  70. Restaurnt() {
  71. service.execute(chef);
  72. service.execute(waitPerson);
  73. }
  74. public static void main(String[] args) {
  75. new Restaurnt();
  76. }
  77. }
  1. 在上面的列子中,保证了使用了同一个对象锁,并且当需要调用notifyAll的时候要先获得对应的锁才可以唤醒对应的线程

二、condition的await、signal

  1. 可以通过Condition中的await()方法来挂起一个任务,同时通过signal()/signalAll()来唤醒被挂起的线程。

1.示例一:源码中的典型示例

  1. //来自Condition源码中的示例
  2. public class BoundedBufferDemo {
  3. final Lock lock = new ReentrantLock();
  4. final Condition full = lock.newCondition();
  5. final Condition empty = lock.newCondition();
  6. final Object[] items = new Object[100];
  7. int putptr, takeptr, count;
  8. public void put(Object x) throws InterruptedException {
  9. lock.lock();
  10. try {
  11. while (count == items.length)
  12. full.await();
  13. items[putptr] = x;
  14. if (++putptr == items.length) putptr = 0;
  15. ++count;
  16. empty.signal();
  17. } finally {
  18. lock.unlock();
  19. }
  20. }
  21. //只要有锁的操作(lock、synchronized都可以直接调signal或await或wait、notify)
  22. public void test(){
  23. lock.lock();
  24. empty.signal();
  25. lock.unlock();
  26. }
  27. public Object take() throws InterruptedException {
  28. lock.lock();
  29. try {
  30. while (count == 0) {
  31. empty.await();
  32. }
  33. Object x = items[takeptr];
  34. if (++takeptr == items.length) takeptr = 0;
  35. --count;
  36. full.signal();
  37. return x;
  38. } finally {
  39. lock.unlock();
  40. }
  41. }
  42. public static void main(String[] args) throws InterruptedException {
  43. BoundedBufferDemo demo = new BoundedBufferDemo();
  44. new BoundedBufferDemo().test();
  45. new Thread(()->{
  46. for (int i = 0; i < 100; i++) {
  47. try {
  48. demo.put(i+"");
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. },"write").start();
  54. TimeUnit.SECONDS.sleep(10);
  55. new Thread(()->{
  56. while (true) {
  57. Object o = null;
  58. try {
  59. o = demo.take();
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. System.out.println(o);
  64. }
  65. },"reader").start();
  66. }
  67. }

2.示例二:重写汽车涂料示例

  1. class Car {
  2. private Lock lock = new ReentrantLock();
  3. private Condition condition = lock.newCondition();
  4. private boolean waxOn = false;
  5. public void waxed() {
  6. lock.lock();
  7. try {
  8. waxOn = true; // Ready to buff
  9. condition.signalAll();
  10. } finally {
  11. lock.unlock();
  12. }
  13. }
  14. public void buffed() {
  15. lock.lock();
  16. try {
  17. waxOn = false; // Ready for another coat of wax
  18. condition.signalAll();
  19. } finally {
  20. lock.unlock();
  21. }
  22. }
  23. public void waitForWaxing() throws InterruptedException {
  24. lock.lock();
  25. try {
  26. while(waxOn == false)
  27. condition.await();
  28. } finally {
  29. lock.unlock();
  30. }
  31. }
  32. public void waitForBuffing() throws InterruptedException{
  33. lock.lock();
  34. try {
  35. while(waxOn == true)
  36. condition.await();
  37. } finally {
  38. lock.unlock();
  39. }
  40. }
  41. }
  42. class WaxOn implements Runnable {
  43. private Car car;
  44. public WaxOn(Car c) { car = c; }
  45. public void run() {
  46. try {
  47. while(!Thread.interrupted()) {
  48. printnb("Wax On! ");
  49. TimeUnit.MILLISECONDS.sleep(200);
  50. car.waxed();
  51. car.waitForBuffing();
  52. }
  53. } catch(InterruptedException e) {
  54. print("Exiting via interrupt");
  55. }
  56. print("Ending Wax On task");
  57. }
  58. }
  59. class WaxOff implements Runnable {
  60. private Car car;
  61. public WaxOff(Car c) { car = c; }
  62. public void run() {
  63. try {
  64. while(!Thread.interrupted()) {
  65. car.waitForWaxing();
  66. printnb("Wax Off! ");
  67. TimeUnit.MILLISECONDS.sleep(200);
  68. car.buffed();
  69. }
  70. } catch(InterruptedException e) {
  71. print("Exiting via interrupt");
  72. }
  73. print("Ending Wax Off task");
  74. }
  75. }
  76. public class WaxOMatic2 {
  77. public static void main(String[] args) throws Exception {
  78. Car car = new Car();
  79. ExecutorService exec = Executors.newCachedThreadPool();
  80. exec.execute(new WaxOff(car));
  81. exec.execute(new WaxOn(car));
  82. TimeUnit.SECONDS.sleep(5);
  83. exec.shutdownNow();
  84. }
  85. }

三、同步队列

  1. 我们可以使用同步对列来解决任务协作的问题,同步对列在任何时刻都值允许一个任务插入或移除元素。

1.LinkedBlockingQueue:无限容量,添加多少取出多少

  1. class LiftOffRunner implements Runnable{//生产者
  2. private BlockingQueue<LiftOff> rockets;
  3. public LiftOffRunner(BlockingQueue<LiftOff> queue){
  4. rockets=queue;
  5. }
  6. public void add(LiftOff lo){//生产方法
  7. try {
  8. rockets.put(lo);
  9. System.out.println("添加时对列的大小:"+rockets.size());
  10. } catch (InterruptedException e) {
  11. System.out.println("Interrupted during put()");
  12. }
  13. }
  14. @Override
  15. public void run() {//消费方法
  16. try {
  17. while (!Thread.interrupted()) {
  18. LiftOff liftOff = rockets.take();//消费者消费
  19. System.out.println("取出后对列的大小:"+rockets.size());
  20. liftOff.run();
  21. }
  22. } catch (Exception e) {
  23. System.out.println("Interrupted take()");
  24. }
  25. System.out.println("Exiting LiftOffRunner");
  26. }
  27. }
  28. public class TestBlockingQueue {
  29. static void test(String msg,BlockingQueue<LiftOff> queue) throws InterruptedException {
  30. System.out.println("对列名称:"+msg);
  31. LiftOffRunner runner = new LiftOffRunner(queue);//创建消费者
  32. Thread thread = new Thread(runner);
  33. for (int i = 0; i <5 ; i++) {
  34. runner.add(new LiftOff(5));//添加了五个LiftOff对象
  35. }
  36. thread.start();
  37. TimeUnit.SECONDS.sleep(5);
  38. thread.interrupt();
  39. System.out.println("Finshed "+msg+" test");
  40. }
  41. public static void main(String[] args) throws InterruptedException {
  42. test("LinkdedBlockQueue",new LinkedBlockingQueue<LiftOff>());
  43. }
  44. }

2.ArrayBlockingQueue,

可以设置固定的尺寸,当该对列为空时,消费者将被挂起,当生产者达到设定的最大目标的时候,如果没有消费,此时生成者将被挂起。

  1. class LiftOffRunner implements Runnable{
  2. private BlockingQueue<LiftOff> rockets;
  3. public LiftOffRunner(BlockingQueue<LiftOff> queue){
  4. rockets=queue;
  5. }
  6. public void add(LiftOff lo){
  7. try {
  8. rockets.put(lo);
  9. System.out.println("添加时对列的大小:"+rockets.size());
  10. } catch (InterruptedException e) {
  11. System.out.println("Interrupted during put()");
  12. }
  13. }
  14. @Override
  15. public void run() {
  16. try {
  17. while (!Thread.interrupted()) {
  18. LiftOff liftOff = rockets.take();//消费者消费
  19. System.out.println("取出后对列的大小:"+rockets.size());
  20. liftOff.run();
  21. }
  22. } catch (Exception e) {
  23. System.out.println("Interrupted take()");
  24. }
  25. System.out.println("Exiting LiftOffRunner");
  26. }
  27. }
  28. public class TestBlockingQueue {
  29. static void test(String msg,BlockingQueue<LiftOff> queue) throws InterruptedException {
  30. System.out.println("对列名称:"+msg);
  31. LiftOffRunner runner = new LiftOffRunner(queue);//创建消费者
  32. Thread thread = new Thread(runner);
  33. //将消费者放在生产前面,此时的生产者设定的目标为三,这个时候生产者到达生产目标之后,将会被消费之后在进行生产
  34. thread.start();
  35. for (int i = 0; i <5 ; i++) {
  36. runner.add(new LiftOff(5));//添加了五个LiftOff对象
  37. }
  38. // thread.start();//如果将消费者放在后面,此时的生产者设定的目标为三,这个时候生产者将被挂起
  39. TimeUnit.SECONDS.sleep(5);
  40. System.out.println("Finshed "+msg+" test");
  41. }
  42. public static void main(String[] args) throws InterruptedException {
  43. test("ArrayBlockingQueu",new ArrayBlockingQueue<LiftOff>(3));
  44. }
  45. }

3.SynchronousQueue:

由于该对列本身没用任何容量,所以该对列放入一个消费一个,如果没用去消费。则任何元素都无法添加到该对列中去

  1. class LiftOffRunner implements Runnable{
  2. private BlockingQueue<LiftOff> rockets;
  3. public LiftOffRunner(BlockingQueue<LiftOff> queue){
  4. rockets=queue;
  5. }
  6. public void add(LiftOff lo){
  7. try {
  8. rockets.put(lo);
  9. System.out.println("添加时对列的大小:"+rockets.size());
  10. } catch (InterruptedException e) {
  11. System.out.println("Interrupted during put()");
  12. }
  13. }
  14. @Override
  15. public void run() {
  16. try {
  17. while (!Thread.interrupted()) {
  18. LiftOff liftOff = rockets.take();//消费者消费
  19. System.out.println("取出后对列的大小:"+rockets.size());
  20. liftOff.run();
  21. }
  22. } catch (Exception e) {
  23. System.out.println("Interrupted take()");
  24. }
  25. System.out.println("Exiting LiftOffRunner");
  26. }
  27. }
  28. public class TestBlockingQueue {
  29. static void test(String msg,BlockingQueue<LiftOff> queue) throws InterruptedException {
  30. System.out.println("对列名称:"+msg);
  31. LiftOffRunner runner = new LiftOffRunner(queue);//创建消费者
  32. Thread thread = new Thread(runner);
  33. thread.start();
  34. for (int i = 0; i <5 ; i++) {
  35. runner.add(new LiftOff(5));//添加了五个LiftOff对象
  36. }
  37. TimeUnit.SECONDS.sleep(5);
  38. System.out.println("Finshed "+msg+" test");
  39. }
  40. public static void main(String[] args) throws InterruptedException {
  41. test("Synchronous",new SynchronousQueue<LiftOff>());
  42. }
  43. }

4. 同步队示例——烤土司

1.吐司类

  1. public class Toast {
  2. public enum Status {DRY,BUTTERED,JAMMED}
  3. private Status status=Status.DRY;//默认烤吐司
  4. private final int id;
  5. public Toast(int i){
  6. id=i;
  7. }
  8. public void butter(){
  9. status=Status.BUTTERED;//涂黄油
  10. }
  11. public void jammed(){//涂果酱
  12. status=Status.JAMMED;
  13. }
  14. public Status getStatus(){
  15. return status;
  16. }
  17. public int getId() {
  18. return id;
  19. }
  20. @Override
  21. public String toString() {
  22. return "Toast"+id+":"+status;
  23. }
  24. }
  25. class ToastQueue extends LinkedBlockingQueue<Toast>{
  26. }

2.拷吐司之后摸黄油然后涂果酱

  1. class Toaster implements Runnable{
  2. private ToastQueue toasts;//该对列放置烤好的吐司
  3. private int count=0;
  4. public Toaster(ToastQueue queue){
  5. toasts=queue;
  6. }
  7. @Override
  8. public void run() {
  9. try {
  10. while (!Thread.interrupted()) {
  11. Toast toast = new Toast(count++);
  12. TimeUnit.MILLISECONDS.sleep(200);
  13. System.out.println(toast);
  14. //把烤好的吐司放入对列中
  15. toasts.put(toast);
  16. }
  17. } catch (InterruptedException e) {
  18. System.out.println("Toaster interrupted");
  19. }
  20. System.out.println("Toaster off");
  21. }
  22. }
  23. //涂果酱
  24. class Butterer implements Runnable{
  25. private ToastQueue dryQueue,//放置烤好的吐司
  26. butteredQueue;//放置吐完黄油的吐司
  27. Butterer(ToastQueue dryQueue,ToastQueue butteredQueue){
  28. this.dryQueue=dryQueue;
  29. this.butteredQueue=butteredQueue;
  30. }
  31. @Override
  32. public void run() {
  33. try {
  34. while (!Thread.interrupted()){
  35. Toast take = dryQueue.take();//先获得烤好的吐司
  36. take.butter();//给吐司涂黄油
  37. System.out.println(take);
  38. butteredQueue.put(take);//放入对列
  39. }
  40. } catch (InterruptedException e) {
  41. System.out.println("Butter Interrupted");
  42. }
  43. System.out.println("Butter off");
  44. }
  45. }
  46. class Jammer implements Runnable{
  47. private ToastQueue butteredQueue,//放置好涂完黄油的吐司
  48. jammerQueue;//放置吐完果酱的吐司
  49. Jammer(ToastQueue butteredQueue,ToastQueue jammerQueue){
  50. this.butteredQueue=butteredQueue;
  51. this.jammerQueue=jammerQueue;
  52. }
  53. @Override
  54. public void run() {
  55. try {
  56. while (!Thread.interrupted()){
  57. Toast take = butteredQueue.take();//先获得涂完黄油的吐司
  58. take.jammed();
  59. System.out.println(take);
  60. jammerQueue.put(take);//涂好的果酱吐司放入对列
  61. }
  62. }catch (InterruptedException e){
  63. System.out.println("Jammer Interrupted");
  64. }
  65. System.out.println("Jammer off");
  66. }
  67. }

3.吃烤好的吐司

  1. public class Eater implements Runnable{
  2. private ToastQueue jammerQueue;
  3. private int count=0;
  4. Eater(ToastQueue jammerQueue){
  5. this.jammerQueue=jammerQueue;
  6. }
  7. @Override
  8. public void run() {
  9. try {
  10. while (!Thread.interrupted()) {
  11. Toast take = jammerQueue.take();
  12. if (take.getId()!=count++||take.getStatus()!=Toast.Status.JAMMED){
  13. System.out.println("吐司还没玩成"+take);
  14. }else{
  15. System.out.println("吃!!! "+take);
  16. System.out.println();
  17. }
  18. }
  19. } catch (InterruptedException e) {
  20. System.out.println("Eater InterruptedException");
  21. }
  22. }
  23. }
  24. public class ToastMatic {
  25. public static void main(String[] args) throws InterruptedException {
  26. ToastQueue dryQueue = new ToastQueue(),butteredQueue = new ToastQueue(),jammerQueue = new ToastQueue();
  27. ExecutorService service = Executors.newCachedThreadPool();
  28. service.execute(new Toaster(dryQueue));//先烤吐司
  29. service.execute(new Butterer(dryQueue,butteredQueue));
  30. service.execute(new Jammer(butteredQueue,jammerQueue));
  31. service.execute(new Eater(jammerQueue));
  32. TimeUnit.SECONDS.sleep(5);
  33. service.shutdownNow();
  34. }
  35. }

扩展:因为该实例使用的是linked同步对列,具有无限容量,所以当休眠取消后就会有许多土司放入该队列。所以更应该去使用SynchronousQueue。

四、任务间使用管道进行输入输出

  1. 通过输入输出流在线程中进行通信是很有用的,线程中提供“管道”的形式进行支持。两者通过管道进行传输,管道基本上是一个阻塞队列。**PiedWtiter**(允许任务线程管道写)、**PipedReader**(允许不同任务从同一个管道中读取)<br />
  1. class Sender implements Runnable{
  2. private PipedWriter pipedWriter=new PipedWriter();
  3. public PipedWriter getPipedWriter() {
  4. return pipedWriter;
  5. }
  6. @Override
  7. public void run() {
  8. try {
  9. while (true)
  10. for (char c = 'A'; c <'c' ; c++) {
  11. pipedWriter.write(c);//不断进行写入
  12. TimeUnit.MILLISECONDS.sleep(100);
  13. }
  14. } catch (IOException | InterruptedException e) {
  15. System.out.println("输入流被打断");
  16. }
  17. }
  18. }
  19. class Receiver implements Runnable{
  20. private PipedReader in;
  21. int i=0;
  22. Receiver(Sender sender) throws IOException {
  23. in=new PipedReader(sender.getPipedWriter());
  24. }
  25. @Override
  26. public void run() {
  27. try{
  28. while (true){
  29. i++;
  30. int read = in.read();
  31. System.out.print((char)read);
  32. if (i%5==0)
  33. System.out.println();
  34. }
  35. }
  36. catch (Exception e){
  37. System.out.println("输出流被打断");
  38. }
  39. }
  40. }
  41. public class PipedIo {
  42. public static void main(String[] args) throws IOException, InterruptedException {
  43. Sender sender = new Sender();
  44. Receiver receiver = new Receiver(sender);
  45. ExecutorService service = Executors.newCachedThreadPool();
  46. service.execute(sender);
  47. service.execute(receiver);
  48. TimeUnit.SECONDS.sleep(4);
  49. service.shutdownNow();
  50. }
  51. }