这个专题算是尚硅谷的视频笔记

在什么情况下可以用最少的资源做最多的事情,这个我们要知道
不能想到多线程就加锁

volatile关键字与内存可见性

来看一下不用volatile的方式, 在ThreadDemo类中定义了flag, 多线程下修改了值为true,但是在main线程了没有打印出结果,就说明在main线程下flag的值还是false.那是什么原因呢?是因为内存可见性问题,当有多个线程访问共享数据的时候,JVM会为每一个线程分配一个独立的缓存来提高效率, 这也就带来了内存可见性问题.

  1. public class VolatileTest {
  2. public static void main(String[] args) {
  3. ThreadDemo td = new ThreadDemo();
  4. new Thread(td).start();
  5. while (true) {
  6. if(td.isFlag()){
  7. System.out.println("-----------------");
  8. break;
  9. }
  10. }
  11. }
  12. }
  13. class ThreadDemo implements Runnable {
  14. private boolean flag = false;
  15. @Override
  16. public void run() {
  17. try {
  18. Thread.sleep(200);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. flag = true;
  23. System.out.println("flag = "+flag);
  24. }
  25. public boolean isFlag() {
  26. return flag;
  27. }
  28. public void setFlag(boolean flag) {
  29. this.flag = flag;
  30. }
  31. }

当然,要解决内存可见性问题,使用 synchronized 来加锁也是可以的,但是如果使用 synchronized 加锁就会带来性能问题.

//这样使用synchronized也可以,但是影响性能
while (true) {
    synchronized(td){
        if(td.isFlag()){
            System.out.println("-----------------");
            break;
        }
    }
}

相较于synchronized来说, volatile是一种较为轻量级的同步策略.
只需要在定义flag的时候加上volatile关键字即可.

// 加上volatile    
private volatile boolean flag = false;

但是需要注意的是:

  1. volatile不具有互斥性.
  2. volatile不能保证变量的原子性

原子性

i++的操作,就不是原子性的.

java.util.concurrent.atomic包下面有很多常用的原子变量.

原子变量都是用volatile修饰的,保证了内存可见性,用了cas算法保证了数字的原子性.

CAS算法是硬件对于并发操作共享数据的支持.
CAS包含了三个操作数:
内存值V
预估值 E
更新值 N
当且仅当 V == E时,V=N,否则,什么都不做.

CAS算法

应用

java5开始,在java.util.concurrent包下提供了大量支持高效并发访问的集合接口和实现类. 如ConcurrentHashMap等线程安全集合.

引入概念

这些线程安全类底层实现用的就是CAS算法(Compare and Swap 比较交换 ).实现方式是基于硬件平台的汇编指令.也就是说,CAS是靠硬件实现的.

优点

这些算法相对于 synchronized 是比较乐观的, 它不会像 synchronized 一样,当一个线程访问数据的时候,其他线程都阻塞. synchronized 不管是否有线程冲突都会进行加锁,由于 CAS 是非阻塞的,所以不会有死锁问题.并且线程之间的相互影响也非常小,更重要的是,使用无锁的方式完全没有锁竞争带来的系统开销.也没有线程频繁调度带来的开销,所以它要比锁的方式拥有更优越的性能.

实现原理

image.png
假设现在有两个线程 T1和T2,在他们各自的运行环境中都有共享变量的副本 V1 , V2 , 预期只存中的值还没有被改变 , 假设现在在并发环境 , 并且T1先拿到了执行权限 , 失败的线程不会被挂起 , 而是被告知这次竞争失败 , 并可以再次发起尝试 .
此时T1线程比较主存中的V和T1线程中的E1,发现E1 = V,也就是说预期是正确的 , 所以执行N1 = V1+1 , 并且将 N1 的值传回主存 . 这时候主存中的V = 21.
线程T2拿到执行权的时候,也要将V和E2进行比较 , 此时因为主存已经被修改 , 所以T2线程再将主存中的值更新到自己的副本中,再发起重试. 直到满足条件.

CurrentHashMap锁分段机制

HashMap是线程不安全的,HashTable是线程安全的。
HashTable底层加了锁,是对整张表加了锁,线程安全,效率低。

CurrentHashMap采用锁分段机制。
CurrentLevel 锁分段级别。

每个段都是独立的锁,就是Segment
但是jdk1.8之后,segment也被取消了,CurrentHashMap底层开始使用CAS。

实现Callable接口

public class TestCallable {

    public static void main(String[] args) {

        ThreadDemo td = new ThreadDemo();
        //如果是实现Runnable接口,可以直接Thread.start(td)就行了
        //但是,我们这个有返回值,需要一个东西去接收返回结果
        // 执行Callable方式,使用FutureTask实现类去接收结果
        FutureTask<Integer> result = new FutureTask<>(td);
        new Thread(result).start();
        try {
            //可以使用get方法来获取返回值
            result.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }
}

class ThreadDemo implements Callable {
    // 实现Callable的接口,重写call方法
    // call方法和run方法的区别可以有返回值,并且可以抛出异常
    @Override
    public Object call() throws Exception {
        int sum = 0;
        for(int i = 0;i<=100;i++){
            sum+=i;
            System.out.println(i);
        }
        return sum;
    }
}

同步锁Lock

Synchronized是隐式锁
Lock是一个显示锁,需要通过lock来加锁,使用unlock来释放锁。
如果不加锁,就会产生线程安全问题

public class TestLock {
    public static void main(String[] args) {
        Ticket ticket = new Ticket();
        new Thread(ticket,"1号窗口").start();
        new Thread(ticket,"2号窗口").start();
        new Thread(ticket,"3号窗口").start();
    }
}
//这样就会产生多线程安全问题
class Ticket implements Runnable {

    private int tick = 100;
    @Override
    public void run() {
        while(tick>0){
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread().getName() + "完成售票,余票为"+ --tick);
        }
    }
}

加个锁

public class TestLock {
    public static void main(String[] args) {
        Ticket ticket = new Ticket();
        new Thread(ticket,"1号窗口").start();
        new Thread(ticket,"2号窗口").start();
        new Thread(ticket,"3号窗口").start();
    }
}
//这样就会产生多线程安全问题
class Ticket implements Runnable {

    private int tick = 100;

    private Lock lock = new ReentrantLock();
    @Override
    public void run() {
        while(tick>0){
            lock.lock();
            try {
                if(tick>0){
                    System.out.println(Thread.currentThread().getName() + "完成售票,余票为"+ --tick);
                }
            }catch (Exception e){
            }finally {
                lock.unlock();
            }
        }
    }
}

使用同步锁Lock来完成等待唤醒机制

完成等待唤醒机制就是使用synchronized锁时候的那个wait和notify。
完成等待唤醒最经典的案例就是生产者和消费者案例。

这个还没写完,后续再补充。

public class TestProductAndConsumer {
    public static void main(String[] args) {
        //一个店员
        Clerk clerk = new Clerk();

        Productor pro = new Productor(clerk);
        Concumer cus = new Concumer(clerk);

        new Thread(pro,"生产者A").start();
        new Thread(cus,"消费者A").start();
    }
}
//店员
class Clerk{
    private int product = 0;

    //进货
    public void  get(){
        if(product>=10){
            System.out.println("产品已满!");
        }else{
            System.out.println(Thread.currentThread().getName()+" :" + ++product);
        }
    }

    //卖货
    public void sale(){
        if(product<=0){
            System.out.println("缺货");
        }else{
            System.out.println(Thread.currentThread().getName()+" :" + --product);
        }
    }
}

//生产者
class Productor implements Runnable {

    private Clerk clerk;

    public Productor(Clerk clerk){
        this.clerk = clerk;
    }


    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            clerk.get();
        }
    }
}

//消费者
class Concumer implements Runnable{

    private Clerk clerk;

    public Concumer(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            clerk.sale();
        }
    }
}

Condition 线程通信

在Condition对象中,与wait、notify和notifyAll方法对应的分别是await,signal和signalAll。

编写一个程序,开启3个线程,这三个线程的ID分别是A B C,每个线程将自己的ID在屏幕上打印10遍,要求输出的结果必须按照顺序显示。如ABCABCABC… 依次递归。

public class TestABCAiternate {

    /**
     * 编写一个程序,开启3个线程,这三个线程的ID分别是A B C,每个线程将自己的ID在屏幕上打印10遍,要求输出的结果必须按照顺序显示。
     * 如ABCABCABC... 依次递归
     */
    public static void main(String[] args) {
        ABCDemo demo = new ABCDemo();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    demo.LoopA();
                }

            }
        },"A").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    demo.LoopB();
                }

            }
        },"B").start();


        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    demo.LoopC();
                }

            }
        },"C").start();
    }


}

class ABCDemo {

    private int number = 1; //当前正在执行线程的标记

    private Lock lock = new ReentrantLock();
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();

    public void LoopA(){
        lock.lock();
        try {
            //判断
            if(number!=1){
                condition1.await();
            }
            //打印

            System.out.println(Thread.currentThread().getName());

            //唤醒
            number = 2;
            condition2.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }


    public void LoopB(){
        lock.lock();
        try {
            //判断
            if(number!=2){
                condition2.await();
            }
            //打印

            System.out.println(Thread.currentThread().getName());

            //唤醒
            number = 3;
            condition3.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void LoopC(){
        lock.lock();
        try {
            //判断
            if(number!=3){
                condition3.await();
            }
            //打印

            System.out.println(Thread.currentThread().getName());

            //唤醒
            number = 1;
            condition1.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

这就用到了线程通信。

ReadWriteLock 读写锁

可以多个线程读,只能一个线程写,和前面用lock是一样的,不过需要定义的是ReadWriteLock。

//定义的时候用ReadWriteLock
private ReadWriteLock lock = new ReentrantReadWriteLock();

// 使用的时候也加上readLock或者writeLock
lock.ReadLock().lock;

lock.WriteLock.lock();

线程池

首先,线程池是有类别的,有得是核心线程,有的是非核心线程。所以我们需要两个变量标识核心线程数量coreSize和最大线程数量maxSize。
其次,我们需要一个任务队列来存放任务,这个队列必须是线程安全的,一般使用BlockingQueue阻塞队列来充当。
最后,当任务越来越多而线程处理却不及时,队列满了,线程数也达到最大线程数了,这时候就需要走拒绝策略。常用的拒绝策略有丢弃当前任务,丢弃最老的任务,调用者自己处理等。

为什么要你区分核心线程呢?

这是为了控制系统中线程的数量。

  • 当线程池中的线程数没有达到核心线程数的时候,来了一个任务加一个线程是可以的,可以提高任务执行的效率。
  • 当线程池中的线程数达到核心线程数的时候,就要控制一下线程的数量了,来任务先进队列。
    • 如果任务执行足够快,这些核心线程很快就可以处理完队列中的任务,就没有必要新增线程。
    • 如果队列中的任务也满了,这时候只靠核心线程就没有办法处理了,就要增加新的线程,但是线程也不能无限制的增加,所以需要控制最大线程数量maxSize。

实现线程池

//拒绝策略接口
public interface RejectPolicy {
    void reject(Runnable task,MyThreadPoolExecutor myThreadPoolExecutor);
}

/**
 * 丢弃当前任务
 */
public class DiscardRejectPolicy implements RejectPolicy{
    @Override
    public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
        System.out.println("discard one task");
    }
}

实现一个线程池

public class MyThreadPoolExecutor implements Executor {
    //线程池的名字
    private String name;
    //线程序列号
    private AtomicInteger sequence = new AtomicInteger(0);

    //核心线程数
    private int coreSize;
    //最大线程数
    private int maxSize;
    //任务队列
    private BlockingQueue<Runnable> taskQueue;
    //拒绝策略
    private RejectPolicy rejectPolicy;

    //当前运行的线程数
    private AtomicInteger runningCount = new AtomicInteger(0);


    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }

    @Override
    public void execute(Runnable task) {
        //正在运行的线程数
        int count = runningCount.get();
        //如果正在运行的线程数小于核心线程数,直接加一个线程
        if (count<coreSize){
            // 这里不一定能添加成功,addWorker方法里面还要判断一次是不是确实小于核心线程
            if(addWorker(task,true)){
                return;
            }
            //如果添加核心线程失败
            //todo 创建核心线程失败的逻辑
        }

        //如果达到了核心线程数,尝试让任务入队
        //这里使用offer,是因为offer在队列满了的时候会返回false
        if(taskQueue.offer(task)){

        }else{
            //如果入队失败,说明队列满了,添加一个非核心线程
            if(!addWorker(task,false)){
                //如果添加非核心线程失败了,就执行拒绝策略
                rejectPolicy.reject(task,this);
            }
        }
    }

    private boolean addWorker(Runnable newTask,boolean core){
        //自旋判断是不是真的可以创建一个线程
        for (;;){
            // 正在运行的线程数
            int count = runningCount.get();
            //核心线程还是非核心线程
            int max = core?coreSize:maxSize;
            //如果不能满足创建线程的条件,直接返回false
            if(count>=max){
                return false;
            }
            //修改runningCount成功,可以创建线程
            if(runningCount.compareAndSet(count,count+1)){
                //线程的名字
                String threadName = (core?"core_":"")+name+sequence.incrementAndGet();
                //创建线程并启动
                new Thread(()->{
                    System.out.println("thread name : "+Thread.currentThread().getName());
                    //运行的任务
                    Runnable task = newTask;
                    // 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了
                    while(task!=null || (task = getTask())!=null){
                        try{
                            //执行任务
                            task.run();
                        }finally {
                            //任务执行完成,置为空
                            task = null;
                        }
                    }
                },threadName).start();
                break;
            }
        }
        return true;
    }

    private Runnable getTask(){
        try{
            //task方法会一直阻塞直到取到任务为止
            return taskQueue.take();
        }catch(InterruptedException e){
            //线程中断了,返回null可以结束当前线程
            //当前线程要结束了,要把runningCount的数量减一
            runningCount.decrementAndGet();
            return null;
        }
    }

}

进行测试

public class MyThreadPoolExecutortest {

    public static void main(String[] args) {
        Executor threadPool = new MyThreadPoolExecutor("test",5,10,new ArrayBlockingQueue<>(15),new DiscardRejectPolicy());

        AtomicInteger num = new AtomicInteger(0);

        for (int i = 0; i < 100; i++) {
            threadPool.execute(()->{
                try{
                    Thread.sleep(1000);
                    System.out.println("running :"+System.currentTimeMillis()+" : "+num.incrementAndGet());
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            });
        }
    }

}