1、juc入门概念
并发包,并发原子包,锁包
1、什么是进程,什么是线程?
进程隶属于操作系统,是一个独立可运行的应用程序,线程隶属于进程,享有这个进程的系统资源,一个进程最少要有一个或者N个线程。
2、什么是并发、并行
并发:多个线程同时访问一个系统资源。
并行:多个线程同时做不同的事情。
线程的6种状态,在thread.state中,是一个枚举类。
NEW RUNNABLE BLOCKED WAITING TIMED_WAITING TERMINATED
wait和sleep的区别
wait会将锁资源释放,进入沉睡状态直到唤醒,sleep会占住锁直到到时。
lambda表达式
当接口中只有一个抽象方法时可以使用lambda表达式简化创建内部类的代码
new Thread(()->{for (int i=0;i<40;i++) tickets.saleTickets();},"C").start();
jdk8之后,接口可以有默认实现和静态方法。
在多线程场景下,判断只能使用while 不能使用if ,防止虚假唤醒。
1、消费者与生产者模式
public class SignalOnly {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(() -> {
shareResource.print(1,5);
}, "A").start();
new Thread(() -> {
shareResource.print(2,10);
}, "B").start();
new Thread(() -> {
shareResource.print(3,15);
}, "C").start();
}
}
class ShareResource{
private int num = 1;
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void print(int count,int runCount){
lock.lock();
try {
while (num!=count){
if (count==1) condition1.await();
if (count==2) condition2.await();
if (count==3) condition3.await();
}
num+=1;
for (int i=1;i<=runCount;i++){
out.println(Thread.currentThread().getName()+"\t"+i);
}
if (count==1) condition2.signal();
if (count==2) condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
2、锁
在非静态同步方法中,synchronized锁的整个实例对象,此时所有的同步方法不允许其他线程访问,直到抢到了锁的线程执行完毕之后,其他线程才能通过cpu调度获得运行权力来获取锁,然后执行。
sleep和await方法是有区别的,前者不会释放锁会进入阻塞状态,后者会释放锁进入等待状态直到被唤醒重新回到就绪状态通过cpu调度来运行。
而静态同步方法,锁的是这个类对象,所有的静态同步方法也只允许一个抢到了这把类锁的对象进行访问。
可重入锁:
用可重入锁ReentrantLock代替synchronized 优化具体的逻辑 synchronized只能加在方法和代码体中
用Condition类代替wait和notify,可以实现精准控制线程之间的通信。
3、ArrayList是非线程安全的
在主线程中,使用ArrayList不会出现错误,但是如果多线程访问,那么会出现异常
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < 20; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,4));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
此时会出现一个异常
java.util.ConcurrentModificationException 多线程修改异常
解决的思路:
1、使用vector类 所有的方法都是同步方法、不允许使用,性能较低
2、使用collections工具类
Collections.synchronizedList(new ArrayList<String>())
3、使用java.util.concurrent.CopyOnWriteArrayList作为list使用。
copyonwritearraylist源码
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
底层使用了加锁,并且拷贝一份然后将长度加1,然后在拷贝的这个数组中添加完元素后再放回去,并释放锁。
写时复制:
即只对写的操作进行加锁,可以并发的读,写的时候将容器复制一份然后在新容器中进行写,写完后再将原容器的引用指向新容器。
4、HashMap与concurrenthashmap
hashset底层就是一个hashmap,hashset的构造方法就是创建了一个hashmap,并且add方法就是调用了hashmap的put方法,hashset中的元素作为hashmap的node的节点,并且value是一个写死的object常量。
hashmap默认大小是16,负载因子是0.75,达到12后就会进行自动扩容,并且扩容量左移一位,即x2
hashmap是线程不安全的。JDK7之前是数组+链表,8之后是数组+链表+红黑树,并且存放的是node对象,node对象有两个属性,key和value。
在多线程场景下的解决思路:
用Hashtable 不推荐。
collections.synchronizedmap方法 不推荐。
JUC包下的 ConcurrentHashmap 推荐使用。
hashmap
jdk7 hashmap以数组加链表的方法实现,通过获取一个key的hashcode和数组的lenght进行取模,这样可以保证计算结果肯定会落到这个数组的下标中,这个数组中保存的是entry对象的内存地址,entry对象就是一个单向链表,其中有一个属性就是next,保存了指向它下一个链表节点的内存地址,假如hashcode相等落到了同一个下标中,在JDK7中使用头插法,只需要将entry对象的next属性设置为原来的头结点,然后再将数组中保存的内存地址指向这个entry对象。
并且key可以为NULL,每put一个kv的时候会进行判断,遍历这个entry对象,通过它的next属性获取所有的key,对put进来的元素的hash值和key进行判断,如果都一样那么就会替换掉value值,然后将原来的value返回
使用头插法的好处是,只需要将新存放进来的元素的next指向原来的头结点就可以搞定,并且可以从头开始遍历获取所有的entry,而尾插法的话就会造成无法遍历获取key,甚至在put一个新元素的时候还得来一次遍历获取最末尾的节点。
在jdk7中,会对传入进来的初始化数组大小进行一次减1然后乘2运算,然后将结果进行高16位或运算,一直右移然后保证所有的比特位上只有一个1,然后获取这个数来作为hashmap的初始化容量,这个数一定是2的幂次方。
高16位的原因是int类型占32个bite位
减1是为了确保默认初始容量为16的情况下返回的是32。
要使用2的幂次方的数字作为数组的容量是因为2的幂次方以二进制表示只有一个bite位上是1,当这个数减去1的时候低位bite位全是1,这样就保证了在与hash值进行与运算的时候,最终结果区间都是0-这个数字之间,就控制住了数组下标越界的问题,但是此时两个hash值假如高位的数字不同低位的数字相同那么他们会落在同一个下标之上,此时会造成链表过长,所以再获取hashcode值之后还会将这个hashcode的值进行右移和异或运算,尽量将高位的bite位的数字影响到最终结果。
总结:首先将传入的自定义容量减1乘2,然后进行高16位移位后再通过或运算得到的结果作为数组初始容量,通过获取key的hashcode,然后再进行一轮哈希算法,最终控制落到数组中的下标上的元素均匀且不越界。
ConcurrentHashmap
线程安全。
JDK 1.7版
维护一个agment()数组,agment()中又有一个entry数组。
5、实现多线程的第三种方式
实现callable接口。
与实现runable接口不同,这个接口的方法是call()方法,并且能抛出异常,而且带有返回值。
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(" hello ,world");
return Integer.parseInt("0615");
}
});
new Thread(futureTask).start();
System.out.println(futureTask.get());
}
}
get方法应该放在最后,不然也会阻塞当前线程。
future不管几个线程调用,都只会执行一次。
我们可以在业务逻辑中将处理时长比较耗时的方法另起一个futuretask对象来进行处理,在我们的主业务处理完毕后再将运行结果获取,这样就不会因为当前线程什么都不干阻塞的等待这个方法运行完毕返回结果。
6、强大的JUC辅助类
CountDownLatch
有两个方法,countDown和await;
当某个线程调用await方法时,会阻塞;
当一个线程调用countDown时会使计数器-1
当计数器为0时,被await方法阻塞的线程会被唤醒继续执行。
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
System.out.println("离开了"+Thread.currentThread().getName());
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("老大关门");
}
CyclicBarrier类
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(9, () -> {
System.out.println("召唤加里奥");
});
for (int i = 1; i < 10; i++) {
int finalI = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"收集到第"+ finalI +"个羁绊");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
semaphore类
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(4);
for (int i = 0; i < 8; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到了资源");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+"释放了资源");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
适用于高并发场景下,控制线程访问的一个api,无论多少线程,只提供4份资源供访问。
假设构造方法中传入一个1,就达到了synchronized的效果,并且可以自由控制锁住的时间。
7、ReentrantReadWriteLock类
读写锁,只允许同时读,只要有写的操作就需要保证原子性和数据的一致性,只允许写,在写的过程中不允许另外的线程写。
又被称为共享锁和排他锁
8、BlockQueue
阻塞队列
这个接口是为了调度阻塞的线程和资源而出的。
当队列为空时,从队列中获取元素的的操作将会被阻塞。
当队列满了时,往队列中添加元素的操作将会被阻塞。
Queue也是Collection的一个子接口,并且有8个具体的实现类。
9、线程池
池化技术
不用我们手动创建线程,而是创建一个线程池对象,它里面维护了N条线程,我们需要用线程就从池中拿用完就放回去,提升了线程的复用性以及减少系统创建线程后用完线程之后销毁的这一步开销。
提高响应速度,不需要手动创建线程,直接获取即可用,省去了创建线程对象这一步。
方便管理,所有的线程都存放在线程池中,即听从线程池的调度。
线程池的顶级接口
Executor,在它之下的子接口是ExecutorServer 但是不允许new来创建
获取线程池的方式是通过Executors工具类的静态API来获取。
Executors能帮我们创建的五种线程池:
// ExecutorService threadPool = Executors.newFixedThreadPool(5); // 自定义线程数量
// ExecutorService threadPool = Executors.newSingleThreadExecutor(); // 只有一条线程 单例的
// ExecutorService threadPool = Executors.newCachedThreadPool(); // 根据访问的线程数量动态的创建线程
//public static ScheduledExecutorService newScheduledThread(int corePoolSize)
返回的是ScheduledExecutorService 对象,可以指定线程数量。和FixedXxx 不同的是,该对象可以根据时间需要对线程进行调度
内部典型方法为: public ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
// new SingleThreadScheduledExecutor() 和上述类似,只不过单池单线程,依旧可以进行时间调度;
(在某个固定的延时过后进行执行,或者周期性执行)
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newFixedThreadPool(5); // 自定义线程数量
// ExecutorService threadPool = Executors.newSingleThreadExecutor(); // 只有一条线程 单例的
ExecutorService threadPool = Executors.newCachedThreadPool(); // 根据访问的线程数量动态的创建线程
try {
for (int i = 0; i < 11; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"正在工作");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown(); // 关闭线程
}
}
本质上都是通过
new ThreadPoolExecutor();
这个方式来创建线程。
上面的三个线程池只不过是传入的参数不同,返回的线程池也不同。
阿里巴巴代码规范不允许我们使用工具类创建线程池,推荐手动创建。
七大参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize | 即当前线程池的常驻线程数量 |
---|---|
maximumPoolSize | 能够同时运行的线程数的最大值 |
keepAliveTime | 指超过corePoolSize值的线程在挂起时,在设置的参数值后自动销毁 |
unit | 配合keepAliveTime,作为时间单位 |
BlockingQueue | 线程的任务队列,执行已提交还未执行的任务 |
threadFactory | 创建线程的工厂,一般使用默认设置 |
handler | 拒绝策略,表示当线程任务队列已满并且工作线程达到了maxiumPoolSize的峰值后执行的拒绝策略 |
线程池的工作原理
- 一个请求来到线程池中请求执行,首先会查看corepoolsize的默认线程有没有在工作,如果没有就让默认线程工作。
- 如果默认线程都在工作,那么就会进入阻塞队列进行等待。
- 如果阻塞队列已满,那么线程池就会额外创建临时线程,直到线程数等于maxmumpoolsize
- 如果此时再进来请求,继续以上步骤,如果已经达到了maximumpoolsize可运行的线程数峰值,此时执行决绝策略。
但是不允许使用自带的工具类创建线程池,只能够手动创建。
因为会发生OOM错误,在源码中默认使用integer的最大值作为maximumPoolSize和阻塞队列的大小。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
线程池最多允许maximumPoolSize+阻塞队列大小同时运行。
超过会报RejectedExecutionException
四种拒绝策略
maximumpoolsize的值一般是看cpu密集型还是io密集型,如果是前者的话可以获取当前系统的cpu核心数,然后+1作为线程池的最大运行线程数。
第一种:抛异常
第二种:不放弃执行,将无法调度线程处理的任务交给调用者线程
第三种:抛弃阻塞队列中等待最久的任务
第四种:放弃无法调度线程进行执行的任务,不处理也不抛异常,如果业务允许这是一种最好的策略。
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newFixedThreadPool(5); // 自定义线程数量
// ExecutorService threadPool = Executors.newSingleThreadExecutor(); // 只有一条线程 单例的
// ExecutorService threadPool = Executors.newCachedThreadPool(); // 根据访问的线程数量动态的创建线程
int coreCount = Runtime.getRuntime().availableProcessors() + 1;
System.out.println(coreCount);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4,
coreCount,
500,
MILLISECONDS,
new LinkedBlockingQueue<Runnable>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
try {
int i = 0;
while (i<30){
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"正在工作");
});
i++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown(); // 关闭线程
}
}
10、函数式接口与stream计算
四大函数式接口
consumer消费者, 可以配合lombda表达式声明一个方法,只有一个参数且无返回值
supplier 提供者, 可以配合lombda表达式使用,不需要参数,返回泛型声明的类型。
Function
predicate 判断者,传入泛型指定的参数,返回布尔类型的值。
Stream
在JDK8后,可以将list转为stream,这个stream和IO流不是同一个概念,可以操作数组、集合生成的数据序列。
集合关注的是数据,stream关注的是计算。
Stream流式计算代码:
ForkJoinFuture类
分之合并,递归调用,可以将一个复杂的任务以拆分的形式分给不同的线程去做,完成后再将结果合并返回。
分之合并 ForkJoin代码:
CompletaTableFuture
异步调用
….未完待续