标签: java springboot

判断文件服务器中文件是否存在

  1. String path="http://127.0.0.1/upload/cardpic/aa.jpg";
  2. try {
  3. URL url =new URL(path);//服务器路径用URL
  4. HttpURLConnection urlcon = (HttpURLConnection) url.openConnection();
  5. urlcon.setRequestMethod("HEAD"); //请求方法用head
  6. if (urlcon.getResponseCode() == HttpURLConnection.HTTP_OK){ 根据返回code值判断是否存在,也可以用404代表不存在
  7. mm.setHeadimgurl(path);
  8. }else{
  9. mm.setHeadimgurl("");
  10. }
  11. } catch (IOException e) {
  12. // TODO Auto-generated catch block
  13. e.printStackTrace();
  14. }

java.net.URL 和 java.net.HttpURLConnection

Java URL处理

  1. URL(Uniform Resource Locator)中文名为统一资源定位符,有时也被俗称为网页地址。表示为互联网上的资源,如网页或者FTP地址。
  2. 介绍Java是如处理URL的,URL可以分为如下几个部分:
  • protocol(协议)可以是 HTTP、HTTPS、FTP 和 File,port 为端口号,path为文件路径及文件名。
    URL 解析:

协议为(protocol):http
主机为(host:port):www.runoob.com
端口号为(port): 80 ,以上URL实例并未指定端口,因为 HTTP 协议默认的端口号为 80。
文件路径为(path):/index.html
请求参数(query):language=cn
定位位置(fragment):j2se,定位到网页中 id 属性为 j2se 的 HTML 元素位置 。

前面提到了从空间角度,Java 对象要比原始数据类型开销大的。你知道对象的内存结构是什么样的吗?比如,对象头的结构。如何计算或者获取某个 Java 对象的大小?

CAS机制
  • CAS是英文单词Compare and Swap的缩写,翻译过来就是比较并替换。
  • CAS机制中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。
  • 更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B。
  • 在java中除了上面提到的Atomic系列类,以及Lock系列类夺得底层实现,甚至在JAVA1.6以上版本,synchronized转变为重量级锁之前,也会采用CAS机制。
    CAS缺点:
    1) CPU开销过大``` 在并发量比较高的情况下,如果许多线程反复尝试更新某一个变量,却又一直更新不成功,循环往复,会给CPU带来很到的压力。
    1. <br />2) 不能保证代码块的原子性
    CAS机制所保证的知识一个变量的原子性操作,而不能保证整个代码块的原子性。比如需要保证3个变量共同进行原子性的更新,就不得不使用synchronized了。 ```
    3) ABA问题

HashMap中的哈希碰撞问题?及解决?

HashMap的结构

  1. 在jdk7中hashMap的基础结构是数组+链表,在jdk8中引入了红黑树结构,在使用时,存储数据长度小于8时使用的是数据和链表的存储形式,当数据量超过8时,HashMap会将数据已红黑树的形式存储。因为HashMap是的形式进行存储,一个key对应一个Value,而key是通过hashmap中的hash算法,使用hashcode()获得hashCode值,这样会发生一个问题会出现key的值相同的状况发生,又因hashMap存储数据是按照hashCode值将数据存到对应的bucket,即被称为哈希碰撞。
  • 解决哈希碰撞通常有两种方法:链表法和开放地址法;
  • 链表法(使用单向链表):将相同hash值的对象组织成一个链表存放在hash值对应的槽位;
  • 开放地址法:通过一个探测算法,当某个槽位已被占据的情况下继续查找下一个使用的槽位。

    JAVA 网络编程模型

  • 阻塞I/O模型:常见的I/O模型,在读写的时候客户端会发生阻塞。其工作流程为,在用户线程发出I/O请求之后,内核会检查数据是否就绪,此时用户线程一直处于阻塞等待内存数据就绪;在内存数据就绪后,内核将数据复制到用户线程中,并返回I/O执行结果到用户线程,此时用户线程将解除阻塞状态并开始处理数据。典型的阻塞I/O例子:data=socket.read(),如果内核数据没有就绪,socket线程就会一直阻塞在read()中等待内核数据就绪。

  • 非阻塞I/O模型:指用户线程发起一个I/O操作后,无需阻塞就可以得到内核返回的一个结果。如果内核返回的结果为false,表示内核数据还没有准备好,需要稍后再次发起I/O操作。一旦内核数据准备好了,并且再次收到用户请求,内核就会立刻将数据复制到用户线程中并将复制结果通知给用户线程。
  • 多路复用I/O模型:是多线程并发编程用的较多的模型,Java NIO就是基于多路复用I/O模型实现的。在多路复用I/O模型中会有一个被称为selector的线程不断的询问多个socket的状况,只有在socket读写事件时,才会通知用户线程进行I/O读写操作。
    • 非阻塞I/O模型是在每个用户线程中都进行socket状态检查,而在多路复用I/O模型是在系统内核中进行socket状态检查,所以效率比非阻塞高;
    • 多路复用I/O模型通过在一个selector线程上以轮询的方式检测多个socket上是否有事件到达,并逐个处理和响应。因此在事件响应体很大时,selector线程就会成为性能瓶颈。在实践应用中最后在事件处理中不多复杂的操作,只做数据的接收和转发,将具体的业务转发给后面的业务线程处理。
  • 信号驱动I/O模型:该模型是,在用户线程发起一个I/O请求操作时,系统会为该请求对应的socket注册一个信号函数,然后用户线程可以继续执行其他业务;在内核数据准备就绪后,会发一个信号到用户线程,用户线程接收到信号后会在信号函数中调用对应的I/O操作;
  • 异步I/O模型:在异步线程中,用户不需要关心整个I/O操作是如何进行的,只需要发起一个请求,在接收到内核返回的信号(成功或失败),表明I/O操作已经完成。在异步I/O模型中,I/O操作的两个阶段(请求的发起、数据的读取)都是在内核中自动完成的,最终发送一个信号告知用户线程I/O操作已经完成。与信号模型的区别是,信号模型在接收到信号后还需要用户线程调用I/O函数进行实际的I/O读写操作,在异步I/O中是,在接受的信号后,用户线程已经接收到I/O操作完的数据,可以直接使用数据。异步I/O需要操作系统的支持,需要有Asynchronous I/O的JDK版本。
  • Java I/O:在整个java.io包中有重要的五个类和一个接口,五个类有File、outputstream、inputstream、writer、reader,一个接口是指serializable。
  • Java NIO:主要包括三大块,selector(选择器)、channel(通道)和buffer(缓冲区)。
    • selector:用于监听多个channel事件,例如连接打开和数据到达;selector能够检测多个注册通道上是否有事件发生,若有事件发生,会获取事件然后针对每个事件进行响应处理。
    • channel:即为通道,与stream流类似,但与之不同,stream流是单向的,channel是双向的,从channel中读取数据或将数据写入channel中。channel中有四个类型:FileChannel(文件流通道)、DatagramChannel(UDP)、SocketChannel(TCP client端)、ServerSocketChannel(TCP server端)
    • buffer:表示缓冲区,实际上是一个容器,是一个连续数组。channel提供从文件或网络读取数据的渠道,数据的读写必须经过buffer,常用的buffer实现类有:ByteBuffer、IntBuffer、CharBuffer、LongBuffer、DoublebBuffer、FloatBuffer、ShortBuffer;

Java NIO 与BIO的区别:

  • BIO是面向流的操作,NIO是面向缓冲区的;在面向流的操作中,数据只能在一个流中连续进行读写,数据没有缓冲,因此字节流无法前后移动。在NIO中,每次都是将数据从一个channel读取到一个buffer中,再从buffer中写入到channel中,因此可以方便的在缓冲区中进行移动等操作。
  • BIO的流是阻塞模式的,NIO的流是非阻塞模式的;

Java基础

  1. 重载与重写的区别

重载:方法名相同,但是参数必须不同,返回类型也可修改,异常类型也可以修改
重写:类上是父子类、接口与实现类,方法名相同,且参数不可修改,返回类型也不可修改,异常可减少或删除,但不能扩展。

  1. StringBuffer 和 StringBuilder
  • 线程安全:StringBuffer是线程安全的(有关键字synchronized),stringbuilder没有是使用synchroized修饰
  • 缓冲区:StringBuffer的toString()方法中,每次获取toString都会使用缓存区中的toStringCache值来构造一个字符串;StringBuilder每次都需要复制一次字符数组,再造一个字符串。
  • 性能:StringBuffer是线程安全的,又是所有方法都公开的且是同步的,StringBuilder是没有对方法加锁同步的,所以StringBuilder性能优先于StringBuffer。

Java集合

ArrayList与LinkedList的区别是什么?

  1. 底层使用的数据结构
    • ArrayList底层使用的是Object数组,初始化时就会指向的会是一个static修饰的空数组,数组的长度一开始为0,插入第一个元素是数组长度会初始化为10,之后每次数组空间不够,进行扩容时都是原来的1.5倍。ArrayList的空间浪费主要体现在list列表的结尾会预留一定的容量空间(为了避免添加元素时,数组空间不够导致频繁申请内存),而LinkedList的空间花费则体现在,其每一个元素浪费的空间更多(存放后继指针next和前驱指针pre以及数据)
    • LinkedList底层使用的数据结构是双向链表,每个节点都保存了指向前驱节点和后继节点的指针。初始化时,不执行任何操作,添加第一个元素时,再去构造链表中的节点。
  2. 是否保证线程安全

ArrayList和LinkedList都是不同步的,即都是线程不安全的。
因为ArrayList的插入元素的方法就是裸奔,直接将原数组index及后面的元素拷贝到index+1及后面的位置上,然后将index位置设置为插入的值,并发修改时保证不了数据安全性,所以也不允许并发修改,一旦检测到并发修改,会抛出ConcurrentModificationException异常;

  1. /**
  2. * Inserts the specified element at the specified position in this
  3. * list. Shifts the element currently at that position (if any) and
  4. * any subsequent elements to the right (adds one to their indices).
  5. * ArrayList的插入元素的方法
  6. * @param index index at which the specified element is to be inserted
  7. * @param element element to be inserted
  8. * @throws IndexOutOfBoundsException {@inheritDoc}
  9. */
  10. public void add(int index, E element) {
  11. rangeCheckForAdd(index);
  12. ensureCapacityInternal(size + 1); // Increments modCount!!
  13. //将原数组index之后的元素拷贝到元素组index+1后面的位置上
  14. System.arraycopy(elementData, index, elementData, index + 1,
  15. size - index);
  16. elementData[index] = element;
  17. size++;
  18. }
  1. 插入和删除的复杂度
    • ArrayList采用数组存储,元素的物理存储地址是连续的,支持以O(1)的时间复杂度对元素的快速访问。插入和删除元素后,需要将后面的元素进行移动,所以插入和删除元素的时间复杂度受元素位置的影响。复杂度是O(1)。
    • LinkedList采用链表存储,所以不能快速随机访问。所以首尾插入,删除元素时间复杂度不受元素位置影响,都是近似O(1)(如果是插入到中间位置还需要考虑寻找插入位置的时间复杂度)。而数组为近似O(n)。
  2. 继承树
    • ArrayList继承AbstractList,实现了List,RandomAccess,Cloneable,java.io.Serializable接口。
    • LinkedList继承自AbstractSequentialList,实现了List,Deque,Cloneable,java.io.Serializable接口。

AbstractSequuentList是AbstractList类的子类,实现了根据下标来访问元素的一些方法,主要是通过ListIterator便利获取特定元素。
List接口代表的是有序集合,与Set相反,List的元素是按照移动的顺序进行排列。
Cloneable接口代表类会重写父类Object的clone()方法,支持对实例对象的clone操作。
java.io.Serializable接口代表类支持序列化。
RandomAccess是一个标示性接口,代表ArrayList支持快速访问,而LinkedList不支持。
Deque接口是双端队列的意思,代表LinkedList支持两端元素插入和移动。

怎么使ArrayList,LinkedList变成线程安全的?

  1. 使用SynchronizedList

SynchronizedList是一个线程安全的包装类。继承于SynchronizedCollection,SynchronizedCollection实现了Collection接口,SynchronizedList包含一个List对象,对List的访问修改方法进行了一些封装,在封装的方法中会对list使用同步锁加锁,然后再进行存取和修改操作。使用方法如下:

  1. LinkedList<Integer> linkedList = new LinkedList<Integer>();
  2. //调用Collections的synchronizedList方法,传入一个linkedList,会返回一个SynchronizedList实例对象
  3. List<Integer> synchronizedList = Collections.synchronizedList(linkedList);
  4. //调用Collections的synchronizedList方法,ArrayList,返回一个SynchronizedRandomAccessList实例对象
  5. ArrayList<Integer> arrayList = new ArrayList<Integer>();
  6. List<Integer> synchronizedRandomAccessList = Collections.synchronizedList(arrayList);
  1. 使用CopyOnWriteArrayList

CopyOnWriteArrayList与ArrayList类似,都实现了List接口,只不过它的父类是Object,而不是AbstractList。CopyOnWriteArrayList与ArrayList不同:

  • 内部持有一个ReentrantLock类型的lock锁,用于控制并发访问(在对数组进行修改的方法中,都会先获取lock,获取成功才能进行修改,修改完释放锁,保证每次只允许一个线程对数组进行修改)

    1. /** The lock protecting all mutators */
    2. final transient ReentrantLock lock = new ReentrantLock();
  • 使用volatile修饰Object数组,使得变量具备内存可见性

    1. /** The array, accessed only via getArray/setArray. */
    2. //CopyOnWriteArrayList
    3. private transient volatile Object[] array;
    4. /**
    5. * The array buffer into which the elements of the ArrayList are stored.
    6. * The capacity of the ArrayList is the length of this array buffer. Any
    7. * empty ArrayList with elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA
    8. * will be expanded to DEFAULT_CAPACITY when the first element is added.
    9. */
    10. //ArrayList
    11. transient Object[] elementData; // non-private to simplify nested class access

    可以看到区别主要在于CopyOnWriteArrayList的Object是使用volatile来修饰,volatile可以使变量具备内存可见性,一个线程在工作内存中对变量进行修改后,会立即更新到物理内存,并且使得其他线程中的这个变量缓存失效,其它线程在读取时,会去物理内存中读取最新的值。(volatile修饰的是指向数组的引用变量,所以对数组添加元素,删除元素不会改变引用,只有对数组变量array重新赋值才会改变。所以为了保证内存可见性,CopyOnWriteArrayList.add()方法在添加元素时,都是复制出一个新数组,进行修改操作后,在设置到数组上)
    注意事项:Object数组都使用transient修饰,是因为transient修饰的属性不会参与序列化,ArrayList通过实现writeObject()和readObject()方法来自定义序列化方法(基于反序列化时节约空间考虑,如果用默认的序列方法,源elementData数组长度为100,实际上只有10个元素,反序列化时也会出现分配长度为100的数组,造成内存浪费)。下面是copyOpyOnWriteArrayList的add()方法

    1. /**
    2. * Inserts the specified element at the specified position in this
    3. * list. Shifts the element currently at that position (if any) and
    4. * any subsequent elements to the right (adds one to their indices).
    5. *
    6. * @throws IndexOutOfBoundsException {@inheritDoc}
    7. */
    8. public void add(int index, E element) {
    9. //使用ReetrantLock,保证写线程在同时刻只有一个
    10. final ReentrantLock lock = this.lock;
    11. lock.lock();
    12. try {
    13. //获取旧数组引用
    14. Object[] elements = getArray();
    15. int len = elements.length;
    16. if (index > len || index < 0)
    17. throw new IndexOutOfBoundsException("Index: "+index+
    18. ", Size: "+len);
    19. //创建新数组,并将就数组的数据复制到新数组中
    20. Object[] newElements;
    21. int numMoved = len - index;
    22. if (numMoved == 0)
    23. newElements = Arrays.copyOf(elements, len + 1);
    24. else {
    25. newElements = new Object[len + 1];
    26. System.arraycopy(elements, 0, newElements, 0, index);
    27. System.arraycopy(elements, index, newElements, index + 1,
    28. numMoved);
    29. }
    30. //向新数组中添加新的数组
    31. newElements[index] = element;
    32. //将旧数组引用指向新的数组
    33. setArray(newElements);
    34. } finally {
    35. lock.unlock();
    36. }
    37. }
  1. SynchronizedList和CopyOnWriteArrayList优缺点
  • SynchronizedList:读写都加锁,SynchronizedList是通过在方法上使用synchronized修饰来实现方法同步,即便只是多个线程在读数据,也不能进行,如果是读比较多的的场景下,会使性能不高,所以适合读写均匀的情况下;
  • CopyOnWriteArrayList:读不加锁,写加锁,因CopyOnWriteArrayList是读写分离的,只有对写操作加锁,但是每次写操作(添加和删除元素)时都会复制出一个新的数组,完成修改后将新数组设置到旧数组的引用上,所以在写比较多的情况写,会有很大的性能开销,所以适合读比较多的应用场景。

    ArrayList遍历时删除元素有哪些方法?

    首先结论如下:

  • 普通for循环正序删除(结果:会漏掉元素判断)

  • 普通for循环倒序删除(结果:正确删除)
  • for-each循环删除(结果:抛出异常)
  • Iterator遍历,使用ArrayList.remove()删除元素(结果:抛出异常)
  • Iterator遍历,使用Iterator的remove删除元素(结果:正确删除)

首先创建一个数组

  1. public static void main(String[] args) {
  2. ArrayList<Integer> arrayList = new ArrayList();
  3. arrayList.add(1);
  4. arrayList.add(2);
  5. arrayList.add(3);
  6. arrayList.add(3);
  7. arrayList.add(4);
  8. arrayList.add(5);
  9. removeWayOne(arrayList);
  10. }
  1. 普通for循环正序删除(结果:会漏掉对后一个元素的判断)

    1. for (int i = 0; i < arrayList.size(); i++) {
    2. if (arrayList.get(i) == 3) {//3是要删除的元素
    3. arrayList.remove(i);
    4. //解决方案: 加一行代码i = i - 1; 删除元素后,下标减1
    5. }
    6. System.out.println("当前arrayList是"+arrayList.toString());
    7. }
    8. //原ArrayList是[1, 2, 3, 3, 4, 5]
    9. //删除后是[1, 2, 3, 4, 5]

    原因在于调用remove删除元素时,remove方法调用System.arraycopy()方法将后面的元素移动到前面的位

  2. 普通for循环倒序删除(结果:正确删除)

    1. for (int i = arrayList.size() -1 ; i>=0; i--) {
    2. if (arrayList.get(i).equals(3)) {
    3. arrayList.remove(i);
    4. }
    5. System.out.println("当前arrayList是"+arrayList.toString());
    6. }

    这用方法可以正确删除元素,是因为调用remove删除元素时,remove方法调用System.arraycopy()将删除的元素后面的元素向前移动,而不会影响删除元素之前的元素,所以倒序遍历可以正常删除元素。

  3. for-each循环删除(结果:抛出异常)

抛出异常的根本原因在于for-each是使用Iterator来实现遍历的,调用ArrayList.remove()方法会将modCount+1,而Iterator内部的expectedModCount却没有更新,这样在下次循环时调用Iterator.next()会对modCount和expectedModCount进行比较,不一致就会抛出ConcurrentModificationException异常。

  1. public static void removeWayThree(ArrayList<Integer> arrayList) {
  2. for (Integer value : arrayList) {
  3. if (value.equals(3)) {//3是要删除的元素
  4. arrayList.remove(value);
  5. }
  6. System.out.println("当前arrayList是"+arrayList.toString());
  7. }
  8. }

Iterator.hasNext()来判断是否还有下一个元素和Iterator.next()方法来获取下一个元素。而因为在删除元素时,remove()方法会调用fastRemove()方法,其中会调用modCount+1,代表对数组进行修改,将修改次数+1。

  1. /**
  2. * Removes the first occurrence of the specified element from this list,
  3. * if it is present. If the list does not contain the element, it is
  4. * unchanged. More formally, removes the element with the lowest index
  5. * <tt>i</tt> such that
  6. * <tt>(o==null&nbsp;?&nbsp;get(i)==null&nbsp;:&nbsp;o.equals(get(i)))</tt>
  7. * (if such an element exists). Returns <tt>true</tt> if this list
  8. * contained the specified element (or equivalently, if this list
  9. * changed as a result of the call).
  10. *
  11. * @param o element to be removed from this list, if present
  12. * @return <tt>true</tt> if this list contained the specified element
  13. */
  14. public boolean remove(Object o) {
  15. if (o == null) {
  16. for (int index = 0; index < size; index++)
  17. if (elementData[index] == null) {
  18. fastRemove(index);
  19. return true;
  20. }
  21. } else {
  22. for (int index = 0; index < size; index++)
  23. if (o.equals(elementData[index])) {
  24. fastRemove(index);
  25. return true;
  26. }
  27. }
  28. return false;
  29. }
  30. /*
  31. * Private remove method that skips bounds checking and does not
  32. * return the value removed.
  33. */
  34. private void fastRemove(int index) {
  35. modCount++;
  36. int numMoved = size - index - 1;
  37. if (numMoved > 0)
  38. System.arraycopy(elementData, index+1, elementData, index,
  39. numMoved);
  40. elementData[--size] = null; // clear to let GC do its work
  41. }

HashMap添加一个键值对过程?

流程图如下:
JAVA - 图1

  1. 初始化table

判断table是否为空或为null,否则执行resize()方法(resize方法一般是扩容时调用,也可以调用来初始化table)

  1. 计算hash值

根据键值key计算hash值。(因为hashCode是一个int类型的变量,是4字节,32位,所以这里会将hashCode的低16位与高16位进行一个异或运算,来保留高位的运算,以便于得到的hash值更加均匀分布)

  1. /**
  2. * Computes key.hashCode() and spreads (XORs) higher bits of hash
  3. * to lower. Because the table uses power-of-two masking, sets of
  4. * hashes that vary only in bits above the current mask will
  5. * always collide. (Among known examples are sets of Float keys
  6. * holding consecutive whole numbers in small tables.) So we
  7. * apply a transform that spreads the impact of higher bits
  8. * downward. There is a tradeoff between speed, utility, and
  9. * quality of bit-spreading. Because many common sets of hashes
  10. * are already reasonably distributed (so don't benefit from
  11. * spreading), and because we use trees to handle large sets of
  12. * collisions in bins, we just XOR some shifted bits in the
  13. * cheapest possible way to reduce systematic lossage, as well as
  14. * to incorporate impact of the highest bits that would otherwise
  15. * never be used in index calculations because of table bounds.
  16. */
  17. static final int hash(Object key) {
  18. int h;
  19. return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
  20. }
  1. 插入或更新节点

根据(n-1)& hash计算得到插入的数组下标i,然后进行判断
3.1 数组为空(table[i] == null)
那么说明当前数组下标下,没有hash冲突的元素,直接新建节点添加。
3.2 等于下标首元素,table[i].hash hash && (table[i] key || (key != null && key.equals(table[i].key)))
判断table[i]的首个元素是否和key,如果相同直接更新value。
3.3 数组下标存的是红黑树,table[i] instanceof TreeNode
判断table[i]是否为treeNode,即table[i]是否是红黑树,如果是红黑树,则直接在树中插入键值对。
3.4 数组下标存的是链表
上面的判断条件都不满足,说明table[i]存储的是一个链表,那么遍历链表,判断是否存在已有元素的key与插入键值对key相等,如果是,那么更新value,如果没有,那么在链表末尾插入一个新节点。插入之后判断链表长度链表长度是否大于8,大于8的话把链表转换为红黑树。

  1. 扩容

插入成功后,判断实际存在的键值对数量size是否超过了最大容量threshold(一般是数组长度*负载因子0.75),如果超过,进行扩容。
源码如下:

  1. /**
  2. * Associates the specified value with the specified key in this map.
  3. * If the map previously contained a mapping for the key, the old
  4. * value is replaced.
  5. *
  6. * @param key key with which the specified value is to be associated
  7. * @param value value to be associated with the specified key
  8. * @return the previous value associated with <tt>key</tt>, or
  9. * <tt>null</tt> if there was no mapping for <tt>key</tt>.
  10. * (A <tt>null</tt> return can also indicate that the map
  11. * previously associated <tt>null</tt> with <tt>key</tt>.)
  12. */
  13. public V put(K key, V value) {
  14. return putVal(hash(key), key, value, false, true);
  15. }
  16. /**
  17. * Implements Map.put and related methods
  18. *
  19. * @param hash hash for key
  20. * @param key the key
  21. * @param value the value to put
  22. * @param onlyIfAbsent if true, don't change existing value
  23. * @param evict if false, the table is in creation mode.
  24. * @return previous value, or null if none
  25. */
  26. final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
  27. boolean evict) {
  28. Node<K,V>[] tab; Node<K,V> p; int n, i;
  29. //1.tab为空则创建
  30. if ((tab = table) == null || (n = tab.length) == 0)
  31. n = (tab = resize()).length;
  32. //2.计算index,并对null做处理
  33. //3.插入元素
  34. //(n - 1) & hash 确定元素存放在哪个数组下标下
  35. //下标没有元素,新生成节点放入中(此时,这个节点是放在数组中)
  36. if ((p = tab[i = (n - 1) & hash]) == null)
  37. tab[i] = newNode(hash, key, value, null);
  38. //下标中已存在元素
  39. else {
  40. Node<K,V> e; K k;
  41. //节点key存在,直接覆盖value
  42. //比较桶中第一个元素(数组中的节点)的hash值相等,key相等
  43. if (p.hash == hash &&
  44. ((k = p.key) == key || (key != null && key.equals(k))))
  45. //将第一个元素直接赋值给e,用e来记录
  46. e = p;
  47. //判断该链为红黑树
  48. //hash值不相等,即key不相等,为红黑树节点
  49. else if (p instanceof TreeNode)
  50. //放入树中
  51. e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
  52. //该链为链表
  53. //为链表节点
  54. else {
  55. //在链表最末插入结点
  56. for (int binCount = 0; ; ++binCount) {
  57. //到达链表的尾部
  58. if ((e = p.next) == null) {
  59. //到达尾部插入新结点
  60. p.next = newNode(hash, key, value, null);
  61. //结点数量达到阈值,转化为红黑树
  62. if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
  63. treeifyBin(tab, hash);
  64. //跳出循环
  65. break;
  66. }
  67. //判断链表中结点的key值与插入的元素的key值是否相等
  68. if (e.hash == hash &&
  69. ((k = e.key) == key || (key != null && key.equals(k))))
  70. //相等,跳出循环
  71. break;
  72. //用于遍历桶中的链表,与前面的e = p.next组合,可以遍历链表
  73. p = e;
  74. }
  75. }
  76. //表示在桶中找到了key值、hash值与插入元素相等的结点
  77. if (e != null) { // existing mapping for key
  78. //纪录e的value
  79. V oldValue = e.value;
  80. //onlyIfAbsent为false或者旧值为null
  81. if (!onlyIfAbsent || oldValue == null)
  82. //用新值代替旧值
  83. e.value = value;
  84. //访问后回调
  85. afterNodeAccess(e);
  86. //返回旧值
  87. return oldValue;
  88. }
  89. }
  90. ++modCount;
  91. //超过最大容量就扩容,实际大小大于阈值则扩容
  92. if (++size > threshold)
  93. resize();
  94. //插入后回调
  95. afterNodeInsertion(evict);
  96. return null;
  97. }

扩容方法:

  1. /**
  2. * Initializes or doubles table size. If null, allocates in
  3. * accord with initial capacity target held in field threshold.
  4. * Otherwise, because we are using power-of-two expansion, the
  5. * elements from each bin must either stay at same index, or move
  6. * with a power of two offset in the new table.
  7. *
  8. * @return the table
  9. */
  10. final Node<K,V>[] resize() {
  11. //记录表值
  12. Node<K,V>[] oldTab = table;
  13. //判断oldTab的空间大小
  14. int oldCap = (oldTab == null) ? 0 : oldTab.length;
  15. int oldThr = threshold;
  16. int newCap, newThr = 0;
  17. if (oldCap > 0) {
  18. if (oldCap >= MAXIMUM_CAPACITY) {
  19. threshold = Integer.MAX_VALUE;
  20. return oldTab;
  21. }
  22. else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
  23. oldCap >= DEFAULT_INITIAL_CAPACITY)
  24. newThr = oldThr << 1; // double threshold
  25. }
  26. else if (oldThr > 0) // initial capacity was placed in threshold
  27. newCap = oldThr;
  28. else { // zero initial threshold signifies using defaults
  29. newCap = DEFAULT_INITIAL_CAPACITY;
  30. newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
  31. }
  32. if (newThr == 0) {
  33. float ft = (float)newCap * loadFactor;
  34. newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
  35. (int)ft : Integer.MAX_VALUE);
  36. }
  37. threshold = newThr;
  38. @SuppressWarnings({"rawtypes","unchecked"})
  39. Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
  40. table = newTab;
  41. if (oldTab != null) {
  42. for (int j = 0; j < oldCap; ++j) {
  43. Node<K,V> e;
  44. if ((e = oldTab[j]) != null) {
  45. oldTab[j] = null;
  46. if (e.next == null)
  47. newTab[e.hash & (newCap - 1)] = e;
  48. else if (e instanceof TreeNode)
  49. ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
  50. else { // preserve order
  51. Node<K,V> loHead = null, loTail = null;
  52. Node<K,V> hiHead = null, hiTail = null;
  53. Node<K,V> next;
  54. do {
  55. next = e.next;
  56. if ((e.hash & oldCap) == 0) {
  57. if (loTail == null)
  58. loHead = e;
  59. else
  60. loTail.next = e;
  61. loTail = e;
  62. }
  63. else {
  64. if (hiTail == null)
  65. hiHead = e;
  66. else
  67. hiTail.next = e;
  68. hiTail = e;
  69. }
  70. } while ((e = next) != null);
  71. if (loTail != null) {
  72. loTail.next = null;
  73. newTab[j] = loHead;
  74. }
  75. if (hiTail != null) {
  76. hiTail.next = null;
  77. newTab[j + oldCap] = hiHead;
  78. }
  79. }
  80. }
  81. }
  82. }
  83. return newTab;
  84. }

ConcurrentHashMap添加一个键值对的工程是怎样的?

ConcurrentHashMap的工作流程图:
JAVA - 图2

  1. 判断null值

判断key null 或者 value == null,如果是,抛出空指针异常。

  1. 计算hash

根基key计算hash值(计算结果跟hashMap是一致的,写法不同)。

  1. 进入for循环,插入或更新元素

3.1 如果tab null || tab.length == 0,说明数组未初始化
说明当前数组tab还没有初始化。
那么调用initTable()方法初始化tab。(在initTable方法中,为了控制只有一个线程对table进行初始化,当前线程会通过CAS操作对SIZECTL变量赋值为-1,如果赋值成功,线程才能初始化table,否则会调用Thread.yield()方法让出时间片。
3.2 如果f == null,说明当前下标没有哈希冲突的键值对
说明当前数组下标还没有哈希冲突的键值对。
Node f是根据key的hash值计算得到数组下标,下标存储的元素,f可能是null,也有可能是链表头节点,红黑树根节点或迁移标志节点ForwardNode,那么根据key和value创建一个Node,使用CAS操作设置在当前数组下标下,并且break出for循环。
3.3 如果 f != null && f.hash = -1,说明存储的是标志节点,表示在扩容
说明ConcurrentHashMap正在扩容,当前的节点f是一个标志节点,当前下标存储的hash冲突的元素已经迁移了。那么当前线程会调用helpTransfer()方法来辅助扩容,扩容完成后会将tab指向新的table,然后继续执行for循环。
3.4 除了上面三种以外情况,说明是下标存储链表或者是红黑树
说明f节点是一个链表的头结点或者是红黑树的根节点,那么对f加synchronize同步锁,然后进行以下判断:

  1. - f.hash > 0

如果是f的hash值大于0,当前数组下标存储的是一个链表,f是链表的头结点。
对链表进行遍历,如果有节点跟当前需要插入的节点的hash值相同,那么节点的value进行更新,否则根据key,value创建一个Node,添加到链表末尾

  1. - f instanceof TreeBin

如果f是TreeBin类型,那么说明当前数组下标存储的是一个红黑树,f是红黑树的根节点,调用putTreeVal方法,插入或更新节点。

  • 插入完成后,判断binCount(数组下标存储是一个链表时,binCount是链表长度),当binCount超过8时,并且数组的长度大于64时,那么调用treeifyBin方法将链表转换为红黑树。最后break出for循环。

PS:
当原来的链表长度超过8时,确实会调用treeifyBin方法,但是在treeifyBin方法中会判断当前tab是否为空或者数组长度是否小于64,如果满足条件,那么调用resize方法对tab初始化或者扩容。就不会将链表转换为红黑树了。
treeifyBin方法源码:

  1. /**
  2. * Replaces all linked nodes in bin at given index unless table is
  3. * too small, in which case resizes instead.
  4. */
  5. private final void treeifyBin(Node<K,V>[] tab, int index) {
  6. Node<K,V> b; int n, sc;
  7. if (tab != null) {
  8. if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
  9. tryPresize(n << 1);
  10. else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
  11. synchronized (b) {
  12. if (tabAt(tab, index) == b) {
  13. TreeNode<K,V> hd = null, tl = null;
  14. for (Node<K,V> e = b; e != null; e = e.next) {
  15. TreeNode<K,V> p =
  16. new TreeNode<K,V>(e.hash, e.key, e.val,
  17. null, null);
  18. if ((p.prev = tl) == null)
  19. hd = p;
  20. else
  21. tl.next = p;
  22. tl = p;
  23. }
  24. setTabAt(tab, index, new TreeBin<K,V>(hd));
  25. }
  26. }
  27. }
  28. }
  29. }
  1. 判断是否需要扩容

调用addCount()方法对当前数组长度加一,在addCount()方法中,会判断当前元素个数是否超过sizeCtl(扩容阈值,总长度0.75),如果是,那么会进行扩容,如果正处于扩容中,当前线程会辅助扩容。
*ConcurrentHashMap源码:

  1. /** Implementation for put and putIfAbsent */
  2. final V putVal(K key, V value, boolean onlyIfAbsent) {
  3. //ConcurrentHashMap不允许key和value为null,否则抛出异常
  4. if (key == null || value == null) throw new NullPointerException();
  5. //计算hash值
  6. int hash = spread(key.hashCode());
  7. int binCount = 0;
  8. for (Node<K,V>[] tab = table;;) { //进入循环
  9. Node<K,V> f; int n, i, fh;
  10. if (tab == null || (n = tab.length) == 0) //数组如果为空
  11. tab = initTable(); //初始化数组
  12. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  13. //如果发现此数组下标下没有哈希冲突的元素,就直接使用CAS操作将Node设置到此下标下
  14. if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
  15. break; // no lock when adding to empty bin
  16. } else if ((fh = f.hash) == MOVED) //代表当前下标的头结点是标识结点,代表数组在扩容
  17. tab = helpTransfer(tab, f); //协助扩容
  18. else { //这种是普通情况,存在的是链表或者红黑树,进行插入
  19. V oldVal = null;
  20. synchronized (f) { //加同步锁,避免多线程进行插入
  21. if (tabAt(tab, i) == f) {
  22. if (fh >= 0) { //头结点hash值大于0,此数组下标下代表存的一个链表
  23. binCount = 1;
  24. for (Node<K,V> e = f;; ++binCount) { //遍历链表
  25. K ek;
  26. if (e.hash == hash &&
  27. ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
  28. oldVal = e.val;
  29. if (!onlyIfAbsent)
  30. e.val = value;
  31. break;
  32. }
  33. Node<K,V> pred = e;
  34. if ((e = e.next) == null) { //键值对是新添加的,在链表尾部插入新节点
  35. pred.next = new Node<K,V>(hash, key,value, null);
  36. break;
  37. }
  38. }
  39. } else if (f instanceof TreeBin) { //下标下存的是红黑树
  40. Node<K,V> p;
  41. binCount = 2;
  42. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
  43. oldVal = p.val;
  44. if (!onlyIfAbsent)
  45. p.val = value;
  46. }
  47. }
  48. }
  49. }
  50. if (binCount != 0) {
  51. if (binCount >= TREEIFY_THRESHOLD) //链表长度>=8,转换为红黑树
  52. treeifyBin(tab, i);
  53. if (oldVal != null)
  54. return oldVal;
  55. break;
  56. }
  57. }
  58. }
  59. addCount(1L, binCount);
  60. return null;
  61. }

HashMap与HashTable,ConcurrentHashMap的区别是什么

主要从底层数据结构,线程安全,执行效率,是否允许Null值,初始容量及扩容,hash值计算来进行分析。

  1. 底层数据结构
    1. transient Node<K,V>[] table; //HashMap
    2. transient volatile Node<K,V>[] table; //ConcurrentHashMap
    3. private transient Entry<?,?>[] table; //HashTable
    HashMap = 数组+链表+红黑树
    HashMap的底层数据结构是一个数组+链表+红黑树,数组的每个元素存储是一个链表的头结点,链表中存储了一组哈希冲突的键值对,通过链地址法来解决哈希冲突的。为了避免链表长度过长,影响查找元素的效率,当链表的长度大于8时,会将链表转换为红黑树,链表的长度小于6时,将红黑树转换为链表(但是红黑树转换为链表的时机不是在删除链表时,而是在扩容时,发现红黑树分解后的两个链表小于6,就按链表处理,否则就建立两个小的红黑树,设置到扩容后的位置)。之所以临界点为8是因为红黑树的查找时间复杂度为logN,链表的平均时间查找复杂度为N/2,当N为8时,logN为3是小于N/2的,正好可以通过转换红黑树减少查找的时间复杂度。

ConcurrentHashMap = 数组+链表+红黑树
ConcurrentHashMap底层数据结构跟HashMap一致,底层数据结构是一个数组+链表+红黑树。只不过使用了volatile来进行修饰他的属性,来保证内存可见性(一个线程修改了这些属性后,会使其他线程对于该属性的缓存失效,以便下次读取时取最新的值)。
HashTable = 数组+链表
HashTable底层数据结构跟HashMap一致,底层数据结构是数组+链表,也是通过链地址法来解决冲突,只不过链表过长时,不会转为红黑树来减少查找的时间复杂度。HashTable属于历史遗留类,实际开发中很少使用。

  1. 线程安全

HashMap 非线程安全
HashMap是非线程安全的。(例如多个线程插入多个键值对,如果两个键值对的key哈希冲突,可能使得两个线程在操作同一个链表中的节点,导致一个键值对的value被覆盖)
HashTable 线程安全
HashTable是线程安全的,主要通过使用synchronize关键字修饰大部分方法,使得每次只能一个线程对HashTable进行同步修改,性能开销较大。
ConcurrentHashMap 线程安全
ConcurrentHashMap 是线程安全的,主要通过CAS操作+synchronize来保证线程安全的。
CAS操作
往ConcurrentHashMap中插入新的键值对是,如果对应的数组下标元素为null,那么通过CAS操作原子性的将节点设置到数组中。

  1. //这是添加新的键值对的方法
  2. final V putVal(K key, V value, boolean onlyIfAbsent) {
  3. ...其他代码
  4. if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  5. if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
  6. break; // 因为对应的数组下标元素为null,所以null作为预期值,new Node<K,V>(hash, key, value, null)作为即将更新的值,只有当内存中的值与即将预期值一致时,才会进行更新,保证原子性。
  7. }
  8. ...其他代码
  9. }
  10. static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
  11. Node<K,V> c, Node<K,V> v) {
  12. return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
  13. }

多线程

Java中创建线程有哪些方式

继承Thread类,重写run方法

这种方式通过自定义类继承Thread类,重写run()方法,然后在创建自定义类的对象,然后调用start()方法,JVM会创建一个新线程。并且为线程创建方法调用栈和程序计数器,此时线程处于就绪状态,当线程获取CPU时间片后,线程会进入到运行状态,回去调用run()方法。并且创建自定义类对象的线程与调用run()方法的线程之间是并发的。

  1. class CustomThread extends Thread {
  2. public static void main(String[] args) {
  3. System.out.println(Thread.currentThread().getName()+"线程调用了main方法");
  4. for (int i = 0; i < 10; i++) {
  5. if (i == 1) {
  6. CustomThread customThread = new CustomThread();
  7. customThread.start();
  8. System.out.println(Thread.currentThread().getName()+"线程--i是"+i);
  9. }
  10. }
  11. System.out.println("main()方法执行完毕!");
  12. }
  13. void run() {
  14. System.out.println(Thread.currentThread().getName()+"线程调用了run()方法");
  15. for (int j = 0; j < 5; j++) {
  16. System.out.println(Thread.currentThread().getName()+"线程--j是"+j);
  17. }
  18. System.out.println("run()方法执行完毕!");
  19. }
  20. }

执行原理
首先看start方法的源码,在源码中首先会判断threadStatus是否为0,如果为0会抛出异常。然后会将当前对象添加到线程组,最后调用start()方法。

  1. /**
  2. * Causes this thread to begin execution; the Java Virtual Machine
  3. * calls the <code>run</code> method of this thread.
  4. * <p>
  5. * The result is that two threads are running concurrently: the
  6. * current thread (which returns from the call to the
  7. * <code>start</code> method) and the other thread (which executes its
  8. * <code>run</code> method).
  9. * <p>
  10. * It is never legal to start a thread more than once.
  11. * In particular, a thread may not be restarted once it has completed
  12. * execution.
  13. *
  14. * @exception IllegalThreadStateException if the thread was already
  15. * started.
  16. * @see #run()
  17. * @see #stop()
  18. */
  19. public synchronized void start() {
  20. /**
  21. * This method is not invoked for the main method thread or "system"
  22. * group threads created/set up by the VM. Any new functionality added
  23. * to this method in the future may have to also be added to the VM.
  24. *
  25. * A zero status value corresponds to state "NEW".
  26. */
  27. if (threadStatus != 0)
  28. throw new IllegalThreadStateException();
  29. /* Notify the group that this thread is about to be started
  30. * so that it can be added to the group's list of threads
  31. * and the group's unstarted count can be decremented. */
  32. //将对象添加到线程组
  33. group.add(this);
  34. boolean started = false;
  35. try {
  36. start0(); //这是一个native方法,调用后JVM会新建一个线程来调用run方法
  37. started = true;
  38. } finally {
  39. try {
  40. if (!started) {
  41. group.threadStartFailed(this);
  42. }
  43. } catch (Throwable ignore) {
  44. /* do nothing. If start0 threw a Throwable then
  45. it will be passed up the call stack */
  46. }
  47. }
  48. }
  49. private native void start0();

拓展问题:多次调用Thread对象的start()方法会怎么样?
会抛出IllegalThreadStateException异常。其实在Thread#start()方法里面的注释中有提到,多次调用start()方法是非法的,所以在上面的start()方法源码中一开始就对threadStatus进行判断,不为0就会抛出IllegalThreadStateException异常。
注意:start()方法中判断threadStatus是否为0,是判断当前线程是否新建态,0是代表新建态(上图中源码注释里有提到),而不是就绪态,因为Java的Thread类中,Thread的Runnable状态包括了线程的就绪态和运行态,(Thread的state的RUNNABLE时(也就是threadStatus为4时),代表线程为就绪态或运行态)。执行start()方法的线程还不是JVM新建线程,所以不是就绪态

实现Runnable接口

这种方式就是创建一个类(target类),实现Runnable接口的Run()方法,然后将target类的实例对象作为Thread的构造器入参target,实际上的线程对象还是Thread实例,只不过线程Thread与线程执行体(target类的run方法)分离了,耦合度更低一些。

  1. class ThreadTarget implements Runnable {
  2. void run() {
  3. System.out.println(Thread.currentThread().getName()+"线程执行了run方法");
  4. }
  5. public static void main(String[] args) {
  6. System.out.println(Thread.currentThread().getName()+"线程执行了main方法");
  7. ThreadTarget target = new ThreadTarget();
  8. Thread thread = new Thread(target);
  9. thread.start();
  10. }
  11. }

原理:因为Thread类的run()方法中会判断成员变量target是否为空,不为空就会调用target类的run方法。

  1. /* What will be run. */
  2. private Runnable target;
  3. /**
  4. * If this thread was constructed using a separate
  5. * <code>Runnable</code> run object, then that
  6. * <code>Runnable</code> object's <code>run</code> method is called;
  7. * otherwise, this method does nothing and returns.
  8. * <p>
  9. * Subclasses of <code>Thread</code> should override this method.
  10. *
  11. * @see #start()
  12. * @see #stop()
  13. * @see #Thread(ThreadGroup, Runnable, String)
  14. */
  15. @Override
  16. public void run() {
  17. if (target != null) {
  18. target.run();
  19. }
  20. }

另外一种写法
匿名内部类
可以不用创建target类,可以使用匿名内部类的方式实现,因此上面的代码也可以按照一下方式写:

  1. Thread thread = new Thread(new Runnable() {
  2. @Override
  3. public void run() {
  4. System.out.println(Thread.currentThread().getName()+"线程执行了run方法");
  5. }
  6. });
  7. thread.start();

lamda表达式
在Java8后使用了@FunctionalInterface注解来修饰Runnable接口,表明Runnable接口是一个函数式接口,有且只有一个抽象方法,也可以Lamda方式来创建Runnable对象,比匿名类的方式更简洁。

  1. @FunctionalInterface
  2. public interface Runnable {
  3. /**
  4. * When an object implementing interface <code>Runnable</code> is used
  5. * to create a thread, starting the thread causes the object's
  6. * <code>run</code> method to be called in that separately executing
  7. * thread.
  8. * <p>
  9. * The general contract of the method <code>run</code> is that it may
  10. * take any action whatsoever.
  11. *
  12. * @see java.lang.Thread#run()
  13. */
  14. public abstract void run();
  15. }

因此上面的的代码也可以按以下方式写:

  1. Thread thread = new Thread(()->{
  2. System.out.println(Thread.currentThread().getName()+"线程执行了run方法");
  3. })
  4. thread.start()

总结
这种写法不用继承Thread类,但同样也有缺点,就是线程方法体(Run()方法)不能设置返回值。

实现Callable接口

Runnable接口中的run方法是没有返回值,如果需要执行的任务带返回值就不能使用Runnable接口。创建一个类CallableTarget,实现Callable接口,实现带有返回值的call()方法,然后根据CallableTarget创建一个任务FutureTask,然后根据FutureTask来创建一个线程Thread,调用Thread的start方法执行任务。

  1. public class CallableTarget implements Callable<Integer> {
  2. public Integer call() throws InterruptedException {
  3. System.out.println(Thread.currentThread().getName()+"线程执行了call方法");
  4. Thread.sleep(5000);
  5. return 1;
  6. }
  7. public static void main(String[] args) throws ExecutionException, InterruptedException {
  8. System.out.println(Thread.currentThread().getName()+"线程执行了main方法");
  9. CallableTarget callableTarget = new CallableTarget();
  10. FutureTask<Integer> task = new FutureTask<Integer>(callableTarget);
  11. Thread thread = new Thread(task);
  12. thread.start();
  13. Integer result = task.get();//当前线程会阻塞,一直等到结果返回。
  14. System.out.println("执行完毕,打印result="+result);
  15. System.out.println("执行完毕");
  16. }
  17. }

原理就是Thread类默认的run()方法实现会去调自身的实例变量target的run()方法,(target就是构造器Thread传入的FutureTask),而FutureTask的run方法中就会调用Callable接口的实例的call()方法。

  1. //Thread类的run方法实现
  2. /**
  3. * If this thread was constructed using a separate
  4. * <code>Runnable</code> run object, then that
  5. * <code>Runnable</code> object's <code>run</code> method is called;
  6. * otherwise, this method does nothing and returns.
  7. * <p>
  8. * Subclasses of <code>Thread</code> should override this method.
  9. *
  10. * @see #start()
  11. * @see #stop()
  12. * @see #Thread(ThreadGroup, Runnable, String)
  13. */
  14. @Override
  15. public void run() {
  16. //这里target就是在创建Thread时传入的FutureTask实例变量
  17. if (target != null) {
  18. target.run();
  19. }
  20. }
  21. //FutureTask类的run方法实现
  22. public void run() {
  23. if (state != NEW ||
  24. !UNSAFE.compareAndSwapObject(this, runnerOffset,
  25. null, Thread.currentThread()))
  26. return;
  27. try {
  28. Callable<V> c = callable;
  29. if (c != null && state == NEW) {
  30. V result;
  31. boolean ran;
  32. try {
  33. //这里调用Callable实例的call方法
  34. result = c.call();
  35. ran = true;
  36. } catch (Throwable ex) {
  37. result = null;
  38. ran = false;
  39. setException(ex);
  40. }
  41. if (ran)
  42. set(result);
  43. }
  44. } finally {
  45. // runner must be non-null until state is settled to
  46. // prevent concurrent calls to run()
  47. runner = null;
  48. // state must be re-read after nulling runner to prevent
  49. // leaked interrupts
  50. int s = state;
  51. if (s >= INTERRUPTING)
  52. handlePossibleCancellationInterrupt(s);
  53. }
  54. }

Java中Runnable、Callable、Future、FutureTask的区别和联系?

最原始的通过新建线程执行任务的方法就是我们创建一个新类,继承Thread,然后重写run方法,因为限制太大了,Java不支持多继承,所以有了Runnable接口。

Runnable

Runnable接口,只需要新建一个类实现这个接口,然后重写run方法,将该类的实例作为参数创建Thread。线程运行时调用该类的run方法。
Thread.start()方法->Thread.run()方法->target.run()方法

Callable

Callable是跟Runnable类似,也是一个接口。只不过它的call方法有返回值,可以供程序接受任务执行的结果。

  1. @FunctionalInterface
  2. public interface Callable<V> {
  3. /**
  4. * Computes a result, or throws an exception if unable to do so.
  5. *
  6. * @return computed result
  7. * @throws Exception if unable to compute a result
  8. */
  9. V call() throws Exception;
  10. }

Future

Future也是一个接口,Future就像是一个管理容器一样,进一步对Runnable和Callable的实例进行封装,提供了取消任务的cancel()方法,查询任务是否完成的isDone()方法,获取执行结果的get方法,带有超时时间来获取执行结果的get()方法。

  1. public interface Future<V> {
  2. /**
  3. * Attempts to cancel execution of this task. This attempt will
  4. * fail if the task has already completed, has already been cancelled,
  5. * or could not be cancelled for some other reason. If successful,
  6. * and this task has not started when {@code cancel} is called,
  7. * this task should never run. If the task has already started,
  8. * then the {@code mayInterruptIfRunning} parameter determines
  9. * whether the thread executing this task should be interrupted in
  10. * an attempt to stop the task.
  11. *
  12. * <p>After this method returns, subsequent calls to {@link #isDone} will
  13. * always return {@code true}. Subsequent calls to {@link #isCancelled}
  14. * will always return {@code true} if this method returned {@code true}.
  15. *
  16. * @param mayInterruptIfRunning {@code true} if the thread executing this
  17. * task should be interrupted; otherwise, in-progress tasks are allowed
  18. * to complete
  19. * @return {@code false} if the task could not be cancelled,
  20. * typically because it has already completed normally;
  21. * {@code true} otherwise
  22. */
  23. boolean cancel(boolean mayInterruptIfRunning);
  24. /**
  25. * Returns {@code true} if this task was cancelled before it completed
  26. * normally.
  27. *
  28. * @return {@code true} if this task was cancelled before it completed
  29. */
  30. boolean isCancelled();
  31. /**
  32. * Returns {@code true} if this task completed.
  33. *
  34. * Completion may be due to normal termination, an exception, or
  35. * cancellation -- in all of these cases, this method will return
  36. * {@code true}.
  37. *
  38. * @return {@code true} if this task completed
  39. */
  40. boolean isDone();
  41. /**
  42. * Waits if necessary for the computation to complete, and then
  43. * retrieves its result.
  44. *
  45. * @return the computed result
  46. * @throws CancellationException if the computation was cancelled
  47. * @throws ExecutionException if the computation threw an
  48. * exception
  49. * @throws InterruptedException if the current thread was interrupted
  50. * while waiting
  51. */
  52. V get() throws InterruptedException, ExecutionException;
  53. /**
  54. * Waits if necessary for at most the given time for the computation
  55. * to complete, and then retrieves its result, if available.
  56. *
  57. * @param timeout the maximum time to wait
  58. * @param unit the time unit of the timeout argument
  59. * @return the computed result
  60. * @throws CancellationException if the computation was cancelled
  61. * @throws ExecutionException if the computation threw an
  62. * exception
  63. * @throws InterruptedException if the current thread was interrupted
  64. * while waiting
  65. * @throws TimeoutException if the wait timed out
  66. */
  67. V get(long timeout, TimeUnit unit)
  68. throws InterruptedException, ExecutionException, TimeoutException;
  69. }

FutureTask

因为Future只是一个接口,并不能实例化,可以认为FutureTask就是Future接口的实现类,FutureTask实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable接口和Future接口。

  1. public class FutureTask<V> implements RunnableFuture<V> {
  2. ...
  3. }
  4. public interface RunnableFuture<V> extends Runnable, Future<V> {
  5. void run();
  6. }

volatile关键字有什么用?怎么理解可见性?一般什么场景去用可见性?

当线程进行一个volatile变量写操作时,JIT编译器生成的汇编指令会在写操作的指令后面加上一个lock指令,代码如下:

  1. instance = new Singleton(); // instance是volatile变量
  2. 转变成汇编代码,如下。
  3. 0x01a3de1d: movb $0×0,0×1104800(%esi);0x01a3de24: lock addl $0×0,(%esp);

“lock”有三个作用:

  1. 将当前CPU缓存行的数据会写回到系统内存。
  2. 这个写回内存的操作会使得其他CPU里缓存该内存地址的数据无效。
  3. 确保指令重排序时,内存屏障前的指令不会排到后面去,内存屏障后面的指令不会排到前面去。

可见性可以理解为一个线程的写操作可以立即被其他线程得知。为了提高CPU处理速度,CPU一般不直接与内存进行通信,而是将系统内存的数据读取到内存缓存,再进行操作。对于普通的变量,修改完不知道何时会更新到系统内存。但是如果是对volatile修饰的变量进行写操作,JVM会向处理器发送一条Lock前缀的指令,将这个变量所在的缓存行的数据立即写回到系统内存。但是即便写回到系统内存,其他CPU的缓存行数据还是旧数据,为了保证数据一致性,其他CPU会嗅探在总线上传播的数据来检查自己缓存的行的值是否过期,当CPU发现缓存行对应的内存地址被修改,那么就会将当前缓存行设置为无效,下次当CPU对这个缓存进行修改时,会重新从系统内存中把数据读到处理器缓存里。

使用场景
  1. 读写锁
    如果需要实现一个读写锁,每次只能一个线程去写数据,但是有多个线程来读取数据,就synchronize同步锁来对set方法加锁,get方法不加锁,使用volatile修饰变量,保存内存可见性,不然多个线程可能会在变量修改后还读到一个旧值。

    1. volatile Integer a;
    2. //可以实现一写多读的场景,保证并发修改数据时的正确。
    3. set(Integer c) {
    4. synchronized(this.a) {
    5. this.a = c;
    6. }
    7. }
    8. get() {
    9. return a;
    10. }
  2. 状态位

用于做状态位标志,如果多个线程去需要根据一个状态位来执行一些操作,使用volatile修饰可以保存内存可见性。用于单例模式,保证1内存可见性,以防止指令重排序。

Java中线程的状态是怎么样的?

在传统操作系统中,线程等于轻量级的进程。
JAVA - 图3
所以传统的操作系统线程一般有以下状态:

  1. 新建状态:使用new关键字和Thread类或其子类建立一个线程对象后,该线程对象就处于新建状态。它保持这个状态直到程序start()这个线程;
  2. 就绪状态:当线程对象调用了start()方法之后,该线程就进入就绪状态。该线程就处于就绪队列中,要等待JVM里线程调度器的调度。
  3. 运行状态:如果就绪状态的线程获取CPU资源,就可以执行run()方法,此时线程便处于运行状态。处于运行状态的线程最为复杂,它可以变为阻塞状态、就绪状态和死亡状态。
  4. 阻塞状态:如果一个线程执行了sleep(睡眠)、suspend(挂起)等方法,失去所占用资源之后,该线程就从运行状态进入阻塞状态。在睡眠时间已到或获得设备资源后可以重新进入就绪状态。可以分为三种:
    • 等待阻塞:运行状态中的线程执行wait()方法,使线程进入到等待阻塞状态。
    • 同步阻塞:线程在获取synchronized同步锁失败(因为同步锁被其他线程占用)。
    • 其他阻塞:通过调用线程的sleep()或join()发出I/O请求时,线程就会进入到阻塞状态。当sleep()状态超时,join()等待线程终止或超时,或者I/O处理完毕,线程重新转入就绪状态。
  5. 死亡状态:一个运行状态的线程完成任务或者在其他终止条件发生时,该线程进入到终止状态。

但是Java中Thread对象的状态划分跟传统的操作系统线程状态有一些区别。

  1. /**
  2. * A thread state. A thread can be in one of the following states:
  3. * <ul>
  4. * <li>{@link #NEW}<br>
  5. * A thread that has not yet started is in this state.
  6. * </li>
  7. * <li>{@link #RUNNABLE}<br>
  8. * A thread executing in the Java virtual machine is in this state.
  9. * </li>
  10. * <li>{@link #BLOCKED}<br>
  11. * A thread that is blocked waiting for a monitor lock
  12. * is in this state.
  13. * </li>
  14. * <li>{@link #WAITING}<br>
  15. * A thread that is waiting indefinitely for another thread to
  16. * perform a particular action is in this state.
  17. * </li>
  18. * <li>{@link #TIMED_WAITING}<br>
  19. * A thread that is waiting for another thread to perform an action
  20. * for up to a specified waiting time is in this state.
  21. * </li>
  22. * <li>{@link #TERMINATED}<br>
  23. * A thread that has exited is in this state.
  24. * </li>
  25. * </ul>
  26. *
  27. * <p>
  28. * A thread can be in only one state at a given point in time.
  29. * These states are virtual machine states which do not reflect
  30. * any operating system thread states.
  31. *
  32. * @since 1.5
  33. * @see #getState
  34. */
  35. public enum State {
  36. /**
  37. * Thread state for a thread which has not yet started.
  38. */
  39. NEW, //新建状态
  40. /**
  41. * Thread state for a runnable thread. A thread in the runnable
  42. * state is executing in the Java virtual machine but it may
  43. * be waiting for other resources from the operating system
  44. * such as processor.
  45. */
  46. RUNNABLE, //运行态
  47. /**
  48. * Thread state for a thread blocked waiting for a monitor lock.
  49. * A thread in the blocked state is waiting for a monitor lock
  50. * to enter a synchronized block/method or
  51. * reenter a synchronized block/method after calling
  52. * {@link Object#wait() Object.wait}.
  53. */
  54. BLOCKED, //阻塞状态
  55. /**
  56. * Thread state for a waiting thread.
  57. * A thread is in the waiting state due to calling one of the
  58. * following methods:
  59. * <ul>
  60. * <li>{@link Object#wait() Object.wait} with no timeout</li>
  61. * <li>{@link #join() Thread.join} with no timeout</li>
  62. * <li>{@link LockSupport#park() LockSupport.park}</li>
  63. * </ul>
  64. *
  65. * <p>A thread in the waiting state is waiting for another thread to
  66. * perform a particular action.
  67. *
  68. * For example, a thread that has called <tt>Object.wait()</tt>
  69. * on an object is waiting for another thread to call
  70. * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
  71. * that object. A thread that has called <tt>Thread.join()</tt>
  72. * is waiting for a specified thread to terminate.
  73. */
  74. WAITING, //等待态
  75. /**
  76. * Thread state for a waiting thread with a specified waiting time.
  77. * A thread is in the timed waiting state due to calling one of
  78. * the following methods with a specified positive waiting time:
  79. * <ul>
  80. * <li>{@link #sleep Thread.sleep}</li>
  81. * <li>{@link Object#wait(long) Object.wait} with timeout</li>
  82. * <li>{@link #join(long) Thread.join} with timeout</li>
  83. * <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
  84. * <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
  85. * </ul>
  86. */
  87. TIMED_WAITING, //有时间限制的等待态
  88. /**
  89. * Thread state for a terminated thread.
  90. * The thread has completed execution.
  91. */
  92. TERMINATED; //死亡态
  93. }

JAVA - 图4
NEW 新建态:处于NEW状态的线程此时尚未启动,还没有调用Thread实例的start()方法。
RUNNABLE 运行态:表示当前线程正在运行中。处于RUNNABLE状态的线程可能在Java虚拟机中运行,也有可能在等待其他系统资源(比如I/O)。Java线程的RUNNABLE状态其实是包括了传统操作系统线程的ready和running两个状态。
BLOCKED 阻塞态:阻塞状态,处于BLOCKED状态的线程等待锁的释放以进入同步区。
WAITING 等待态:等待状态,处于等待状态的线程变成RUNNABLE状态需要其他线程唤醒。调用如下三个方法会使线程进入等待状态:

  • Object.wait():使当前线程处于等待状态直到另一个线程调用notify唤醒它;
  • Thread.join():等待线程执行完毕,底层调用的是Object实例的wait()方法;
  • LockSupport.park():除非获得调用许可,否则禁用当前线程进行线程调用。

TIMED_WAITING 超时等待状态:超时等待状态,线程等待一个具体时间,时间到后悔被自动唤醒。调用如下方法会是线程进入超时等待状态:

  • Thread.sleep(long millis):使当前线程睡眠指定时间;
  • Object.wait(long timeout):线程休眠指定时间,等待期间可以通过notify()/notifyAll()唤醒;
  • Thread.join(long millis):等待当前线程最多执行millis毫秒,如果millis为0,则会一直执行;
  • LockSupport.parkNanos(long nanos): 除非获得调用许可,否则禁用当前线程进行线程调度指定时间;
  • LockSupport.parkUntil(long deadline):同上,也是禁止线程进行调度指定时间;

TERMINATED 终止态:终止状态,此时线程已执行完毕。
状态转换
1. BLOCKED与RUNNABLE状态转换
处于BLOCKED状态的线程是因为在等待锁的释放,当获得锁之后就转换为RUNNABLE状态。

  1. WAITING状态与RUNNABLE状态转换

Object.wait()Thread.join()LockSupport.park()这3个方法可以使线程从RUNNABLE状态转为WAITING状态。

  1. TIMED_WAITING与RUNNABLE状态转换

TIMED_WAITING与WAITING状态类似,只是TIMED_WAITING状态等待的时间是指定的。调用Thread.sleep(long)Object.wait(long)Thread.join(long)会使得RUNNABLE状态转换为TIMED_WAITING状态

wait()、join()、sleep()方法有什么作用

Object.wait()方法是什么?

调用wait()方法前线程必须持有对象Object锁。线程调用wait()方法后,会释放当前的Object锁,进入锁的monitor对象的等待队列,知道有其他线程调用notify()/notifyAll()方法唤醒等待锁的线程。
需要注意的是,其他线程调用notify()方法只会唤醒单个等待锁的线程,也不一定马上把时间片分给刚才放弃锁的那个线程,具体看系统调度。

Thread.join()方法是什么?

join()方法是Thread类的一个实例方法。它的作用是让当前线程陷入“等待”状态。等join的这个线程threadA执行完成后,再继续执行当前线程。实现原理是join()方法本身是一个sychronized修饰的方法,也就是调用join()这个方法需要先获取threadA的锁,获得锁之后再调用wait()方法来进行等待,一直到threadA执行完成后,threadA会调用notify_all()方法,唤醒所有等待的线程,当前线程才会结束等待。
join()源码如下:

  1. public final void join() throws InterruptedException {
  2. join(0);//0的话代表没有超时时间一直等下去
  3. }
  4. public final synchronized void join(long millis)
  5. throws InterruptedException {
  6. long base = System.currentTimeMillis();
  7. long now = 0;
  8. if (millis < 0) {
  9. throw new IllegalArgumentException("timeout value is negative");
  10. }
  11. if (millis == 0) {
  12. while (isAlive()) {
  13. wait(0);
  14. }
  15. } else {
  16. while (isAlive()) {
  17. long delay = millis - now;
  18. if (delay <= 0) {
  19. break;
  20. }
  21. wait(delay);
  22. now = System.currentTimeMillis() - base;
  23. }
  24. }
  25. }

sleep()方法是什么

sleep方法是thread类的一个静态方法,它的作用是让当前线程休眠一段时间。sleep方法不会释放当前线程持有的锁,而wait方法会。
sleep与wait区别:

  • wait可以指定时间,也可以不指定;而sleep必须指定时间。
  • wait释放CPU资源,同时释放锁;sleep释放CPU资源,不会释放锁,所以易死锁。(调用join方法也不会释放锁)
  • wait必须放在同步块或同步方法中,而sleep可以在任意位置。

    Thread.sleep(),Object.wait(),LockSupport.part()有什么区别?

  1. 这三个方法都会让线程挂起,释放CPU时间片,进入到阻塞态。但是Object.wait()需要释放锁,所以必须早synchronized同步锁中使用,同理配套的Object.notify()也是。而Thread.sleep(),LockSupport.part()方法不需要在synchronized中使用,并且调用时也不会释放锁。
  2. 由于Thread.sleep()没有对应的唤醒线程的方法,所以必须指定超时时间,超过时间后,线程恢复。所以调用Thread.sleep()后的线程一般是出于TIME_WAITING状态,而调用了Object.wait(),LockSupport.park()的方法是进入到WAITING状态。
  3. Object.wait()对应的唤醒方法为Object.notify(),LockSupport.park()对应的唤醒方法为LockSupport.unpark()。
  4. 在代码中必须能保证wait方法比notify方法先执行,如果notify方法比wait方法早执行的话,就会导致因wait方法进入休眠的线程接收不到唤醒通知的问题。而park、unpark则不会有这个问题,我们可以先调用unpark方法释放一个许可证,这样后面线程调用park方法时,发现已经许可证了,就可以直接获取许可证而不用进入休眠状态了。(LockSupport.park() 的实现原理是通过二元信号量做的阻塞,要注意的是,这个信号量最多只能加到1,也就是无论执行多少次unpark()方法,也最多只会有一个许可证。
  5. 三种方法让线程进入阻塞态后,都可以响应中断,也就是调用Thread.interrupt()方法会设置中断标志位,之前执行Thread.sleep(),Object.wait()了的线程会抛出InterruptedException异常,然后需要代码进行处理。而调用了park()方法的线程在响应中断只会相当于一次正常的唤醒操作(等价于调用unpark()方法),让线程唤醒,继续执行后面的代码,不会抛出InterruptedException异常。

JAVA - 图5

怎么实现一个生产者消费者?

  1. 使用Object.wait()和Object.notify()实现

使用一个queue作为一个队列,存放数据,并且使用Synchronized同步锁,每次只能同时存在一个线程来生产或者消费数据,生成线程发现队列容量大于10,生产者线程进入waiting状态,,一旦成功往队列添加数据,那么就唤醒所有线程(主要是消费者线程起来消费)。消费线程消费时,当队列容量等于0,也会主动1进入waiting状态。
伪代码如下:

  1. LinkedList<Integer> queue = new LinkedList<>();
  2. void produce(Integer value) {
  3. synchronized(queue) {//加锁控制,保证同一时间点,只能有一个线程生成或者消费
  4. while(queue.size()>10) {
  5. queue.waiting();
  6. }
  7. queue.add(value);
  8. //唤醒消费者线程
  9. queue.notifyAll();
  10. }
  11. }
  12. Integer consumer() {
  13. synchronized(queue) {//加锁控制,保证同一时间点,只能有一个线程生成或者消费
  14. while(queue.size()==0) {
  15. queue.waiting();
  16. }
  17. Integer value = queue.poll();
  18. //唤醒生产者线程
  19. queue.notifyAll();
  20. return value;
  21. }
  22. }

完整代码如下:

  1. public static void main(String[] args) {
  2. Queue<Integer> queue = new LinkedList<>();
  3. final Customer customer = new Customer(queue);
  4. final Producer producer = new Producer(queue);
  5. ExecutorService pool = Executors.newCachedThreadPool();
  6. for (int i = 0; i < 1000; i++) {
  7. pool.execute(new Runnable() {
  8. @Override
  9. public void run() {
  10. try {
  11. Thread.sleep(1000);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. Integer a = customer.removeObject();
  16. System.out.println("消费了数据 "+a);
  17. }
  18. });
  19. pool.execute(new Runnable() {
  20. @Override
  21. public void run() {
  22. try {
  23. Thread.sleep(1000);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. Random random = new Random();
  28. Integer a = random.nextInt(1000);
  29. System.out.println("生成了数据 "+a);
  30. producer.addObject(a);
  31. }
  32. });
  33. }
  34. }
  35. private static class Customer {
  36. Queue<Integer> queue;
  37. Customer(Queue<Integer> queue) { this.queue = queue; }
  38. public Integer removeObject() {
  39. synchronized (queue) {
  40. try {
  41. while (queue.size()==0) {
  42. System.out.println("队列中没有元素了,进行等待");
  43. queue.wait();
  44. }
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. Integer number = queue.poll();
  49. System.out.println("唤醒所有生产线程,当前queue大小是" + queue.size());
  50. queue.notifyAll();
  51. return number;
  52. }
  53. }
  54. }
  55. private static class Producer {
  56. Queue<Integer> queue;
  57. Producer(Queue<Integer> queue) { this.queue = queue; }
  58. public void addObject(Integer number) {
  59. synchronized (queue) {
  60. try {
  61. while (queue.size()>10) {
  62. queue.wait();
  63. }
  64. } catch (InterruptedException e) {
  65. e.printStackTrace();
  66. }
  67. queue.add(number);
  68. queue.notifyAll();
  69. System.out.println("唤醒所有消费线程,当前queue大小是"+queue.size());
  70. }
  71. }
  72. }
  1. 使用Lock和Condition来实现

调用Object.wait()方法可以让线程进入等待状态,被添加到Object的Monitor监视器的等待队列,Object.notifyAll()可以唤醒monitor监视器等待队列中所有的线程。而调用lock的newCondition()方法,可以返回一个ConditionObject实例对象,每个ConditionObject包含一个链表,存储等待队列。可以认为一个ReentrantLock有一个同步队列(存放没有获得锁的线程)和多个等待队列(存放调用wait()方法的线程)。使用Condition.singal()和Condition.singalAll()可以更加精准的唤醒线程,唤醒的都是这个Condition对应的等待队列里面的线程,而Object.notify()和Object.notifyAll()只能唤醒等待队列中的所有的线程。

  1. ReentrantLock lock = new ReentrantLock();
  2. Condition customerQueue = lock.newCondition();

ReentrantLock的Condition相关的实现
JAVA - 图6

  1. abstract static class Sync extends AbstractQueuedSynchronizer {
  2. final ConditionObject newCondition() {
  3. return new ConditionObject();
  4. }
  5. }
  6. //AQS内部类 ConditionObject
  7. public class ConditionObject implements Condition, java.io.Serializable {
  8. private static final long serialVersionUID = 1173984872572414699L;
  9. //链表头结点
  10. private transient Node firstWaiter;
  11. //链表尾结点
  12. private transient Node lastWaiter;
  13. //真正的创建Condition对象
  14. public ConditionObject() { }
  15. }

消费者-生产者实现

  1. public static void main(String[] args) {
  2. ReentrantLock lock = new ReentrantLock();
  3. Condition customerQueue = lock.newCondition();
  4. Condition producerQueue = lock.newCondition();
  5. Queue<Integer> queue = new LinkedList<>();
  6. final Customer customer = new Customer(lock,customerQueue, producerQueue,queue);
  7. final Producer producer = new Producer(lock,customerQueue, producerQueue,queue);
  8. ExecutorService pool = Executors.newCachedThreadPool();
  9. for (int i = 0; i < 1000; i++) {
  10. pool.execute(new Runnable() {
  11. @Override
  12. public void run() {
  13. try {
  14. Thread.sleep(1000);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. Integer a = customer.take();
  19. // System.out.println("消费了数据 "+a);
  20. }
  21. });
  22. pool.execute(new Runnable() {
  23. @Override
  24. public void run() {
  25. try {
  26. Thread.sleep(1000);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. Random random = new Random();
  31. Integer a = random.nextInt(1000);
  32. // System.out.println("生成了数据 "+a);
  33. producer.add(a);
  34. }
  35. });
  36. }
  37. }
  38. private static class Customer {
  39. private ReentrantLock lock;
  40. private Condition customer;
  41. private Condition producer;
  42. private Queue<Integer> queue;
  43. Customer(ReentrantLock lock, Condition customer, Condition producer,Queue<Integer> queue) {
  44. this.lock = lock;
  45. this.customer = customer;
  46. this.producer = producer;
  47. this.queue = queue;
  48. }
  49. public Integer take() {
  50. lock.lock();
  51. Integer element = null;
  52. try {
  53. while (queue.size() == 0) {
  54. customer.await();
  55. }
  56. element = queue.poll();
  57. System.out.println("消费者线程取出来元素"+element);
  58. producer.signalAll();
  59. } catch (InterruptedException e) {
  60. e.printStackTrace();
  61. } finally {
  62. lock.unlock();
  63. }
  64. return element;
  65. }
  66. }
  67. private static class Producer {
  68. private ReentrantLock lock;
  69. private Condition customer;
  70. private Condition producer;
  71. private Queue<Integer> queue;
  72. Producer(ReentrantLock lock, Condition customer, Condition producer,Queue<Integer> queue) {
  73. this.lock = lock;
  74. this.customer = customer;
  75. this.producer = producer;
  76. this.queue = queue;
  77. }
  78. public void add( Integer element) {
  79. lock.lock();
  80. try {
  81. while (queue.size() > 10) {
  82. producer.await();
  83. }
  84. queue.add(element);
  85. System.out.println("生成和线程添加元素"+element);
  86. customer.signalAll();
  87. } catch (InterruptedException e) {
  88. e.printStackTrace();
  89. } finally {
  90. lock.unlock();
  91. }
  92. }
  93. }
  1. 使用阻塞队列BlockingQueue实现

利用阻塞队列BlockingQueue的特征进行生产和消费的同步(其实阻塞队列内部也是基于Lock,condition实现的 )

  1. public class BlockQueueRepository<T> extends AbstractRepository<T> implements Repository<T> {
  2. public BlockQueueRepository() {
  3. products = new LinkedBlockingQueue<>(cap);
  4. }
  5. @Override
  6. public void put(T t) {
  7. if (isFull()) {
  8. log.info("repository is full, waiting for consume.....");
  9. }
  10. try {
  11. //如果队列长度已满,那么会阻塞等待
  12. ((BlockingQueue) products).put(t);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. @Override
  18. public T take() {
  19. T product = null;
  20. if (isEmpty()) {
  21. log.info("repository is empty, waiting for produce.....");
  22. }
  23. try {
  24. //如果队列元素为空,那么也会阻塞等待
  25. product = (T) ((BlockingQueue) products).take();
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. return product;
  30. }
  31. }

线程池理解

线程池作用
  • 提高响应速度
  • 减少资源占用
  • 控制并发数,可以通过设置线程池的最大线程数量来控制最大并发数,如果每次都是创建新线程,来了大量的请求,可能会因为创建的线程过多,造成内存溢出。
  • 方便管理线程资源
    线程池中的参数
    1. /**
    2. * Creates a new {@code ThreadPoolExecutor} with the given initial
    3. * parameters and default thread factory.
    4. *
    5. * @param corePoolSize the number of threads to keep in the pool, even
    6. * if they are idle, unless {@code allowCoreThreadTimeOut} is set
    7. * @param maximumPoolSize the maximum number of threads to allow in the
    8. * pool
    9. * @param keepAliveTime when the number of threads is greater than
    10. * the core, this is the maximum time that excess idle threads
    11. * will wait for new tasks before terminating.
    12. * @param unit the time unit for the {@code keepAliveTime} argument
    13. * @param workQueue the queue to use for holding tasks before they are
    14. * executed. This queue will hold only the {@code Runnable}
    15. * tasks submitted by the {@code execute} method.
    16. * @param handler the handler to use when execution is blocked
    17. * because the thread bounds and queue capacities are reached
    18. * @throws IllegalArgumentException if one of the following holds:<br>
    19. * {@code corePoolSize < 0}<br>
    20. * {@code keepAliveTime < 0}<br>
    21. * {@code maximumPoolSize <= 0}<br>
    22. * {@code maximumPoolSize < corePoolSize}
    23. * @throws NullPointerException if {@code workQueue}
    24. * or {@code handler} is null
    25. */
    26. public ThreadPoolExecutor(int corePoolSize,
    27. int maximumPoolSize,
    28. long keepAliveTime,
    29. TimeUnit unit,
    30. BlockingQueue<Runnable> workQueue,
    31. RejectedExecutionHandler handler) {
    32. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    33. Executors.defaultThreadFactory(), handler);
    34. }
    从上面代码中可知线程池中的参数有corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、handler六个参数
  1. corePoolSize 核心线程数

该线程池中核心线程数最大值,添加任务时,即便有空闲线程,只要当前线程池数小于corePoolSize,都是会新建线程来执行这个任务。并且核心线程空闲时间超过keepAliveTime也不会被回收。(从阻塞队列取任务时,如果阻塞队列为空:核心线程的会一直卡在workQueue.take方法,被阻塞并挂起,不会占用CPU资源,非核心线程会调用workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)方法取任务,如果超过keepAliveTime时间后还没有拿到,下一次循环判断compareAndDecrementWorkerCount就会返回null,Worker对象的run()方法循环体的判断为null,任务结束,然后线程被系统回收)。

  1. maximumPoolSize 最大线程数

该线程池中线程总数最大值,一般用于当线程池中线程都在执行任务,并且等待队列满时,如果当前线程数小于maximumPoolSize,可以创建一个新线程来执行任务。maximumPoolSize一般也可以用来现在最大线程并发执行量。

  1. workerQueue 等待队列

等待队列,一般是抽象类BlockingQueue的子类。

ArrayBlockingQueue 有界队列,一种使用数组实现的先进先出的有界阻塞队列。支持公平锁和非公平锁,底层数据结构是数组,需要指定队列大小。
LinkedBlockingQueue 无界队列,一种使用链表实现的先进先出的有界阻塞队列。默认的容量是Interge.MAX_VALUE,相比于ArrayBlockingQueue具有更高的吞吐量,但是却失去了快速随机存取的特性。默认大小是Interge.MAX_VALUE,也可以指定大小。newFixedThreadPool()和newSingleThreadExecutor()的等待队列都是这个阻塞队列。
LinkedBlockingDeque 一种使用链表实现的具有双向存取功能的有界阻塞队列。在高并发下,相比于LinkedBlockingQueue可以将锁竞争降低最多一半
PriorityBlockingQueue 一种提供了优先级排序的无解阻塞队列。如果没有提供具体的排列方法,那么将会使用自然排序进行,会抛出OOM异常。
SynchronousQueue SynchronousQueue一种不存储任务的同步队列,内部没有使用AQS。如果是公平锁模式每次调用put方法往队列中添加一个线程后,线程先进行自旋,然后超时后就阻塞等待直到有提取线程调用take方法把操作取出,这样之前调用put方法的线程才能继续执行。如果是非公平锁模式,每次添加任务就是把任务添加到栈中,这样就是先进后出,所以非公平
LinkedTransferQueue 一种使用链表实现的无解阻塞队列
DelayQueue 一种无界的延时队列,可以设置每个元素需要等待多久才能从队列中取出。延时队列,该队列中的元素只有当其指定的延时时间到了,才能够从队列中获取到该元素。底层数据结构是数组实现的
  1. keepAliveTime 非核心线程限制超时时长

非核心线程如果处于闲置状态超过该值,就会被销毁。如果设置allowCoreThreadTimeOut(true),则会也作用于核心线程。

  1. RejectedExecutionHandler 拒绝处理策略

拒绝处理策略,核心线程已经满了,等待队列也满了,并且线程数量大于最大线程数时,就会采用拒绝处理策略进行处理。
四种拒绝策略为:

  • ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常;
  • ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常;
  • ThreadPoolExecutorDiscardOldestPolicy:丢弃等待队列头部(最旧的)任务,然后重新尝试执行程序,将任务添加到队列中(如果再次失败,重试此过程);
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
    线程池执行任务的过程?
    JAVA - 图7
    Executors提供的四种线程池的使用场景
    Executors提供的四种线程池,分别为:
  1. newFixedThreadPool 定长线程池
    1. /**
    2. * 创建一个线程池,该线程池重用固定数量的线程*操作一个共享的无界队列。在任何时候,最多
    3. * {@code nThreads}线程将是正在处理任务的活动线程。如果在所有线程都处于活动状态时提交额外的任 * 务,那么它们将在队列中等待,直到有一个线程可用。如果任何线程在执行过程中由于失败而终止
    4. * 在关闭之前,如果需要*执行后续任务,一个新的线程将取代它。线程池中的线程将存在
    5. * 直到显式地{@link ExecutorService#shutdown shutdown}。
    6. *
    7. * @param nThreads the number of threads in the pool
    8. * @return the newly created thread pool
    9. * @throws IllegalArgumentException if {@code nThreads <= 0}
    10. */
    11. public static ExecutorService newFixedThreadPool(int nThreads) {
    12. return new ThreadPoolExecutor(nThreads, nThreads,
    13. 0L, TimeUnit.MILLISECONDS,
    14. new LinkedBlockingQueue<Runnable>());
    15. }
    一句话总结:线程数固定等待队列无限长。创建一个线程池,核心线程数与最大线程数都是传入参数nThreads。可控制线程最大并发数,超出的线程会在队列中等待(比较适合需要控制并发量的情况)。主要通过将核心线程数设置为与最大线程数相等实现的。缺点是LinkedBlockingQueue队列的默认长度是Interge.MAX_VALUE,存在内存溢出的风险。
    与CacheThreadPool区别:
  • 因为corePoolSize == maximumPoolSize,所以newFixedThreadPool只会创建核心线程。而CachedThreadPool因为corePoolSize = 0,所以只会创建非核心线程。
  • 在getTask()方法,如果队列里没有任务可取,线程会一直阻塞在LinkedBlockingQueue.take(),线程不会被回收。CacheThreadPool的线程会在60s后被回收。
  • 由于线程不会被回收,会一直卡在阻塞,所以在没有任务的情况下,FixedThreadPool占用资源更多。
  • 都几乎不会触发拒绝策略,但是原理不同。FixedThreadPool是因为阻塞队列可以很大(最大为Integer最大值),故几乎不会触发拒绝策略;CacheThreadPool是因为线程池很大(最大为Integer最大值),几乎不会导致线程数量大于最大线程数,故几乎不会触发拒绝策略。
  1. newSingleThreadExecutor 单线程池

    1. /**
    2. * Creates an Executor that uses a single worker thread operating
    3. * off an unbounded queue. (Note however that if this single
    4. * thread terminates due to a failure during execution prior to
    5. * shutdown, a new one will take its place if needed to execute
    6. * subsequent tasks.) Tasks are guaranteed to execute
    7. * sequentially, and no more than one task will be active at any
    8. * given time. Unlike the otherwise equivalent
    9. * {@code newFixedThreadPool(1)} the returned executor is
    10. * guaranteed not to be reconfigurable to use additional threads.
    11. *
    12. * @return the newly created single-threaded Executor
    13. */
    14. public static ExecutorService newSingleThreadExecutor() {
    15. return new FinalizableDelegatedExecutorService
    16. (new ThreadPoolExecutor(1, 1,
    17. 0L, TimeUnit.MILLISECONDS,
    18. new LinkedBlockingQueue<Runnable>()));
    19. }

    单线程池,等待队列无限长。创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO,LIFO,优先级)执行。主要通过将核心线程数和最大线程数都设置为1来实现。

  2. newCacheThreadPool 可缓存线程池

    1. /**
    2. * Creates a thread pool that creates new threads as needed, but
    3. * will reuse previously constructed threads when they are
    4. * available. These pools will typically improve the performance
    5. * of programs that execute many short-lived asynchronous tasks.
    6. * Calls to {@code execute} will reuse previously constructed
    7. * threads if available. If no existing thread is available, a new
    8. * thread will be created and added to the pool. Threads that have
    9. * not been used for sixty seconds are terminated and removed from
    10. * the cache. Thus, a pool that remains idle for long enough will
    11. * not consume any resources. Note that pools with similar
    12. * properties but different details (for example, timeout parameters)
    13. * may be created using {@link ThreadPoolExecutor} constructors.
    14. *
    15. * @return the newly created thread pool
    16. */
    17. public static ExecutorService newCachedThreadPool() {
    18. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    19. 60L, TimeUnit.SECONDS,
    20. new SynchronousQueue<Runnable>());
    21. }

    创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程。若无可回收,则新建线程。但是由于最大线程数设置的是Integer.MAX_VALUE,存在内存溢出的风险。最大线程数无限大,线程超时被回收
    CacheThreadPool运行流程如下:

  3. 提交任务进线程池。

  4. 因corePoolSize为0,所以不创建核心线程,线程池最大为Integer.MAX_VALUE。
  5. 尝试将任务添加到SynchronousQueue队列。(需要注意SynchronousQueue本身不存储任务,只是将添加任务的线程加入一个栈中,进行阻塞等待,然后线程池中的线程空闲时,会从栈中取出线程,取出线程携带的任务,进行执行)。
  6. 如果SynchronousQueue入列成功,等待被当前运行的线程空闲后拉取执行。如果当前没有空闲线程,那么就创建一个非核心线程,然后从SynchronousQueue拉取任务并在当前线程执行。
  7. 如果SynchronousQueue已有任务在等待,入列操作将会阻塞。

当需要执行很多段时间的任务,CacheThreadPool的线程复用率比较高,会显著提升性能。而且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool并不会占用很多资源。

  1. newScheduleThreadPool 定时执行线程池
    1. /**
    2. * Creates a new {@code ScheduledThreadPoolExecutor} with the
    3. * given initial parameters.
    4. *
    5. * @param corePoolSize the number of threads to keep in the pool, even
    6. * if they are idle, unless {@code allowCoreThreadTimeOut} is set
    7. * @param threadFactory the factory to use when the executor
    8. * creates a new thread
    9. * @throws IllegalArgumentException if {@code corePoolSize < 0}
    10. * @throws NullPointerException if {@code threadFactory} is null
    11. */
    12. public ScheduledThreadPoolExecutor(int corePoolSize,
    13. ThreadFactory threadFactory) {
    14. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    15. new DelayedWorkQueue(), threadFactory);
    16. }
    创建一个定时执行的线程池,主要是通过DelayedWorkQueue来实现(该队列中的元素只有当其指定的延时时间到了,才能够从队列中获取到该元素)。支持定时及周期性任务执行。但是由于最大线程数设置的是Integer.MAX_VALUE,存在内存溢出风险。线程数无限大,定时执行。
    为什么不建议使用Executors四种线程池?主要是newFixedThreadPool和newSingleThreadExecutor的等待队列是LinkedBlockingQueue,长度是Integer.MAX_VALUE,可以认为是无限大的,如果创建的任务特别多,可能会造成内存溢出。而newCacheThreadPool和newScheduleThreadPool的最大线程数是Integer.MAX_VALUE,如果创建的任务过多,可能会导致创建的线程过多,从而导致内存溢出。
    线程池的状态
    线程池生命周期:
  • RUNNING:表示线程池处于运行状态,这时候线程池可以接受和处理任务,值是-1;
  • SHUTDOWN:表示线程池不接受新任务,但仍然可以处理队列中的任务,二进制值是0。调用showdown()方法会进入到SHUTDOWN状态。
  • STOP:表示线程池不接受新任务,也不处理队列中的任务,同时中断正在执行任务的线程,值是1。调用showdownNow()方法会进入到STOP状态。
  • TIDYING:表示所有的任务都已经终止,并且工作线程的数量为0。值是2。SHUTDOWN和STOP状态的线程池任务执行完了,工作线程也为0了就会进入到TIDYING状态。
  • TERMINATED:表示线程池处于终止状态。值是3

JAVA - 图8

怎么根据业务场景确定线程池的参数corePoolSize和maximumPoolSize?

方法一 计算密集型任务

因为是计算密集型任务,可以理解为每个任务在执行期间基本没有IO操作,全部都在CPU时间片中执行。所以可以理解为CPU就是满载的,CPU利用率就是100%,其实线程数等于CPU数就可以的,但是由于需要考虑到计算密集型的线程恰好在某时因为发生一个页错误或者因其他原因而暂停,此时应该需要有一个“额外”的空闲线程来获得时间片,然后执行,可以确保在这种情况下CPU周期不会中断工作,充分利用CPU。

  1. 最佳线程数=CPU的数量+1

方法二 IO密集型任务

任务在执行时,需要进行一些IO操作,所以为了充分利用CPU,应该在线程进行IO操作时,就让出时间片,CPU进行上下文切换,执行其他线程的任务,保证CPU利用率的100%。
如果任务有50%的时间需要CPU执行状态,其他时间进行IO操作,则程序所需线程数为CPU数量的1除以0.5,也就是2倍。如果任务有20%的时时间需要CPU执行,其他时间需要进行IO操作,最佳线程数也就是1除以0.2,也就是CPU数的5倍 所以公式为

  1. 最佳线程数 = CPU数量/(每个任务中需要CPU执行的时间的比例)
  2. = CPU数量/(CPU运行时间/任务执行总时间)=CPU数量/(CPU运行时间/(CPU运行时间+IO操作时间))
  3. 所以最终公式为
  4. 最佳线程数/CPU数量 = CPU运行时间/(CPU运行时间+IO操作时间)

方法三 动态化线程池
  1. 追求响应时间的任务

一种是追求响应时间的任务,例如使用线程池对发起多个网络请求,然后对结果进行计算。 这种任务的最大线程数需要设置大一点,然后队列使用同步队列,队列中不缓存任务,任务来了就会被执行。判断线程池资源不够用时,一般是发现活跃线程数/最大线程数>阀值(默认是0.8)时,或者是线程池抛出的RejectedExecut异常次数达到阀值,就会进行告警。然后程序员收到告警后,动态发送修改核心线程数,最大线程数,队列相关的指令,服务器进行动态修改。

  1. 追求高吞吐量的任务

假设说需要定期自动化生成一些报表,不需要考虑响应时间,只是希望如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先的问题。 这种就是使用有界队列,对任务进行缓存,然后线程进行并发执行。判断线程池资源不够用时,一般是发现等待队列中的任务数量/等待队列的长度>阀值(默认是0.8)时,或者是线程池抛出的RejectedExecut异常次数达到阀值,就会进行告警。然后程序员收到告警后,动态发送修改核心线程数,最大线程数,队列相关的指令,服务器进行动态修改。
ThreadPoolExecutor提供了如下几个public的setter方法
JAVA - 图9
调用corePoolSize方法之后,线程池会直接覆盖原来的corePoolSize值,并且基于当前值和原始值的比较结果采取不同的处理策略。(总得来说就是,多退少补的策略)
对于新corePoolSize 小于 当前工作线程数的情况:
说明有多余的worker线程,此时会向当前idle状态的worker线程发起中断请求以实现回收,多余的worker在下次idel的时候也会被回收。
对于新corePoolSize 大于 当前工作线程数的情况:
如果当前队列中有待执行任务,则线程池会创建新的worker线程来执行队列任务。
setCorePoolSize的方法的执行流程入下图所示:
JAVA - 图10

ThreadLocal是什么?如何避免内存泄漏

ThreadLocal表示 线程本地存储,对于代码中的一个变量,每个线程都拥有这个变量的一个副本,访问和修改它时都是对副本进行操作。

使用场景

ThreadLocal 适用于每个线程需要自己独立的实例且该实例需要在多个方法中被使用,也即变量在线程间隔离而在方法或类间共享的场景。(例如:方法直接调用时传递的变量过多,为了代码简洁性,可以使用ThreadLocal,在前一个方法中,将变量进行存储,后一个方法中取,进行使用。)

  1. public class A {
  2. // 每个线程本地副本初始化
  3. private static ThreadLocal <UserData> threadLocal = new ThreadLocal <>(). withInitial (() -> new UserData ());
  4. public static void setUser (UserLogin user){
  5. if (user == null )
  6. return ;
  7. UserData userData = threadLocal.get();
  8. userData. setUserLogin (user);
  9. }
  10. public static UserLogin getUser (){
  11. return threadLocal.get(). getUserLogin ();
  12. }
  13. }

实现原理

每个Thread有一个ThreadLocalMap,类似HashMap,当调用ThreadLocal#set()方法进行存值时,实际上是先获取到当前的线程,然后获取线程的map,是一个ThreadLocalMap类型,然后会在这个map中添加一个新的键值对,key就是我们ThreadLocalMap变量的地址,value就是我们存的值。ThreadLocalMap与HashMap不同的是,解决HashMap使用的是开放定址法,也就是当发现hashCode计算得到数组下标已经存储了元素后,会继续往后找,直到找到一个空的数组下标,存储键值对。

  1. /**
  2. * Sets the current thread's copy of this thread-local variable
  3. * to the specified value. Most subclasses will have no need to
  4. * override this method, relying solely on the {@link #initialValue}
  5. * method to set the values of thread-locals.
  6. * ThreadLocal实例的复制方法
  7. * @param value the value to be stored in the current thread's copy of
  8. * this thread-local.
  9. */
  10. public void set(T value) {
  11. //获取当前线程
  12. Thread t = Thread.currentThread();
  13. //获取线程对应的map
  14. ThreadLocalMap map = getMap(t);
  15. //将值存入线程特有的Map中
  16. if (map != null)
  17. //key 为this就是当前ThreadLocal引用变量的地址,value就是存储的值
  18. map.set(this, value);
  19. else
  20. createMap(t, value);
  21. }
  22. /**
  23. * Get the map associated with a ThreadLocal. Overridden in
  24. * InheritableThreadLocal.
  25. *
  26. * @param t the current thread
  27. * @return the map
  28. */
  29. ThreadLocalMap getMap(Thread t) {
  30. //线程的threadLocals实例变量就是Map
  31. return t.threadLocals;
  32. }
  33. /**
  34. * Create the map associated with a ThreadLocal. Overridden in
  35. * InheritableThreadLocal.
  36. *
  37. * @param t the current thread
  38. * @param firstValue value for the initial entry of the map
  39. */
  40. void createMap(Thread t, T firstValue) {
  41. t.threadLocals = new ThreadLocalMap(this, firstValue);
  42. }

ThreadLocal 中的Entry的key使用了弱引用,为什么使用弱引用?

JAVA - 图11
首先上面类A的代码中,类A中有一个ThreadLocal类型的变量
它们的引用链如下:

ThreadLocal变量所在的类的实例(代码中A的实例)->ThreadLocal 执行代码的线程->线程独有的ThreadLocalMap->引用的key就是ThreadLocal

可以看到ThreadLocal变量不仅被所在的类A的实例所引用,还被执行的线程所引用,

  1. 如果使用强引用,也就是线程对ThreadLocal变量是强引用,那么类A的实例即便已经不被其他变量所引用了,不会被访问到了,需要被回收了,类A被回收了,但是ThreadLocal变量由于有被执行线程所引用,只要线程还在,ThreadLocal变量也不能被被JVM回收,所以会造成内存泄露。
  2. 如果使用弱引用,不会影响key的回收,也就是不会影响引用了ThreadLocal的实例对象的回收。

但是即便使用弱引用,ThreadLocalMap对value的引用是强引用(一边value是局部变量,也不能用弱引用,那样在用到的时候就会被),但是value依然不会被回收,会造成内存泄露。
通常来说,value回收的时机有两个:
1.我们在用完ThreadLocal后,应该遵循规范手动调用ThreadLocal#remove()对键值对value释放,这样可以使value被回收。
2.此线程在其他对象中使用ThreadLocal对线程ThreadLocalMap进行set()和get()时,由于需要进行开放定址法进行探测,会对沿途过期的键值对(就是key为null的键值对)进行清除。以及set()方法触发的cleanSomeSlots()方法对过期键值对进行清除。

锁相关

Sychronize实现原理是怎么样的?

对于synchronized关键字而言,在javac编译时,会生成对应的monitorenter和monitorexit指令分别对应synchronized同步块的进入和退出,有两个指令的原因是为了保证抛异常的情况下也能释放锁,所以javac为同步代码块添加了一个隐式的try-finally,在finally中会调用monitorexit命令释放锁。

而对于synchronized方法而言,javac为其生成一个ACC_SYNCHRONIZED关键字,在JVM进行方法调用时,发现调用的方法被ACC_SYNCHRONIZED修饰,则会先尝试获得锁。
synchronized锁执行流程图:
JAVA - 图12

Java对象的内存布局,由对象头+实例数据+对其填充三部分组成,而对象头主要包括Mark Word + 指向对象所属的类指针组成。Mark Word 主要用于存储对象自身的运行时数据,哈希码,GC分代年龄,锁标志等。
JAVA - 图13
Mark word的数据映射表
JAVA - 图14

AbstractQueuedSynchronizer(AQS)是什么?

AQS就是AbstractQueuedSynchronizer,抽象队列同步器,是一个可以用于实现基于先进先出等待队列的锁和同步器的框架。实现锁ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等都是基于AQS的。

ReentrantLock有一个变量Sync,Sync父类是AbstractQueuedSynchronizer

  1. public class ReentrantLock implements Lock, java.io.Serializable {
  2. private static final long serialVersionUID = 7373984872572414699L;
  3. /** Synchronizer providing all implementation mechanics */
  4. private final Sync sync;
  5. }

ReentrantLock的非公平锁和公平锁区别在于非公平锁在CAS更新state失败后会调用tryAcquire()来判断是否需要进入同步队列,会再次判断state的值是否为0,为0会去CAS更新state值,更新成功就直接获得锁,否则就进入等待队列。(进入等待队列前会抢锁)
而公平锁首先会判断state是否为0,为0并且等待队列为空,才会去使用CAS操作抢占锁,抢占成功就获得锁,没有成功并且当前线程不是获得锁的线程,都会被加入到等待队列。

  1. /**
  2. * 公平锁
  3. * Sync object for fair locks
  4. */
  5. static final class FairSync extends Sync {
  6. private static final long serialVersionUID = -3000897897090466540L;
  7. final void lock() {
  8. acquire(1);
  9. }
  10. /**
  11. * Fair version of tryAcquire. Don't grant access unless
  12. * recursive call or no waiters or is first.
  13. */
  14. protected final boolean tryAcquire(int acquires) {
  15. final Thread current = Thread.currentThread();
  16. int c = getState();
  17. if (c == 0) {
  18. if (!hasQueuedPredecessors() &&
  19. compareAndSetState(0, acquires)) {
  20. setExclusiveOwnerThread(current);
  21. return true;
  22. }
  23. }
  24. else if (current == getExclusiveOwnerThread()) {
  25. int nextc = c + acquires;
  26. if (nextc < 0)
  27. throw new Error("Maximum lock count exceeded");
  28. setState(nextc);
  29. return true;
  30. }
  31. return false;
  32. }
  33. }

IO

如何实现零拷贝?

实现零拷贝,关键在于DMA技术,在DMA技术出现之前I/O过程如下:
IO过程.png

什么是DMA技术?

在进行I/O设备和内存的数据传输的时候,数据搬运工作全部交给DMA控制器,而CPU不再参与任何与数据搬运相关的事情,这样CPU就可以去处理别的事务。DMA控制器进行数据传输过程图如下:
DMA控制器数据传输过程.png

传统文件传输

传统的I/O工作方式是,数据读取和写入是从用户空间到内核来回复制,而内核空间的数量是通过操作系统层面的I/O接口从磁盘读取和写入。过程如下:
传统文件传输底层操作过程.png
从上图可以看到,共发生了4次用户态与内核态的上下文切换,因为发生了两次系统调用。一次是read(),一次是write(),每次系统调用都得先从用户态切换到内核态,等内核完成任务后,又从内核态切回用户态。
在上图过程中还发生了四次拷贝,两次DMA的拷贝,两次CPU拷贝:

  • 第一次,把磁盘上的文件通过DMA搬运将数据拷贝到操作系统内核的缓冲区;
  • 第二次,把内核缓冲区通过CPU拷贝将数据拷贝到用户的缓冲区;
  • 第三次,使用CPU拷贝将用户缓冲区的数据拷贝到内核的socket的缓冲区;
  • 第四次,把内核的socket缓冲区里的数据,使用DMA搬运,拷贝到网卡的缓冲区内。

要想提高文件的传输必须从下面两个方面入手:

  • 减少用户态与内核态的上下文切换;
  • 减少内存拷贝次数。
    如何优化文件传输的性能?
  1. 减少用户态与内核态的上下文切换次数

因为在读取磁盘的时候,发生上下文切换是因为用户空间没有权限操作磁盘和网卡,所以一次系统调用必然会发生两次上下文切换。

  1. 减少内存拷贝次数

传统的文件传输方式会用4次数据拷贝,在这里面,从内核的读缓冲区拷贝到用户的缓冲区,再从用户的缓冲区拷贝到socket的缓冲区里这个过程是不必要的。因为将数据拷贝到用户缓冲区并不会对数据进行再加工,所以用户的缓冲区是没必要存在的。

如何实现零拷贝?
  • mmap + write
  • sendfile

    mmap + write

    由于read()会将数据拷贝到用户缓冲区,为了减小这一开销,用mmap()替换read()操作

    1. buf = mmap(file, len); //mmap()系统调用函数会直接把内核缓冲区里的数据映射到用户空间,操作系统内核与用户空间就不需要在进行任何数据拷贝操作
    2. write(sockfd, buf, len);

    零拷贝mmap+write.png

    sendfile

    在linux内核版本2.1中,有一个专门发送文件的系统调用函数sendfile(),函数形式如下:

    1. #include <sys/socket.h>
    2. #前两个参数分别是目的端和源端的文件描述符,后面两个参数是源端的偏移量和复制数据的长度,返回值是实际复制数据长度。这个调用函数可以替代read()和#write()两个系统调用,这样就可以减少一次系统调用,同时也减少了2次上下文切换的开销
    3. ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

    这样,该系统调用可以直接把内核缓冲区里的数据拷贝到socket缓冲区里,不再拷贝到用户态,这样就只有2次上下文切换和3次数据拷贝,过程如下图:
    零拷贝sendfile.png
    上面两种方式都还不是真正的零拷贝技术,如果网卡支持SG-DMA(the scatter-gather direct memory access)技术(与普通的DMA有所不同),就可以进一步减少通过CPU把内核缓冲区里的数据拷贝到socket缓冲区的过程。

    1. #查看linux系统的网卡是否支持scatter-gather特性:
    2. $ ethtool -k eth0 | grep scatter-gather
    3. scatter-gather on

    从linux内核2.4版本开始起,对于支持网卡SG-DMA技术的情况下,sendfile()系统调用过程发生了变化,具体过程如下:

  • 第一步,通过DMA将磁盘上的数据拷贝到内核缓冲区;

  • 第二步,缓冲区描述符和数据长度传到socket缓冲区,这样网卡的SG-DMA控制器就可以直接将内核缓存中的数据拷贝到网卡的缓冲区中,此过程不需要将数据从操作西荣内核缓冲区拷贝到socket缓冲区,这样就减少了一次数据拷贝,所有在这个过程只进行了2次数据拷贝,如下图:

SG-DMA零拷贝.png
以上就是所谓的零拷贝技术,因为没有在内存层面去拷贝数据,也就是没有通过CPU来搬运数据,所有的数据都是通过DMA来进行传输的。
零拷贝技术的文件传输方式比传统的文件传输方式,减少了两次上下文切换和数据拷贝次数,只需要两次上下文切换和数据拷贝次数,就可以完成文件的传输,而且两次的数据拷贝过程,都不需要通过CPU,两次都是有DMA来搬运。
所以总体来看零拷贝技术可以把文件传输的性能提高至少一倍以上。

使用零拷贝技术的项目

kafka开源项目,使用了零拷贝技术,在kafka文件传输的代码,可以找到调用了Java NIO库里的transferTo方法

  1. @Override
  2. public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
  3. return fileChannel.transferTo(postion, count, socketChannel);
  4. }

JVM

类加载机制、双亲委派机制和双亲委派模型

类加载

JAVA - 图21
以上是类加载的过程,在java中类是指java源代码通过编译后的class文件。
类加载器有:

  • 系统级别:启动类加载器、扩展类加载器、系统类加载器
  • 用户级别:自定义类加载器(继承了ClassLoader)
  • 层级结构:

JAVA - 图22
在类加载器加载class的时候要遵循双亲委派模型

双亲委派机制

双亲委派机制:除了顶层的根加载器外,所有的加载器按照父子关系形成树形结构,每个加载器有且只有一个父加载器
JAVA - 图23
双亲委派机制执行过程,根据源码分析:

  1. protected Class loadClass(String class_name, boolean resolve) throws ClassNotFoundException {
  2. Class cl = null;
  3. /* First try: lookup hash table.
  4. */
  5. if((cl=(Class)classes.get(class_name)) == null) {
  6. /* Second try: Load system class using system class loader. You better
  7. * don't mess around with them.
  8. */
  9. for(int i=0; i < ignored_packages.length; i++) {
  10. if(class_name.startsWith(ignored_packages[i])) {
  11. cl = deferTo.loadClass(class_name);
  12. break;
  13. }
  14. }
  15. if(cl == null) {
  16. JavaClass clazz = null;
  17. /* Third try: Special request?
  18. */
  19. if(class_name.indexOf("$$BCEL$$") >= 0)
  20. clazz = createClass(class_name);
  21. else { // Fourth try: Load classes via repository
  22. if ((clazz = repository.loadClass(class_name)) != null) {
  23. clazz = modifyClass(clazz);
  24. }
  25. else
  26. throw new ClassNotFoundException(class_name);
  27. }
  28. if(clazz != null) {
  29. byte[] bytes = clazz.getBytes();
  30. cl = defineClass(class_name, bytes, 0, bytes.length);
  31. } else // Fourth try: Use default class loader
  32. cl = Class.forName(class_name);
  33. }
  34. if(resolve)
  35. resolveClass(cl);
  36. }
  37. classes.put(class_name, cl);
  38. return cl;
  39. }

Java反射机制

反射机制概念:是指在程序运行过程中,对任意一个类都能获取其所有的属性和方法,并且对任意一个方法。这种动态获取类和对象的信息,以及动态调用对象的方法的功能被称为Java语言的反射机制。

反射的应用

Java中的对象有两种类型:编译时类型和运行时类型;

  • 编译时类型:指在声明对象时所采用的类型;
  • 运行时类型:指为对象赋值时所采用的类型;

    Java的反射API

    Java的反射API主要在运行过程中动态的生成类、接口或对象。常用的API,如下:

  • class类:用于获取类的属性、方法等信息;

  • field类:表示类的成员变量,用于获取和设置类中的属性值;
  • method类:表示类的的方法,用于获取方法的描述信息或执行某个方法;
  • constructor类:表示类的构造方法。

    反射的步骤

  1. 获取想要操作类的class对象,该class对象时反射的核心,通过它可以调用类的任意方法;
  2. 调用class对象所对应的类中定义的方法,这是反射的使用阶段;
  3. 使用反射API来获取并调用类的属性和方法等信息;

获取class对象的三种方法:

  • 调用某个对象的getClass方法以获取该类对应的对象;

    1. Person p = new Person();
    2. Class class = p.getClass();
  • 调用某个类的class属性以获取该类对应的class对象;

    1. Class class = Person.getClass();
  • 调用class类中的forName静态方法以获取该类对应的class对象,这是最安全、性能也最好的方法;

    1. Class class = Class.forName("fullClassPath"); //fullClassPath为类的包路径及名称

    创建对象的两种方式

  • 使用class对象的newInstance方法创建该class对象对应类的实例,这种方法要求class对象对应的类有默认的空构造器;

  • 先使用class对象获取指定的constructor对象,再调用constructor对象的newInstance方法创建class对象对应类的实例,通过这种方法可以选定构造方法创建实例;

    注解

    注解概念

    注解(Annotation)是Java提供的设置程序中元素的关联信息和元数据(MetaData)的方法,它是一个接口,程序可以通过反射获取指定程序中元素的注解对象,然后通过该注解对象获取注解中的元数据信息。

标准元注解

标准元注解:@Target 、@Retention、@Documented、@Inherited 元注解(Meta-Annotation)负责注解其他注解。在Java中定义了4个标准的元注解类型,用于定义不同类型的注解

  • @Target :说明了注解所修饰的对象范围。注解可被用于packages、types(类、接口、枚举、注解类型)、类型成员(方法、构造方法、成员变量、枚举值)、方法参数和本地变量(循环变量、catch参数等)。在注解类型的声明中使用了target,可更加明确其修饰的目标,target取值如下

注解target值.jpg

  • @Retention:定义了该注解被保留的级别,即被描述的注解在什么级别有效,如下三种类型:
    • SOURCE:在源文件中有效,即在源文件中被保留;
    • CLASS:在class文件中有效,即在class文件中被保留;
    • RUNTIME:在运行时有效,即运行时被保留;
  • @Documented:表明这个注解应该被javadoc工具记录,因此可以被javadoc类的工具文档化;
  • @Inherited:是一个标记注解,表明某个被标注的类型是被继承的。如果有一个使用了@Inherited修饰的Annotation被用于一个Class,则这个注解将被用于该Class的子类。

    内部类

    内部类是定义在类内部的类。内部类根据不同的定义方式,可分为静态内部类、成员内部类、局部内部类和匿名内部类这四种。

静态内部类

定义在类内部的静态类被称为静态内部类。静态内部类可以访问外部;类的静态变量和方法;在静态内部类中可以定义静态变量、方法、构造函数等;静态内部类通过“外部类.静态内部类”的方式调用。 在Java集合HashMap在内部维护了一个静态内部类Node数据用于存放元素,但Node数组对使用者是透明的。像这种和外部关系密切且不依赖外部类实例的类,可以使用静态内部类实现。

成员内部类

定义在类内部的非静态类叫作成员内部类。成员内部类不能定义静态方法和变量(final修饰除外),因为成员内部类是非静态的,而在Java的非静态代码块中不能定义静态方法和变量。 成员内部类的调用方式与静态内部类的调用方式一样。

局部内部类

定义在方法中的类叫作局部内部类。当一个类只需要在某个方法中使用某个特定的类时,可以通过局部类实现。

匿名内部类

匿名内部类是指通过继承一个父类或者实现一个接口的方式直接定义并使用的类。匿名内部类没有class关键字,这是因为匿名内部类直接使用new生成一个对象的引用。

Java并发编程

Java线程的创建

常见的Java线程的4种创建方式分别:

  • 继承Thread类
  • 实现Runnable接口
  • 通过ExecutorService和Callable实现有返回值的线程
  • 基于线程池
    继承Thread类

实现Runnable接口

基于线程池

线程池的工作原理

Java线程池主要用于管理线程组及其运行状态,以便Java虚拟机更好的利用CPU资源。 Java线程池工作原理:JVM先根据用户参数创建一定数量的可运行的线程任务,并将其放入队列中,在线程创建后启动这些任务,如果线程数量超过了最大线程数量(用户设置线程池的大小),则超出数量的线程排队等候,在有任务执行完毕后,线程池调度器会发现有可用的线程,进而再次从队列中取出任务。 线程池主要作用:线程复用、线程资源管理、控制操作系统的最大并发数,以保证系统高效且安全的运行。

线程复用

线程池核心组件和核心类

Java线程池主要由以下4个核心组件组成:

  • 线程池管理器:用于创建并管理线程池;
  • 工作线程:线程池中执行具体任务的线程;
  • 任务接口:用于定义工作线程的调度和执行策略,只有线程实现该接口,线程中的任务才能够被线程池调度。
  • 任务队列:存放待处理的任务,新的任务将会不断被加入队列中,执行完成的任务将被从队列中移除。

Java线程池是通过Executor框架实现,在该框架中用到了Executor、Excutors、ExecutorService、ThreadPoolExecutor、Callable、Future、FutureTask几个核心类。

作者 @zzxhub
2018 年 07月 07日