JUC
1.JUC是什么
java.util.concurrent 在并发编程中使用的工具<br /> java.util.concurrent.atomic<br /> java.util.concurrent.locks
2进程/线程回顾
2.1什么是进程/线程
**进程**:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。 是程序中的控制单元,执行路径。每一个进程执行都是一个执行顺序,该顺序就是一个执行路径。或者叫控制单元 操作系统会以进程为单位,分配系统资源(CPU时间片、内存等资源),进程是资源分配的最小单位。<br /> **线程**:通常在一个进程中可以包含若干个线程,当然一个进程中至少有一个线程,不然没有存在的意义。线程可以利用进程所拥有的资源,在引入线程的操作系统中,通常都是把进程作为分配资源的基本单位,而把线程作为独立运行和独立调度的基本单位,由于线程比进程更小,基本上不拥有系统资源,故对它的调度所付出的开销就会小得多,能更高效的提高系统多个程序间并发执行的程度。 操作系统调度(CPU调度)执行的最小单位。
2.2并发/并行
并发:多个线程同一时间争夺同一份资源
并行:同一时间,安排不同的线程做不同的事情
2.Lock接口(买票)
wait放开手去睡,放开手里的锁 sleep握紧手去睡,醒了手里还有锁 代码演示
wati sleep
xxxxxxxxxx
class Ticket{
private int number=300;
public synchronized void sale(){
if(number>0){
System.out.println(Thread.currentThread().getName()+”\t 卖出第几张票”+number—+”\t 还剩多少张”+number);
}
try {
Thread.sleep(3000);
this.wait(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class SaleTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(new Runnable() {
@Override
public void run() {
for (int i =0;i<40;i++)
{
ticket.sale();
}
}
},”A”).start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i =0;i<40;i++)
{
ticket.sale();
}
}
},”B”).start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i =0;i<40;i++)
{
ticket.sale();
}
}
},”C”).start();
}
}
线程的四个状态:new runnable blocked waiting timed_waiting terminated
xxxxxxxxxx
/*
题目:三个售票员 卖出30 张票
多线程编程的企业级套路+模版
在高内聚低耦合的前提下,线程 操作(对外暴露的调用方法) 资源类
private Lock lock = new ReentrantLock();
lock.lock();
try {
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
/
// 资源类
class Ticket2{
private int number=300;
private Lock lock = new ReentrantLock();
public void sale(){
lock.lock();
try {
if(number>0){
System.out.println(Thread.currentThread().getName()+”\t 卖出第几张票”+number—+”\t 还剩多少张”+number);
}
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
public class SaleTicket2 {
public static void main(String[] args) {
Ticket2 ticket2 = new Ticket2();
new Thread(new Runnable() {
@Override
public void run() {
for (int i =0;i<300;i++)
{
ticket2.sale();
}
}
},”A”).start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i =0;i<300;i++)
{
ticket2.sale();
}
}
},”B”).start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i =0;i<300;i++)
{
ticket2.sale();
}
}
},”C”).start();
}
}
3. LambdaExpress
接口中只有一个方法, 面向函数接口. 其中的变量应该为final修饰的
1.拷贝小括号 写死右箭头 落地大括号
xxxxxxxxxx
@FunctionalInterface //只有一个方法 默认加上
interface Foo{
void sayHello(String s);
}
xxxxxxxxxx
Foo foo = (s)->{ //方法类型可以省略
System.out.println(“hello”+s);
};
String s1 = “132”;
foo.sayHello(s1);
2.@FunctionalInterface //只有一个方法 默认加上 可以有多个default实现
3.default 方法的默认实现 则不会引起上述注解错误
xxxxxxxxxx
@FunctionalInterface //只有一个未实现方法 默认加上
interface Foo{
void sayHello(String s);
default int div(int x,int y){
System.out.println(“hello”);
return x+y;
}
default int div2(int x,int y){
System.out.println(“hello”);
return x+y;
}
}
System.out.println(foo.div(1,2));
4.静态方法实现 可以有多个静态方法实现
xxxxxxxxxx
@FunctionalInterface //只有一个未实现方法 默认加上
interface Foo{
void sayHello(String s);
static int mv(int x,int y){
return xy;
}
static int mv2(int x,int y){
return xy*2;
}
}
System.out.println(FOO.mv2(3,5));
xxxxxxxxxx
public class SaleTicket3 {
public static void main(String[] args) {
Ticket3 ticket2 = new Ticket3();
new Thread(()->{
for (int i =0;i<300;i++)
ticket2.sale();
},”A”).start();
new Thread(()->{
for (int i =0;i<300;i++)
ticket2.sale();
},”B”).start();
new Thread(()->{
for (int i =0;i<300;i++)
ticket2.sale();
},”C”).start();
}
}
4.线程间的通信(生产者 消费者)
1.高内聚低耦合线 程操作资源类
2.判断/干活/通知
3.多线程交互中,必须防止多线程的虚假唤醒,也即(判断只用while 不用if)
4.注意标志位
synchronized
xxxxxxxxxx
class AirConditioner{
private int number = 0;
public synchronized void increment() throws InterruptedException {
//1.判断
//if(number!=0){
while(number!=0)
this.wait();
}
//2.干活
number++;
System.out.println(Thread.currentThread().getName()+”\t”+number);
//3.唤醒
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
while(number==0){
this.wait();
}
number—;
System.out.println(Thread.currentThread().getName()+”\t”+number);
this.notifyAll();
}
}
public class ThreadWaitNotifyDemo {
public static void main(String[] args) {
AirConditioner airConditioner = new AirConditioner();
new Thread(()->{
for (int i=0;i<10;i++)
{
try {
airConditioner.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, “A”).start();
new Thread(()->{
for (int i=0;i<10;i++)
{
try {
airConditioner.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, “B”).start();
new Thread(()->{
for (int i=0;i<10;i++)
{
try {
airConditioner.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, “C”).start();
new Thread(()->{
for (int i=0;i<10;i++)
{
try {
airConditioner.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, “D”).start();
}
}
Lock ReentrantLock
xxxxxxxxxx
class AirConditioner2{
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public void increment() throws InterruptedException {
//1.判断
//if
lock.lock();
try {
while (number!=0){
notFull.await();
}
//2.干活
number++;
System.out.println(Thread.currentThread().getName()+”\t”+number);
//3.唤醒
notEmpty.signalAll();
}catch (Exception e){
}finally {
lock.unlock();
}
}
public void decrement() throws InterruptedException {
lock.lock();
try {
while (number!=0){
notEmpty.await();
}
//2.干活
number—;
System.out.println(Thread.currentThread().getName()+”\t”+number);
//3.唤醒
notFull.signalAll();
}catch (Exception e){
}finally {
lock.unlock();
}
}
}
public class ThreadWaitNotifyDemo2 {
public static void main(String[] args) {
AirConditioner airConditioner = new AirConditioner();
new Thread(()->{
for (int i=0;i<10;i++)
{
try {
airConditioner.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, “A”).start();
new Thread(()->{
for (int i=0;i<10;i++)
{
try {
airConditioner.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, “B”).start();
new Thread(()->{
for (int i=0;i<10;i++)
{
try {
airConditioner.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, “C”).start();
new Thread(()->{
for (int i=0;i<10;i++)
{
try {
airConditioner.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, “D”).start();
}
}
5.例题:
1.多线程之间按顺序调用,实现A->B->C三个线程启动,要求如下:
AA打印5次,BB打印10次,CC打印15次 接着, AA打印5次,BB打印10次,CC打印15次 循环10次。
xxxxxxxxxx
class TPrint{
Lock lock = new ReentrantLock();
Condition a= lock.newCondition();
Condition b= lock.newCondition();
Condition c= lock.newCondition();
private int number = 1;
private int count=0;
public void printA() throws InterruptedException {
lock.lock();
try {
while (number!=1){
a.await();
}
System.out.println(“========”+count+++”========”);
for (int i=0;i<5;i++)
System.out.println(“AA”);
number=2;
b.signalAll();
}catch (Exception e){
}finally {
lock.unlock();
}
}
public void printB() throws InterruptedException {
lock.lock();
try {
while (number!=2){
b.await();
}
for (int i=0;i<10;i++)
System.out.println(“BB”);
number=3;
c.signalAll();
}catch (Exception e){
}finally {
lock.unlock();
}
}
public void printC() throws InterruptedException {
lock.lock();
try {
while (number!=3)
{
c.await();
}
for (int i=0;i<15;i++)
System.out.println(“CC”);
number=1;
a.signalAll();
}catch (Exception e){
}finally {
lock.unlock();
}
}
}
public class ThreadPrint {
public static void main(String[] args) {
TPrint p = new TPrint();
new Thread(()->{
for (int i=0;i<10;i++){
try {
p.printA();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},”A”).start();
new Thread(()->{
for (int i=0;i<10;i++) {
try {
p.printB();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},”B”).start();
new Thread(()->{
for (int i=0;i<10;i++) {
try {
p.printC();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},”C”).start();
}
}
2.多线程8锁:
//前提 邮件和短信线程间隔两秒1.标准访问,清问先打印邮件还是短信? //不确定2.邮件方法暂停4秒钟,请问先打印邮件还是短信? //先邮件后短信3.新增一个普通方法hello(),请问先打印邮件还是hello ? //hello4.两部手机,请问先打印邮件还是短信? //短信5.两个静态同步方法,同一部手机,请问先打印邮件还是短信? // 邮件6.两个静态同步方法,2部手机,请问先打印邮件还是短信?//邮件7.一个普通同步方法,1个静态同步方法,1部手机,请问先打印邮件还是短信? //短信8.一个普通同步方法,1个静态同步方法,2部手机,请问先打印邮件还是短信? //短信
xxxxxxxxxx
class Phone{
//1.标准访问,清问先打印邮件还是短信
public synchronized void email(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“sentEmail”);
}
public synchronized void sms(){
System.out.println(“sentSMS”);
}
//2.邮件方法暂停4秒钟,请问先打印邮件还是短信?
//3.新增一个普通方法hello(),请问先打印邮件还是hello ?
public void hello(){
System.out.println(“hello”);
}
//4.两部手机,请问先打印邮件还是短信?
//5.两个静态同步方法,同一部手机,请问先打印邮件还是短信?
public static synchronized void email_static(){
System.out.println(“sentEmail_static”);
}
public static synchronized void sms_static(){
System.out.println(“sentSMS_static”);
}
//6.两个静态同步方法,2部手机,请问先打印邮件还是短信?
//7.一个普通同步方法,1个静态同步方法,1部手机,请问先打印邮件还是短信?
//8.一个普通同步方法,1个静态同步方法,2部手机,请问先打印邮件还是短信?
}
public class Lock8 {
public static void main(String[] args) {
Phone phone =new Phone();
Phone phone1 =new Phone();
new Thread(()->{
// phone.email();
phone.email_static();
}).start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
// phone.sms();
// phone.hello();
// phone1.sms();
// Phone.sms_static();
phone1.sms_static();
}).start();
}
}
3.举例说明集合类是不安全的
java.util.ConcurrentModificationException<br /> Copyonwrite容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容涨object[]添加,而是先将当前容删object[]进rcop,复制出一个新的容器object[] newELements,然后新的容器object[] newELements里添加元素,添加完元素之后,再将原容器的引用指向新的容器setArray(newElements);。 这样做的好处是可以对CopyonMrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
3.1ArrayList
xxxxxxxxxx
/*
题目:举例说明集合类是不安全的
Arraylist线程不安全,单机版。 HashSet,HashMap,线程不安全
1.故障现象
java.util.ConcurrentModificationException
2.导致原因
3.解决方案
3.1 Vector 线程安全但是效率低
3.2 Collections.synchronizedList(new ArrayList<>()); 小数据量可以使用
3.3 new CopyOnWriteArrayList<>();
4.优化建议(同样的错误,不出现第二次)
/
public class NotSafeDemo {
public static void main(String[] args) {
// List
//1.
// List
//2.
// List
List
for (int i =1 ;i<=30;i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
xxxxxxxxxx
//CopyOnWriteArrayList 添加底层代码
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
3.2Hashset
HashSet底层源码是HashMap 的key value为一个不变的Object类 PRESENT
xxxxxxxxxx
// Set
Set
// HashSet
for (int i =1 ;i<=30;i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(set);
},String.valueOf(i)).start();
}
xxxxxxxxxx
//源码
private static final Object PRESENT = new Object();
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
3.3HashMap
HashMap底层为 node类型的数组,node类型的链表,红黑树默认容量16,负载因子0.75(可以修改)new ConcurrentHashMap<>();
xxxxxxxxxx
public static void MapDemo(){
// Map
Map
for (int i =1 ;i<=30;i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,8));
System.out.println(map);
},String.valueOf(i)).start();
}
}
3.4补充
Arraylist扩容为原来的一半,HashMap扩容为原来的一倍
HashMap遍历方式 以及底层原理
6.Callable 线程接口
和Runnable 接口的差别 方法名,异常,返回值1.get 方法一般请放在最后一行,如果放在主线程之间会造成线程阻塞。2.同一个FutureTask对象 只能执行一个线程,多次线程无效。 new Thread(futureTask,”B”).start(); new Thread(futureTask,”C”).start();System.out.println(futureTask.get());System.out.println(futureTask.get());
xxxxxxxxxx
class MyThread2 implements Callable
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName()+”come in here”);
return 1024;
}
}
/
call run
抛异常
返回值
/
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask(new MyThread2());
FutureTask futureTask1 = new FutureTask(new MyThread2());
new Thread(futureTask,”B”).start();
new Thread(futureTask1,”A”).start();
System.out.println(Thread.currentThread().getName()+”main is over”);
System.out.println(futureTask.get());
System.out.println(futureTask1.get());
new Thread(futureTask,”C”).start();
System.out.println(futureTask.get());
}
}
7.JUC辅助工具类
7.1CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(6); countDownLatch.countDown(); countDownLatch.await();<br /> CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。<br />xxxxxxxxxx<br />public class CountDownLatchDemo {<br /> public static void main(String[] args) throws InterruptedException {<br />// CloseDoorDemo1();<br /> CountDownLatch countDownLatch = new CountDownLatch(6);<br /> for (int i =1;i<=6;i++)<br /> {<br /> new Thread(()->{<br /> System.out.println(Thread.currentThread().getName()+"离开教室");<br /> countDownLatch.countDown();<br /> },String.valueOf(i)).start();<br /> }<br /> countDownLatch.await();<br /> System.out.println(Thread.currentThread().getName()+"\t关门");<br /> }<br /> public static void CloseDoorDemo1() {<br /> for (int i =1;i<=6;i++)<br /> {<br /> new Thread(()->{<br /> System.out.println(Thread.currentThread().getName()+"离开教室");<br /> },String.valueOf(i)).start();<br /> }<br /> System.out.println(Thread.currentThread().getName()+"\t关门");<br /> }<br />}
7.2 CyclicBarrier
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{ System.out.println("召唤神龙"); }); cyclicBarrier.await(); 需要等到7个线程全部执行完后才执行完毕 不影响主线程执行。对与此例而言。<br />xxxxxxxxxx<br />public class CyclicBarrierDemo {<br /> public static void main(String[] args) {<br /> //CyclicBarrier(int parties,Runnable barrierAction);<br /> CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{<br /> System.out.println("召唤神龙");<br /> });<br /> for (int i =0;i<7;i++) {<br /> final int templInt = i;<br /> new Thread(()->{<br /> System.out.println(Thread.currentThread().getName()+"\t收集到第:"+templInt+"颗龙珠");<br /> try {<br /> cyclicBarrier.await();<br /> } catch (InterruptedException e) {<br /> e.printStackTrace();<br /> } catch (BrokenBarrierException e) {<br /> e.printStackTrace();<br /> }<br /> },String.valueOf(i)).start();<br /> }<br /> System.out.println("123");<br /> }<br />}
7.3Semaphore
在信号量上我们定义两种操作:
1.acquire(获取)当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1)要么一直等下去,直到有线 程释放信号量,或超时。2.release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。 Semaphore semaphore = new Semaphore(3); // 如果写1 相当于synchronized 加锁 semaphore.acquire(); semaphore.release();
xxxxxxxxxx
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for(int i =0;i<6 ;i++){
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+”抢占了资源”);
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+”释放了资源”);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
8.ReadWriteLock
多个线程同时读一个资源类没有任何问题,所以为了满足并发量,读取共享资源应该可以同时进行。但是,如果有一个线程想去写共亨资源来,就不应该再有其它线程可以对该资源进行读或写I小总结: 读-读能共存 读-写不能共存 写-写不能共存 private ReadWriteLock readWriteLock=new ReentrantReadWriteLock(); readWriteLock.writeLock().lock();readWriteLock.readLock().lock(); 与ReentrantLock 相同的用法
xxxxxxxxxx
class MyCache{
private volatile Map
private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
public void put(String key,Object value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+”开始写入 “+value);
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key,value);
System.out.println(Thread.currentThread().getName()+”写入成功”);
}catch (Exception e){
e.printStackTrace();
}finally {
readWriteLock.writeLock().unlock();
}
}
public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+”开始读取”);
Object result = map.get(key);
System.out.println(Thread.currentThread().getName()+”读取成功”+ result);
}catch (Exception e){
e.printStackTrace();
}finally {
readWriteLock.readLock().unlock();
}
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for(int i =0;i<10 ;i++){
final int tempInt =i;
if (i%2==0){
new Thread(()->{
myCache.put(tempInt/2+””,tempInt/2+””);
},String.valueOf(i/2)).start();
}else {
new Thread(()->{
myCache.get(tempInt/2+””);
},String.valueOf(i/2)).start();
}
}
/for(int i =0;i<5 ;i++){
final int tempInt =i;
new Thread(()->{
myCache.put(tempInt+””,tempInt+””);
},String.valueOf(i)).start();
}/
/for(int i =0;i<5 ;i++){
final int tempInt =i;
new Thread(()->{
myCache.get(tempInt+””);
},String.valueOf(i)).start();
}/
}
}
9.阻塞队列BlockingQueue
9.1阻塞队列简介
当队列是空的,从队列中获取元素的操作将会被阻塞 当队列是满的,从队列中添加元素的操作将会被阻塞<br /> 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素<br /> 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
9.2阻塞队列好处
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起<br /> 为什么需要BlockingQueue 好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了<br /> 在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
9.3.阻塞队列体系结构
9.4种类分析
ArrayBlockingQueue:由数组结构组成的有界阻塞队列。 LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。<br /> PriorityBlockingQueue:支持优先级排序的无界阻塞队列。 DelayQueue:使用优先级队列实现的延迟无界阻塞队列。 SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。 LinkedTransferQueue:由链表组成的无界阻塞队列。 LinkedBlockingDeque:由链表组成的双向阻塞队列。
9.5BlockingQueue核心方法
抛出异常 | 当阻塞队列满时,再往队列里add插入元素会抛IlegalStateException:Queue full 当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException |
---|---|
特殊值 | 插入方法,成功ture失败false 移除方法,成功返回出队列的元素,队列里没有就返回null。 |
一直阻塞 | 当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程直到put数据or响应中断退出 当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用 |
超时退出 | 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出 |
element | 检查元素,如果有则返回队列中第一个元素,如果没有则抛出NoSuchElementException。 |
---|---|
peek | 检查元素,如果有则返回队列中第一个元素,如果没有返回null |
xxxxxxxxxx
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// List list= new ArrayList();
BlockingQueue
//1.add
/ System.out.println(blockingQueue.add(“a”));
System.out.println(blockingQueue.add(“b”));
System.out.println(blockingQueue.add(“c”));/
// System.out.println(blockingQueue.add(“d”));
//2.remover
/ System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());/
//3.element
/ System.out.println(blockingQueue.add(“a”));
System.out.println(blockingQueue.add(“b”));/
/ System.out.println(blockingQueue.element());
System.out.println(blockingQueue.element());
System.out.println(blockingQueue.element());/
//4.offer(e)
/System.out.println(blockingQueue.offer(“a”));
System.out.println(blockingQueue.offer(“b”));
System.out.println(blockingQueue.offer(“c”));
System.out.println(blockingQueue.offer(“c”));/
//5.poll(e)
/System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());/
//6.peek()
/ System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.peek());/
//7.put
/blockingQueue.put(“a”);
blockingQueue.put(“a”);
blockingQueue.put(“a”);/
// blockingQueue.put(“a”);
//8.take
/System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());/
//9.offer(element,timeout,timeUnit)
System.out.println(blockingQueue.offer(“a”, 3L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer(“a”, 3L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer(“a”, 3L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer(“a”, 3L, TimeUnit.SECONDS));
//10 poll
System.out.println(blockingQueue.poll(3L,TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3L,TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3L,TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3L,TimeUnit.SECONDS));
}
}
10.ThreadPool线程池
10.1线程池的优势
线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
10.2特点
**它的主要特点为**:线程复用;控制最大并发数;管理线程。<br /> 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。 第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。 第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
10.3Executor框架实现
10.4分类
10.4.1Executors.newFixedThreadPool(int)
执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程。 线程拥挤不会影响功能的正常执行,线程执行没有固定的顺序。
10.4.2Executors.newSingleThreadExecutor()
一个线程 单线程
10.4.3Executors.newCachedThreadPool()
执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强<br />xxxxxxxxxx<br />public class MyThreadPoolDemo {<br /> public static void main(String[] args) {<br />// ExecutorService threadPool = Executors.newFixedThreadPool(5);// 池中有五个工作线程<br />// ExecutorService threadPool = Executors.newSingleThreadExecutor(); //一个线程<br /> ExecutorService threadPool = Executors.newCachedThreadPool(); //一个线程 可扩容可伸缩<br /> try{<br /> //模拟有10个顾客来银行办理业务,目前尺子里有五个工作人员提供服务<br /> for (int i=0;i<=10;i++){<br /> threadPool.execute(()->{<br /> System.out.println(Thread.currentThread().getName()+"办理业务");<br /> /*try {<br /> TimeUnit.SECONDS.sleep(1);<br /> } catch (InterruptedException e) {<br /> e.printStackTrace();<br /> }*/<br /> });<br /> //暂停几秒<br />// TimeUnit.SECONDS.sleep(1);<br /> }<br /> }catch(Exception e){<br /> }finally{<br /> threadPool.shutdown();<br /> }<br /> }<br />}
10.5底层原理
xxxxxxxxxx
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue
}
7个参数
xxxxxxxxxx
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQ ueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.hand ler = handler;
}
1 corePoolSize 线程池的常驻核心线程数 2 maximumPoolSize 线程池中能够容纳同时执行的最大线程数,此值必须大于等于1 3 keepAliveTime 多余的空闲线程的存活时间。当前池中线程数量超过corePoolSize时,当空闲时间达到KeepAliveTime时,多余线程会被销毁知道只剩下corePoolSize个线程为止。 4 TimeUnit unit ,keepAliveTime 的单位 5 BlockingQueue workQueue 任务队列,被提价但尚未被执行的任务 阻塞队列满后才会开非核心线程数 6 ThreadFactory threadFactory ,表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认的即可。 7 RejectedExecutionHandler handler,拒绝策略,表示当队列满了,并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时,如何来拒绝请求执行的runnable的策略。
运行原理:
1、在创建了线程池后,开始等待请求。2、当调用execute()方法添加一个请求任务时,线程池会做出如下判断: 2.1.如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务; 2.2.如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列; 2.3.如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建 非核心线程立刻运行这个任务; 2.4.如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱 和拒绝策略来执行。3、当一个线程完成任务时,它会从队列中取下一个任务来执行。4、当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断: 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
线程池用那个?生产中如何设置合理的参数:
为什么?
【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明: Executors返回的线程池对象的弊端如下: 1.FixedThreadPool和SingleThreadPool: 允许的请求队列长度为Integer.MAx_VALUE,可能会堆积大量的请求,从而导致OOM。 2.CachedThreadPool和scheduledThreadPool: 允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM.
10.6线程池的拒绝策略
AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行 CallerRunsPolicy:“调用者运行"—种调节机制,该策略既不会抛弃任务,也不 会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。 谁让你来你找谁去。 DiscardPolicy():抛弃无法处理的任务,不予任何处理,不抛出异常 DiscardOldestPolicy();抛弃队列中等待最久的任务,然后把当前任务加入队列中,尝试再次提交当前任务。
10.7参数设置
maximumPoolSize设置参数: 如果是CPU密集型的 则设置为cpu核数+1 (Runtime.getRuntime.availableProcessors()+1) IO密集型的 则为 CPU核数/ (1-阻塞系数)
11.java8之流式计算复习
11.1 java内置核心四大函数式接口。
函数式接口 | 参数类型 | 返回值 | 用途 |
---|---|---|---|
Consumer 消费型接口 |
T | void | 对类型为T的对象应用操 作,包含方法: void accept(T t) |
Supplier 供给型接口 |
无 | T | 返回类型为T的对象,包 含方法:T get(); |
Function 函数型接口 |
T | R | 对类型为T的对象应用操 作,并返回结果。结果 是R类型的对象。包含方 法:Rapply(T t); |
Predicate 断定型接口 |
T | boolean | 确定类型为T的对象是否 满足某约束,并返回 boolean值。包含方法 boolean test(T t); |
xxxxxxxxxx
Function
return s.length();
};
System.out.println(function.apply(“132”));
xxxxxxxxxx
Predicate
return s.isEmpty();
};
System.out.println(predicate.test(“”));
xxxxxxxxxx
Consumer
System.out.println(s);
};
consumer.accept(“accept”);
xxxxxxxxxx
Supplier
return “supplier”;
};
System.out.println(supplier.get());
11.2stream流式计算
1.流是什么?
是数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。 “集合讲的是数据,流讲的是计算!”
2.特点
1.Stream自己不会存储元素 2.Stream不会改变源对象。相反他们会返回一个持有结果的新stream 3.Stream操作是延迟执行的。这意味着他们会等到需要结果的时候才执行。
3.阶段
1.创建一个Stream:一个数据源(集合,数组) 2.中间操作:处理数据源数据 3.终止操作:执行中间操作链,产生结果。<br /> 源头=>中间流水线=>结果
xxxxxxxxxx
class User{
private int id;
private String name;
private int age;
public User(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return “User{“ +
“id=” + id +
“, name=’” + name + ‘\’’ +
“, age=” + age +
‘}’;
}
}
public class StreamDemo {
public static void main(String[] args) {
User u1 = new User(11,”a”,23);
User u2 = new User(12,”b”,24);
User u3 = new User(13,”c”,22);
User u4 = new User(14,”d”,28);
User u5 = new User(16,”e”,26);
List
list.stream().filter((u)->{
return u.getId()%2==0;
}).filter((u)->{
return u.getAge()>24;
}).map((u)->{
return u.getName().toUpperCase();
}).sorted((o1,o2)->{
return o2.compareTo(o1);
}).limit(1).forEach(System.out::println);
list.stream().filter((u)-> u.getId()%2==0).filter((u)-> u.getAge()>24).map((u)-> u.getName().toUpperCase()).sorted((o1,o2)-> o2.compareTo(o1)).limit(1).forEach(System.out::println);
// collect(Collectors.toList()).
}
}
xxxxxxxxxx
list.stream().filter((u)->{
return u.getId()%2==0;
}).filter((u)->{
return u.getAge()>24;
}).map((u)->{
return u.getName().toUpperCase();
}).sorted((o1,o2)->{
return o2.compareTo(o1);
}).limit(1).forEach(System.out::println);
list.stream().filter((u)-> u.getId()%2==0).filter((u)-> u.getAge()>24).map((u)-> u.getName().toUpperCase()).sorted((o1,o2)-> o2.compareTo(o1)).limit(1).forEach(System.out::println);
12.ForkJoinPool
线程的分支算法
xxxxxxxxxx
class MyTask extends RecursiveTask
private static final Integer ADJUST_VALUE=10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if((end-begin)<=ADJUST_VALUE)
{
for (int i=begin;i<=end;i++)
{
result = result + i;
}
}else {
int middle = (end+begin)/2;
MyTask task1 = new MyTask(begin,middle);
MyTask task2 = new MyTask(middle+1,end);
task1.fork();
task2.fork();
result=task1.join()+task2.join();
}
return result;
}
}
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask myTask = new MyTask(0,100);
ForkJoinPool threadPool = new ForkJoinPool();
ForkJoinTask
System.out.println(forkJoinTask.get());
threadPool.shutdown();
}
}
13.异步回调
xxxxxxxxxx
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception{
CompletableFuture
CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+”模拟 更新数据库”);
});
// completableFuture.get();//执行完后 主线程才执行
CompletableFuture
CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+”模拟 insert”);
return 1024;
});
completableFuture2.whenCompleteAsync((t,u)->{
System.out.println(“T=”+t);
System.out.println(“U=”+u);
System.out.println(“执行完成”);
}).exceptionally(f->{
System.out.println(“exception:”+f.getMessage());
return 444;
});
// completableFuture2.get();
CompletableFuture<?>[] completableFutures = {completableFuture,completableFuture2};
CompletableFuture.allOf(completableFuture,completableFuture2).get();
System.out.println(“main is over”);
}
}