1 介绍
方法名 | 作用 |
---|---|
final void wait() | 表示线程一直等待,直到其它线程通知 |
void wait(long timeout) | 线程等待指定毫秒参数的时间 |
final void wait(long timeout,intnanos) | 线程等待指定毫秒、微妙的时间 |
final void notify() | 唤醒一个处于等待状态的线程 |
final void notify() | 唤醒一个处于等待状态的线程 |
final void notifyAll() | 唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先运行 |
均是java.lang.Object类的方法都只能在同步方法或者同步代码块中使用,否则会抛出异常
2 生产者/消费者
2.1 synchronize实现
public class ProductConsumeBySyncTest {
public static void main(String[] args) {
DealData data = new DealData();
new Thread(() -> {
data.product();
}, "线程1").start();
new Thread(() -> {
data.consume();
}, "线程2").start();
new Thread(() -> {
data.product();
}, "线程3").start();
new Thread(() -> {
data.consume();
}, "线程4").start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
data.stop();
}
}
class DealData {
private volatile boolean flag = true;
public static final int MAX_COUNT = 10;
public static AtomicInteger atomicInteger = new AtomicInteger();
private static final List<Integer> queue = new ArrayList<>();
public void product() {
while (flag) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (queue) {
//池子满了,生产者停止生产
//埋个坑,这里用的if
//TODO 判断
while (queue.size() == MAX_COUNT) {
System.out.println("生产已达上限, 等待消费中...");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//干活
queue.add(atomicInteger.incrementAndGet());
System.out.println("生产者生产了:" + atomicInteger.get() + "\t" + "生产队列剩余:" + queue.size());
//通知
queue.notifyAll();
}
}
}
public void consume() {
while (flag) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (queue) {
while (queue.size() == 0) {
System.out.println("没有可消费产品,等待生产中...");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int c = queue.get(0);
queue.remove(0);
System.out.println("消费者消费了" + c + ";消费队列队列剩余:" + queue.size());
queue.notifyAll();
}
}
}
public void stop() {
this.flag = false;
}
}
输出:
没有可消费产品,等待生产中...
生产者生产了:1 生产队列剩余:1
生产者生产了:2 生产队列剩余:2
消费者消费了1;消费队列队列剩余:1
消费者消费了2;消费队列队列剩余:0
生产者生产了:3 生产队列剩余:1
生产者生产了:4 生产队列剩余:2
消费者消费了3;消费队列队列剩余:1
消费者消费了4;消费队列队列剩余:0
生产者生产了:5 生产队列剩余:1
消费者消费了5;消费队列队列剩余:0
没有可消费产品,等待生产中...
生产者生产了:6 生产队列剩余:1
消费者消费了6;消费队列队列剩余:0
生产者生产了:7 生产队列剩余:1
消费者消费了7;消费队列队列剩余:0
没有可消费产品,等待生产中...
生产者生产了:8 生产队列剩余:1
消费者消费了8;消费队列队列剩余:0
生产者生产了:9 生产队列剩余:1
生产者生产了:10 生产队列剩余:2
消费者消费了9;消费队列队列剩余:1
消费者消费了10;消费队列队列剩余:0
没有可消费产品,等待生产中...
没有可消费产品,等待生产中...
生产者生产了:11 生产队列剩余:1
生产者生产了:12 生产队列剩余:2
消费者消费了11;消费队列队列剩余:1
消费者消费了12;消费队列队列剩余:0
没有可消费产品,等待生产中...
生产者生产了:13 生产队列剩余:1
消费者消费了13;消费队列队列剩余:0
生产者生产了:14 生产队列剩余:1
消费者消费了14;消费队列队列剩余:0
生产者生产了:15 生产队列剩余:1
消费者消费了15;消费队列队列剩余:0
没有可消费产品,等待生产中...
生产者生产了:16 生产队列剩余:1
消费者消费了16;消费队列队列剩余:0
生产者生产了:17 生产队列剩余:1
消费者消费了17;消费队列队列剩余:0
生产者生产了:18 生产队列剩余:1
消费者消费了18;消费队列队列剩余:0
没有可消费产品,等待生产中...
没有可消费产品,等待生产中...
生产者生产了:19 生产队列剩余:1
生产者生产了:20 生产队列剩余:2
消费者消费了19;消费队列队列剩余:1
消费者消费了20;消费队列队列剩余:0
2.2 通过Lock实现
public class ProductConsumeByLockTest {
public static void main(String[] args) {
DealDataByLock data = new DealDataByLock();
new Thread(() -> {
Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.product());
}, "线程1").start();
new Thread(() -> {
Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.consume());
}, "线程2").start();
new Thread(() -> {
Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.product());
}, "线程3").start();
new Thread(() -> {
Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.consume());
}, "线程4").start();
}
}
class DealDataByLock {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void product() {
lock.lock();
try {
while (number != 0)
condition.await();
number++;
System.out.println(Thread.currentThread().getName() + " 生产者生产了number=" + number);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void consume() {
lock.lock();
try {
while (number == 0)
condition.await();
number--;
System.out.println(Thread.currentThread().getName() + " 消费者消费了number=" + number);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
输出:
线程3 生产者生产了number=1
线程4 消费者消费了number=0
线程1 生产者生产了number=1
线程4 消费者消费了number=0
线程1 生产者生产了number=1
线程4 消费者消费了number=0
线程1 生产者生产了number=1
线程4 消费者消费了number=0
线程1 生产者生产了number=1
线程4 消费者消费了number=0
线程1 生产者生产了number=1
线程2 消费者消费了number=0
线程3 生产者生产了number=1
线程2 消费者消费了number=0
线程3 生产者生产了number=1
线程2 消费者消费了number=0
线程3 生产者生产了number=1
线程2 消费者消费了number=0
线程3 生产者生产了number=1
线程2 消费者消费了number=0
2.3 通过阻塞队列
public class ProductConsumeByBlockTest {
public static void main(String[] args) {
DealDataByBlock data = new DealDataByBlock();
new Thread(() -> {
data.product();
}, "线程1").start();
new Thread(() -> {
data.consume();
}, "线程2").start();
/* new Thread(() -> {
data.product();
}, "线程3").start();*/
new Thread(() -> {
data.consume();
}, "线程4").start();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
data.stop();
}
}
class DealDataByBlock {
public static final int MAX_COUNT = 10;
private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(MAX_COUNT);
private static boolean flag = true;
private AtomicInteger atomicInteger = new AtomicInteger();
public void product() {
while (flag) {
//offer:在该队列的尾部插入指定的元素,在队列已满的情况下等待指定的等待时间,等待空间变为可用。
boolean retvalue = false;
try {
retvalue = queue.offer(atomicInteger.incrementAndGet(), 2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (retvalue == true) {
System.out.println(Thread.currentThread().getName() + "\t 插入队列" + atomicInteger.get() + "成功" + "生产队列大小= " + queue.size());
} else {
System.out.println(Thread.currentThread().getName() + "\t 插入队列" + atomicInteger.get() + "失败" + "生产队列大小= " + queue.size());
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void consume() {
Integer result = null;
while (flag) {
try {
//poll:检索并删除此队列的头,如果需要元素变为可用,则等待指定的等待时间
result = queue.poll(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (null == result) {
System.out.println("超过两秒没有取道数据,消费者即将退出");
return;
}
System.out.println(Thread.currentThread().getName() + "\t 消费" + result + "成功" + "\t\t" + "消费队列大小= " + queue.size());
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stop() {
this.flag = false;
}
}
输出(一个生产者俩消费者):
线程1 插入队列1成功生产队列大小= 1
线程2 消费1成功 消费队列大小= 0
线程4 消费2成功 消费队列大小= 0
线程1 插入队列2成功生产队列大小= 0
线程1 插入队列3成功生产队列大小= 1
线程2 消费3成功 消费队列大小= 0
线程1 插入队列4成功生产队列大小= 1
线程4 消费4成功 消费队列大小= 0
线程2 消费5成功 消费队列大小= 0
线程1 插入队列5成功生产队列大小= 0
超过两秒没有取道数据,消费者即将退出